completion.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. import json
  2. import logging
  3. from collections.abc import Generator
  4. import requests
  5. from flask import request
  6. from flask_restful import Resource, reqparse # type: ignore
  7. from werkzeug.exceptions import BadRequest, InternalServerError, NotFound
  8. import services
  9. from controllers.service_api import api
  10. from controllers.service_api.app.error import (
  11. AppUnavailableError,
  12. CompletionRequestError,
  13. ConversationCompletedError,
  14. NotChatAppError,
  15. ProviderModelCurrentlyNotSupportError,
  16. ProviderNotInitializeError,
  17. ProviderQuotaExceededError,
  18. )
  19. from controllers.service_api.wraps import FetchUserArg, WhereisUserArg, validate_app_token
  20. from controllers.web.error import InvokeRateLimitError as InvokeRateLimitHttpError
  21. from core.app.apps.base_app_queue_manager import AppQueueManager
  22. from core.app.entities.app_invoke_entities import InvokeFrom
  23. from core.errors.error import (
  24. ModelCurrentlyNotSupportError,
  25. ProviderTokenNotInitError,
  26. QuotaExceededError,
  27. )
  28. from core.model_runtime.errors.invoke import InvokeError
  29. from libs import helper
  30. from libs.helper import uuid_value
  31. from models.model import App, AppMode, EndUser
  32. from services.app_generate_service import AppGenerateService
  33. from services.errors.llm import InvokeRateLimitError
  34. class CompletionApi(Resource):
  35. @validate_app_token(fetch_user_arg=FetchUserArg(fetch_from=WhereisUserArg.JSON, required=True))
  36. def post(self, app_model: App, end_user: EndUser):
  37. if app_model.mode != "completion":
  38. raise AppUnavailableError()
  39. parser = reqparse.RequestParser()
  40. parser.add_argument("inputs", type=dict, required=True, location="json")
  41. parser.add_argument("query", type=str, location="json", default="")
  42. parser.add_argument("files", type=list, required=False, location="json")
  43. parser.add_argument("response_mode", type=str, choices=["blocking", "streaming"], location="json")
  44. parser.add_argument("retriever_from", type=str, required=False, default="dev", location="json")
  45. args = parser.parse_args()
  46. streaming = args["response_mode"] == "streaming"
  47. args["auto_generate_name"] = False
  48. try:
  49. response = AppGenerateService.generate(
  50. app_model=app_model,
  51. user=end_user,
  52. args=args,
  53. invoke_from=InvokeFrom.SERVICE_API,
  54. streaming=streaming,
  55. )
  56. return helper.compact_generate_response(response)
  57. except services.errors.conversation.ConversationNotExistsError:
  58. raise NotFound("Conversation Not Exists.")
  59. except services.errors.conversation.ConversationCompletedError:
  60. raise ConversationCompletedError()
  61. except services.errors.app_model_config.AppModelConfigBrokenError:
  62. logging.exception("App model config broken.")
  63. raise AppUnavailableError()
  64. except ProviderTokenNotInitError as ex:
  65. raise ProviderNotInitializeError(ex.description)
  66. except QuotaExceededError:
  67. raise ProviderQuotaExceededError()
  68. except ModelCurrentlyNotSupportError:
  69. raise ProviderModelCurrentlyNotSupportError()
  70. except InvokeError as e:
  71. raise CompletionRequestError(e.description)
  72. except ValueError as e:
  73. raise e
  74. except Exception:
  75. logging.exception("internal server error.")
  76. raise InternalServerError()
  77. class CompletionStopApi(Resource):
  78. @validate_app_token(fetch_user_arg=FetchUserArg(fetch_from=WhereisUserArg.JSON, required=True))
  79. def post(self, app_model: App, end_user: EndUser, task_id):
  80. if app_model.mode != "completion":
  81. raise AppUnavailableError()
  82. AppQueueManager.set_stop_flag(task_id, InvokeFrom.SERVICE_API, end_user.id)
  83. return {"result": "success"}, 200
  84. class ChatApi(Resource):
  85. @validate_app_token(fetch_user_arg=FetchUserArg(fetch_from=WhereisUserArg.JSON, required=True))
  86. def post(self, app_model: App, end_user: EndUser):
  87. app_mode = AppMode.value_of(app_model.mode)
  88. if app_mode not in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT}:
  89. raise NotChatAppError()
  90. parser = reqparse.RequestParser()
  91. parser.add_argument("inputs", type=dict, required=True, location="json")
  92. parser.add_argument("query", type=str, required=True, location="json")
  93. parser.add_argument("files", type=list, required=False, location="json")
  94. parser.add_argument("response_mode", type=str, choices=["blocking", "streaming"], location="json")
  95. parser.add_argument("conversation_id", type=uuid_value, location="json")
  96. parser.add_argument("retriever_from", type=str, required=False, default="dev", location="json")
  97. parser.add_argument("auto_generate_name", type=bool, required=False, default=True, location="json")
  98. args = parser.parse_args()
  99. streaming = args["response_mode"] == "streaming"
  100. try:
  101. response = AppGenerateService.generate(
  102. app_model=app_model, user=end_user, args=args, invoke_from=InvokeFrom.SERVICE_API, streaming=streaming
  103. )
  104. return helper.compact_generate_response(response)
  105. except services.errors.conversation.ConversationNotExistsError:
  106. raise NotFound("Conversation Not Exists.")
  107. except services.errors.conversation.ConversationCompletedError:
  108. raise ConversationCompletedError()
  109. except services.errors.app_model_config.AppModelConfigBrokenError:
  110. logging.exception("App model config broken.")
  111. raise AppUnavailableError()
  112. except ProviderTokenNotInitError as ex:
  113. raise ProviderNotInitializeError(ex.description)
  114. except QuotaExceededError:
  115. raise ProviderQuotaExceededError()
  116. except ModelCurrentlyNotSupportError:
  117. raise ProviderModelCurrentlyNotSupportError()
  118. except InvokeRateLimitError as ex:
  119. raise InvokeRateLimitHttpError(ex.description)
  120. except InvokeError as e:
  121. raise CompletionRequestError(e.description)
  122. except ValueError as e:
  123. raise e
  124. except Exception:
  125. logging.exception("internal server error.")
  126. raise InternalServerError()
  127. class ChatStopApi(Resource):
  128. @validate_app_token(fetch_user_arg=FetchUserArg(fetch_from=WhereisUserArg.JSON, required=True))
  129. def post(self, app_model: App, end_user: EndUser, task_id):
  130. app_mode = AppMode.value_of(app_model.mode)
  131. if app_mode not in {AppMode.CHAT, AppMode.AGENT_CHAT, AppMode.ADVANCED_CHAT}:
  132. raise NotChatAppError()
  133. AppQueueManager.set_stop_flag(task_id, InvokeFrom.SERVICE_API, end_user.id)
  134. return {"result": "success"}, 200
  135. class ChatApiForRobot(Resource):
  136. def post(self):
  137. parser = reqparse.RequestParser()
  138. parser.add_argument("id", type=str, required=True, location="json")
  139. parser.add_argument("enterprise_id", type=str, required=False, location="json")
  140. parser.add_argument("device_id", type=str, required=False, location="json")
  141. parser.add_argument("messages", type=list, required=True, location="json")
  142. parser.add_argument("max_tokens", type=int, required=True, location="json")
  143. parser.add_argument("stream", type=bool, required=True, location="json")
  144. args = parser.parse_args()
  145. messages = args["messages"]
  146. if messages is None or len(messages) == 0:
  147. raise BadRequest("messages is empty.")
  148. id = args["id"]
  149. query = messages[len(messages) - 1]["content"]
  150. response_mode = "streaming" if args["stream"] else "blocking"
  151. device_id = args["device_id"]
  152. data = {
  153. "inputs": {},
  154. "query": query,
  155. "response_mode": response_mode,
  156. "conversation_id": "",
  157. "user": device_id if device_id else "abc-123",
  158. "files": []
  159. }
  160. chat_message_url = request.host_url + "v1/chat-messages"
  161. logging.info("Sending request to %s", chat_message_url)
  162. response = requests.post(chat_message_url, data = json.dumps(data), headers = request.headers)
  163. if args["stream"]:
  164. def after_response_generator():
  165. i = 0
  166. for line in response.iter_lines():
  167. line_str = line.decode("utf-8")
  168. if not line_str.startswith('data:'):
  169. continue
  170. content = json.loads(line_str[6:])
  171. event = content["event"]
  172. if event not in ["message", "message_end"]:
  173. continue
  174. new_content = {
  175. "id": id,
  176. "model": "advanced-chat",
  177. "created": content["created_at"],
  178. "choices": [],
  179. }
  180. if i == 0:
  181. choice = {
  182. "index": 0,
  183. "delta": {
  184. "role": "assistant",
  185. "content": ""
  186. },
  187. "finish_reason": None
  188. }
  189. new_content["choices"].append(choice)
  190. yield f"id: {i}\ndata: {json.dumps(new_content)}\n\n"
  191. new_content["choices"].pop()
  192. i = i + 1
  193. if content["event"] == "message":
  194. choice = {
  195. "index": 0,
  196. "delta": {
  197. "content": content["answer"]
  198. },
  199. "finish_reason": None
  200. }
  201. new_content["choices"].append(choice)
  202. else:
  203. choice = {
  204. "index": 0,
  205. "delta": {},
  206. "finish_reason": "stop"
  207. }
  208. new_content["choices"].append(choice)
  209. yield f"id: {i}\ndata: {json.dumps(new_content)}\n\n"
  210. new_response = after_response_generator()
  211. def generate() -> Generator:
  212. yield from new_response
  213. return helper.compact_generate_response(generate())
  214. else:
  215. content = json.loads(response.text)
  216. new_response = {
  217. "id": id,
  218. "model": "advanced-chat",
  219. "created": content["created_at"],
  220. "answer": content ["answer"],
  221. }
  222. return new_response
  223. api.add_resource(CompletionApi, "/completion-messages")
  224. api.add_resource(CompletionStopApi, "/completion-messages/<string:task_id>/stop")
  225. api.add_resource(ChatApi, "/chat-messages")
  226. api.add_resource(ChatStopApi, "/chat-messages/<string:task_id>/stop")
  227. api.add_resource(ChatApiForRobot, "/chat-messages-for-robot")