model_manager.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568
  1. import logging
  2. import os
  3. from collections.abc import Callable, Generator, Sequence
  4. from typing import IO, Literal, Optional, Union, cast, overload
  5. from core.entities.provider_configuration import ProviderConfiguration, ProviderModelBundle
  6. from core.entities.provider_entities import ModelLoadBalancingConfiguration
  7. from core.errors.error import ProviderTokenNotInitError
  8. from core.model_runtime.callbacks.base_callback import Callback
  9. from core.model_runtime.entities.llm_entities import LLMResult
  10. from core.model_runtime.entities.message_entities import PromptMessage, PromptMessageTool
  11. from core.model_runtime.entities.model_entities import ModelType
  12. from core.model_runtime.entities.rerank_entities import RerankResult
  13. from core.model_runtime.entities.text_embedding_entities import TextEmbeddingResult
  14. from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeConnectionError, InvokeRateLimitError
  15. from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
  16. from core.model_runtime.model_providers.__base.moderation_model import ModerationModel
  17. from core.model_runtime.model_providers.__base.rerank_model import RerankModel
  18. from core.model_runtime.model_providers.__base.speech2text_model import Speech2TextModel
  19. from core.model_runtime.model_providers.__base.text_embedding_model import TextEmbeddingModel
  20. from core.model_runtime.model_providers.__base.tts_model import TTSModel
  21. from core.provider_manager import ProviderManager
  22. from extensions.ext_redis import redis_client
  23. from models.provider import ProviderType
  24. logger = logging.getLogger(__name__)
  25. class ModelInstance:
  26. """
  27. Model instance class
  28. """
  29. def __init__(self, provider_model_bundle: ProviderModelBundle, model: str) -> None:
  30. self.provider_model_bundle = provider_model_bundle
  31. self.model = model
  32. self.provider = provider_model_bundle.configuration.provider.provider
  33. self.credentials = self._fetch_credentials_from_bundle(provider_model_bundle, model)
  34. self.model_type_instance = self.provider_model_bundle.model_type_instance
  35. self.load_balancing_manager = self._get_load_balancing_manager(
  36. configuration=provider_model_bundle.configuration,
  37. model_type=provider_model_bundle.model_type_instance.model_type,
  38. model=model,
  39. credentials=self.credentials,
  40. )
  41. @staticmethod
  42. def _fetch_credentials_from_bundle(provider_model_bundle: ProviderModelBundle, model: str) -> dict:
  43. """
  44. Fetch credentials from provider model bundle
  45. :param provider_model_bundle: provider model bundle
  46. :param model: model name
  47. :return:
  48. """
  49. configuration = provider_model_bundle.configuration
  50. model_type = provider_model_bundle.model_type_instance.model_type
  51. credentials = configuration.get_current_credentials(model_type=model_type, model=model)
  52. if credentials is None:
  53. raise ProviderTokenNotInitError(f"Model {model} credentials is not initialized.")
  54. return credentials
  55. @staticmethod
  56. def _get_load_balancing_manager(
  57. configuration: ProviderConfiguration, model_type: ModelType, model: str, credentials: dict
  58. ) -> Optional["LBModelManager"]:
  59. """
  60. Get load balancing model credentials
  61. :param configuration: provider configuration
  62. :param model_type: model type
  63. :param model: model name
  64. :param credentials: model credentials
  65. :return:
  66. """
  67. if configuration.model_settings and configuration.using_provider_type == ProviderType.CUSTOM:
  68. current_model_setting = None
  69. # check if model is disabled by admin
  70. for model_setting in configuration.model_settings:
  71. if model_setting.model_type == model_type and model_setting.model == model:
  72. current_model_setting = model_setting
  73. break
  74. # check if load balancing is enabled
  75. if current_model_setting and current_model_setting.load_balancing_configs:
  76. # use load balancing proxy to choose credentials
  77. lb_model_manager = LBModelManager(
  78. tenant_id=configuration.tenant_id,
  79. provider=configuration.provider.provider,
  80. model_type=model_type,
  81. model=model,
  82. load_balancing_configs=current_model_setting.load_balancing_configs,
  83. managed_credentials=credentials if configuration.custom_configuration.provider else None,
  84. )
  85. return lb_model_manager
  86. return None
  87. @overload
  88. def invoke_llm(
  89. self,
  90. prompt_messages: list[PromptMessage],
  91. model_parameters: Optional[dict] = None,
  92. tools: Sequence[PromptMessageTool] | None = None,
  93. stop: Optional[list[str]] = None,
  94. stream: Literal[True] = True,
  95. user: Optional[str] = None,
  96. callbacks: Optional[list[Callback]] = None,
  97. ) -> Generator: ...
  98. @overload
  99. def invoke_llm(
  100. self,
  101. prompt_messages: list[PromptMessage],
  102. model_parameters: Optional[dict] = None,
  103. tools: Sequence[PromptMessageTool] | None = None,
  104. stop: Optional[list[str]] = None,
  105. stream: Literal[False] = False,
  106. user: Optional[str] = None,
  107. callbacks: Optional[list[Callback]] = None,
  108. ) -> LLMResult: ...
  109. @overload
  110. def invoke_llm(
  111. self,
  112. prompt_messages: list[PromptMessage],
  113. model_parameters: Optional[dict] = None,
  114. tools: Sequence[PromptMessageTool] | None = None,
  115. stop: Optional[list[str]] = None,
  116. stream: bool = True,
  117. user: Optional[str] = None,
  118. callbacks: Optional[list[Callback]] = None,
  119. ) -> Union[LLMResult, Generator]: ...
  120. def invoke_llm(
  121. self,
  122. prompt_messages: list[PromptMessage],
  123. model_parameters: Optional[dict] = None,
  124. tools: Sequence[PromptMessageTool] | None = None,
  125. stop: Optional[list[str]] = None,
  126. stream: bool = True,
  127. user: Optional[str] = None,
  128. callbacks: Optional[list[Callback]] = None,
  129. ) -> Union[LLMResult, Generator]:
  130. """
  131. Invoke large language model
  132. :param prompt_messages: prompt messages
  133. :param model_parameters: model parameters
  134. :param tools: tools for tool calling
  135. :param stop: stop words
  136. :param stream: is stream response
  137. :param user: unique user id
  138. :param callbacks: callbacks
  139. :return: full response or stream response chunk generator result
  140. """
  141. if not isinstance(self.model_type_instance, LargeLanguageModel):
  142. raise Exception("Model type instance is not LargeLanguageModel")
  143. self.model_type_instance = cast(LargeLanguageModel, self.model_type_instance)
  144. return self._round_robin_invoke(
  145. function=self.model_type_instance.invoke,
  146. model=self.model,
  147. credentials=self.credentials,
  148. prompt_messages=prompt_messages,
  149. model_parameters=model_parameters,
  150. tools=tools,
  151. stop=stop,
  152. stream=stream,
  153. user=user,
  154. callbacks=callbacks,
  155. )
  156. def get_llm_num_tokens(
  157. self, prompt_messages: list[PromptMessage], tools: Optional[list[PromptMessageTool]] = None
  158. ) -> int:
  159. """
  160. Get number of tokens for llm
  161. :param prompt_messages: prompt messages
  162. :param tools: tools for tool calling
  163. :return:
  164. """
  165. if not isinstance(self.model_type_instance, LargeLanguageModel):
  166. raise Exception("Model type instance is not LargeLanguageModel")
  167. self.model_type_instance = cast(LargeLanguageModel, self.model_type_instance)
  168. return self._round_robin_invoke(
  169. function=self.model_type_instance.get_num_tokens,
  170. model=self.model,
  171. credentials=self.credentials,
  172. prompt_messages=prompt_messages,
  173. tools=tools,
  174. )
  175. def invoke_text_embedding(self, texts: list[str], user: Optional[str] = None) -> TextEmbeddingResult:
  176. """
  177. Invoke large language model
  178. :param texts: texts to embed
  179. :param user: unique user id
  180. :return: embeddings result
  181. """
  182. if not isinstance(self.model_type_instance, TextEmbeddingModel):
  183. raise Exception("Model type instance is not TextEmbeddingModel")
  184. self.model_type_instance = cast(TextEmbeddingModel, self.model_type_instance)
  185. return self._round_robin_invoke(
  186. function=self.model_type_instance.invoke,
  187. model=self.model,
  188. credentials=self.credentials,
  189. texts=texts,
  190. user=user,
  191. )
  192. def get_text_embedding_num_tokens(self, texts: list[str]) -> int:
  193. """
  194. Get number of tokens for text embedding
  195. :param texts: texts to embed
  196. :return:
  197. """
  198. if not isinstance(self.model_type_instance, TextEmbeddingModel):
  199. raise Exception("Model type instance is not TextEmbeddingModel")
  200. self.model_type_instance = cast(TextEmbeddingModel, self.model_type_instance)
  201. return self._round_robin_invoke(
  202. function=self.model_type_instance.get_num_tokens,
  203. model=self.model,
  204. credentials=self.credentials,
  205. texts=texts,
  206. )
  207. def invoke_rerank(
  208. self,
  209. query: str,
  210. docs: list[str],
  211. score_threshold: Optional[float] = None,
  212. top_n: Optional[int] = None,
  213. user: Optional[str] = None,
  214. ) -> RerankResult:
  215. """
  216. Invoke rerank model
  217. :param query: search query
  218. :param docs: docs for reranking
  219. :param score_threshold: score threshold
  220. :param top_n: top n
  221. :param user: unique user id
  222. :return: rerank result
  223. """
  224. if not isinstance(self.model_type_instance, RerankModel):
  225. raise Exception("Model type instance is not RerankModel")
  226. self.model_type_instance = cast(RerankModel, self.model_type_instance)
  227. return self._round_robin_invoke(
  228. function=self.model_type_instance.invoke,
  229. model=self.model,
  230. credentials=self.credentials,
  231. query=query,
  232. docs=docs,
  233. score_threshold=score_threshold,
  234. top_n=top_n,
  235. user=user,
  236. )
  237. def invoke_moderation(self, text: str, user: Optional[str] = None) -> bool:
  238. """
  239. Invoke moderation model
  240. :param text: text to moderate
  241. :param user: unique user id
  242. :return: false if text is safe, true otherwise
  243. """
  244. if not isinstance(self.model_type_instance, ModerationModel):
  245. raise Exception("Model type instance is not ModerationModel")
  246. self.model_type_instance = cast(ModerationModel, self.model_type_instance)
  247. return self._round_robin_invoke(
  248. function=self.model_type_instance.invoke,
  249. model=self.model,
  250. credentials=self.credentials,
  251. text=text,
  252. user=user,
  253. )
  254. def invoke_speech2text(self, file: IO[bytes], user: Optional[str] = None) -> str:
  255. """
  256. Invoke large language model
  257. :param file: audio file
  258. :param user: unique user id
  259. :return: text for given audio file
  260. """
  261. if not isinstance(self.model_type_instance, Speech2TextModel):
  262. raise Exception("Model type instance is not Speech2TextModel")
  263. self.model_type_instance = cast(Speech2TextModel, self.model_type_instance)
  264. return self._round_robin_invoke(
  265. function=self.model_type_instance.invoke,
  266. model=self.model,
  267. credentials=self.credentials,
  268. file=file,
  269. user=user,
  270. )
  271. def invoke_tts(self, content_text: str, tenant_id: str, voice: str, user: Optional[str] = None) -> str:
  272. """
  273. Invoke large language tts model
  274. :param content_text: text content to be translated
  275. :param tenant_id: user tenant id
  276. :param voice: model timbre
  277. :param user: unique user id
  278. :return: text for given audio file
  279. """
  280. if not isinstance(self.model_type_instance, TTSModel):
  281. raise Exception("Model type instance is not TTSModel")
  282. self.model_type_instance = cast(TTSModel, self.model_type_instance)
  283. return self._round_robin_invoke(
  284. function=self.model_type_instance.invoke,
  285. model=self.model,
  286. credentials=self.credentials,
  287. content_text=content_text,
  288. user=user,
  289. tenant_id=tenant_id,
  290. voice=voice,
  291. )
  292. def _round_robin_invoke(self, function: Callable, *args, **kwargs):
  293. """
  294. Round-robin invoke
  295. :param function: function to invoke
  296. :param args: function args
  297. :param kwargs: function kwargs
  298. :return:
  299. """
  300. if not self.load_balancing_manager:
  301. return function(*args, **kwargs)
  302. last_exception = None
  303. while True:
  304. lb_config = self.load_balancing_manager.fetch_next()
  305. if not lb_config:
  306. if not last_exception:
  307. raise ProviderTokenNotInitError("Model credentials is not initialized.")
  308. else:
  309. raise last_exception
  310. try:
  311. if "credentials" in kwargs:
  312. del kwargs["credentials"]
  313. return function(*args, **kwargs, credentials=lb_config.credentials)
  314. except InvokeRateLimitError as e:
  315. # expire in 60 seconds
  316. self.load_balancing_manager.cooldown(lb_config, expire=60)
  317. last_exception = e
  318. continue
  319. except (InvokeAuthorizationError, InvokeConnectionError) as e:
  320. # expire in 10 seconds
  321. self.load_balancing_manager.cooldown(lb_config, expire=10)
  322. last_exception = e
  323. continue
  324. except Exception as e:
  325. raise e
  326. def get_tts_voices(self, language: Optional[str] = None) -> list:
  327. """
  328. Invoke large language tts model voices
  329. :param language: tts language
  330. :return: tts model voices
  331. """
  332. if not isinstance(self.model_type_instance, TTSModel):
  333. raise Exception("Model type instance is not TTSModel")
  334. self.model_type_instance = cast(TTSModel, self.model_type_instance)
  335. return self.model_type_instance.get_tts_model_voices(
  336. model=self.model, credentials=self.credentials, language=language
  337. )
  338. class ModelManager:
  339. def __init__(self) -> None:
  340. self._provider_manager = ProviderManager()
  341. def get_model_instance(self, tenant_id: str, provider: str, model_type: ModelType, model: str) -> ModelInstance:
  342. """
  343. Get model instance
  344. :param tenant_id: tenant id
  345. :param provider: provider name
  346. :param model_type: model type
  347. :param model: model name
  348. :return:
  349. """
  350. if not provider:
  351. return self.get_default_model_instance(tenant_id, model_type)
  352. provider_model_bundle = self._provider_manager.get_provider_model_bundle(
  353. tenant_id=tenant_id, provider=provider, model_type=model_type
  354. )
  355. return ModelInstance(provider_model_bundle, model)
  356. def get_default_provider_model_name(self, tenant_id: str, model_type: ModelType) -> tuple[str, str]:
  357. """
  358. Return first provider and the first model in the provider
  359. :param tenant_id: tenant id
  360. :param model_type: model type
  361. :return: provider name, model name
  362. """
  363. return self._provider_manager.get_first_provider_first_model(tenant_id, model_type)
  364. def get_default_model_instance(self, tenant_id: str, model_type: ModelType) -> ModelInstance:
  365. """
  366. Get default model instance
  367. :param tenant_id: tenant id
  368. :param model_type: model type
  369. :return:
  370. """
  371. default_model_entity = self._provider_manager.get_default_model(tenant_id=tenant_id, model_type=model_type)
  372. if not default_model_entity:
  373. raise ProviderTokenNotInitError(f"Default model not found for {model_type}")
  374. return self.get_model_instance(
  375. tenant_id=tenant_id,
  376. provider=default_model_entity.provider.provider,
  377. model_type=model_type,
  378. model=default_model_entity.model,
  379. )
  380. class LBModelManager:
  381. def __init__(
  382. self,
  383. tenant_id: str,
  384. provider: str,
  385. model_type: ModelType,
  386. model: str,
  387. load_balancing_configs: list[ModelLoadBalancingConfiguration],
  388. managed_credentials: Optional[dict] = None,
  389. ) -> None:
  390. """
  391. Load balancing model manager
  392. :param tenant_id: tenant_id
  393. :param provider: provider
  394. :param model_type: model_type
  395. :param model: model name
  396. :param load_balancing_configs: all load balancing configurations
  397. :param managed_credentials: credentials if load balancing configuration name is __inherit__
  398. """
  399. self._tenant_id = tenant_id
  400. self._provider = provider
  401. self._model_type = model_type
  402. self._model = model
  403. self._load_balancing_configs = load_balancing_configs
  404. for load_balancing_config in self._load_balancing_configs[:]: # Iterate over a shallow copy of the list
  405. if load_balancing_config.name == "__inherit__":
  406. if not managed_credentials:
  407. # remove __inherit__ if managed credentials is not provided
  408. self._load_balancing_configs.remove(load_balancing_config)
  409. else:
  410. load_balancing_config.credentials = managed_credentials
  411. def fetch_next(self) -> Optional[ModelLoadBalancingConfiguration]:
  412. """
  413. Get next model load balancing config
  414. Strategy: Round Robin
  415. :return:
  416. """
  417. cache_key = "model_lb_index:{}:{}:{}:{}".format(
  418. self._tenant_id, self._provider, self._model_type.value, self._model
  419. )
  420. cooldown_load_balancing_configs = []
  421. max_index = len(self._load_balancing_configs)
  422. while True:
  423. current_index = redis_client.incr(cache_key)
  424. current_index = cast(int, current_index)
  425. if current_index >= 10000000:
  426. current_index = 1
  427. redis_client.set(cache_key, current_index)
  428. redis_client.expire(cache_key, 3600)
  429. if current_index > max_index:
  430. current_index = current_index % max_index
  431. real_index = current_index - 1
  432. if real_index > max_index:
  433. real_index = 0
  434. config = self._load_balancing_configs[real_index]
  435. if self.in_cooldown(config):
  436. cooldown_load_balancing_configs.append(config)
  437. if len(cooldown_load_balancing_configs) >= len(self._load_balancing_configs):
  438. # all configs are in cooldown
  439. return None
  440. continue
  441. if bool(os.environ.get("DEBUG", "False").lower() == "true"):
  442. logger.info(
  443. f"Model LB\nid: {config.id}\nname:{config.name}\n"
  444. f"tenant_id: {self._tenant_id}\nprovider: {self._provider}\n"
  445. f"model_type: {self._model_type.value}\nmodel: {self._model}"
  446. )
  447. return config
  448. return None
  449. def cooldown(self, config: ModelLoadBalancingConfiguration, expire: int = 60) -> None:
  450. """
  451. Cooldown model load balancing config
  452. :param config: model load balancing config
  453. :param expire: cooldown time
  454. :return:
  455. """
  456. cooldown_cache_key = "model_lb_index:cooldown:{}:{}:{}:{}:{}".format(
  457. self._tenant_id, self._provider, self._model_type.value, self._model, config.id
  458. )
  459. redis_client.setex(cooldown_cache_key, expire, "true")
  460. def in_cooldown(self, config: ModelLoadBalancingConfiguration) -> bool:
  461. """
  462. Check if model load balancing config is in cooldown
  463. :param config: model load balancing config
  464. :return:
  465. """
  466. cooldown_cache_key = "model_lb_index:cooldown:{}:{}:{}:{}:{}".format(
  467. self._tenant_id, self._provider, self._model_type.value, self._model, config.id
  468. )
  469. res = redis_client.exists(cooldown_cache_key)
  470. res = cast(bool, res)
  471. return res
  472. @staticmethod
  473. def get_config_in_cooldown_and_ttl(
  474. tenant_id: str, provider: str, model_type: ModelType, model: str, config_id: str
  475. ) -> tuple[bool, int]:
  476. """
  477. Get model load balancing config is in cooldown and ttl
  478. :param tenant_id: workspace id
  479. :param provider: provider name
  480. :param model_type: model type
  481. :param model: model name
  482. :param config_id: model load balancing config id
  483. :return:
  484. """
  485. cooldown_cache_key = "model_lb_index:cooldown:{}:{}:{}:{}:{}".format(
  486. tenant_id, provider, model_type.value, model, config_id
  487. )
  488. ttl = redis_client.ttl(cooldown_cache_key)
  489. if ttl == -2:
  490. return False, 0
  491. ttl = cast(int, ttl)
  492. return True, ttl