tool_provider.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. from abc import ABC, abstractmethod
  2. from copy import deepcopy
  3. from typing import Any
  4. from core.entities.provider_entities import ProviderConfig
  5. from core.tools.__base.tool import Tool
  6. from core.tools.entities.tool_entities import (
  7. ToolProviderEntity,
  8. ToolProviderType,
  9. )
  10. from core.tools.errors import ToolProviderCredentialValidationError
  11. class ToolProviderController(ABC):
  12. entity: ToolProviderEntity
  13. def __init__(self, entity: ToolProviderEntity) -> None:
  14. self.entity = entity
  15. def get_credentials_schema(self) -> list[ProviderConfig]:
  16. """
  17. returns the credentials schema of the provider
  18. :return: the credentials schema
  19. """
  20. return deepcopy(self.entity.credentials_schema)
  21. @abstractmethod
  22. def get_tool(self, tool_name: str) -> Tool:
  23. """
  24. returns a tool that the provider can provide
  25. :return: tool
  26. """
  27. pass
  28. @property
  29. def provider_type(self) -> ToolProviderType:
  30. """
  31. returns the type of the provider
  32. :return: type of the provider
  33. """
  34. return ToolProviderType.BUILT_IN
  35. def validate_credentials_format(self, credentials: dict[str, Any]) -> None:
  36. """
  37. validate the format of the credentials of the provider and set the default value if needed
  38. :param credentials: the credentials of the tool
  39. """
  40. credentials_schema = dict[str, ProviderConfig]()
  41. if credentials_schema is None:
  42. return
  43. for credential in self.entity.credentials_schema:
  44. credentials_schema[credential.name] = credential
  45. credentials_need_to_validate: dict[str, ProviderConfig] = {}
  46. for credential_name in credentials_schema:
  47. credentials_need_to_validate[credential_name] = credentials_schema[credential_name]
  48. for credential_name in credentials:
  49. if credential_name not in credentials_need_to_validate:
  50. raise ToolProviderCredentialValidationError(
  51. f"credential {credential_name} not found in provider {self.entity.identity.name}"
  52. )
  53. # check type
  54. credential_schema = credentials_need_to_validate[credential_name]
  55. if not credential_schema.required and credentials[credential_name] is None:
  56. continue
  57. if credential_schema.type in {ProviderConfig.Type.SECRET_INPUT, ProviderConfig.Type.TEXT_INPUT}:
  58. if not isinstance(credentials[credential_name], str):
  59. raise ToolProviderCredentialValidationError(f"credential {credential_name} should be string")
  60. elif credential_schema.type == ProviderConfig.Type.SELECT:
  61. if not isinstance(credentials[credential_name], str):
  62. raise ToolProviderCredentialValidationError(f"credential {credential_name} should be string")
  63. options = credential_schema.options
  64. if not isinstance(options, list):
  65. raise ToolProviderCredentialValidationError(f"credential {credential_name} options should be list")
  66. if credentials[credential_name] not in [x.value for x in options]:
  67. raise ToolProviderCredentialValidationError(
  68. f"credential {credential_name} should be one of {options}"
  69. )
  70. credentials_need_to_validate.pop(credential_name)
  71. for credential_name in credentials_need_to_validate:
  72. credential_schema = credentials_need_to_validate[credential_name]
  73. if credential_schema.required:
  74. raise ToolProviderCredentialValidationError(f"credential {credential_name} is required")
  75. # the credential is not set currently, set the default value if needed
  76. if credential_schema.default is not None:
  77. default_value = credential_schema.default
  78. # parse default value into the correct type
  79. if credential_schema.type in {
  80. ProviderConfig.Type.SECRET_INPUT,
  81. ProviderConfig.Type.TEXT_INPUT,
  82. ProviderConfig.Type.SELECT,
  83. }:
  84. default_value = str(default_value)
  85. credentials[credential_name] = default_value