tools_transform_service.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. import json
  2. import logging
  3. from typing import Optional, Union
  4. from flask import current_app
  5. from core.model_runtime.entities.common_entities import I18nObject
  6. from core.tools.entities.tool_bundle import ApiBasedToolBundle
  7. from core.tools.entities.tool_entities import ApiProviderAuthType, ToolParameter, ToolProviderCredentials
  8. from core.tools.entities.user_entities import UserTool, UserToolProvider
  9. from core.tools.provider.api_tool_provider import ApiBasedToolProviderController
  10. from core.tools.provider.builtin_tool_provider import BuiltinToolProviderController
  11. from core.tools.provider.model_tool_provider import ModelToolProviderController
  12. from core.tools.tool.tool import Tool
  13. from core.tools.utils.configuration import ToolConfigurationManager
  14. from models.tools import ApiToolProvider, BuiltinToolProvider
  15. logger = logging.getLogger(__name__)
  16. class ToolTransformService:
  17. @staticmethod
  18. def get_tool_provider_icon_url(provider_type: str, provider_name: str, icon: str) -> Union[str, dict]:
  19. """
  20. get tool provider icon url
  21. """
  22. url_prefix = (current_app.config.get("CONSOLE_API_URL")
  23. + "/console/api/workspaces/current/tool-provider/")
  24. if provider_type == UserToolProvider.ProviderType.BUILTIN.value:
  25. return url_prefix + 'builtin/' + provider_name + '/icon'
  26. elif provider_type == UserToolProvider.ProviderType.MODEL.value:
  27. return url_prefix + 'model/' + provider_name + '/icon'
  28. elif provider_type == UserToolProvider.ProviderType.API.value:
  29. try:
  30. return json.loads(icon)
  31. except:
  32. return {
  33. "background": "#252525",
  34. "content": "\ud83d\ude01"
  35. }
  36. return ''
  37. @staticmethod
  38. def repack_provider(provider: Union[dict, UserToolProvider]):
  39. """
  40. repack provider
  41. :param provider: the provider dict
  42. """
  43. if isinstance(provider, dict) and 'icon' in provider:
  44. provider['icon'] = ToolTransformService.get_tool_provider_icon_url(
  45. provider_type=provider['type'],
  46. provider_name=provider['name'],
  47. icon=provider['icon']
  48. )
  49. elif isinstance(provider, UserToolProvider):
  50. provider.icon = ToolTransformService.get_tool_provider_icon_url(
  51. provider_type=provider.type.value,
  52. provider_name=provider.name,
  53. icon=provider.icon
  54. )
  55. @staticmethod
  56. def builtin_provider_to_user_provider(
  57. provider_controller: BuiltinToolProviderController,
  58. db_provider: Optional[BuiltinToolProvider],
  59. decrypt_credentials: bool = True
  60. ) -> UserToolProvider:
  61. """
  62. convert provider controller to user provider
  63. """
  64. result = UserToolProvider(
  65. id=provider_controller.identity.name,
  66. author=provider_controller.identity.author,
  67. name=provider_controller.identity.name,
  68. description=I18nObject(
  69. en_US=provider_controller.identity.description.en_US,
  70. zh_Hans=provider_controller.identity.description.zh_Hans,
  71. ),
  72. icon=provider_controller.identity.icon,
  73. label=I18nObject(
  74. en_US=provider_controller.identity.label.en_US,
  75. zh_Hans=provider_controller.identity.label.zh_Hans,
  76. ),
  77. type=UserToolProvider.ProviderType.BUILTIN,
  78. masked_credentials={},
  79. is_team_authorization=False,
  80. tools=[]
  81. )
  82. # get credentials schema
  83. schema = provider_controller.get_credentials_schema()
  84. for name, value in schema.items():
  85. result.masked_credentials[name] = \
  86. ToolProviderCredentials.CredentialsType.default(value.type)
  87. # check if the provider need credentials
  88. if not provider_controller.need_credentials:
  89. result.is_team_authorization = True
  90. result.allow_delete = False
  91. elif db_provider:
  92. result.is_team_authorization = True
  93. if decrypt_credentials:
  94. credentials = db_provider.credentials
  95. # init tool configuration
  96. tool_configuration = ToolConfigurationManager(
  97. tenant_id=db_provider.tenant_id,
  98. provider_controller=provider_controller
  99. )
  100. # decrypt the credentials and mask the credentials
  101. decrypted_credentials = tool_configuration.decrypt_tool_credentials(credentials=credentials)
  102. masked_credentials = tool_configuration.mask_tool_credentials(credentials=decrypted_credentials)
  103. result.masked_credentials = masked_credentials
  104. result.original_credentials = decrypted_credentials
  105. return result
  106. @staticmethod
  107. def api_provider_to_controller(
  108. db_provider: ApiToolProvider,
  109. ) -> ApiBasedToolProviderController:
  110. """
  111. convert provider controller to user provider
  112. """
  113. # package tool provider controller
  114. controller = ApiBasedToolProviderController.from_db(
  115. db_provider=db_provider,
  116. auth_type=ApiProviderAuthType.API_KEY if db_provider.credentials['auth_type'] == 'api_key' else
  117. ApiProviderAuthType.NONE
  118. )
  119. return controller
  120. @staticmethod
  121. def api_provider_to_user_provider(
  122. provider_controller: ApiBasedToolProviderController,
  123. db_provider: ApiToolProvider,
  124. decrypt_credentials: bool = True
  125. ) -> UserToolProvider:
  126. """
  127. convert provider controller to user provider
  128. """
  129. username = 'Anonymous'
  130. try:
  131. username = db_provider.user.name
  132. except Exception as e:
  133. logger.error(f'failed to get user name for api provider {db_provider.id}: {str(e)}')
  134. # add provider into providers
  135. credentials = db_provider.credentials
  136. result = UserToolProvider(
  137. id=db_provider.id,
  138. author=username,
  139. name=db_provider.name,
  140. description=I18nObject(
  141. en_US=db_provider.description,
  142. zh_Hans=db_provider.description,
  143. ),
  144. icon=db_provider.icon,
  145. label=I18nObject(
  146. en_US=db_provider.name,
  147. zh_Hans=db_provider.name,
  148. ),
  149. type=UserToolProvider.ProviderType.API,
  150. masked_credentials={},
  151. is_team_authorization=True,
  152. tools=[]
  153. )
  154. if decrypt_credentials:
  155. # init tool configuration
  156. tool_configuration = ToolConfigurationManager(
  157. tenant_id=db_provider.tenant_id,
  158. provider_controller=provider_controller
  159. )
  160. # decrypt the credentials and mask the credentials
  161. decrypted_credentials = tool_configuration.decrypt_tool_credentials(credentials=credentials)
  162. masked_credentials = tool_configuration.mask_tool_credentials(credentials=decrypted_credentials)
  163. result.masked_credentials = masked_credentials
  164. return result
  165. @staticmethod
  166. def model_provider_to_user_provider(
  167. db_provider: ModelToolProviderController,
  168. ) -> UserToolProvider:
  169. """
  170. convert provider controller to user provider
  171. """
  172. return UserToolProvider(
  173. id=db_provider.identity.name,
  174. author=db_provider.identity.author,
  175. name=db_provider.identity.name,
  176. description=I18nObject(
  177. en_US=db_provider.identity.description.en_US,
  178. zh_Hans=db_provider.identity.description.zh_Hans,
  179. ),
  180. icon=db_provider.identity.icon,
  181. label=I18nObject(
  182. en_US=db_provider.identity.label.en_US,
  183. zh_Hans=db_provider.identity.label.zh_Hans,
  184. ),
  185. type=UserToolProvider.ProviderType.MODEL,
  186. masked_credentials={},
  187. is_team_authorization=db_provider.is_active,
  188. )
  189. @staticmethod
  190. def tool_to_user_tool(
  191. tool: Union[ApiBasedToolBundle, Tool], credentials: dict = None, tenant_id: str = None
  192. ) -> UserTool:
  193. """
  194. convert tool to user tool
  195. """
  196. if isinstance(tool, Tool):
  197. # fork tool runtime
  198. tool = tool.fork_tool_runtime(meta={
  199. 'credentials': credentials,
  200. 'tenant_id': tenant_id,
  201. })
  202. # get tool parameters
  203. parameters = tool.parameters or []
  204. # get tool runtime parameters
  205. runtime_parameters = tool.get_runtime_parameters() or []
  206. # override parameters
  207. current_parameters = parameters.copy()
  208. for runtime_parameter in runtime_parameters:
  209. found = False
  210. for index, parameter in enumerate(current_parameters):
  211. if parameter.name == runtime_parameter.name and parameter.form == runtime_parameter.form:
  212. current_parameters[index] = runtime_parameter
  213. found = True
  214. break
  215. if not found and runtime_parameter.form == ToolParameter.ToolParameterForm.FORM:
  216. current_parameters.append(runtime_parameter)
  217. user_tool = UserTool(
  218. author=tool.identity.author,
  219. name=tool.identity.name,
  220. label=tool.identity.label,
  221. description=tool.description.human,
  222. parameters=current_parameters
  223. )
  224. return user_tool
  225. if isinstance(tool, ApiBasedToolBundle):
  226. return UserTool(
  227. author=tool.author,
  228. name=tool.operation_id,
  229. label=I18nObject(
  230. en_US=tool.operation_id,
  231. zh_Hans=tool.operation_id
  232. ),
  233. description=I18nObject(
  234. en_US=tool.summary or '',
  235. zh_Hans=tool.summary or ''
  236. ),
  237. parameters=tool.parameters
  238. )