app_generate_service.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. from collections.abc import Generator, Mapping
  2. from typing import Any, Union
  3. from openai._exceptions import RateLimitError
  4. from configs import dify_config
  5. from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator
  6. from core.app.apps.agent_chat.app_generator import AgentChatAppGenerator
  7. from core.app.apps.chat.app_generator import ChatAppGenerator
  8. from core.app.apps.completion.app_generator import CompletionAppGenerator
  9. from core.app.apps.workflow.app_generator import WorkflowAppGenerator
  10. from core.app.entities.app_invoke_entities import InvokeFrom
  11. from core.app.features.rate_limiting import RateLimit
  12. from libs.helper import RateLimiter
  13. from models.model import Account, App, AppMode, EndUser
  14. from models.workflow import Workflow
  15. from services.billing_service import BillingService
  16. from services.errors.llm import InvokeRateLimitError
  17. from services.workflow_service import WorkflowService
  18. class AppGenerateService:
  19. system_rate_limiter = RateLimiter("app_daily_rate_limiter", dify_config.APP_DAILY_RATE_LIMIT, 86400)
  20. @classmethod
  21. def generate(
  22. cls,
  23. app_model: App,
  24. user: Union[Account, EndUser],
  25. args: Mapping[str, Any],
  26. invoke_from: InvokeFrom,
  27. streaming: bool = True,
  28. ):
  29. """
  30. App Content Generate
  31. :param app_model: app model
  32. :param user: user
  33. :param args: args
  34. :param invoke_from: invoke from
  35. :param streaming: streaming
  36. :return:
  37. """
  38. # system level rate limiter
  39. if dify_config.BILLING_ENABLED:
  40. # check if it's free plan
  41. limit_info = BillingService.get_info(app_model.tenant_id)
  42. if limit_info["subscription"]["plan"] == "sandbox":
  43. if cls.system_rate_limiter.is_rate_limited(app_model.tenant_id):
  44. raise InvokeRateLimitError(
  45. "Rate limit exceeded, please upgrade your plan "
  46. f"or your RPD was {dify_config.APP_DAILY_RATE_LIMIT} requests/day"
  47. )
  48. cls.system_rate_limiter.increment_rate_limit(app_model.tenant_id)
  49. # app level rate limiter
  50. max_active_request = AppGenerateService._get_max_active_requests(app_model)
  51. rate_limit = RateLimit(app_model.id, max_active_request)
  52. request_id = RateLimit.gen_request_key()
  53. try:
  54. request_id = rate_limit.enter(request_id)
  55. if app_model.mode == AppMode.COMPLETION.value:
  56. return rate_limit.generate(
  57. CompletionAppGenerator.convert_to_event_stream(
  58. CompletionAppGenerator().generate(
  59. app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming
  60. ),
  61. ),
  62. request_id=request_id,
  63. )
  64. elif app_model.mode == AppMode.AGENT_CHAT.value or app_model.is_agent:
  65. return rate_limit.generate(
  66. AgentChatAppGenerator.convert_to_event_stream(
  67. AgentChatAppGenerator().generate(
  68. app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming
  69. ),
  70. ),
  71. request_id,
  72. )
  73. elif app_model.mode == AppMode.CHAT.value:
  74. return rate_limit.generate(
  75. ChatAppGenerator.convert_to_event_stream(
  76. ChatAppGenerator().generate(
  77. app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming
  78. ),
  79. ),
  80. request_id=request_id,
  81. )
  82. elif app_model.mode == AppMode.ADVANCED_CHAT.value:
  83. workflow = cls._get_workflow(app_model, invoke_from)
  84. return rate_limit.generate(
  85. AdvancedChatAppGenerator.convert_to_event_stream(
  86. AdvancedChatAppGenerator().generate(
  87. app_model=app_model,
  88. workflow=workflow,
  89. user=user,
  90. args=args,
  91. invoke_from=invoke_from,
  92. streaming=streaming,
  93. ),
  94. ),
  95. request_id=request_id,
  96. )
  97. elif app_model.mode == AppMode.WORKFLOW.value:
  98. workflow = cls._get_workflow(app_model, invoke_from)
  99. return rate_limit.generate(
  100. WorkflowAppGenerator.convert_to_event_stream(
  101. WorkflowAppGenerator().generate(
  102. app_model=app_model,
  103. workflow=workflow,
  104. user=user,
  105. args=args,
  106. invoke_from=invoke_from,
  107. streaming=streaming,
  108. call_depth=0,
  109. workflow_thread_pool_id=None,
  110. ),
  111. ),
  112. request_id,
  113. )
  114. else:
  115. raise ValueError(f"Invalid app mode {app_model.mode}")
  116. except RateLimitError as e:
  117. raise InvokeRateLimitError(str(e))
  118. except Exception:
  119. rate_limit.exit(request_id)
  120. raise
  121. finally:
  122. if not streaming:
  123. rate_limit.exit(request_id)
  124. @staticmethod
  125. def _get_max_active_requests(app_model: App) -> int:
  126. max_active_requests = app_model.max_active_requests
  127. if max_active_requests is None:
  128. max_active_requests = int(dify_config.APP_MAX_ACTIVE_REQUESTS)
  129. return max_active_requests
  130. @classmethod
  131. def generate_single_iteration(cls, app_model: App, user: Account, node_id: str, args: Any, streaming: bool = True):
  132. if app_model.mode == AppMode.ADVANCED_CHAT.value:
  133. workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER)
  134. return AdvancedChatAppGenerator.convert_to_event_stream(
  135. AdvancedChatAppGenerator().single_iteration_generate(
  136. app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming
  137. )
  138. )
  139. elif app_model.mode == AppMode.WORKFLOW.value:
  140. workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER)
  141. return AdvancedChatAppGenerator.convert_to_event_stream(
  142. WorkflowAppGenerator().single_iteration_generate(
  143. app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming
  144. )
  145. )
  146. else:
  147. raise ValueError(f"Invalid app mode {app_model.mode}")
  148. @classmethod
  149. def generate_single_loop(cls, app_model: App, user: Account, node_id: str, args: Any, streaming: bool = True):
  150. if app_model.mode == AppMode.ADVANCED_CHAT.value:
  151. workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER)
  152. return AdvancedChatAppGenerator.convert_to_event_stream(
  153. AdvancedChatAppGenerator().single_loop_generate(
  154. app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming
  155. )
  156. )
  157. elif app_model.mode == AppMode.WORKFLOW.value:
  158. workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER)
  159. return AdvancedChatAppGenerator.convert_to_event_stream(
  160. WorkflowAppGenerator().single_loop_generate(
  161. app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming
  162. )
  163. )
  164. else:
  165. raise ValueError(f"Invalid app mode {app_model.mode}")
  166. @classmethod
  167. def generate_more_like_this(
  168. cls,
  169. app_model: App,
  170. user: Union[Account, EndUser],
  171. message_id: str,
  172. invoke_from: InvokeFrom,
  173. streaming: bool = True,
  174. ) -> Union[Mapping, Generator]:
  175. """
  176. Generate more like this
  177. :param app_model: app model
  178. :param user: user
  179. :param message_id: message id
  180. :param invoke_from: invoke from
  181. :param streaming: streaming
  182. :return:
  183. """
  184. return CompletionAppGenerator().generate_more_like_this(
  185. app_model=app_model, message_id=message_id, user=user, invoke_from=invoke_from, stream=streaming
  186. )
  187. @classmethod
  188. def _get_workflow(cls, app_model: App, invoke_from: InvokeFrom) -> Workflow:
  189. """
  190. Get workflow
  191. :param app_model: app model
  192. :param invoke_from: invoke from
  193. :return:
  194. """
  195. workflow_service = WorkflowService()
  196. if invoke_from == InvokeFrom.DEBUGGER:
  197. # fetch draft workflow by app_model
  198. workflow = workflow_service.get_draft_workflow(app_model=app_model)
  199. if not workflow:
  200. raise ValueError("Workflow not initialized")
  201. else:
  202. # fetch published workflow by app_model
  203. workflow = workflow_service.get_published_workflow(app_model=app_model)
  204. if not workflow:
  205. raise ValueError("Workflow not published")
  206. return workflow