| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 | package http_requestsimport (	"bufio"	"bytes"	"encoding/json"	"fmt"	"io"	"net/http"	"time"	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream")func parseJsonBody(resp *http.Response, ret interface{}) error {	defer resp.Body.Close()	json_decoder := json.NewDecoder(resp.Body)	return json_decoder.Decode(ret)}func RequestAndParse[T any](client *http.Client, url string, method string, options ...HttpOptions) (*T, error) {	var ret T	resp, err := Request(client, url, method, options...)	if err != nil {		return nil, err	}	// get read timeout	read_timeout := int64(60000)	for _, option := range options {		if option.Type == "read_timeout" {			read_timeout = option.Value.(int64)			break		}	}	time.AfterFunc(time.Millisecond*time.Duration(read_timeout), func() {		// close the response body if timeout		resp.Body.Close()	})	err = parseJsonBody(resp, &ret)	if err != nil {		return nil, err	}	return &ret, nil}func GetAndParse[T any](client *http.Client, url string, options ...HttpOptions) (*T, error) {	return RequestAndParse[T](client, url, "GET", options...)}func PostAndParse[T any](client *http.Client, url string, options ...HttpOptions) (*T, error) {	return RequestAndParse[T](client, url, "POST", options...)}func PutAndParse[T any](client *http.Client, url string, options ...HttpOptions) (*T, error) {	return RequestAndParse[T](client, url, "PUT", options...)}func DeleteAndParse[T any](client *http.Client, url string, options ...HttpOptions) (*T, error) {	return RequestAndParse[T](client, url, "DELETE", options...)}func PatchAndParse[T any](client *http.Client, url string, options ...HttpOptions) (*T, error) {	return RequestAndParse[T](client, url, "PATCH", options...)}func RequestAndParseStream[T any](client *http.Client, url string, method string, options ...HttpOptions) (*stream.StreamResponse[T], error) {	resp, err := Request(client, url, method, options...)	if err != nil {		return nil, err	}	if resp.StatusCode != http.StatusOK {		defer resp.Body.Close()		error_text, _ := io.ReadAll(resp.Body)		return nil, fmt.Errorf("request failed with status code: %d and respond with: %s", resp.StatusCode, error_text)	}	ch := stream.NewStreamResponse[T](1024)	// get read timeout	read_timeout := int64(60000)	for _, option := range options {		if option.Type == "read_timeout" {			read_timeout = option.Value.(int64)			break		}	}	time.AfterFunc(time.Millisecond*time.Duration(read_timeout), func() {		// close the response body if timeout		resp.Body.Close()	})	routine.Submit(func() {		scanner := bufio.NewScanner(resp.Body)		defer resp.Body.Close()		for scanner.Scan() {			data := scanner.Bytes()			if len(data) == 0 {				continue			}			if bytes.HasPrefix(data, []byte("data: ")) {				// split				data = data[6:]			}			// unmarshal			t, err := parser.UnmarshalJsonBytes[T](data)			if err != nil {				ch.WriteError(err)				break			}			ch.Write(t)		}		ch.Close()	})	return ch, nil}func GetAndParseStream[T any](client *http.Client, url string, options ...HttpOptions) (*stream.StreamResponse[T], error) {	return RequestAndParseStream[T](client, url, "GET", options...)}func PostAndParseStream[T any](client *http.Client, url string, options ...HttpOptions) (*stream.StreamResponse[T], error) {	return RequestAndParseStream[T](client, url, "POST", options...)}func PutAndParseStream[T any](client *http.Client, url string, options ...HttpOptions) (*stream.StreamResponse[T], error) {	return RequestAndParseStream[T](client, url, "PUT", options...)}
 |