| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363 | import reimport uuidfrom json import dumps as json_dumpsfrom json import loads as json_loadsfrom json.decoder import JSONDecodeErrorfrom typing import Optionalfrom requests import getfrom yaml import YAMLError, safe_loadfrom core.tools.entities.common_entities import I18nObjectfrom core.tools.entities.tool_bundle import ApiToolBundlefrom core.tools.entities.tool_entities import ApiProviderSchemaType, ToolParameterfrom core.tools.errors import ToolApiSchemaError, ToolNotSupportedError, ToolProviderNotFoundErrorclass ApiBasedToolSchemaParser:    @staticmethod    def parse_openapi_to_tool_bundle(        openapi: dict, extra_info: Optional[dict], warning: Optional[dict]    ) -> list[ApiToolBundle]:        warning = warning if warning is not None else {}        extra_info = extra_info if extra_info is not None else {}        # set description to extra_info        extra_info["description"] = openapi["info"].get("description", "")        if len(openapi["servers"]) == 0:            raise ToolProviderNotFoundError("No server found in the openapi yaml.")        server_url = openapi["servers"][0]["url"]        # list all interfaces        interfaces = []        for path, path_item in openapi["paths"].items():            methods = ["get", "post", "put", "delete", "patch", "head", "options", "trace"]            for method in methods:                if method in path_item:                    interfaces.append(                        {                            "path": path,                            "method": method,                            "operation": path_item[method],                        }                    )        # get all parameters        bundles = []        for interface in interfaces:            # convert parameters            parameters = []            if "parameters" in interface["operation"]:                for parameter in interface["operation"]["parameters"]:                    tool_parameter = ToolParameter(                        name=parameter["name"],                        label=I18nObject(en_US=parameter["name"], zh_Hans=parameter["name"]),                        human_description=I18nObject(                            en_US=parameter.get("description", ""), zh_Hans=parameter.get("description", "")                        ),                        type=ToolParameter.ToolParameterType.STRING,                        required=parameter.get("required", False),                        form=ToolParameter.ToolParameterForm.LLM,                        llm_description=parameter.get("description"),                        default=parameter["schema"]["default"]                        if "schema" in parameter and "default" in parameter["schema"]                        else None,                    )                    # check if there is a type                    typ = ApiBasedToolSchemaParser._get_tool_parameter_type(parameter)                    if typ:                        tool_parameter.type = typ                    parameters.append(tool_parameter)            # create tool bundle            # check if there is a request body            if "requestBody" in interface["operation"]:                request_body = interface["operation"]["requestBody"]                if "content" in request_body:                    for content_type, content in request_body["content"].items():                        # if there is a reference, get the reference and overwrite the content                        if "schema" not in content:                            continue                        if "$ref" in content["schema"]:                            # get the reference                            root = openapi                            reference = content["schema"]["$ref"].split("/")[1:]                            for ref in reference:                                root = root[ref]                            # overwrite the content                            interface["operation"]["requestBody"]["content"][content_type]["schema"] = root                    # parse body parameters                    if "schema" in interface["operation"]["requestBody"]["content"][content_type]:                        body_schema = interface["operation"]["requestBody"]["content"][content_type]["schema"]                        required = body_schema.get("required", [])                        properties = body_schema.get("properties", {})                        for name, property in properties.items():                            tool = ToolParameter(                                name=name,                                label=I18nObject(en_US=name, zh_Hans=name),                                human_description=I18nObject(                                    en_US=property.get("description", ""), zh_Hans=property.get("description", "")                                ),                                type=ToolParameter.ToolParameterType.STRING,                                required=name in required,                                form=ToolParameter.ToolParameterForm.LLM,                                llm_description=property.get("description", ""),                                default=property.get("default", None),                            )                            # check if there is a type                            typ = ApiBasedToolSchemaParser._get_tool_parameter_type(property)                            if typ:                                tool.type = typ                            parameters.append(tool)            # check if parameters is duplicated            parameters_count = {}            for parameter in parameters:                if parameter.name not in parameters_count:                    parameters_count[parameter.name] = 0                parameters_count[parameter.name] += 1            for name, count in parameters_count.items():                if count > 1:                    warning["duplicated_parameter"] = f"Parameter {name} is duplicated."            # check if there is a operation id, use $path_$method as operation id if not            if "operationId" not in interface["operation"]:                # remove special characters like / to ensure the operation id is valid ^[a-zA-Z0-9_-]{1,64}$                path = interface["path"]                if interface["path"].startswith("/"):                    path = interface["path"][1:]                # remove special characters like / to ensure the operation id is valid ^[a-zA-Z0-9_-]{1,64}$                path = re.sub(r"[^a-zA-Z0-9_-]", "", path)                if not path:                    path = str(uuid.uuid4())                interface["operation"]["operationId"] = f'{path}_{interface["method"]}'            bundles.append(                ApiToolBundle(                    server_url=server_url + interface["path"],                    method=interface["method"],                    summary=interface["operation"]["description"]                    if "description" in interface["operation"]                    else interface["operation"].get("summary", None),                    operation_id=interface["operation"]["operationId"],                    parameters=parameters,                    author="",                    icon=None,                    openapi=interface["operation"],                )            )        return bundles    @staticmethod    def _get_tool_parameter_type(parameter: dict) -> ToolParameter.ToolParameterType:        parameter = parameter or {}        typ = None        if "type" in parameter:            typ = parameter["type"]        elif "schema" in parameter and "type" in parameter["schema"]:            typ = parameter["schema"]["type"]        if typ in {"integer", "number"}:            return ToolParameter.ToolParameterType.NUMBER        elif typ == "boolean":            return ToolParameter.ToolParameterType.BOOLEAN        elif typ == "string":            return ToolParameter.ToolParameterType.STRING    @staticmethod    def parse_openapi_yaml_to_tool_bundle(        yaml: str, extra_info: Optional[dict], warning: Optional[dict]    ) -> list[ApiToolBundle]:        """        parse openapi yaml to tool bundle        :param yaml: the yaml string        :return: the tool bundle        """        warning = warning if warning is not None else {}        extra_info = extra_info if extra_info is not None else {}        openapi: dict = safe_load(yaml)        if openapi is None:            raise ToolApiSchemaError("Invalid openapi yaml.")        return ApiBasedToolSchemaParser.parse_openapi_to_tool_bundle(openapi, extra_info=extra_info, warning=warning)    @staticmethod    def parse_swagger_to_openapi(swagger: dict, extra_info: Optional[dict], warning: Optional[dict]) -> dict:        """        parse swagger to openapi        :param swagger: the swagger dict        :return: the openapi dict        """        # convert swagger to openapi        info = swagger.get("info", {"title": "Swagger", "description": "Swagger", "version": "1.0.0"})        servers = swagger.get("servers", [])        if len(servers) == 0:            raise ToolApiSchemaError("No server found in the swagger yaml.")        openapi = {            "openapi": "3.0.0",            "info": {                "title": info.get("title", "Swagger"),                "description": info.get("description", "Swagger"),                "version": info.get("version", "1.0.0"),            },            "servers": swagger["servers"],            "paths": {},            "components": {"schemas": {}},        }        # check paths        if "paths" not in swagger or len(swagger["paths"]) == 0:            raise ToolApiSchemaError("No paths found in the swagger yaml.")        # convert paths        for path, path_item in swagger["paths"].items():            openapi["paths"][path] = {}            for method, operation in path_item.items():                if "operationId" not in operation:                    raise ToolApiSchemaError(f"No operationId found in operation {method} {path}.")                if ("summary" not in operation or len(operation["summary"]) == 0) and (                    "description" not in operation or len(operation["description"]) == 0                ):                    warning["missing_summary"] = f"No summary or description found in operation {method} {path}."                openapi["paths"][path][method] = {                    "operationId": operation["operationId"],                    "summary": operation.get("summary", ""),                    "description": operation.get("description", ""),                    "parameters": operation.get("parameters", []),                    "responses": operation.get("responses", {}),                }                if "requestBody" in operation:                    openapi["paths"][path][method]["requestBody"] = operation["requestBody"]        # convert definitions        for name, definition in swagger["definitions"].items():            openapi["components"]["schemas"][name] = definition        return openapi    @staticmethod    def parse_openai_plugin_json_to_tool_bundle(        json: str, extra_info: Optional[dict], warning: Optional[dict]    ) -> list[ApiToolBundle]:        """        parse openapi plugin yaml to tool bundle        :param json: the json string        :return: the tool bundle        """        warning = warning if warning is not None else {}        extra_info = extra_info if extra_info is not None else {}        try:            openai_plugin = json_loads(json)            api = openai_plugin["api"]            api_url = api["url"]            api_type = api["type"]        except:            raise ToolProviderNotFoundError("Invalid openai plugin json.")        if api_type != "openapi":            raise ToolNotSupportedError("Only openapi is supported now.")        # get openapi yaml        response = get(api_url, headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "}, timeout=5)        if response.status_code != 200:            raise ToolProviderNotFoundError("cannot get openapi yaml from url.")        return ApiBasedToolSchemaParser.parse_openapi_yaml_to_tool_bundle(            response.text, extra_info=extra_info, warning=warning        )    @staticmethod    def auto_parse_to_tool_bundle(        content: str, extra_info: Optional[dict] = None, warning: Optional[dict] = None    ) -> tuple[list[ApiToolBundle], str]:        """        auto parse to tool bundle        :param content: the content        :return: tools bundle, schema_type        """        warning = warning if warning is not None else {}        extra_info = extra_info if extra_info is not None else {}        content = content.strip()        loaded_content = None        json_error = None        yaml_error = None        try:            loaded_content = json_loads(content)        except JSONDecodeError as e:            json_error = e        if loaded_content is None:            try:                loaded_content = safe_load(content)            except YAMLError as e:                yaml_error = e        if loaded_content is None:            raise ToolApiSchemaError(                f"Invalid api schema, schema is neither json nor yaml. json error: {str(json_error)},"                f" yaml error: {str(yaml_error)}"            )        swagger_error = None        openapi_error = None        openapi_plugin_error = None        schema_type = None        try:            openapi = ApiBasedToolSchemaParser.parse_openapi_to_tool_bundle(                loaded_content, extra_info=extra_info, warning=warning            )            schema_type = ApiProviderSchemaType.OPENAPI.value            return openapi, schema_type        except ToolApiSchemaError as e:            openapi_error = e        # openai parse error, fallback to swagger        try:            converted_swagger = ApiBasedToolSchemaParser.parse_swagger_to_openapi(                loaded_content, extra_info=extra_info, warning=warning            )            schema_type = ApiProviderSchemaType.SWAGGER.value            return ApiBasedToolSchemaParser.parse_openapi_to_tool_bundle(                converted_swagger, extra_info=extra_info, warning=warning            ), schema_type        except ToolApiSchemaError as e:            swagger_error = e        # swagger parse error, fallback to openai plugin        try:            openapi_plugin = ApiBasedToolSchemaParser.parse_openai_plugin_json_to_tool_bundle(                json_dumps(loaded_content), extra_info=extra_info, warning=warning            )            return openapi_plugin, ApiProviderSchemaType.OPENAI_PLUGIN.value        except ToolNotSupportedError as e:            # maybe it's not plugin at all            openapi_plugin_error = e        raise ToolApiSchemaError(            f"Invalid api schema, openapi error: {str(openapi_error)}, swagger error: {str(swagger_error)},"            f" openapi plugin error: {str(openapi_plugin_error)}"        )
 |