base.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. import json
  2. from collections.abc import Callable, Generator
  3. from typing import TypeVar
  4. import requests
  5. from pydantic import BaseModel
  6. from yarl import URL
  7. from configs import dify_config
  8. from core.plugin.entities.plugin_daemon import PluginDaemonBasicResponse
  9. plugin_daemon_inner_api_baseurl = dify_config.PLUGIN_API_URL
  10. plugin_daemon_inner_api_key = dify_config.PLUGIN_API_KEY
  11. T = TypeVar("T", bound=(BaseModel | dict | list | bool))
  12. class BasePluginManager:
  13. def _request(
  14. self,
  15. method: str,
  16. path: str,
  17. headers: dict | None = None,
  18. data: bytes | dict | str | None = None,
  19. params: dict | None = None,
  20. stream: bool = False,
  21. ) -> requests.Response:
  22. """
  23. Make a request to the plugin daemon inner API.
  24. """
  25. url = URL(str(plugin_daemon_inner_api_baseurl)) / path
  26. headers = headers or {}
  27. headers["X-Api-Key"] = plugin_daemon_inner_api_key
  28. headers["Accept-Encoding"] = "gzip, deflate, br"
  29. if headers.get("Content-Type") == "application/json" and isinstance(data, dict):
  30. data = json.dumps(data)
  31. response = requests.request(
  32. method=method, url=str(url), headers=headers, data=data, params=params, stream=stream
  33. )
  34. return response
  35. def _stream_request(
  36. self,
  37. method: str,
  38. path: str,
  39. params: dict | None = None,
  40. headers: dict | None = None,
  41. data: bytes | dict | None = None,
  42. ) -> Generator[bytes, None, None]:
  43. """
  44. Make a stream request to the plugin daemon inner API
  45. """
  46. response = self._request(method, path, headers, data, params, stream=True)
  47. for line in response.iter_lines():
  48. line = line.decode("utf-8").strip()
  49. if line.startswith("data:"):
  50. line = line[5:].strip()
  51. if line:
  52. yield line
  53. def _stream_request_with_model(
  54. self,
  55. method: str,
  56. path: str,
  57. type: type[T],
  58. headers: dict | None = None,
  59. data: bytes | dict | None = None,
  60. params: dict | None = None,
  61. ) -> Generator[T, None, None]:
  62. """
  63. Make a stream request to the plugin daemon inner API and yield the response as a model.
  64. """
  65. for line in self._stream_request(method, path, params, headers, data):
  66. yield type(**json.loads(line))
  67. def _request_with_model(
  68. self,
  69. method: str,
  70. path: str,
  71. type: type[T],
  72. headers: dict | None = None,
  73. data: bytes | None = None,
  74. params: dict | None = None,
  75. ) -> T:
  76. """
  77. Make a request to the plugin daemon inner API and return the response as a model.
  78. """
  79. response = self._request(method, path, headers, data, params)
  80. return type(**response.json())
  81. def _request_with_plugin_daemon_response(
  82. self,
  83. method: str,
  84. path: str,
  85. type: type[T],
  86. headers: dict | None = None,
  87. data: bytes | dict | None = None,
  88. params: dict | None = None,
  89. transformer: Callable[[dict], dict] | None = None,
  90. ) -> T:
  91. """
  92. Make a request to the plugin daemon inner API and return the response as a model.
  93. """
  94. response = self._request(method, path, headers, data, params)
  95. json_response = response.json()
  96. if transformer:
  97. json_response = transformer(json_response)
  98. rep = PluginDaemonBasicResponse[type](**json_response)
  99. if rep.code != 0:
  100. raise ValueError(f"got error from plugin daemon: {rep.message}, code: {rep.code}")
  101. if rep.data is None:
  102. raise ValueError("got empty data from plugin daemon")
  103. return rep.data
  104. def _request_with_plugin_daemon_response_stream(
  105. self,
  106. method: str,
  107. path: str,
  108. type: type[T],
  109. headers: dict | None = None,
  110. data: bytes | dict | None = None,
  111. params: dict | None = None,
  112. ) -> Generator[T, None, None]:
  113. """
  114. Make a stream request to the plugin daemon inner API and yield the response as a model.
  115. """
  116. for line in self._stream_request(method, path, params, headers, data):
  117. line_data = json.loads(line)
  118. rep = PluginDaemonBasicResponse[type](**line_data)
  119. if rep.code != 0:
  120. raise ValueError(f"got error from plugin daemon: {rep.message}, code: {rep.code}")
  121. if rep.data is None:
  122. raise ValueError("got empty data from plugin daemon")
  123. yield rep.data