base.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. import json
  2. from collections.abc import Generator
  3. from typing import TypeVar
  4. import requests
  5. from pydantic import BaseModel
  6. from yarl import URL
  7. from configs import dify_config
  8. from core.plugin.entities.plugin_daemon import PluginDaemonBasicResponse
  9. plugin_daemon_inner_api_baseurl = dify_config.PLUGIN_API_URL
  10. plugin_daemon_inner_api_key = dify_config.PLUGIN_API_KEY
  11. T = TypeVar("T", bound=(BaseModel | dict | list | bool))
  12. class BasePluginManager:
  13. def _request(
  14. self,
  15. method: str,
  16. path: str,
  17. headers: dict | None = None,
  18. data: bytes | dict | None = None,
  19. params: dict | None = None,
  20. stream: bool = False,
  21. ) -> requests.Response:
  22. """
  23. Make a request to the plugin daemon inner API.
  24. """
  25. url = URL(str(plugin_daemon_inner_api_baseurl)) / path
  26. headers = headers or {}
  27. headers["X-Api-Key"] = plugin_daemon_inner_api_key
  28. response = requests.request(
  29. method=method, url=str(url), headers=headers, data=data, params=params, stream=stream
  30. )
  31. return response
  32. def _stream_request(
  33. self,
  34. method: str,
  35. path: str,
  36. params: dict | None = None,
  37. headers: dict | None = None,
  38. data: bytes | dict | None = None,
  39. ) -> Generator[bytes, None, None]:
  40. """
  41. Make a stream request to the plugin daemon inner API
  42. """
  43. response = self._request(method, path, headers, data, params, stream=True)
  44. yield from response.iter_lines()
  45. def _stream_request_with_model(
  46. self,
  47. method: str,
  48. path: str,
  49. type: type[T],
  50. headers: dict | None = None,
  51. data: bytes | dict | None = None,
  52. params: dict | None = None,
  53. ) -> Generator[T, None, None]:
  54. """
  55. Make a stream request to the plugin daemon inner API and yield the response as a model.
  56. """
  57. for line in self._stream_request(method, path, params, headers, data):
  58. yield type(**json.loads(line))
  59. def _request_with_model(
  60. self,
  61. method: str,
  62. path: str,
  63. type: type[T],
  64. headers: dict | None = None,
  65. data: bytes | None = None,
  66. params: dict | None = None,
  67. ) -> T:
  68. """
  69. Make a request to the plugin daemon inner API and return the response as a model.
  70. """
  71. response = self._request(method, path, headers, data, params)
  72. return type(**response.json())
  73. def _request_with_plugin_daemon_response(
  74. self,
  75. method: str,
  76. path: str,
  77. type: type[T],
  78. headers: dict | None = None,
  79. data: bytes | dict | None = None,
  80. params: dict | None = None,
  81. ) -> T:
  82. """
  83. Make a request to the plugin daemon inner API and return the response as a model.
  84. """
  85. response = self._request(method, path, headers, data, params)
  86. json_response = response.json()
  87. for provider in json_response.get("data", []):
  88. declaration = provider.get("declaration", {}) or {}
  89. provider_name = declaration.get("identity", {}).get("name")
  90. for tool in declaration.get("tools", []):
  91. tool["identity"]["provider"] = provider_name
  92. rep = PluginDaemonBasicResponse[type](**json_response)
  93. if rep.code != 0:
  94. raise ValueError(f"got error from plugin daemon: {rep.message}, code: {rep.code}")
  95. if rep.data is None:
  96. raise ValueError("got empty data from plugin daemon")
  97. return rep.data
  98. def _request_with_plugin_daemon_response_stream(
  99. self,
  100. method: str,
  101. path: str,
  102. type: type[T],
  103. headers: dict | None = None,
  104. data: bytes | dict | None = None,
  105. params: dict | None = None,
  106. ) -> Generator[T, None, None]:
  107. """
  108. Make a stream request to the plugin daemon inner API and yield the response as a model.
  109. """
  110. for line in self._stream_request(method, path, params, headers, data):
  111. line_data = json.loads(line)
  112. rep = PluginDaemonBasicResponse[type](**line_data)
  113. if rep.code != 0:
  114. raise ValueError(f"got error from plugin daemon: {rep.message}, code: {rep.code}")
  115. if rep.data is None:
  116. raise ValueError("got empty data from plugin daemon")
  117. yield rep.data