app_generate_service.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. from collections.abc import Generator, Mapping
  2. from typing import Any, Union
  3. from openai._exceptions import RateLimitError
  4. from configs import dify_config
  5. from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator
  6. from core.app.apps.agent_chat.app_generator import AgentChatAppGenerator
  7. from core.app.apps.chat.app_generator import ChatAppGenerator
  8. from core.app.apps.completion.app_generator import CompletionAppGenerator
  9. from core.app.apps.workflow.app_generator import WorkflowAppGenerator
  10. from core.app.entities.app_invoke_entities import InvokeFrom
  11. from core.app.features.rate_limiting import RateLimit
  12. from models.model import Account, App, AppMode, EndUser
  13. from models.workflow import Workflow
  14. from services.errors.llm import InvokeRateLimitError
  15. from services.workflow_service import WorkflowService
  16. class AppGenerateService:
  17. @classmethod
  18. def generate(
  19. cls,
  20. app_model: App,
  21. user: Union[Account, EndUser],
  22. args: Mapping[str, Any],
  23. invoke_from: InvokeFrom,
  24. streaming: bool = True,
  25. ):
  26. """
  27. App Content Generate
  28. :param app_model: app model
  29. :param user: user
  30. :param args: args
  31. :param invoke_from: invoke from
  32. :param streaming: streaming
  33. :return:
  34. """
  35. max_active_request = AppGenerateService._get_max_active_requests(app_model)
  36. rate_limit = RateLimit(app_model.id, max_active_request)
  37. request_id = RateLimit.gen_request_key()
  38. try:
  39. request_id = rate_limit.enter(request_id)
  40. if app_model.mode == AppMode.COMPLETION.value:
  41. return rate_limit.generate(
  42. CompletionAppGenerator.convert_to_event_stream(
  43. CompletionAppGenerator().generate(
  44. app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming
  45. ),
  46. ),
  47. request_id=request_id,
  48. )
  49. elif app_model.mode == AppMode.AGENT_CHAT.value or app_model.is_agent:
  50. return rate_limit.generate(
  51. AgentChatAppGenerator.convert_to_event_stream(
  52. AgentChatAppGenerator().generate(
  53. app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming
  54. ),
  55. ),
  56. request_id,
  57. )
  58. elif app_model.mode == AppMode.CHAT.value:
  59. return rate_limit.generate(
  60. ChatAppGenerator.convert_to_event_stream(
  61. ChatAppGenerator().generate(
  62. app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming
  63. ),
  64. ),
  65. request_id=request_id,
  66. )
  67. elif app_model.mode == AppMode.ADVANCED_CHAT.value:
  68. workflow = cls._get_workflow(app_model, invoke_from)
  69. return rate_limit.generate(
  70. AdvancedChatAppGenerator.convert_to_event_stream(
  71. AdvancedChatAppGenerator().generate(
  72. app_model=app_model,
  73. workflow=workflow,
  74. user=user,
  75. args=args,
  76. invoke_from=invoke_from,
  77. streaming=streaming,
  78. ),
  79. ),
  80. request_id=request_id,
  81. )
  82. elif app_model.mode == AppMode.WORKFLOW.value:
  83. workflow = cls._get_workflow(app_model, invoke_from)
  84. return rate_limit.generate(
  85. WorkflowAppGenerator.convert_to_event_stream(
  86. WorkflowAppGenerator().generate(
  87. app_model=app_model,
  88. workflow=workflow,
  89. user=user,
  90. args=args,
  91. invoke_from=invoke_from,
  92. streaming=streaming,
  93. call_depth=0,
  94. workflow_thread_pool_id=None,
  95. ),
  96. ),
  97. request_id,
  98. )
  99. else:
  100. raise ValueError(f"Invalid app mode {app_model.mode}")
  101. except RateLimitError as e:
  102. raise InvokeRateLimitError(str(e))
  103. except Exception:
  104. rate_limit.exit(request_id)
  105. raise
  106. finally:
  107. if not streaming:
  108. rate_limit.exit(request_id)
  109. @staticmethod
  110. def _get_max_active_requests(app_model: App) -> int:
  111. max_active_requests = app_model.max_active_requests
  112. if max_active_requests is None:
  113. max_active_requests = int(dify_config.APP_MAX_ACTIVE_REQUESTS)
  114. return max_active_requests
  115. @classmethod
  116. def generate_single_iteration(cls, app_model: App, user: Account, node_id: str, args: Any, streaming: bool = True):
  117. if app_model.mode == AppMode.ADVANCED_CHAT.value:
  118. workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER)
  119. return AdvancedChatAppGenerator.convert_to_event_stream(
  120. AdvancedChatAppGenerator().single_iteration_generate(
  121. app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming
  122. )
  123. )
  124. elif app_model.mode == AppMode.WORKFLOW.value:
  125. workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER)
  126. return AdvancedChatAppGenerator.convert_to_event_stream(
  127. WorkflowAppGenerator().single_iteration_generate(
  128. app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming
  129. )
  130. )
  131. else:
  132. raise ValueError(f"Invalid app mode {app_model.mode}")
  133. @classmethod
  134. def generate_single_loop(cls, app_model: App, user: Account, node_id: str, args: Any, streaming: bool = True):
  135. if app_model.mode == AppMode.ADVANCED_CHAT.value:
  136. workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER)
  137. return AdvancedChatAppGenerator.convert_to_event_stream(
  138. AdvancedChatAppGenerator().single_loop_generate(
  139. app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming
  140. )
  141. )
  142. elif app_model.mode == AppMode.WORKFLOW.value:
  143. workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER)
  144. return AdvancedChatAppGenerator.convert_to_event_stream(
  145. WorkflowAppGenerator().single_loop_generate(
  146. app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming
  147. )
  148. )
  149. else:
  150. raise ValueError(f"Invalid app mode {app_model.mode}")
  151. @classmethod
  152. def generate_more_like_this(
  153. cls,
  154. app_model: App,
  155. user: Union[Account, EndUser],
  156. message_id: str,
  157. invoke_from: InvokeFrom,
  158. streaming: bool = True,
  159. ) -> Union[Mapping, Generator]:
  160. """
  161. Generate more like this
  162. :param app_model: app model
  163. :param user: user
  164. :param message_id: message id
  165. :param invoke_from: invoke from
  166. :param streaming: streaming
  167. :return:
  168. """
  169. return CompletionAppGenerator().generate_more_like_this(
  170. app_model=app_model, message_id=message_id, user=user, invoke_from=invoke_from, stream=streaming
  171. )
  172. @classmethod
  173. def _get_workflow(cls, app_model: App, invoke_from: InvokeFrom) -> Workflow:
  174. """
  175. Get workflow
  176. :param app_model: app model
  177. :param invoke_from: invoke from
  178. :return:
  179. """
  180. workflow_service = WorkflowService()
  181. if invoke_from == InvokeFrom.DEBUGGER:
  182. # fetch draft workflow by app_model
  183. workflow = workflow_service.get_draft_workflow(app_model=app_model)
  184. if not workflow:
  185. raise ValueError("Workflow not initialized")
  186. else:
  187. # fetch published workflow by app_model
  188. workflow = workflow_service.get_published_workflow(app_model=app_model)
  189. if not workflow:
  190. raise ValueError("Workflow not published")
  191. return workflow