| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347 | 
							
- import re
 
- import uuid
 
- from json import dumps as json_dumps
 
- from json import loads as json_loads
 
- from json.decoder import JSONDecodeError
 
- from requests import get
 
- from yaml import YAMLError, safe_load
 
- from core.tools.entities.common_entities import I18nObject
 
- from core.tools.entities.tool_bundle import ApiToolBundle
 
- from core.tools.entities.tool_entities import ApiProviderSchemaType, ToolParameter
 
- from core.tools.errors import ToolApiSchemaError, ToolNotSupportedError, ToolProviderNotFoundError
 
- class ApiBasedToolSchemaParser:
 
-     @staticmethod
 
-     def parse_openapi_to_tool_bundle(openapi: dict, extra_info: dict = None, warning: dict = None) -> list[ApiToolBundle]:
 
-         warning = warning if warning is not None else {}
 
-         extra_info = extra_info if extra_info is not None else {}
 
-         # set description to extra_info
 
-         extra_info['description'] = openapi['info'].get('description', '')
 
-         if len(openapi['servers']) == 0:
 
-             raise ToolProviderNotFoundError('No server found in the openapi yaml.')
 
-         server_url = openapi['servers'][0]['url']
 
-         # list all interfaces
 
-         interfaces = []
 
-         for path, path_item in openapi['paths'].items():
 
-             methods = ['get', 'post', 'put', 'delete', 'patch', 'head', 'options', 'trace']
 
-             for method in methods:
 
-                 if method in path_item:
 
-                     interfaces.append({
 
-                         'path': path,
 
-                         'method': method,
 
-                         'operation': path_item[method],
 
-                     })
 
-         # get all parameters
 
-         bundles = []
 
-         for interface in interfaces:
 
-             # convert parameters
 
-             parameters = []
 
-             if 'parameters' in interface['operation']:
 
-                 for parameter in interface['operation']['parameters']:
 
-                     tool_parameter = ToolParameter(
 
-                         name=parameter['name'],
 
-                         label=I18nObject(
 
-                             en_US=parameter['name'],
 
-                             zh_Hans=parameter['name']
 
-                         ),
 
-                         human_description=I18nObject(
 
-                             en_US=parameter.get('description', ''),
 
-                             zh_Hans=parameter.get('description', '')
 
-                         ),
 
-                         type=ToolParameter.ToolParameterType.STRING,
 
-                         required=parameter.get('required', False),
 
-                         form=ToolParameter.ToolParameterForm.LLM,
 
-                         llm_description=parameter.get('description'),
 
-                         default=parameter['schema']['default'] if 'schema' in parameter and 'default' in parameter['schema'] else None,
 
-                     )
 
-                    
 
-                     # check if there is a type
 
-                     typ = ApiBasedToolSchemaParser._get_tool_parameter_type(parameter)
 
-                     if typ:
 
-                         tool_parameter.type = typ
 
-                     parameters.append(tool_parameter)
 
-             # create tool bundle
 
-             # check if there is a request body
 
-             if 'requestBody' in interface['operation']:
 
-                 request_body = interface['operation']['requestBody']
 
-                 if 'content' in request_body:
 
-                     for content_type, content in request_body['content'].items():
 
-                         # if there is a reference, get the reference and overwrite the content
 
-                         if 'schema' not in content:
 
-                             continue
 
-                         if '$ref' in content['schema']:
 
-                             # get the reference
 
-                             root = openapi
 
-                             reference = content['schema']['$ref'].split('/')[1:]
 
-                             for ref in reference:
 
-                                 root = root[ref]
 
-                             # overwrite the content
 
-                             interface['operation']['requestBody']['content'][content_type]['schema'] = root
 
-                     # parse body parameters
 
-                     if 'schema' in interface['operation']['requestBody']['content'][content_type]:
 
-                         body_schema = interface['operation']['requestBody']['content'][content_type]['schema']
 
-                         required = body_schema.get('required', [])
 
-                         properties = body_schema.get('properties', {})
 
-                         for name, property in properties.items():
 
-                             tool = ToolParameter(
 
-                                 name=name,
 
-                                 label=I18nObject(
 
-                                     en_US=name,
 
-                                     zh_Hans=name
 
-                                 ),
 
-                                 human_description=I18nObject(
 
-                                     en_US=property.get('description', ''),
 
-                                     zh_Hans=property.get('description', '')
 
-                                 ),
 
-                                 type=ToolParameter.ToolParameterType.STRING,
 
-                                 required=name in required,
 
-                                 form=ToolParameter.ToolParameterForm.LLM,
 
-                                 llm_description=property.get('description', ''),
 
-                                 default=property.get('default', None),
 
-                             )
 
-                             # check if there is a type
 
-                             typ = ApiBasedToolSchemaParser._get_tool_parameter_type(property)
 
-                             if typ:
 
-                                 tool.type = typ
 
-                             parameters.append(tool)
 
