file_obj.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. import enum
  2. from typing import Optional
  3. from pydantic import BaseModel
  4. from core.app.app_config.entities import FileExtraConfig
  5. from core.file.tool_file_parser import ToolFileParser
  6. from core.file.upload_file_parser import UploadFileParser
  7. from core.model_runtime.entities.message_entities import ImagePromptMessageContent
  8. from extensions.ext_database import db
  9. from models.model import UploadFile
  10. class FileType(enum.Enum):
  11. IMAGE = 'image'
  12. @staticmethod
  13. def value_of(value):
  14. for member in FileType:
  15. if member.value == value:
  16. return member
  17. raise ValueError(f"No matching enum found for value '{value}'")
  18. class FileTransferMethod(enum.Enum):
  19. REMOTE_URL = 'remote_url'
  20. LOCAL_FILE = 'local_file'
  21. TOOL_FILE = 'tool_file'
  22. @staticmethod
  23. def value_of(value):
  24. for member in FileTransferMethod:
  25. if member.value == value:
  26. return member
  27. raise ValueError(f"No matching enum found for value '{value}'")
  28. class FileBelongsTo(enum.Enum):
  29. USER = 'user'
  30. ASSISTANT = 'assistant'
  31. @staticmethod
  32. def value_of(value):
  33. for member in FileBelongsTo:
  34. if member.value == value:
  35. return member
  36. raise ValueError(f"No matching enum found for value '{value}'")
  37. class FileVar(BaseModel):
  38. id: Optional[str] = None # message file id
  39. tenant_id: str
  40. type: FileType
  41. transfer_method: FileTransferMethod
  42. url: Optional[str] = None # remote url
  43. related_id: Optional[str] = None
  44. extra_config: Optional[FileExtraConfig] = None
  45. filename: Optional[str] = None
  46. extension: Optional[str] = None
  47. mime_type: Optional[str] = None
  48. def to_dict(self) -> dict:
  49. return {
  50. '__variant': self.__class__.__name__,
  51. 'tenant_id': self.tenant_id,
  52. 'type': self.type.value,
  53. 'transfer_method': self.transfer_method.value,
  54. 'url': self.preview_url,
  55. 'related_id': self.related_id,
  56. 'filename': self.filename,
  57. 'extension': self.extension,
  58. 'mime_type': self.mime_type,
  59. }
  60. def to_markdown(self) -> str:
  61. """
  62. Convert file to markdown
  63. :return:
  64. """
  65. preview_url = self.preview_url
  66. if self.type == FileType.IMAGE:
  67. text = f'![{self.filename or ""}]({preview_url})'
  68. else:
  69. text = f'[{self.filename or preview_url}]({preview_url})'
  70. return text
  71. @property
  72. def data(self) -> Optional[str]:
  73. """
  74. Get image data, file signed url or base64 data
  75. depending on config MULTIMODAL_SEND_IMAGE_FORMAT
  76. :return:
  77. """
  78. return self._get_data()
  79. @property
  80. def preview_url(self) -> Optional[str]:
  81. """
  82. Get signed preview url
  83. :return:
  84. """
  85. return self._get_data(force_url=True)
  86. @property
  87. def prompt_message_content(self) -> ImagePromptMessageContent:
  88. if self.type == FileType.IMAGE:
  89. image_config = self.extra_config.image_config
  90. return ImagePromptMessageContent(
  91. data=self.data,
  92. detail=ImagePromptMessageContent.DETAIL.HIGH
  93. if image_config.get("detail") == "high" else ImagePromptMessageContent.DETAIL.LOW
  94. )
  95. def _get_data(self, force_url: bool = False) -> Optional[str]:
  96. if self.type == FileType.IMAGE:
  97. if self.transfer_method == FileTransferMethod.REMOTE_URL:
  98. return self.url
  99. elif self.transfer_method == FileTransferMethod.LOCAL_FILE:
  100. upload_file = (db.session.query(UploadFile)
  101. .filter(
  102. UploadFile.id == self.related_id,
  103. UploadFile.tenant_id == self.tenant_id
  104. ).first())
  105. return UploadFileParser.get_image_data(
  106. upload_file=upload_file,
  107. force_url=force_url
  108. )
  109. elif self.transfer_method == FileTransferMethod.TOOL_FILE:
  110. extension = self.extension
  111. # add sign url
  112. return ToolFileParser.get_tool_file_manager().sign_file(tool_file_id=self.related_id, extension=extension)
  113. return None