tool_provider.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. from abc import ABC, abstractmethod
  2. from typing import Any
  3. from core.entities.provider_entities import ProviderConfig
  4. from core.tools.__base.tool import Tool
  5. from core.tools.entities.tool_entities import (
  6. ToolProviderEntity,
  7. ToolProviderType,
  8. )
  9. from core.tools.errors import ToolProviderCredentialValidationError
  10. class ToolProviderController(ABC):
  11. entity: ToolProviderEntity
  12. def __init__(self, entity: ToolProviderEntity) -> None:
  13. self.entity = entity
  14. def get_credentials_schema(self) -> dict[str, ProviderConfig]:
  15. """
  16. returns the credentials schema of the provider
  17. :return: the credentials schema
  18. """
  19. return self.entity.credentials_schema.copy()
  20. @abstractmethod
  21. def get_tool(self, tool_name: str) -> Tool:
  22. """
  23. returns a tool that the provider can provide
  24. :return: tool
  25. """
  26. pass
  27. @property
  28. def provider_type(self) -> ToolProviderType:
  29. """
  30. returns the type of the provider
  31. :return: type of the provider
  32. """
  33. return ToolProviderType.BUILT_IN
  34. def validate_credentials_format(self, credentials: dict[str, Any]) -> None:
  35. """
  36. validate the format of the credentials of the provider and set the default value if needed
  37. :param credentials: the credentials of the tool
  38. """
  39. credentials_schema = self.entity.credentials_schema
  40. if credentials_schema is None:
  41. return
  42. credentials_need_to_validate: dict[str, ProviderConfig] = {}
  43. for credential_name in credentials_schema:
  44. credentials_need_to_validate[credential_name] = credentials_schema[credential_name]
  45. for credential_name in credentials:
  46. if credential_name not in credentials_need_to_validate:
  47. raise ToolProviderCredentialValidationError(
  48. f"credential {credential_name} not found in provider {self.entity.identity.name}"
  49. )
  50. # check type
  51. credential_schema = credentials_need_to_validate[credential_name]
  52. if credential_schema.type in {ProviderConfig.Type.SECRET_INPUT, ProviderConfig.Type.TEXT_INPUT}:
  53. if not isinstance(credentials[credential_name], str):
  54. raise ToolProviderCredentialValidationError(f"credential {credential_name} should be string")
  55. elif credential_schema.type == ProviderConfig.Type.SELECT:
  56. if not isinstance(credentials[credential_name], str):
  57. raise ToolProviderCredentialValidationError(f"credential {credential_name} should be string")
  58. options = credential_schema.options
  59. if not isinstance(options, list):
  60. raise ToolProviderCredentialValidationError(f"credential {credential_name} options should be list")
  61. if credentials[credential_name] not in [x.value for x in options]:
  62. raise ToolProviderCredentialValidationError(
  63. f"credential {credential_name} should be one of {options}"
  64. )
  65. credentials_need_to_validate.pop(credential_name)
  66. for credential_name in credentials_need_to_validate:
  67. credential_schema = credentials_need_to_validate[credential_name]
  68. if credential_schema.required:
  69. raise ToolProviderCredentialValidationError(f"credential {credential_name} is required")
  70. # the credential is not set currently, set the default value if needed
  71. if credential_schema.default is not None:
  72. default_value = credential_schema.default
  73. # parse default value into the correct type
  74. if credential_schema.type in {
  75. ProviderConfig.Type.SECRET_INPUT,
  76. ProviderConfig.Type.TEXT_INPUT,
  77. ProviderConfig.Type.SELECT,
  78. }:
  79. default_value = str(default_value)
  80. credentials[credential_name] = default_value