model_manager.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. import logging
  2. import os
  3. from collections.abc import Generator
  4. from typing import IO, Optional, Union, cast
  5. from core.entities.provider_configuration import ProviderConfiguration, ProviderModelBundle
  6. from core.entities.provider_entities import ModelLoadBalancingConfiguration
  7. from core.errors.error import ProviderTokenNotInitError
  8. from core.model_runtime.callbacks.base_callback import Callback
  9. from core.model_runtime.entities.llm_entities import LLMResult
  10. from core.model_runtime.entities.message_entities import PromptMessage, PromptMessageTool
  11. from core.model_runtime.entities.model_entities import ModelType
  12. from core.model_runtime.entities.rerank_entities import RerankResult
  13. from core.model_runtime.entities.text_embedding_entities import TextEmbeddingResult
  14. from core.model_runtime.errors.invoke import InvokeAuthorizationError, InvokeConnectionError, InvokeRateLimitError
  15. from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
  16. from core.model_runtime.model_providers.__base.moderation_model import ModerationModel
  17. from core.model_runtime.model_providers.__base.rerank_model import RerankModel
  18. from core.model_runtime.model_providers.__base.speech2text_model import Speech2TextModel
  19. from core.model_runtime.model_providers.__base.text_embedding_model import TextEmbeddingModel
  20. from core.model_runtime.model_providers.__base.tts_model import TTSModel
  21. from core.provider_manager import ProviderManager
  22. from extensions.ext_redis import redis_client
  23. from models.provider import ProviderType
  24. logger = logging.getLogger(__name__)
  25. class ModelInstance:
  26. """
  27. Model instance class
  28. """
  29. def __init__(self, provider_model_bundle: ProviderModelBundle, model: str) -> None:
  30. self.provider_model_bundle = provider_model_bundle
  31. self.model = model
  32. self.provider = provider_model_bundle.configuration.provider.provider
  33. self.credentials = self._fetch_credentials_from_bundle(provider_model_bundle, model)
  34. self.model_type_instance = self.provider_model_bundle.model_type_instance
  35. self.load_balancing_manager = self._get_load_balancing_manager(
  36. configuration=provider_model_bundle.configuration,
  37. model_type=provider_model_bundle.model_type_instance.model_type,
  38. model=model,
  39. credentials=self.credentials
  40. )
  41. def _fetch_credentials_from_bundle(self, provider_model_bundle: ProviderModelBundle, model: str) -> dict:
  42. """
  43. Fetch credentials from provider model bundle
  44. :param provider_model_bundle: provider model bundle
  45. :param model: model name
  46. :return:
  47. """
  48. configuration = provider_model_bundle.configuration
  49. model_type = provider_model_bundle.model_type_instance.model_type
  50. credentials = configuration.get_current_credentials(
  51. model_type=model_type,
  52. model=model
  53. )
  54. if credentials is None:
  55. raise ProviderTokenNotInitError(f"Model {model} credentials is not initialized.")
  56. return credentials
  57. def _get_load_balancing_manager(self, configuration: ProviderConfiguration,
  58. model_type: ModelType,
  59. model: str,
  60. credentials: dict) -> Optional["LBModelManager"]:
  61. """
  62. Get load balancing model credentials
  63. :param configuration: provider configuration
  64. :param model_type: model type
  65. :param model: model name
  66. :param credentials: model credentials
  67. :return:
  68. """
  69. if configuration.model_settings and configuration.using_provider_type == ProviderType.CUSTOM:
  70. current_model_setting = None
  71. # check if model is disabled by admin
  72. for model_setting in configuration.model_settings:
  73. if (model_setting.model_type == model_type
  74. and model_setting.model == model):
  75. current_model_setting = model_setting
  76. break
  77. # check if load balancing is enabled
  78. if current_model_setting and current_model_setting.load_balancing_configs:
  79. # use load balancing proxy to choose credentials
  80. lb_model_manager = LBModelManager(
  81. tenant_id=configuration.tenant_id,
  82. provider=configuration.provider.provider,
  83. model_type=model_type,
  84. model=model,
  85. load_balancing_configs=current_model_setting.load_balancing_configs,
  86. managed_credentials=credentials if configuration.custom_configuration.provider else None
  87. )
  88. return lb_model_manager
  89. return None
  90. def invoke_llm(self, prompt_messages: list[PromptMessage], model_parameters: Optional[dict] = None,
  91. tools: Optional[list[PromptMessageTool]] = None, stop: Optional[list[str]] = None,
  92. stream: bool = True, user: Optional[str] = None, callbacks: list[Callback] = None) \
  93. -> Union[LLMResult, Generator]:
  94. """
  95. Invoke large language model
  96. :param prompt_messages: prompt messages
  97. :param model_parameters: model parameters
  98. :param tools: tools for tool calling
  99. :param stop: stop words
  100. :param stream: is stream response
  101. :param user: unique user id
  102. :param callbacks: callbacks
  103. :return: full response or stream response chunk generator result
  104. """
  105. if not isinstance(self.model_type_instance, LargeLanguageModel):
  106. raise Exception("Model type instance is not LargeLanguageModel")
  107. self.model_type_instance = cast(LargeLanguageModel, self.model_type_instance)
  108. return self._round_robin_invoke(
  109. function=self.model_type_instance.invoke,
  110. model=self.model,
  111. credentials=self.credentials,
  112. prompt_messages=prompt_messages,
  113. model_parameters=model_parameters,
  114. tools=tools,
  115. stop=stop,
  116. stream=stream,
  117. user=user,
  118. callbacks=callbacks
  119. )
  120. def get_llm_num_tokens(self, prompt_messages: list[PromptMessage],
  121. tools: Optional[list[PromptMessageTool]] = None) -> int:
  122. """
  123. Get number of tokens for llm
  124. :param prompt_messages: prompt messages
  125. :param tools: tools for tool calling
  126. :return:
  127. """
  128. if not isinstance(self.model_type_instance, LargeLanguageModel):
  129. raise Exception("Model type instance is not LargeLanguageModel")
  130. self.model_type_instance = cast(LargeLanguageModel, self.model_type_instance)
  131. return self._round_robin_invoke(
  132. function=self.model_type_instance.get_num_tokens,
  133. model=self.model,
  134. credentials=self.credentials,
  135. prompt_messages=prompt_messages,
  136. tools=tools
  137. )
  138. def invoke_text_embedding(self, texts: list[str], user: Optional[str] = None) \
  139. -> TextEmbeddingResult:
  140. """
  141. Invoke large language model
  142. :param texts: texts to embed
  143. :param user: unique user id
  144. :return: embeddings result
  145. """
  146. if not isinstance(self.model_type_instance, TextEmbeddingModel):
  147. raise Exception("Model type instance is not TextEmbeddingModel")
  148. self.model_type_instance = cast(TextEmbeddingModel, self.model_type_instance)
  149. return self._round_robin_invoke(
  150. function=self.model_type_instance.invoke,
  151. model=self.model,
  152. credentials=self.credentials,
  153. texts=texts,
  154. user=user
  155. )
  156. def get_text_embedding_num_tokens(self, texts: list[str]) -> int:
  157. """
  158. Get number of tokens for text embedding
  159. :param texts: texts to embed
  160. :return:
  161. """
  162. if not isinstance(self.model_type_instance, TextEmbeddingModel):
  163. raise Exception("Model type instance is not TextEmbeddingModel")
  164. self.model_type_instance = cast(TextEmbeddingModel, self.model_type_instance)
  165. return self._round_robin_invoke(
  166. function=self.model_type_instance.get_num_tokens,
  167. model=self.model,
  168. credentials=self.credentials,
  169. texts=texts
  170. )
  171. def invoke_rerank(self, query: str, docs: list[str], score_threshold: Optional[float] = None,
  172. top_n: Optional[int] = None,
  173. user: Optional[str] = None) \
  174. -> RerankResult:
  175. """
  176. Invoke rerank model
  177. :param query: search query
  178. :param docs: docs for reranking
  179. :param score_threshold: score threshold
  180. :param top_n: top n
  181. :param user: unique user id
  182. :return: rerank result
  183. """
  184. if not isinstance(self.model_type_instance, RerankModel):
  185. raise Exception("Model type instance is not RerankModel")
  186. self.model_type_instance = cast(RerankModel, self.model_type_instance)
  187. return self._round_robin_invoke(
  188. function=self.model_type_instance.invoke,
  189. model=self.model,
  190. credentials=self.credentials,
  191. query=query,
  192. docs=docs,
  193. score_threshold=score_threshold,
  194. top_n=top_n,
  195. user=user
  196. )
  197. def invoke_moderation(self, text: str, user: Optional[str] = None) \
  198. -> bool:
  199. """
  200. Invoke moderation model
  201. :param text: text to moderate
  202. :param user: unique user id
  203. :return: false if text is safe, true otherwise
  204. """
  205. if not isinstance(self.model_type_instance, ModerationModel):
  206. raise Exception("Model type instance is not ModerationModel")
  207. self.model_type_instance = cast(ModerationModel, self.model_type_instance)
  208. return self._round_robin_invoke(
  209. function=self.model_type_instance.invoke,
  210. model=self.model,
  211. credentials=self.credentials,
  212. text=text,
  213. user=user
  214. )
  215. def invoke_speech2text(self, file: IO[bytes], user: Optional[str] = None) \
  216. -> str:
  217. """
  218. Invoke large language model
  219. :param file: audio file
  220. :param user: unique user id
  221. :return: text for given audio file
  222. """
  223. if not isinstance(self.model_type_instance, Speech2TextModel):
  224. raise Exception("Model type instance is not Speech2TextModel")
  225. self.model_type_instance = cast(Speech2TextModel, self.model_type_instance)
  226. return self._round_robin_invoke(
  227. function=self.model_type_instance.invoke,
  228. model=self.model,
  229. credentials=self.credentials,
  230. file=file,
  231. user=user
  232. )
  233. def invoke_tts(self, content_text: str, tenant_id: str, voice: str, streaming: bool, user: Optional[str] = None) \
  234. -> str:
  235. """
  236. Invoke large language tts model
  237. :param content_text: text content to be translated
  238. :param tenant_id: user tenant id
  239. :param user: unique user id
  240. :param voice: model timbre
  241. :param streaming: output is streaming
  242. :return: text for given audio file
  243. """
  244. if not isinstance(self.model_type_instance, TTSModel):
  245. raise Exception("Model type instance is not TTSModel")
  246. self.model_type_instance = cast(TTSModel, self.model_type_instance)
  247. return self._round_robin_invoke(
  248. function=self.model_type_instance.invoke,
  249. model=self.model,
  250. credentials=self.credentials,
  251. content_text=content_text,
  252. user=user,
  253. tenant_id=tenant_id,
  254. voice=voice,
  255. streaming=streaming
  256. )
  257. def _round_robin_invoke(self, function: callable, *args, **kwargs):
  258. """
  259. Round-robin invoke
  260. :param function: function to invoke
  261. :param args: function args
  262. :param kwargs: function kwargs
  263. :return:
  264. """
  265. if not self.load_balancing_manager:
  266. return function(*args, **kwargs)
  267. last_exception = None
  268. while True:
  269. lb_config = self.load_balancing_manager.fetch_next()
  270. if not lb_config:
  271. if not last_exception:
  272. raise ProviderTokenNotInitError("Model credentials is not initialized.")
  273. else:
  274. raise last_exception
  275. try:
  276. if 'credentials' in kwargs:
  277. del kwargs['credentials']
  278. return function(*args, **kwargs, credentials=lb_config.credentials)
  279. except InvokeRateLimitError as e:
  280. # expire in 60 seconds
  281. self.load_balancing_manager.cooldown(lb_config, expire=60)
  282. last_exception = e
  283. continue
  284. except (InvokeAuthorizationError, InvokeConnectionError) as e:
  285. # expire in 10 seconds
  286. self.load_balancing_manager.cooldown(lb_config, expire=10)
  287. last_exception = e
  288. continue
  289. except Exception as e:
  290. raise e
  291. def get_tts_voices(self, language: Optional[str] = None) -> list:
  292. """
  293. Invoke large language tts model voices
  294. :param language: tts language
  295. :return: tts model voices
  296. """
  297. if not isinstance(self.model_type_instance, TTSModel):
  298. raise Exception("Model type instance is not TTSModel")
  299. self.model_type_instance = cast(TTSModel, self.model_type_instance)
  300. return self.model_type_instance.get_tts_model_voices(
  301. model=self.model,
  302. credentials=self.credentials,
  303. language=language
  304. )
  305. class ModelManager:
  306. def __init__(self) -> None:
  307. self._provider_manager = ProviderManager()
  308. def get_model_instance(self, tenant_id: str, provider: str, model_type: ModelType, model: str) -> ModelInstance:
  309. """
  310. Get model instance
  311. :param tenant_id: tenant id
  312. :param provider: provider name
  313. :param model_type: model type
  314. :param model: model name
  315. :return:
  316. """
  317. if not provider:
  318. return self.get_default_model_instance(tenant_id, model_type)
  319. provider_model_bundle = self._provider_manager.get_provider_model_bundle(
  320. tenant_id=tenant_id,
  321. provider=provider,
  322. model_type=model_type
  323. )
  324. return ModelInstance(provider_model_bundle, model)
  325. def get_default_model_instance(self, tenant_id: str, model_type: ModelType) -> ModelInstance:
  326. """
  327. Get default model instance
  328. :param tenant_id: tenant id
  329. :param model_type: model type
  330. :return:
  331. """
  332. default_model_entity = self._provider_manager.get_default_model(
  333. tenant_id=tenant_id,
  334. model_type=model_type
  335. )
  336. if not default_model_entity:
  337. raise ProviderTokenNotInitError(f"Default model not found for {model_type}")
  338. return self.get_model_instance(
  339. tenant_id=tenant_id,
  340. provider=default_model_entity.provider.provider,
  341. model_type=model_type,
  342. model=default_model_entity.model
  343. )
  344. class LBModelManager:
  345. def __init__(self, tenant_id: str,
  346. provider: str,
  347. model_type: ModelType,
  348. model: str,
  349. load_balancing_configs: list[ModelLoadBalancingConfiguration],
  350. managed_credentials: Optional[dict] = None) -> None:
  351. """
  352. Load balancing model manager
  353. :param load_balancing_configs: all load balancing configurations
  354. :param managed_credentials: credentials if load balancing configuration name is __inherit__
  355. """
  356. self._tenant_id = tenant_id
  357. self._provider = provider
  358. self._model_type = model_type
  359. self._model = model
  360. self._load_balancing_configs = load_balancing_configs
  361. for load_balancing_config in self._load_balancing_configs:
  362. if load_balancing_config.name == "__inherit__":
  363. if not managed_credentials:
  364. # remove __inherit__ if managed credentials is not provided
  365. self._load_balancing_configs.remove(load_balancing_config)
  366. else:
  367. load_balancing_config.credentials = managed_credentials
  368. def fetch_next(self) -> Optional[ModelLoadBalancingConfiguration]:
  369. """
  370. Get next model load balancing config
  371. Strategy: Round Robin
  372. :return:
  373. """
  374. cache_key = "model_lb_index:{}:{}:{}:{}".format(
  375. self._tenant_id,
  376. self._provider,
  377. self._model_type.value,
  378. self._model
  379. )
  380. cooldown_load_balancing_configs = []
  381. max_index = len(self._load_balancing_configs)
  382. while True:
  383. current_index = redis_client.incr(cache_key)
  384. if current_index >= 10000000:
  385. current_index = 1
  386. redis_client.set(cache_key, current_index)
  387. redis_client.expire(cache_key, 3600)
  388. if current_index > max_index:
  389. current_index = current_index % max_index
  390. real_index = current_index - 1
  391. if real_index > max_index:
  392. real_index = 0
  393. config = self._load_balancing_configs[real_index]
  394. if self.in_cooldown(config):
  395. cooldown_load_balancing_configs.append(config)
  396. if len(cooldown_load_balancing_configs) >= len(self._load_balancing_configs):
  397. # all configs are in cooldown
  398. return None
  399. continue
  400. if bool(os.environ.get("DEBUG", 'False').lower() == 'true'):
  401. logger.info(f"Model LB\nid: {config.id}\nname:{config.name}\n"
  402. f"tenant_id: {self._tenant_id}\nprovider: {self._provider}\n"
  403. f"model_type: {self._model_type.value}\nmodel: {self._model}")
  404. return config
  405. return None
  406. def cooldown(self, config: ModelLoadBalancingConfiguration, expire: int = 60) -> None:
  407. """
  408. Cooldown model load balancing config
  409. :param config: model load balancing config
  410. :param expire: cooldown time
  411. :return:
  412. """
  413. cooldown_cache_key = "model_lb_index:cooldown:{}:{}:{}:{}:{}".format(
  414. self._tenant_id,
  415. self._provider,
  416. self._model_type.value,
  417. self._model,
  418. config.id
  419. )
  420. redis_client.setex(cooldown_cache_key, expire, 'true')
  421. def in_cooldown(self, config: ModelLoadBalancingConfiguration) -> bool:
  422. """
  423. Check if model load balancing config is in cooldown
  424. :param config: model load balancing config
  425. :return:
  426. """
  427. cooldown_cache_key = "model_lb_index:cooldown:{}:{}:{}:{}:{}".format(
  428. self._tenant_id,
  429. self._provider,
  430. self._model_type.value,
  431. self._model,
  432. config.id
  433. )
  434. return redis_client.exists(cooldown_cache_key)
  435. @classmethod
  436. def get_config_in_cooldown_and_ttl(cls, tenant_id: str,
  437. provider: str,
  438. model_type: ModelType,
  439. model: str,
  440. config_id: str) -> tuple[bool, int]:
  441. """
  442. Get model load balancing config is in cooldown and ttl
  443. :param tenant_id: workspace id
  444. :param provider: provider name
  445. :param model_type: model type
  446. :param model: model name
  447. :param config_id: model load balancing config id
  448. :return:
  449. """
  450. cooldown_cache_key = "model_lb_index:cooldown:{}:{}:{}:{}:{}".format(
  451. tenant_id,
  452. provider,
  453. model_type.value,
  454. model,
  455. config_id
  456. )
  457. ttl = redis_client.ttl(cooldown_cache_key)
  458. if ttl == -2:
  459. return False, 0
  460. return True, ttl