http_warpper.go 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package requests
  2. import (
  3. "bufio"
  4. "bytes"
  5. "encoding/json"
  6. "net/http"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  10. )
  11. func parseJsonBody(resp *http.Response, ret interface{}) error {
  12. defer resp.Body.Close()
  13. json_decoder := json.NewDecoder(resp.Body)
  14. return json_decoder.Decode(ret)
  15. }
  16. func RequestAndParse[T any](client *http.Client, url string, method string, options ...HttpOptions) (*T, error) {
  17. var ret T
  18. resp, err := Request(client, url, method, options...)
  19. if err != nil {
  20. return nil, err
  21. }
  22. err = parseJsonBody(resp, &ret)
  23. if err != nil {
  24. return nil, err
  25. }
  26. return &ret, nil
  27. }
  28. func GetAndParse[T any](client *http.Client, url string, options ...HttpOptions) (*T, error) {
  29. return RequestAndParse[T](client, url, "GET", options...)
  30. }
  31. func PostAndParse[T any](client *http.Client, url string, options ...HttpOptions) (*T, error) {
  32. return RequestAndParse[T](client, url, "POST", options...)
  33. }
  34. func PutAndParse[T any](client *http.Client, url string, options ...HttpOptions) (*T, error) {
  35. return RequestAndParse[T](client, url, "PUT", options...)
  36. }
  37. func DeleteAndParse[T any](client *http.Client, url string, options ...HttpOptions) (*T, error) {
  38. return RequestAndParse[T](client, url, "DELETE", options...)
  39. }
  40. func PatchAndParse[T any](client *http.Client, url string, options ...HttpOptions) (*T, error) {
  41. return RequestAndParse[T](client, url, "PATCH", options...)
  42. }
  43. func RequestAndParseStream[T any](client *http.Client, url string, method string, options ...HttpOptions) (*stream.StreamResponse[T], error) {
  44. resp, err := Request(client, url, method, options...)
  45. if err != nil {
  46. return nil, err
  47. }
  48. ch := stream.NewStreamResponse[T](1024)
  49. routine.Submit(func() {
  50. scanner := bufio.NewScanner(resp.Body)
  51. for scanner.Scan() {
  52. data := scanner.Bytes()
  53. if bytes.HasPrefix(data, []byte("data: ")) {
  54. // split
  55. data = data[6:]
  56. // unmarshal
  57. t, err := parser.UnmarshalJsonBytes[T](data)
  58. if err != nil {
  59. continue
  60. }
  61. ch.Write(t)
  62. }
  63. }
  64. ch.Close()
  65. })
  66. return ch, nil
  67. }
  68. func GetAndParseStream[T any](client *http.Client, url string, options ...HttpOptions) (*stream.StreamResponse[T], error) {
  69. return RequestAndParseStream[T](client, url, "GET", options...)
  70. }
  71. func PostAndParseStream[T any](client *http.Client, url string, options ...HttpOptions) (*stream.StreamResponse[T], error) {
  72. return RequestAndParseStream[T](client, url, "POST", options...)
  73. }
  74. func PutAndParseStream[T any](client *http.Client, url string, options ...HttpOptions) (*stream.StreamResponse[T], error) {
  75. return RequestAndParseStream[T](client, url, "PUT", options...)
  76. }