base.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. import json
  2. from collections.abc import Callable, Generator
  3. from typing import Optional, TypeVar
  4. import requests
  5. from pydantic import BaseModel
  6. from yarl import URL
  7. from configs import dify_config
  8. from core.model_runtime.errors.invoke import (
  9. InvokeAuthorizationError,
  10. InvokeBadRequestError,
  11. InvokeConnectionError,
  12. InvokeRateLimitError,
  13. InvokeServerUnavailableError,
  14. )
  15. from core.plugin.entities.plugin_daemon import PluginDaemonBasicResponse, PluginDaemonError, PluginDaemonInnerError
  16. plugin_daemon_inner_api_baseurl = dify_config.PLUGIN_API_URL
  17. plugin_daemon_inner_api_key = dify_config.PLUGIN_API_KEY
  18. T = TypeVar("T", bound=(BaseModel | dict | list | bool | str))
  19. class BasePluginManager:
  20. def _request(
  21. self,
  22. method: str,
  23. path: str,
  24. headers: dict | None = None,
  25. data: bytes | dict | str | None = None,
  26. params: dict | None = None,
  27. files: dict | None = None,
  28. stream: bool = False,
  29. ) -> requests.Response:
  30. """
  31. Make a request to the plugin daemon inner API.
  32. """
  33. url = URL(str(plugin_daemon_inner_api_baseurl)) / path
  34. headers = headers or {}
  35. headers["X-Api-Key"] = plugin_daemon_inner_api_key
  36. headers["Accept-Encoding"] = "gzip, deflate, br"
  37. if headers.get("Content-Type") == "application/json" and isinstance(data, dict):
  38. data = json.dumps(data)
  39. response = requests.request(
  40. method=method, url=str(url), headers=headers, data=data, params=params, stream=stream, files=files
  41. )
  42. return response
  43. def _stream_request(
  44. self,
  45. method: str,
  46. path: str,
  47. params: dict | None = None,
  48. headers: dict | None = None,
  49. data: bytes | dict | None = None,
  50. files: dict | None = None,
  51. ) -> Generator[bytes, None, None]:
  52. """
  53. Make a stream request to the plugin daemon inner API
  54. """
  55. response = self._request(method, path, headers, data, params, files, stream=True)
  56. for line in response.iter_lines():
  57. line = line.decode("utf-8").strip()
  58. if line.startswith("data:"):
  59. line = line[5:].strip()
  60. if line:
  61. yield line
  62. def _stream_request_with_model(
  63. self,
  64. method: str,
  65. path: str,
  66. type: type[T],
  67. headers: dict | None = None,
  68. data: bytes | dict | None = None,
  69. params: dict | None = None,
  70. files: dict | None = None,
  71. ) -> Generator[T, None, None]:
  72. """
  73. Make a stream request to the plugin daemon inner API and yield the response as a model.
  74. """
  75. for line in self._stream_request(method, path, params, headers, data, files):
  76. yield type(**json.loads(line))
  77. def _request_with_model(
  78. self,
  79. method: str,
  80. path: str,
  81. type: type[T],
  82. headers: dict | None = None,
  83. data: bytes | None = None,
  84. params: dict | None = None,
  85. files: dict | None = None,
  86. ) -> T:
  87. """
  88. Make a request to the plugin daemon inner API and return the response as a model.
  89. """
  90. response = self._request(method, path, headers, data, params, files)
  91. return type(**response.json())
  92. def _request_with_plugin_daemon_response(
  93. self,
  94. method: str,
  95. path: str,
  96. type: type[T],
  97. headers: dict | None = None,
  98. data: bytes | dict | None = None,
  99. params: dict | None = None,
  100. files: dict | None = None,
  101. transformer: Callable[[dict], dict] | None = None,
  102. ) -> T:
  103. """
  104. Make a request to the plugin daemon inner API and return the response as a model.
  105. """
  106. response = self._request(method, path, headers, data, params, files)
  107. json_response = response.json()
  108. if transformer:
  109. json_response = transformer(json_response)
  110. rep = PluginDaemonBasicResponse[type](**json_response)
  111. if rep.code != 0:
  112. if rep.code == -500:
  113. try:
  114. error = PluginDaemonError(**json.loads(rep.message))
  115. except Exception as e:
  116. raise ValueError(f"got error from plugin daemon: {rep.message}, code: {rep.code}")
  117. self._handle_plugin_daemon_error(error.error_type, error.message, error.args)
  118. raise ValueError(f"got error from plugin daemon: {rep.message}, code: {rep.code}")
  119. if rep.data is None:
  120. raise ValueError("got empty data from plugin daemon")
  121. return rep.data
  122. def _request_with_plugin_daemon_response_stream(
  123. self,
  124. method: str,
  125. path: str,
  126. type: type[T],
  127. headers: dict | None = None,
  128. data: bytes | dict | None = None,
  129. params: dict | None = None,
  130. files: dict | None = None,
  131. ) -> Generator[T, None, None]:
  132. """
  133. Make a stream request to the plugin daemon inner API and yield the response as a model.
  134. """
  135. for line in self._stream_request(method, path, params, headers, data, files):
  136. line_data = json.loads(line)
  137. rep = PluginDaemonBasicResponse[type](**line_data)
  138. if rep.code != 0:
  139. if rep.code == -500:
  140. try:
  141. error = PluginDaemonError(**json.loads(rep.message))
  142. except Exception as e:
  143. raise PluginDaemonInnerError(code=rep.code, message=rep.message)
  144. self._handle_plugin_daemon_error(error.error_type, error.message, error.args)
  145. raise ValueError(f"got error from plugin daemon: {rep.message}, code: {rep.code}")
  146. if rep.data is None:
  147. raise ValueError("got empty data from plugin daemon")
  148. yield rep.data
  149. def _handle_plugin_daemon_error(self, error_type: str, message: str, args: Optional[dict] = None):
  150. """
  151. handle the error from plugin daemon
  152. """
  153. args = args or {}
  154. if error_type == PluginDaemonInnerError.__name__:
  155. raise PluginDaemonInnerError(code=-500, message=message)
  156. elif error_type == InvokeRateLimitError.__name__:
  157. raise InvokeRateLimitError(description=args.get("description"))
  158. elif error_type == InvokeAuthorizationError.__name__:
  159. raise InvokeAuthorizationError(description=args.get("description"))
  160. elif error_type == InvokeBadRequestError.__name__:
  161. raise InvokeBadRequestError(description=args.get("description"))
  162. elif error_type == InvokeConnectionError.__name__:
  163. raise InvokeConnectionError(description=args.get("description"))
  164. elif error_type == InvokeServerUnavailableError.__name__:
  165. raise InvokeServerUnavailableError(description=args.get("description"))
  166. else:
  167. raise ValueError(f"got unknown error from plugin daemon: {error_type}, message: {message}, args: {args}")