| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560 | import loggingfrom collections.abc import Callable, Generator, Iterable, Sequencefrom typing import IO, Any, Optional, Union, castfrom configs import dify_configfrom core.entities.embedding_type import EmbeddingInputTypefrom core.entities.provider_configuration import ProviderConfiguration, ProviderModelBundlefrom core.entities.provider_entities import ModelLoadBalancingConfigurationfrom core.errors.error import ProviderTokenNotInitErrorfrom core.model_runtime.callbacks.base_callback import Callbackfrom core.model_runtime.entities.llm_entities import LLMResultfrom core.model_runtime.entities.message_entities import PromptMessage, PromptMessageToolfrom core.model_runtime.entities.model_entities import ModelTypefrom core.model_runtime.entities.rerank_entities import RerankResultfrom core.model_runtime.entities.text_embedding_entities import TextEmbeddingResultfrom core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeConnectionError, InvokeRateLimitErrorfrom core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModelfrom core.model_runtime.model_providers.__base.moderation_model import ModerationModelfrom core.model_runtime.model_providers.__base.rerank_model import RerankModelfrom core.model_runtime.model_providers.__base.speech2text_model import Speech2TextModelfrom core.model_runtime.model_providers.__base.text_embedding_model import TextEmbeddingModelfrom core.model_runtime.model_providers.__base.tts_model import TTSModelfrom core.provider_manager import ProviderManagerfrom extensions.ext_redis import redis_clientfrom models.provider import ProviderTypelogger = logging.getLogger(__name__)class ModelInstance:    """    Model instance class    """    def __init__(self, provider_model_bundle: ProviderModelBundle, model: str) -> None:        self.provider_model_bundle = provider_model_bundle        self.model = model        self.provider = provider_model_bundle.configuration.provider.provider        self.credentials = self._fetch_credentials_from_bundle(provider_model_bundle, model)        self.model_type_instance = self.provider_model_bundle.model_type_instance        self.load_balancing_manager = self._get_load_balancing_manager(            configuration=provider_model_bundle.configuration,            model_type=provider_model_bundle.model_type_instance.model_type,            model=model,            credentials=self.credentials,        )    @staticmethod    def _fetch_credentials_from_bundle(provider_model_bundle: ProviderModelBundle, model: str) -> dict:        """        Fetch credentials from provider model bundle        :param provider_model_bundle: provider model bundle        :param model: model name        :return:        """        configuration = provider_model_bundle.configuration        model_type = provider_model_bundle.model_type_instance.model_type        credentials = configuration.get_current_credentials(model_type=model_type, model=model)        if credentials is None:            raise ProviderTokenNotInitError(f"Model {model} credentials is not initialized.")        return credentials    @staticmethod    def _get_load_balancing_manager(        configuration: ProviderConfiguration, model_type: ModelType, model: str, credentials: dict    ) -> Optional["LBModelManager"]:        """        Get load balancing model credentials        :param configuration: provider configuration        :param model_type: model type        :param model: model name        :param credentials: model credentials        :return:        """        if configuration.model_settings and configuration.using_provider_type == ProviderType.CUSTOM:            current_model_setting = None            # check if model is disabled by admin            for model_setting in configuration.model_settings:                if model_setting.model_type == model_type and model_setting.model == model:                    current_model_setting = model_setting                    break            # check if load balancing is enabled            if current_model_setting and current_model_setting.load_balancing_configs:                # use load balancing proxy to choose credentials                lb_model_manager = LBModelManager(                    tenant_id=configuration.tenant_id,                    provider=configuration.provider.provider,                    model_type=model_type,                    model=model,                    load_balancing_configs=current_model_setting.load_balancing_configs,                    managed_credentials=credentials if configuration.custom_configuration.provider else None,                )                return lb_model_manager        return None    def invoke_llm(        self,        prompt_messages: Sequence[PromptMessage],        model_parameters: Optional[dict] = None,        tools: Sequence[PromptMessageTool] | None = None,        stop: Optional[Sequence[str]] = None,        stream: bool = True,        user: Optional[str] = None,        callbacks: Optional[list[Callback]] = None,    ) -> Union[LLMResult, Generator]:        """        Invoke large language model        :param prompt_messages: prompt messages        :param model_parameters: model parameters        :param tools: tools for tool calling        :param stop: stop words        :param stream: is stream response        :param user: unique user id        :param callbacks: callbacks        :return: full response or stream response chunk generator result        """        if not isinstance(self.model_type_instance, LargeLanguageModel):            raise Exception("Model type instance is not LargeLanguageModel")        self.model_type_instance = cast(LargeLanguageModel, self.model_type_instance)        return cast(            Union[LLMResult, Generator],            self._round_robin_invoke(                function=self.model_type_instance.invoke,                model=self.model,                credentials=self.credentials,                prompt_messages=prompt_messages,                model_parameters=model_parameters,                tools=tools,                stop=stop,                stream=stream,                user=user,                callbacks=callbacks,            ),        )    def get_llm_num_tokens(        self, prompt_messages: list[PromptMessage], tools: Optional[list[PromptMessageTool]] = None    ) -> int:        """        Get number of tokens for llm        :param prompt_messages: prompt messages        :param tools: tools for tool calling        :return:        """        if not isinstance(self.model_type_instance, LargeLanguageModel):            raise Exception("Model type instance is not LargeLanguageModel")        self.model_type_instance = cast(LargeLanguageModel, self.model_type_instance)        return cast(            int,            self._round_robin_invoke(                function=self.model_type_instance.get_num_tokens,                model=self.model,                credentials=self.credentials,                prompt_messages=prompt_messages,                tools=tools,            ),        )    def invoke_text_embedding(        self, texts: list[str], user: Optional[str] = None, input_type: EmbeddingInputType = EmbeddingInputType.DOCUMENT    ) -> TextEmbeddingResult:        """        Invoke large language model        :param texts: texts to embed        :param user: unique user id        :param input_type: input type        :return: embeddings result        """        if not isinstance(self.model_type_instance, TextEmbeddingModel):            raise Exception("Model type instance is not TextEmbeddingModel")        self.model_type_instance = cast(TextEmbeddingModel, self.model_type_instance)        return cast(            TextEmbeddingResult,            self._round_robin_invoke(                function=self.model_type_instance.invoke,                model=self.model,                credentials=self.credentials,                texts=texts,                user=user,                input_type=input_type,            ),        )    def get_text_embedding_num_tokens(self, texts: list[str]) -> int:        """        Get number of tokens for text embedding        :param texts: texts to embed        :return:        """        if not isinstance(self.model_type_instance, TextEmbeddingModel):            raise Exception("Model type instance is not TextEmbeddingModel")        self.model_type_instance = cast(TextEmbeddingModel, self.model_type_instance)        return cast(            int,            self._round_robin_invoke(                function=self.model_type_instance.get_num_tokens,                model=self.model,                credentials=self.credentials,                texts=texts,            ),        )    def invoke_rerank(        self,        query: str,        docs: list[str],        score_threshold: Optional[float] = None,        top_n: Optional[int] = None,        user: Optional[str] = None,    ) -> RerankResult:        """        Invoke rerank model        :param query: search query        :param docs: docs for reranking        :param score_threshold: score threshold        :param top_n: top n        :param user: unique user id        :return: rerank result        """        if not isinstance(self.model_type_instance, RerankModel):            raise Exception("Model type instance is not RerankModel")        self.model_type_instance = cast(RerankModel, self.model_type_instance)        return cast(            RerankResult,            self._round_robin_invoke(                function=self.model_type_instance.invoke,                model=self.model,                credentials=self.credentials,                query=query,                docs=docs,                score_threshold=score_threshold,                top_n=top_n,                user=user,            ),        )    def invoke_moderation(self, text: str, user: Optional[str] = None) -> bool:        """        Invoke moderation model        :param text: text to moderate        :param user: unique user id        :return: false if text is safe, true otherwise        """        if not isinstance(self.model_type_instance, ModerationModel):            raise Exception("Model type instance is not ModerationModel")        self.model_type_instance = cast(ModerationModel, self.model_type_instance)        return cast(            bool,            self._round_robin_invoke(                function=self.model_type_instance.invoke,                model=self.model,                credentials=self.credentials,                text=text,                user=user,            ),        )    def invoke_speech2text(self, file: IO[bytes], user: Optional[str] = None) -> str:        """        Invoke large language model        :param file: audio file        :param user: unique user id        :return: text for given audio file        """        if not isinstance(self.model_type_instance, Speech2TextModel):            raise Exception("Model type instance is not Speech2TextModel")        self.model_type_instance = cast(Speech2TextModel, self.model_type_instance)        return cast(            str,            self._round_robin_invoke(                function=self.model_type_instance.invoke,                model=self.model,                credentials=self.credentials,                file=file,                user=user,            ),        )    def invoke_tts(self, content_text: str, tenant_id: str, voice: str, user: Optional[str] = None) -> Iterable[bytes]:        """        Invoke large language tts model        :param content_text: text content to be translated        :param tenant_id: user tenant id        :param voice: model timbre        :param user: unique user id        :return: text for given audio file        """        if not isinstance(self.model_type_instance, TTSModel):            raise Exception("Model type instance is not TTSModel")        self.model_type_instance = cast(TTSModel, self.model_type_instance)        return cast(            Iterable[bytes],            self._round_robin_invoke(                function=self.model_type_instance.invoke,                model=self.model,                credentials=self.credentials,                content_text=content_text,                user=user,                tenant_id=tenant_id,                voice=voice,            ),        )    def _round_robin_invoke(self, function: Callable[..., Any], *args, **kwargs) -> Any:        """        Round-robin invoke        :param function: function to invoke        :param args: function args        :param kwargs: function kwargs        :return:        """        if not self.load_balancing_manager:            return function(*args, **kwargs)        last_exception: Union[InvokeRateLimitError, InvokeAuthorizationError, InvokeConnectionError, None] = None        while True:            lb_config = self.load_balancing_manager.fetch_next()            if not lb_config:                if not last_exception:                    raise ProviderTokenNotInitError("Model credentials is not initialized.")                else:                    raise last_exception            try:                if "credentials" in kwargs:                    del kwargs["credentials"]                return function(*args, **kwargs, credentials=lb_config.credentials)            except InvokeRateLimitError as e:                # expire in 60 seconds                self.load_balancing_manager.cooldown(lb_config, expire=60)                last_exception = e                continue            except (InvokeAuthorizationError, InvokeConnectionError) as e:                # expire in 10 seconds                self.load_balancing_manager.cooldown(lb_config, expire=10)                last_exception = e                continue            except Exception as e:                raise e    def get_tts_voices(self, language: Optional[str] = None) -> list:        """        Invoke large language tts model voices        :param language: tts language        :return: tts model voices        """        if not isinstance(self.model_type_instance, TTSModel):            raise Exception("Model type instance is not TTSModel")        self.model_type_instance = cast(TTSModel, self.model_type_instance)        return self.model_type_instance.get_tts_model_voices(            model=self.model, credentials=self.credentials, language=language        )class ModelManager:    def __init__(self) -> None:        self._provider_manager = ProviderManager()    def get_model_instance(self, tenant_id: str, provider: str, model_type: ModelType, model: str) -> ModelInstance:        """        Get model instance        :param tenant_id: tenant id        :param provider: provider name        :param model_type: model type        :param model: model name        :return:        """        if not provider:            return self.get_default_model_instance(tenant_id, model_type)        provider_model_bundle = self._provider_manager.get_provider_model_bundle(            tenant_id=tenant_id, provider=provider, model_type=model_type        )        return ModelInstance(provider_model_bundle, model)    def get_default_provider_model_name(self, tenant_id: str, model_type: ModelType) -> tuple[str, str]:        """        Return first provider and the first model in the provider        :param tenant_id: tenant id        :param model_type: model type        :return: provider name, model name        """        return self._provider_manager.get_first_provider_first_model(tenant_id, model_type)    def get_default_model_instance(self, tenant_id: str, model_type: ModelType) -> ModelInstance:        """        Get default model instance        :param tenant_id: tenant id        :param model_type: model type        :return:        """        default_model_entity = self._provider_manager.get_default_model(tenant_id=tenant_id, model_type=model_type)        if not default_model_entity:            raise ProviderTokenNotInitError(f"Default model not found for {model_type}")        return self.get_model_instance(            tenant_id=tenant_id,            provider=default_model_entity.provider.provider,            model_type=model_type,            model=default_model_entity.model,        )class LBModelManager:    def __init__(        self,        tenant_id: str,        provider: str,        model_type: ModelType,        model: str,        load_balancing_configs: list[ModelLoadBalancingConfiguration],        managed_credentials: Optional[dict] = None,    ) -> None:        """        Load balancing model manager        :param tenant_id: tenant_id        :param provider: provider        :param model_type: model_type        :param model: model name        :param load_balancing_configs: all load balancing configurations        :param managed_credentials: credentials if load balancing configuration name is __inherit__        """        self._tenant_id = tenant_id        self._provider = provider        self._model_type = model_type        self._model = model        self._load_balancing_configs = load_balancing_configs        for load_balancing_config in self._load_balancing_configs[:]:  # Iterate over a shallow copy of the list            if load_balancing_config.name == "__inherit__":                if not managed_credentials:                    # remove __inherit__ if managed credentials is not provided                    self._load_balancing_configs.remove(load_balancing_config)                else:                    load_balancing_config.credentials = managed_credentials    def fetch_next(self) -> Optional[ModelLoadBalancingConfiguration]:        """        Get next model load balancing config        Strategy: Round Robin        :return:        """        cache_key = "model_lb_index:{}:{}:{}:{}".format(            self._tenant_id, self._provider, self._model_type.value, self._model        )        cooldown_load_balancing_configs = []        max_index = len(self._load_balancing_configs)        while True:            current_index = redis_client.incr(cache_key)            current_index = cast(int, current_index)            if current_index >= 10000000:                current_index = 1                redis_client.set(cache_key, current_index)            redis_client.expire(cache_key, 3600)            if current_index > max_index:                current_index = current_index % max_index            real_index = current_index - 1            if real_index > max_index:                real_index = 0            config: ModelLoadBalancingConfiguration = self._load_balancing_configs[real_index]            if self.in_cooldown(config):                cooldown_load_balancing_configs.append(config)                if len(cooldown_load_balancing_configs) >= len(self._load_balancing_configs):                    # all configs are in cooldown                    return None                continue            if dify_config.DEBUG:                logger.info(                    f"Model LB\nid: {config.id}\nname:{config.name}\n"                    f"tenant_id: {self._tenant_id}\nprovider: {self._provider}\n"                    f"model_type: {self._model_type.value}\nmodel: {self._model}"                )            return config        return None    def cooldown(self, config: ModelLoadBalancingConfiguration, expire: int = 60) -> None:        """        Cooldown model load balancing config        :param config: model load balancing config        :param expire: cooldown time        :return:        """        cooldown_cache_key = "model_lb_index:cooldown:{}:{}:{}:{}:{}".format(            self._tenant_id, self._provider, self._model_type.value, self._model, config.id        )        redis_client.setex(cooldown_cache_key, expire, "true")    def in_cooldown(self, config: ModelLoadBalancingConfiguration) -> bool:        """        Check if model load balancing config is in cooldown        :param config: model load balancing config        :return:        """        cooldown_cache_key = "model_lb_index:cooldown:{}:{}:{}:{}:{}".format(            self._tenant_id, self._provider, self._model_type.value, self._model, config.id        )        res: bool = redis_client.exists(cooldown_cache_key)        return res    @staticmethod    def get_config_in_cooldown_and_ttl(        tenant_id: str, provider: str, model_type: ModelType, model: str, config_id: str    ) -> tuple[bool, int]:        """        Get model load balancing config is in cooldown and ttl        :param tenant_id: workspace id        :param provider: provider name        :param model_type: model type        :param model: model name        :param config_id: model load balancing config id        :return:        """        cooldown_cache_key = "model_lb_index:cooldown:{}:{}:{}:{}:{}".format(            tenant_id, provider, model_type.value, model, config_id        )        ttl = redis_client.ttl(cooldown_cache_key)        if ttl == -2:            return False, 0        ttl = cast(int, ttl)        return True, ttl
 |