completion.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. import json
  2. import logging
  3. import time
  4. from collections.abc import Generator
  5. from datetime import UTC, datetime
  6. import requests
  7. from flask import request
  8. from flask_login import current_user # type: ignore
  9. from flask_restful import Resource, reqparse # type: ignore
  10. from werkzeug.exceptions import BadRequest, InternalServerError, NotFound
  11. import services
  12. from controllers.console.app.error import (
  13. AppUnavailableError,
  14. CompletionRequestError,
  15. ConversationCompletedError,
  16. ProviderModelCurrentlyNotSupportError,
  17. ProviderNotInitializeError,
  18. ProviderQuotaExceededError,
  19. )
  20. from controllers.console.explore.error import NotChatAppError, NotCompletionAppError
  21. from controllers.console.explore.wraps import InstalledAppResource
  22. from controllers.web.error import InvokeRateLimitError as InvokeRateLimitHttpError
  23. from core.app.apps.base_app_queue_manager import AppQueueManager
  24. from core.app.entities.app_invoke_entities import InvokeFrom
  25. from core.errors.error import (
  26. ModelCurrentlyNotSupportError,
  27. ProviderTokenNotInitError,
  28. QuotaExceededError,
  29. )
  30. from core.model_runtime.errors.invoke import InvokeError
  31. from extensions.ext_database import db
  32. from libs import helper
  33. from libs.helper import extract_remote_ip, uuid_value
  34. from models.model import AppMode
  35. from services.app_generate_service import AppGenerateService
  36. from services.errors.llm import InvokeRateLimitError
  37. from services.robot_account_service import RobotAccountService
  38. # define completion api for user
  39. class CompletionApi(InstalledAppResource):
  40. def post(self, installed_app):
  41. app_model = installed_app.app
  42. if app_model.mode != "completion":
  43. raise NotCompletionAppError()
  44. parser = reqparse.RequestParser()
  45. parser.add_argument("inputs", type=dict, required=True, location="json")
  46. parser.add_argument("query", type=str, location="json", default="")
  47. parser.add_argument("files", type=list, required=False, location="json")
  48. parser.add_argument("response_mode", type=str, choices=["blocking", "streaming"], location="json")
  49. parser.add_argument("retriever_from", type=str, required=False, default="explore_app", location="json")
  50. args = parser.parse_args()
  51. streaming = args["response_mode"] == "streaming"
  52. args["auto_generate_name"] = False
  53. installed_app.last_used_at = datetime.now(UTC).replace(tzinfo=None)
  54. db.session.commit()
  55. try:
  56. response = AppGenerateService.generate(
  57. app_model=app_model, user=current_user, args=args, invoke_from=InvokeFrom.EXPLORE, streaming=streaming
  58. )
  59. return helper.compact_generate_response(response)
  60. except services.errors.conversation.ConversationNotExistsError:
  61. raise NotFound("Conversation Not Exists.")
  62. except services.errors.conversation.ConversationCompletedError:
  63. raise ConversationCompletedError()
  64. except services.errors.app_model_config.AppModelConfigBrokenError:
  65. logging.exception("App model config broken.")
  66. raise AppUnavailableError()
  67. except ProviderTokenNotInitError as ex:
  68. raise ProviderNotInitializeError(ex.description)
  69. except QuotaExceededError:
  70. raise ProviderQuotaExceededError()
  71. except ModelCurrentlyNotSupportError:
  72. raise ProviderModelCurrentlyNotSupportError()
  73. except InvokeError as e:
  74. raise CompletionRequestError(e.description)
  75. except ValueError as e:
  76. raise e
  77. except Exception:
  78. logging.exception("internal server error.")
  79. raise InternalServerError()
  80. class CompletionStopApi(InstalledAppResource):
  81. def post(self, installed_app, task_id):
  82. app_model = installed_app.app
  83. if app_model.mode != "completion":
  84. raise NotCompletionAppError()
  85. AppQueueManager.set_stop_flag(task_id, InvokeFrom.EXPLORE, current_user.id)
  86. return {"result": "success"}, 200
  87. class ChatApi(InstalledAppResource):
  88. def post(self, installed_app):
  89. app_model = installed_app.app
  90. app_mode = AppMode.value_of(app_model.mode)
  91. if app_mode not in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT}:
  92. raise NotChatAppError()
  93. parser = reqparse.RequestParser()
  94. parser.add_argument("inputs", type=dict, required=True, location="json")
  95. parser.add_argument("query", type=str, required=True, location="json")
  96. parser.add_argument("files", type=list, required=False, location="json")
  97. parser.add_argument("conversation_id", type=uuid_value, location="json")
  98. parser.add_argument("parent_message_id", type=uuid_value, required=False, location="json")
  99. parser.add_argument("retriever_from", type=str, required=False, default="explore_app", location="json")
  100. args = parser.parse_args()
  101. args["auto_generate_name"] = False
  102. installed_app.last_used_at = datetime.now(UTC).replace(tzinfo=None)
  103. db.session.commit()
  104. try:
  105. response = AppGenerateService.generate(
  106. app_model=app_model, user=current_user, args=args, invoke_from=InvokeFrom.EXPLORE, streaming=True
  107. )
  108. return helper.compact_generate_response(response)
  109. except services.errors.conversation.ConversationNotExistsError:
  110. raise NotFound("Conversation Not Exists.")
  111. except services.errors.conversation.ConversationCompletedError:
  112. raise ConversationCompletedError()
  113. except services.errors.app_model_config.AppModelConfigBrokenError:
  114. logging.exception("App model config broken.")
  115. raise AppUnavailableError()
  116. except ProviderTokenNotInitError as ex:
  117. raise ProviderNotInitializeError(ex.description)
  118. except QuotaExceededError:
  119. raise ProviderQuotaExceededError()
  120. except ModelCurrentlyNotSupportError:
  121. raise ProviderModelCurrentlyNotSupportError()
  122. except InvokeError as e:
  123. raise CompletionRequestError(e.description)
  124. except InvokeRateLimitError as ex:
  125. raise InvokeRateLimitHttpError(ex.description)
  126. except ValueError as e:
  127. raise e
  128. except Exception:
  129. logging.exception("internal server error.")
  130. raise InternalServerError()
  131. class ChatApiForRobot(Resource):
  132. def post(self, installed_app_id):
  133. # 1) 解析机器人聊天接口参数
  134. parser = reqparse.RequestParser()
  135. parser.add_argument("id", type=str, required=True, location="json")
  136. parser.add_argument("enterprise_id", type=str, required=False, location="json")
  137. parser.add_argument("device_id", type=str, required=False, location="json")
  138. parser.add_argument("messages", type=list, required=True, location="json")
  139. parser.add_argument("max_tokens", type=int, required=True, location="json")
  140. parser.add_argument("stream", type=bool, required=True, location="json")
  141. args = parser.parse_args()
  142. # 2) 将机器人聊天请求参数转换为dify聊天请求参数
  143. messages = args["messages"]
  144. if messages is None or len(messages) == 0:
  145. raise BadRequest("messages is empty.")
  146. id = args["id"]
  147. query = messages[len(messages) - 1]["content"]
  148. response_mode = "streaming" if args["stream"] else "blocking"
  149. device_id = args["device_id"]
  150. data = {
  151. "inputs": {},
  152. "query": query,
  153. "response_mode": response_mode,
  154. "conversation_id": "",
  155. "user": device_id if device_id else "abc-123",
  156. "files": []
  157. }
  158. # 3) 获取用户token,封装请求头
  159. login_json = {
  160. "email": "suhh@mail.com.cn",
  161. "password": "tj123456",
  162. }
  163. # 3.1) 方案1:每次login
  164. access_token = RobotAccountService.login(login_json["email"], login_json["password"])
  165. # # 3.2) 方案2:login后将access_token写入redis,根据是否能够从redis取得内容判断
  166. # access_token = RobotAccountService.get_account_access_token(login_json["email"])
  167. # # access_token = None
  168. # if access_token is None:
  169. # # 3.2.1) 方案2-1:调用登录接口,使用返回的access_token
  170. # login_response = requests.post(f"{request.host_url}console/api/login", json=login_json)
  171. # access_token = login_response.json()["data"]["access_token"]
  172. # access_token_key = RobotAccountService._get_access_token_key(login_json["email"])
  173. # redis_client.setex(access_token_key, ACCESS_TOKEN_EXPIRY, access_token)
  174. #
  175. # # 3.2.2) 方案2-2:调用service层封装的login方法,使用返回的access_token
  176. # access_token = RobotAccountService.login(login_json["email"], login_json["password"])
  177. headers = {
  178. "Content-Type": "application/json",
  179. "User-Agent": "Robot",
  180. "Accept": "*/*",
  181. "Authorization": f'Bearer {access_token}',
  182. "Connection": "keep-alive",
  183. }
  184. logging.info(f"Robot request: {extract_remote_ip(request)}")
  185. # 4) 封装请求url并发送请求
  186. chat_message_url = f'{request.host_url}console/api/installed-apps/{installed_app_id}/chat-messages'
  187. logging.info("Sending request to %s", chat_message_url)
  188. # 5) 按照输出要求处理返回的流式数据
  189. if args["stream"]:
  190. def after_response_generator():
  191. timestamp = int(time.time())
  192. new_content = {
  193. "id": id,
  194. "model": "advanced-chat",
  195. "created": timestamp,
  196. "choices": [],
  197. }
  198. i = 0
  199. choice = {
  200. "index": 0,
  201. "delta": {
  202. "role": "assistant",
  203. "content": ""
  204. },
  205. "finish_reason": None
  206. }
  207. new_content["choices"].append(choice)
  208. yield f"id: {i}\ndata: {json.dumps(new_content)}\n\n"
  209. new_content["choices"].pop()
  210. i = i + 1
  211. choice["delta"] = {
  212. "role": None,
  213. "content": "您好!您的问题已收到,正在为您检索,请稍等..."
  214. }
  215. new_content["choices"].append(choice)
  216. yield f"id: {i}\ndata: {json.dumps(new_content)}\n\n"
  217. new_content["choices"].pop()
  218. i = i + 1
  219. response = requests.post(chat_message_url, data=json.dumps(data), headers=headers)
  220. for line in response.iter_lines():
  221. line_str = line.decode("utf-8")
  222. if not line_str.startswith('data:'):
  223. continue
  224. content = json.loads(line_str[6:])
  225. event = content["event"]
  226. if event not in ["message", "message_end"]:
  227. continue
  228. new_content["created"] = content["created_at"]
  229. i = i + 1
  230. if content["event"] == "message":
  231. choice["delta"]["content"] = content["answer"]
  232. else:
  233. choice["delta"]["content"] = ""
  234. choice["finish_reason"] = "stop"
  235. new_content["choices"] = [choice]
  236. yield f"id: {i}\ndata: {json.dumps(new_content)}\n\n"
  237. new_response = after_response_generator()
  238. def generate() -> Generator:
  239. yield from new_response
  240. return helper.compact_generate_response(generate())
  241. else:
  242. response = requests.post(chat_message_url, data=json.dumps(data), headers=headers)
  243. content = json.loads(response.text)
  244. new_response = {
  245. "id": id,
  246. "model": "advanced-chat",
  247. "created": content["created_at"],
  248. "answer": content["answer"],
  249. }
  250. return new_response
  251. class ChatStopApi(InstalledAppResource):
  252. def post(self, installed_app, task_id):
  253. app_model = installed_app.app
  254. app_mode = AppMode.value_of(app_model.mode)
  255. if app_mode not in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT}:
  256. raise NotChatAppError()
  257. AppQueueManager.set_stop_flag(task_id, InvokeFrom.EXPLORE, current_user.id)
  258. return {"result": "success"}, 200