-             # check if parameters is duplicated
 
-             parameters_count = {}
 
-             for parameter in parameters:
 
-                 if parameter.name not in parameters_count:
 
-                     parameters_count[parameter.name] = 0
 
-                 parameters_count[parameter.name] += 1
 
-             for name, count in parameters_count.items():
 
-                 if count > 1:
 
-                     warning['duplicated_parameter'] = f'Parameter {name} is duplicated.'
 
-             # check if there is a operation id, use $path_$method as operation id if not
 
-             if 'operationId' not in interface['operation']:
 
-                 # remove special characters like / to ensure the operation id is valid ^[a-zA-Z0-9_-]{1,64}$
 
-                 path = interface['path']
 
-                 if interface['path'].startswith('/'):
 
-                     path = interface['path'][1:]
 
-                 # remove special characters like / to ensure the operation id is valid ^[a-zA-Z0-9_-]{1,64}$
 
-                 path = re.sub(r'[^a-zA-Z0-9_-]', '', path)
 
-                 if not path:
 
-                     path = str(uuid.uuid4())
 
-                     
 
-                 interface['operation']['operationId'] = f'{path}_{interface["method"]}'
 
-             bundles.append(ApiToolBundle(
 
-                 server_url=server_url + interface['path'],
 
-                 method=interface['method'],
 
-                 summary=interface['operation']['description'] if 'description' in interface['operation'] else 
 
-                         interface['operation'].get('summary', None),
 
-                 operation_id=interface['operation']['operationId'],
 
-                 parameters=parameters,
 
-                 author='',
 
-                 icon=None,
 
-                 openapi=interface['operation'],
 
-             ))
 
-         return bundles
 
-     
 
-     @staticmethod
 
-     def _get_tool_parameter_type(parameter: dict) -> ToolParameter.ToolParameterType:
 
-         parameter = parameter or {}
 
-         typ = None
 
-         if 'type' in parameter:
 
-             typ = parameter['type']
 
-         elif 'schema' in parameter and 'type' in parameter['schema']:
 
-             typ = parameter['schema']['type']
 
-         
 
-         if typ == 'integer' or typ == 'number':
 
-             return ToolParameter.ToolParameterType.NUMBER
 
-         elif typ == 'boolean':
 
-             return ToolParameter.ToolParameterType.BOOLEAN
 
-         elif typ == 'string':
 
-             return ToolParameter.ToolParameterType.STRING
 
-     @staticmethod
 
-     def parse_openapi_yaml_to_tool_bundle(yaml: str, extra_info: dict = None, warning: dict = None) -> list[ApiToolBundle]:
 
-         """
 
-             parse openapi yaml to tool bundle
 
-             :param yaml: the yaml string
 
-             :return: the tool bundle
 
-         """
 
-         warning = warning if warning is not None else {}
 
-         extra_info = extra_info if extra_info is not None else {}
 
-         openapi: dict = safe_load(yaml)
 
-         if openapi is None:
 
-             raise ToolApiSchemaError('Invalid openapi yaml.')
 
-         return ApiBasedToolSchemaParser.parse_openapi_to_tool_bundle(openapi, extra_info=extra_info, warning=warning)
 
-     
 
-     @staticmethod
 
-     def parse_swagger_to_openapi(swagger: dict, extra_info: dict = None, warning: dict = None) -> dict:
 
-         """
 
-             parse swagger to openapi
 
-             :param swagger: the swagger dict
 
-             :return: the openapi dict
 
-         """
 
-         # convert swagger to openapi
 
-         info = swagger.get('info', {
 
-             'title': 'Swagger',
 
-             'description': 'Swagger',
 
-             'version': '1.0.0'
 
-         })
 
-         servers = swagger.get('servers', [])
 
-         if len(servers) == 0:
 
-             raise ToolApiSchemaError('No server found in the swagger yaml.')
 
-         openapi = {
 
-             'openapi': '3.0.0',
 
-             'info': {
 
-                 'title': info.get('title', 'Swagger'),
 
-                 'description': info.get('description', 'Swagger'),
 
-                 'version': info.get('version', '1.0.0')
 
-             },
 
-             'servers': swagger['servers'],
 
-             'paths': {},
 
-             'components': {
 
-                 'schemas': {}
 
-             }
 
-         }
 
-         # check paths
 
-         if 'paths' not in swagger or len(swagger['paths']) == 0:
 
-             raise ToolApiSchemaError('No paths found in the swagger yaml.')
 
-         # convert paths
 
-         for path, path_item in swagger['paths'].items():
 
-             openapi['paths'][path] = {}
 
-             for method, operation in path_item.items():
 
-                 if 'operationId' not in operation:
 
-                     raise ToolApiSchemaError(f'No operationId found in operation {method} {path}.')
 
-                 
 
-                 if ('summary' not in operation or len(operation['summary']) == 0) and \
 
