base.py 3.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. import json
  2. from collections.abc import Generator
  3. from typing import TypeVar
  4. import requests
  5. from pydantic import BaseModel
  6. from yarl import URL
  7. from configs import dify_config
  8. from core.plugin.entities.plugin_daemon import PluginDaemonBasicResponse
  9. plugin_daemon_inner_api_baseurl = dify_config.PLUGIN_API_URL
  10. plugin_daemon_inner_api_key = dify_config.PLUGIN_API_KEY
  11. T = TypeVar("T", bound=(BaseModel | dict | bool))
  12. class BasePluginManager:
  13. def _request(
  14. self,
  15. method: str,
  16. path: str,
  17. headers: dict | None = None,
  18. data: bytes | dict | None = None,
  19. stream: bool = False,
  20. ) -> requests.Response:
  21. """
  22. Make a request to the plugin daemon inner API.
  23. """
  24. url = URL(str(plugin_daemon_inner_api_baseurl)) / path
  25. headers = headers or {}
  26. headers["X-Api-Key"] = plugin_daemon_inner_api_key
  27. response = requests.request(method=method, url=str(url), headers=headers, data=data, stream=stream)
  28. return response
  29. def _stream_request(
  30. self, method: str, path: str, headers: dict | None = None, data: bytes | dict | None = None
  31. ) -> Generator[bytes, None, None]:
  32. """
  33. Make a stream request to the plugin daemon inner API
  34. """
  35. response = self._request(method, path, headers, data, stream=True)
  36. yield from response.iter_lines()
  37. def _stream_request_with_model(
  38. self,
  39. method: str,
  40. path: str,
  41. type: type[T],
  42. headers: dict | None = None,
  43. data: bytes | dict | None = None,
  44. ) -> Generator[T, None, None]:
  45. """
  46. Make a stream request to the plugin daemon inner API and yield the response as a model.
  47. """
  48. for line in self._stream_request(method, path, headers, data):
  49. yield type(**json.loads(line))
  50. def _request_with_model(
  51. self, method: str, path: str, type: type[T], headers: dict | None = None, data: bytes | None = None
  52. ) -> T:
  53. """
  54. Make a request to the plugin daemon inner API and return the response as a model.
  55. """
  56. response = self._request(method, path, headers, data)
  57. return type(**response.json())
  58. def _request_with_plugin_daemon_response(
  59. self, method: str, path: str, type: type[T], headers: dict | None = None, data: bytes | dict | None = None
  60. ) -> T:
  61. """
  62. Make a request to the plugin daemon inner API and return the response as a model.
  63. """
  64. response = self._request(method, path, headers, data)
  65. rep = PluginDaemonBasicResponse[type](**response.json())
  66. if rep.code != 0:
  67. raise ValueError(f"got error from plugin daemon: {rep.message}, code: {rep.code}")
  68. if rep.data is None:
  69. raise ValueError("got empty data from plugin daemon")
  70. return rep.data
  71. def _request_with_plugin_daemon_response_stream(
  72. self, method: str, path: str, type: type[T], headers: dict | None = None, data: bytes | dict | None = None
  73. ) -> Generator[T, None, None]:
  74. """
  75. Make a stream request to the plugin daemon inner API and yield the response as a model.
  76. """
  77. for line in self._stream_request(method, path, headers, data):
  78. line_data = json.loads(line)
  79. rep = PluginDaemonBasicResponse[type](**line_data)
  80. if rep.code != 0:
  81. raise ValueError(f"got error from plugin daemon: {rep.message}, code: {rep.code}")
  82. if rep.data is None:
  83. raise ValueError("got empty data from plugin daemon")
  84. yield rep.data