| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 | package http_requestsimport (	"bufio"	"bytes"	"encoding/json"	"fmt"	"io"	"net/http"	"time"	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream")func parseJsonBody(resp *http.Response, ret interface{}) error {	defer resp.Body.Close()	jsonDecoder := json.NewDecoder(resp.Body)	return jsonDecoder.Decode(ret)}func RequestAndParse[T any](client *http.Client, url string, method string, options ...HttpOptions) (*T, error) {	var ret T	// check if ret is a map, if so, create a new map	if _, ok := any(ret).(map[string]any); ok {		ret = *new(T)	}	resp, err := Request(client, url, method, options...)	if err != nil {		return nil, err	}	// get read timeout	readTimeout := int64(60000)	for _, option := range options {		if option.Type == "read_timeout" {			readTimeout = option.Value.(int64)			break		}	}	time.AfterFunc(time.Millisecond*time.Duration(readTimeout), func() {		// close the response body if timeout		resp.Body.Close()	})	err = parseJsonBody(resp, &ret)	if err != nil {		return nil, err	}	return &ret, nil}func GetAndParse[T any](client *http.Client, url string, options ...HttpOptions) (*T, error) {	return RequestAndParse[T](client, url, "GET", options...)}func PostAndParse[T any](client *http.Client, url string, options ...HttpOptions) (*T, error) {	return RequestAndParse[T](client, url, "POST", options...)}func PutAndParse[T any](client *http.Client, url string, options ...HttpOptions) (*T, error) {	return RequestAndParse[T](client, url, "PUT", options...)}func DeleteAndParse[T any](client *http.Client, url string, options ...HttpOptions) (*T, error) {	return RequestAndParse[T](client, url, "DELETE", options...)}func PatchAndParse[T any](client *http.Client, url string, options ...HttpOptions) (*T, error) {	return RequestAndParse[T](client, url, "PATCH", options...)}func RequestAndParseStream[T any](client *http.Client, url string, method string, options ...HttpOptions) (*stream.Stream[T], error) {	resp, err := Request(client, url, method, options...)	if err != nil {		return nil, err	}	if resp.StatusCode != http.StatusOK {		defer resp.Body.Close()		errorText, _ := io.ReadAll(resp.Body)		return nil, fmt.Errorf("request failed with status code: %d and respond with: %s", resp.StatusCode, errorText)	}	ch := stream.NewStream[T](1024)	// get read timeout	readTimeout := int64(60000)	raiseErrorWhenStreamDataNotMatch := false	for _, option := range options {		if option.Type == "read_timeout" {			readTimeout = option.Value.(int64)			break		} else if option.Type == "raiseErrorWhenStreamDataNotMatch" {			raiseErrorWhenStreamDataNotMatch = option.Value.(bool)		}	}	time.AfterFunc(time.Millisecond*time.Duration(readTimeout), func() {		// close the response body if timeout		resp.Body.Close()	})	routine.Submit(map[string]string{		"module":   "http_requests",		"function": "RequestAndParseStream",	}, func() {		scanner := bufio.NewScanner(resp.Body)		defer resp.Body.Close()		for scanner.Scan() {			data := scanner.Bytes()			if len(data) == 0 {				continue			}			if bytes.HasPrefix(data, []byte("data:")) {				// split				data = data[5:]			}			if bytes.HasPrefix(data, []byte("event:")) {				// TODO: handle event				continue			}			// trim space			data = bytes.TrimSpace(data)			// unmarshal			t, err := parser.UnmarshalJsonBytes[T](data)			if err != nil {				if raiseErrorWhenStreamDataNotMatch {					ch.WriteError(err)					break				} else {					log.Warn("stream data not match for %s, got %s", url, string(data))				}				continue			}			ch.Write(t)		}		ch.Close()	})	return ch, nil}func GetAndParseStream[T any](client *http.Client, url string, options ...HttpOptions) (*stream.Stream[T], error) {	return RequestAndParseStream[T](client, url, "GET", options...)}func PostAndParseStream[T any](client *http.Client, url string, options ...HttpOptions) (*stream.Stream[T], error) {	return RequestAndParseStream[T](client, url, "POST", options...)}func PutAndParseStream[T any](client *http.Client, url string, options ...HttpOptions) (*stream.Stream[T], error) {	return RequestAndParseStream[T](client, url, "PUT", options...)}
 |