app_generate_service.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. from collections.abc import Generator
  2. from typing import Any, Union
  3. from openai._exceptions import RateLimitError
  4. from configs import dify_config
  5. from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator
  6. from core.app.apps.agent_chat.app_generator import AgentChatAppGenerator
  7. from core.app.apps.chat.app_generator import ChatAppGenerator
  8. from core.app.apps.completion.app_generator import CompletionAppGenerator
  9. from core.app.apps.workflow.app_generator import WorkflowAppGenerator
  10. from core.app.entities.app_invoke_entities import InvokeFrom
  11. from core.app.features.rate_limiting import RateLimit
  12. from models.model import Account, App, AppMode, EndUser
  13. from services.errors.llm import InvokeRateLimitError
  14. from services.workflow_service import WorkflowService
  15. class AppGenerateService:
  16. @classmethod
  17. def generate(
  18. cls,
  19. app_model: App,
  20. user: Union[Account, EndUser],
  21. args: Any,
  22. invoke_from: InvokeFrom,
  23. streaming: bool = True,
  24. ):
  25. """
  26. App Content Generate
  27. :param app_model: app model
  28. :param user: user
  29. :param args: args
  30. :param invoke_from: invoke from
  31. :param streaming: streaming
  32. :return:
  33. """
  34. max_active_request = AppGenerateService._get_max_active_requests(app_model)
  35. rate_limit = RateLimit(app_model.id, max_active_request)
  36. request_id = RateLimit.gen_request_key()
  37. try:
  38. request_id = rate_limit.enter(request_id)
  39. if app_model.mode == AppMode.COMPLETION.value:
  40. return rate_limit.generate(
  41. CompletionAppGenerator.convert_to_event_stream(
  42. CompletionAppGenerator().generate(
  43. app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming
  44. ),
  45. ),
  46. request_id,
  47. )
  48. elif app_model.mode == AppMode.AGENT_CHAT.value or app_model.is_agent:
  49. return rate_limit.generate(
  50. AgentChatAppGenerator.convert_to_event_stream(
  51. AgentChatAppGenerator().generate(
  52. app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming
  53. ),
  54. ),
  55. request_id,
  56. )
  57. elif app_model.mode == AppMode.CHAT.value:
  58. return rate_limit.generate(
  59. ChatAppGenerator.convert_to_event_stream(
  60. ChatAppGenerator().generate(
  61. app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming
  62. ),
  63. ),
  64. request_id,
  65. )
  66. elif app_model.mode == AppMode.ADVANCED_CHAT.value:
  67. workflow = cls._get_workflow(app_model, invoke_from)
  68. return rate_limit.generate(
  69. AdvancedChatAppGenerator.convert_to_event_stream(
  70. AdvancedChatAppGenerator().generate(
  71. app_model=app_model,
  72. workflow=workflow,
  73. user=user,
  74. args=args,
  75. invoke_from=invoke_from,
  76. stream=streaming,
  77. ),
  78. ),
  79. request_id,
  80. )
  81. elif app_model.mode == AppMode.WORKFLOW.value:
  82. workflow = cls._get_workflow(app_model, invoke_from)
  83. return rate_limit.generate(
  84. WorkflowAppGenerator.convert_to_event_stream(
  85. WorkflowAppGenerator().generate(
  86. app_model=app_model,
  87. workflow=workflow,
  88. user=user,
  89. args=args,
  90. invoke_from=invoke_from,
  91. stream=streaming,
  92. ),
  93. ),
  94. request_id,
  95. )
  96. else:
  97. raise ValueError(f"Invalid app mode {app_model.mode}")
  98. except RateLimitError as e:
  99. raise InvokeRateLimitError(str(e))
  100. finally:
  101. if not streaming:
  102. rate_limit.exit(request_id)
  103. @staticmethod
  104. def _get_max_active_requests(app_model: App) -> int:
  105. max_active_requests = app_model.max_active_requests
  106. if app_model.max_active_requests is None:
  107. max_active_requests = int(dify_config.APP_MAX_ACTIVE_REQUESTS)
  108. return max_active_requests
  109. @classmethod
  110. def generate_single_iteration(
  111. cls, app_model: App, user: Union[Account, EndUser], node_id: str, args: Any, streaming: bool = True
  112. ):
  113. if app_model.mode == AppMode.ADVANCED_CHAT.value:
  114. workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER)
  115. return AdvancedChatAppGenerator.convert_to_event_stream(
  116. AdvancedChatAppGenerator().single_iteration_generate(
  117. app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, stream=streaming
  118. )
  119. )
  120. elif app_model.mode == AppMode.WORKFLOW.value:
  121. workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER)
  122. return AdvancedChatAppGenerator.convert_to_event_stream(
  123. WorkflowAppGenerator().single_iteration_generate(
  124. app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, stream=streaming
  125. )
  126. )
  127. else:
  128. raise ValueError(f"Invalid app mode {app_model.mode}")
  129. @classmethod
  130. def generate_more_like_this(
  131. cls,
  132. app_model: App,
  133. user: Union[Account, EndUser],
  134. message_id: str,
  135. invoke_from: InvokeFrom,
  136. streaming: bool = True,
  137. ) -> Union[dict, Generator]:
  138. """
  139. Generate more like this
  140. :param app_model: app model
  141. :param user: user
  142. :param message_id: message id
  143. :param invoke_from: invoke from
  144. :param streaming: streaming
  145. :return:
  146. """
  147. return CompletionAppGenerator().generate_more_like_this(
  148. app_model=app_model, message_id=message_id, user=user, invoke_from=invoke_from, stream=streaming
  149. )
  150. @classmethod
  151. def _get_workflow(cls, app_model: App, invoke_from: InvokeFrom) -> Any:
  152. """
  153. Get workflow
  154. :param app_model: app model
  155. :param invoke_from: invoke from
  156. :return:
  157. """
  158. workflow_service = WorkflowService()
  159. if invoke_from == InvokeFrom.DEBUGGER:
  160. # fetch draft workflow by app_model
  161. workflow = workflow_service.get_draft_workflow(app_model=app_model)
  162. if not workflow:
  163. raise ValueError("Workflow not initialized")
  164. else:
  165. # fetch published workflow by app_model
  166. workflow = workflow_service.get_published_workflow(app_model=app_model)
  167. if not workflow:
  168. raise ValueError("Workflow not published")
  169. return workflow