-                     ('description' not in operation or len(operation['description']) == 0):
 
-                     warning['missing_summary'] = f'No summary or description found in operation {method} {path}.'
 
-                 
 
-                 openapi['paths'][path][method] = {
 
-                     'operationId': operation['operationId'],
 
-                     'summary': operation.get('summary', ''),
 
-                     'description': operation.get('description', ''),
 
-                     'parameters': operation.get('parameters', []),
 
-                     'responses': operation.get('responses', {}),
 
-                 }
 
-                 if 'requestBody' in operation:
 
-                     openapi['paths'][path][method]['requestBody'] = operation['requestBody']
 
-         # convert definitions
 
-         for name, definition in swagger['definitions'].items():
 
-             openapi['components']['schemas'][name] = definition
 
-         return openapi
 
-     @staticmethod
 
-     def parse_openai_plugin_json_to_tool_bundle(json: str, extra_info: dict = None, warning: dict = None) -> list[ApiToolBundle]:
 
-         """
 
-             parse openapi plugin yaml to tool bundle
 
-             :param json: the json string
 
-             :return: the tool bundle
 
-         """
 
-         warning = warning if warning is not None else {}
 
-         extra_info = extra_info if extra_info is not None else {}
 
-         try:
 
-             openai_plugin = json_loads(json)
 
-             api = openai_plugin['api']
 
-             api_url = api['url']
 
-             api_type = api['type']
 
-         except:
 
-             raise ToolProviderNotFoundError('Invalid openai plugin json.')
 
-         
 
-         if api_type != 'openapi':
 
-             raise ToolNotSupportedError('Only openapi is supported now.')
 
-         
 
-         # get openapi yaml
 
-         response = get(api_url, headers={
 
-             'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) '
 
-         }, timeout=5)
 
-         if response.status_code != 200:
 
-             raise ToolProviderNotFoundError('cannot get openapi yaml from url.')
 
-         
 
-         return ApiBasedToolSchemaParser.parse_openapi_yaml_to_tool_bundle(response.text, extra_info=extra_info, warning=warning)
 
-     
 
-     @staticmethod
 
-     def auto_parse_to_tool_bundle(content: str, extra_info: dict = None, warning: dict = None) -> tuple[list[ApiToolBundle], str]:
 
-         """
 
-             auto parse to tool bundle
 
-             :param content: the content
 
-             :return: tools bundle, schema_type
 
-         """
 
-         warning = warning if warning is not None else {}
 
-         extra_info = extra_info if extra_info is not None else {}
 
-         content = content.strip()
 
-         loaded_content = None
 
-         json_error = None
 
-         yaml_error = None
 
-         
 
-         try:
 
-             loaded_content = json_loads(content)
 
-         except JSONDecodeError as e:
 
-             json_error = e
 
-         if loaded_content is None:
 
-             try:
 
-                 loaded_content = safe_load(content)
 
-             except YAMLError as e:
 
-                 yaml_error = e
 
-         if loaded_content is None:
 
-             raise ToolApiSchemaError(f'Invalid api schema, schema is neither json nor yaml. json error: {str(json_error)}, yaml error: {str(yaml_error)}')
 
-         swagger_error = None
 
-         openapi_error = None
 
-         openapi_plugin_error = None
 
-         schema_type = None
 
-         
 
-         try:
 
-             openapi = ApiBasedToolSchemaParser.parse_openapi_to_tool_bundle(loaded_content, extra_info=extra_info, warning=warning)
 
-             schema_type = ApiProviderSchemaType.OPENAPI.value
 
-             return openapi, schema_type
 
-         except ToolApiSchemaError as e:
 
-             openapi_error = e
 
-         
 
-         # openai parse error, fallback to swagger
 
-         try:
 
-             converted_swagger = ApiBasedToolSchemaParser.parse_swagger_to_openapi(loaded_content, extra_info=extra_info, warning=warning)
 
-             schema_type = ApiProviderSchemaType.SWAGGER.value
 
-             return ApiBasedToolSchemaParser.parse_openapi_to_tool_bundle(converted_swagger, extra_info=extra_info, warning=warning), schema_type
 
-         except ToolApiSchemaError as e:
 
-             swagger_error = e
 
-         
 
-         # swagger parse error, fallback to openai plugin
 
-         try:
 
-             openapi_plugin = ApiBasedToolSchemaParser.parse_openai_plugin_json_to_tool_bundle(json_dumps(loaded_content), extra_info=extra_info, warning=warning)
 
-             return openapi_plugin, ApiProviderSchemaType.OPENAI_PLUGIN.value
 
-         except ToolNotSupportedError as e:
 
-             # maybe it's not plugin at all
 
-             openapi_plugin_error = e
 
-         raise ToolApiSchemaError(f'Invalid api schema, openapi error: {str(openapi_error)}, swagger error: {str(swagger_error)}, openapi plugin error: {str(openapi_plugin_error)}')
 
 
  |