base.py 3.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. import json
  2. from collections.abc import Generator
  3. from typing import TypeVar
  4. import requests
  5. from pydantic import BaseModel
  6. from yarl import URL
  7. from configs import dify_config
  8. from core.plugin.entities.plugin_daemon import PluginDaemonBasicResponse
  9. plugin_daemon_inner_api_baseurl = dify_config.PLUGIN_API_URL
  10. plugin_daemon_inner_api_key = dify_config.INNER_API_KEY_FOR_PLUGIN
  11. T = TypeVar("T", bound=(BaseModel | dict))
  12. class BasePluginManager:
  13. def _request(
  14. self, method: str, path: str, headers: dict | None = None, data: bytes | None = None, stream: bool = False
  15. ) -> requests.Response:
  16. """
  17. Make a request to the plugin daemon inner API.
  18. """
  19. url = URL(str(plugin_daemon_inner_api_baseurl)) / path
  20. headers = headers or {}
  21. headers["X-Api-Key"] = plugin_daemon_inner_api_key
  22. response = requests.request(method=method, url=str(url), headers=headers, data=data, stream=stream)
  23. return response
  24. def _stream_request(
  25. self, method: str, path: str, headers: dict | None = None, data: bytes | None = None
  26. ) -> Generator[bytes, None, None]:
  27. """
  28. Make a stream request to the plugin daemon inner API
  29. """
  30. response = self._request(method, path, headers, data, stream=True)
  31. yield from response.iter_lines()
  32. def _stream_request_with_model(
  33. self,
  34. method: str,
  35. path: str,
  36. type: type[T],
  37. headers: dict | None = None,
  38. data: bytes | None = None,
  39. ) -> Generator[T, None, None]:
  40. """
  41. Make a stream request to the plugin daemon inner API and yield the response as a model.
  42. """
  43. for line in self._stream_request(method, path, headers, data):
  44. yield type(**json.loads(line))
  45. def _request_with_model(
  46. self, method: str, path: str, type: type[T], headers: dict | None = None, data: bytes | None = None
  47. ) -> T:
  48. """
  49. Make a request to the plugin daemon inner API and return the response as a model.
  50. """
  51. response = self._request(method, path, headers, data)
  52. return type(**response.json())
  53. def _request_with_plugin_daemon_response(
  54. self, method: str, path: str, type: type[T], headers: dict | None = None, data: bytes | None = None
  55. ) -> T:
  56. """
  57. Make a request to the plugin daemon inner API and return the response as a model.
  58. """
  59. response = self._request(method, path, headers, data)
  60. rep = PluginDaemonBasicResponse[type](**response.json())
  61. if rep.code != 0:
  62. raise Exception(f"got error from plugin daemon: {rep.message}, code: {rep.code}")
  63. if rep.data is None:
  64. raise Exception("got empty data from plugin daemon")
  65. return rep.data
  66. def _request_with_plugin_daemon_response_stream(
  67. self, method: str, path: str, type: type[T], headers: dict | None = None, data: bytes | None = None
  68. ) -> Generator[T, None, None]:
  69. """
  70. Make a stream request to the plugin daemon inner API and yield the response as a model.
  71. """
  72. for line in self._stream_request(method, path, headers, data):
  73. line_data = json.loads(line)
  74. rep = PluginDaemonBasicResponse[type](**line_data)
  75. if rep.code != 0:
  76. raise Exception(f"got error from plugin daemon: {rep.message}, code: {rep.code}")
  77. if rep.data is None:
  78. raise Exception("got empty data from plugin daemon")
  79. yield rep.data