| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 | import loggingfrom mimetypes import guess_extensionfrom typing import Optionalfrom core.file import File, FileTransferMethod, FileTypefrom core.tools.entities.tool_entities import ToolInvokeMessagefrom core.tools.tool_file_manager import ToolFileManagerlogger = logging.getLogger(__name__)class ToolFileMessageTransformer:    @classmethod    def transform_tool_invoke_messages(        cls, messages: list[ToolInvokeMessage], user_id: str, tenant_id: str, conversation_id: str | None    ) -> list[ToolInvokeMessage]:        """        Transform tool message and handle file download        """        result = []        for message in messages:            if message.type in {ToolInvokeMessage.MessageType.TEXT, ToolInvokeMessage.MessageType.LINK}:                result.append(message)            elif message.type == ToolInvokeMessage.MessageType.IMAGE and isinstance(message.message, str):                # try to download image                try:                    file = ToolFileManager.create_file_by_url(                        user_id=user_id, tenant_id=tenant_id, conversation_id=conversation_id, file_url=message.message                    )                    url = f'/files/tools/{file.id}{guess_extension(file.mimetype) or ".png"}'                    result.append(                        ToolInvokeMessage(                            type=ToolInvokeMessage.MessageType.IMAGE_LINK,                            message=url,                            save_as=message.save_as,                            meta=message.meta.copy() if message.meta is not None else {},                        )                    )                except Exception as e:                    logger.exception(e)                    result.append(                        ToolInvokeMessage(                            type=ToolInvokeMessage.MessageType.TEXT,                            message=f"Failed to download image: {message.message}, please try to download it manually.",                            meta=message.meta.copy() if message.meta is not None else {},                            save_as=message.save_as,                        )                    )            elif message.type == ToolInvokeMessage.MessageType.BLOB:                # get mime type and save blob to storage                assert message.meta is not None                mimetype = message.meta.get("mime_type", "octet/stream")                # if message is str, encode it to bytes                if isinstance(message.message, str):                    message.message = message.message.encode("utf-8")                # FIXME: should do a type check here.                assert isinstance(message.message, bytes)                file = ToolFileManager.create_file_by_raw(                    user_id=user_id,                    tenant_id=tenant_id,                    conversation_id=conversation_id,                    file_binary=message.message,                    mimetype=mimetype,                )                url = cls.get_tool_file_url(tool_file_id=file.id, extension=guess_extension(file.mimetype))                # check if file is image                if "image" in mimetype:                    result.append(                        ToolInvokeMessage(                            type=ToolInvokeMessage.MessageType.IMAGE_LINK,                            message=url,                            save_as=message.save_as,                            meta=message.meta.copy() if message.meta is not None else {},                        )                    )                else:                    result.append(                        ToolInvokeMessage(                            type=ToolInvokeMessage.MessageType.LINK,                            message=url,                            save_as=message.save_as,                            meta=message.meta.copy() if message.meta is not None else {},                        )                    )            elif message.type == ToolInvokeMessage.MessageType.FILE:                assert message.meta is not None                file = message.meta.get("file")                if isinstance(file, File):                    if file.transfer_method == FileTransferMethod.TOOL_FILE:                        assert file.related_id is not None                        url = cls.get_tool_file_url(tool_file_id=file.related_id, extension=file.extension)                        if file.type == FileType.IMAGE:                            result.append(                                ToolInvokeMessage(                                    type=ToolInvokeMessage.MessageType.IMAGE_LINK,                                    message=url,                                    save_as=message.save_as,                                    meta=message.meta.copy() if message.meta is not None else {},                                )                            )                        else:                            result.append(                                ToolInvokeMessage(                                    type=ToolInvokeMessage.MessageType.LINK,                                    message=url,                                    save_as=message.save_as,                                    meta=message.meta.copy() if message.meta is not None else {},                                )                            )                    else:                        result.append(message)            else:                result.append(message)        return result    @classmethod    def get_tool_file_url(cls, tool_file_id: str, extension: Optional[str]) -> str:        return f'/files/tools/{tool_file_id}{extension or ".bin"}'
 |