base.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. import inspect
  2. import json
  3. import logging
  4. from collections.abc import Callable, Generator
  5. from typing import TypeVar
  6. import requests
  7. from pydantic import BaseModel
  8. from yarl import URL
  9. from configs import dify_config
  10. from core.model_runtime.errors.invoke import (
  11. InvokeAuthorizationError,
  12. InvokeBadRequestError,
  13. InvokeConnectionError,
  14. InvokeRateLimitError,
  15. InvokeServerUnavailableError,
  16. )
  17. from core.model_runtime.errors.validate import CredentialsValidateFailedError
  18. from core.plugin.entities.plugin_daemon import PluginDaemonBasicResponse, PluginDaemonError, PluginDaemonInnerError
  19. from core.plugin.manager.exc import (
  20. PluginDaemonBadRequestError,
  21. PluginDaemonInternalServerError,
  22. PluginDaemonNotFoundError,
  23. PluginDaemonUnauthorizedError,
  24. PluginInvokeError,
  25. PluginPermissionDeniedError,
  26. PluginUniqueIdentifierError,
  27. )
  28. plugin_daemon_inner_api_baseurl = dify_config.PLUGIN_API_URL
  29. plugin_daemon_inner_api_key = dify_config.PLUGIN_API_KEY
  30. T = TypeVar("T", bound=(BaseModel | dict | list | bool | str))
  31. logger = logging.getLogger(__name__)
  32. class BasePluginManager:
  33. def _request(
  34. self,
  35. method: str,
  36. path: str,
  37. headers: dict | None = None,
  38. data: bytes | dict | str | None = None,
  39. params: dict | None = None,
  40. files: dict | None = None,
  41. stream: bool = False,
  42. ) -> requests.Response:
  43. """
  44. Make a request to the plugin daemon inner API.
  45. """
  46. url = URL(str(plugin_daemon_inner_api_baseurl)) / path
  47. headers = headers or {}
  48. headers["X-Api-Key"] = plugin_daemon_inner_api_key
  49. headers["Accept-Encoding"] = "gzip, deflate, br"
  50. if headers.get("Content-Type") == "application/json" and isinstance(data, dict):
  51. data = json.dumps(data)
  52. try:
  53. response = requests.request(
  54. method=method, url=str(url), headers=headers, data=data, params=params, stream=stream, files=files
  55. )
  56. except requests.exceptions.ConnectionError:
  57. logger.exception("Request to Plugin Daemon Service failed")
  58. raise PluginDaemonInnerError(code=-500, message="Request to Plugin Daemon Service failed")
  59. return response
  60. def _stream_request(
  61. self,
  62. method: str,
  63. path: str,
  64. params: dict | None = None,
  65. headers: dict | None = None,
  66. data: bytes | dict | None = None,
  67. files: dict | None = None,
  68. ) -> Generator[bytes, None, None]:
  69. """
  70. Make a stream request to the plugin daemon inner API
  71. """
  72. response = self._request(method, path, headers, data, params, files, stream=True)
  73. for line in response.iter_lines():
  74. line = line.decode("utf-8").strip()
  75. if line.startswith("data:"):
  76. line = line[5:].strip()
  77. if line:
  78. yield line
  79. def _stream_request_with_model(
  80. self,
  81. method: str,
  82. path: str,
  83. type: type[T],
  84. headers: dict | None = None,
  85. data: bytes | dict | None = None,
  86. params: dict | None = None,
  87. files: dict | None = None,
  88. ) -> Generator[T, None, None]:
  89. """
  90. Make a stream request to the plugin daemon inner API and yield the response as a model.
  91. """
  92. for line in self._stream_request(method, path, params, headers, data, files):
  93. yield type(**json.loads(line))
  94. def _request_with_model(
  95. self,
  96. method: str,
  97. path: str,
  98. type: type[T],
  99. headers: dict | None = None,
  100. data: bytes | None = None,
  101. params: dict | None = None,
  102. files: dict | None = None,
  103. ) -> T:
  104. """
  105. Make a request to the plugin daemon inner API and return the response as a model.
  106. """
  107. response = self._request(method, path, headers, data, params, files)
  108. return type(**response.json())
  109. def _request_with_plugin_daemon_response(
  110. self,
  111. method: str,
  112. path: str,
  113. type: type[T],
  114. headers: dict | None = None,
  115. data: bytes | dict | None = None,
  116. params: dict | None = None,
  117. files: dict | None = None,
  118. transformer: Callable[[dict], dict] | None = None,
  119. ) -> T:
  120. """
  121. Make a request to the plugin daemon inner API and return the response as a model.
  122. """
  123. response = self._request(method, path, headers, data, params, files)
  124. json_response = response.json()
  125. if transformer:
  126. json_response = transformer(json_response)
  127. rep = PluginDaemonBasicResponse[type](**json_response)
  128. if rep.code != 0:
  129. try:
  130. error = PluginDaemonError(**json.loads(rep.message))
  131. except Exception as e:
  132. raise ValueError(f"{rep.message}, code: {rep.code}")
  133. self._handle_plugin_daemon_error(error.error_type, error.message)
  134. if rep.data is None:
  135. frame = inspect.currentframe()
  136. raise ValueError(f"got empty data from plugin daemon: {frame.f_lineno if frame else 'unknown'}")
  137. return rep.data
  138. def _request_with_plugin_daemon_response_stream(
  139. self,
  140. method: str,
  141. path: str,
  142. type: type[T],
  143. headers: dict | None = None,
  144. data: bytes | dict | None = None,
  145. params: dict | None = None,
  146. files: dict | None = None,
  147. ) -> Generator[T, None, None]:
  148. """
  149. Make a stream request to the plugin daemon inner API and yield the response as a model.
  150. """
  151. for line in self._stream_request(method, path, params, headers, data, files):
  152. line_data = None
  153. try:
  154. line_data = json.loads(line)
  155. rep = PluginDaemonBasicResponse[type](**line_data)
  156. except Exception as e:
  157. # TODO modify this when line_data has code and message
  158. if line_data and "error" in line_data:
  159. raise ValueError(line_data["error"])
  160. else:
  161. raise ValueError(line)
  162. if rep.code != 0:
  163. if rep.code == -500:
  164. try:
  165. error = PluginDaemonError(**json.loads(rep.message))
  166. except Exception as e:
  167. raise PluginDaemonInnerError(code=rep.code, message=rep.message)
  168. self._handle_plugin_daemon_error(error.error_type, error.message)
  169. raise ValueError(f"plugin daemon: {rep.message}, code: {rep.code}")
  170. if rep.data is None:
  171. frame = inspect.currentframe()
  172. raise ValueError(f"got empty data from plugin daemon: {frame.f_lineno if frame else 'unknown'}")
  173. yield rep.data
  174. def _handle_plugin_daemon_error(self, error_type: str, message: str):
  175. """
  176. handle the error from plugin daemon
  177. """
  178. match error_type:
  179. case PluginDaemonInnerError.__name__:
  180. raise PluginDaemonInnerError(code=-500, message=message)
  181. case PluginInvokeError.__name__:
  182. error_object = json.loads(message)
  183. invoke_error_type = error_object.get("error_type")
  184. args = error_object.get("args")
  185. match invoke_error_type:
  186. case InvokeRateLimitError.__name__:
  187. raise InvokeRateLimitError(description=args.get("description"))
  188. case InvokeAuthorizationError.__name__:
  189. raise InvokeAuthorizationError(description=args.get("description"))
  190. case InvokeBadRequestError.__name__:
  191. raise InvokeBadRequestError(description=args.get("description"))
  192. case InvokeConnectionError.__name__:
  193. raise InvokeConnectionError(description=args.get("description"))
  194. case InvokeServerUnavailableError.__name__:
  195. raise InvokeServerUnavailableError(description=args.get("description"))
  196. case CredentialsValidateFailedError.__name__:
  197. raise CredentialsValidateFailedError(error_object.get("message"))
  198. case _:
  199. raise PluginInvokeError(description=message)
  200. case PluginDaemonInternalServerError.__name__:
  201. raise PluginDaemonInternalServerError(description=message)
  202. case PluginDaemonBadRequestError.__name__:
  203. raise PluginDaemonBadRequestError(description=message)
  204. case PluginDaemonNotFoundError.__name__:
  205. raise PluginDaemonNotFoundError(description=message)
  206. case PluginUniqueIdentifierError.__name__:
  207. raise PluginUniqueIdentifierError(description=message)
  208. case PluginDaemonUnauthorizedError.__name__:
  209. raise PluginDaemonUnauthorizedError(description=message)
  210. case PluginPermissionDeniedError.__name__:
  211. raise PluginPermissionDeniedError(description=message)
  212. case _:
  213. raise Exception(f"got unknown error from plugin daemon: {error_type}, message: {message}")