| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 | import ioimport loggingimport uuidfrom typing import Optionalfrom werkzeug.datastructures import FileStoragefrom core.model_manager import ModelManagerfrom core.model_runtime.entities.model_entities import ModelTypefrom models.model import App, AppMode, AppModelConfig, Messagefrom services.errors.audio import (    AudioTooLargeServiceError,    NoAudioUploadedServiceError,    ProviderNotSupportSpeechToTextServiceError,    ProviderNotSupportTextToSpeechServiceError,    UnsupportedAudioTypeServiceError,)FILE_SIZE = 30FILE_SIZE_LIMIT = FILE_SIZE * 1024 * 1024ALLOWED_EXTENSIONS = ["mp3", "mp4", "mpeg", "mpga", "m4a", "wav", "webm", "amr"]logger = logging.getLogger(__name__)class AudioService:    @classmethod    def transcript_asr(cls, app_model: App, file: FileStorage, end_user: Optional[str] = None):        if app_model.mode in {AppMode.ADVANCED_CHAT.value, AppMode.WORKFLOW.value}:            workflow = app_model.workflow            if workflow is None:                raise ValueError("Speech to text is not enabled")            features_dict = workflow.features_dict            if "speech_to_text" not in features_dict or not features_dict["speech_to_text"].get("enabled"):                raise ValueError("Speech to text is not enabled")        else:            app_model_config: AppModelConfig = app_model.app_model_config            if not app_model_config.speech_to_text_dict["enabled"]:                raise ValueError("Speech to text is not enabled")        if file is None:            raise NoAudioUploadedServiceError()        extension = file.mimetype        if extension not in [f"audio/{ext}" for ext in ALLOWED_EXTENSIONS]:            raise UnsupportedAudioTypeServiceError()        file_content = file.read()        file_size = len(file_content)        if file_size > FILE_SIZE_LIMIT:            message = f"Audio size larger than {FILE_SIZE} mb"            raise AudioTooLargeServiceError(message)        model_manager = ModelManager()        model_instance = model_manager.get_default_model_instance(            tenant_id=app_model.tenant_id, model_type=ModelType.SPEECH2TEXT        )        if model_instance is None:            raise ProviderNotSupportSpeechToTextServiceError()        buffer = io.BytesIO(file_content)        buffer.name = "temp.mp3"        return {"text": model_instance.invoke_speech2text(file=buffer, user=end_user)}    @classmethod    def transcript_tts(        cls,        app_model: App,        text: Optional[str] = None,        voice: Optional[str] = None,        end_user: Optional[str] = None,        message_id: Optional[str] = None,    ):        from collections.abc import Generator        from flask import Response, stream_with_context        from app import app        from extensions.ext_database import db        def invoke_tts(text_content: str, app_model: App, voice: Optional[str] = None):            with app.app_context():                if app_model.mode in {AppMode.ADVANCED_CHAT.value, AppMode.WORKFLOW.value}:                    workflow = app_model.workflow                    if workflow is None:                        raise ValueError("TTS is not enabled")                    features_dict = workflow.features_dict                    if "text_to_speech" not in features_dict or not features_dict["text_to_speech"].get("enabled"):                        raise ValueError("TTS is not enabled")                    voice = features_dict["text_to_speech"].get("voice") if voice is None else voice                else:                    if app_model.app_model_config is None:                        raise ValueError("AppModelConfig not found")                    text_to_speech_dict = app_model.app_model_config.text_to_speech_dict                    if not text_to_speech_dict.get("enabled"):                        raise ValueError("TTS is not enabled")                    voice = text_to_speech_dict.get("voice") if voice is None else voice                model_manager = ModelManager()                model_instance = model_manager.get_default_model_instance(                    tenant_id=app_model.tenant_id, model_type=ModelType.TTS                )                try:                    if not voice:                        voices = model_instance.get_tts_voices()                        if voices:                            voice = voices[0].get("value")                            if not voice:                                raise ValueError("Sorry, no voice available.")                        else:                            raise ValueError("Sorry, no voice available.")                    return model_instance.invoke_tts(                        content_text=text_content.strip(), user=end_user, tenant_id=app_model.tenant_id, voice=voice                    )                except Exception as e:                    raise e        if message_id:            try:                uuid.UUID(message_id)            except ValueError:                return None            message = db.session.query(Message).filter(Message.id == message_id).first()            if message is None:                return None            if message.answer == "" and message.status == "normal":                return None            else:                response = invoke_tts(message.answer, app_model=app_model, voice=voice)                if isinstance(response, Generator):                    return Response(stream_with_context(response), content_type="audio/mpeg")                return response        else:            if text is None:                raise ValueError("Text is required")            response = invoke_tts(text, app_model, voice)            if isinstance(response, Generator):                return Response(stream_with_context(response), content_type="audio/mpeg")            return response    @classmethod    def transcript_tts_voices(cls, tenant_id: str, language: str):        model_manager = ModelManager()        model_instance = model_manager.get_default_model_instance(tenant_id=tenant_id, model_type=ModelType.TTS)        if model_instance is None:            raise ProviderNotSupportTextToSpeechServiceError()        try:            return model_instance.get_tts_voices(language)        except Exception as e:            raise e
 |