123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- package aws_manager
- import (
- "context"
- "fmt"
- "io"
- "net/http"
- "net/url"
- "time"
- "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
- )
- // consume data from data stream
- func (r *AWSPluginRuntime) consume() {
- for {
- select {
- case data := <-r.data_stream:
- fmt.Println(data)
- }
- }
- }
- func (r *AWSPluginRuntime) Listen(session_id string) *entities.BytesIOListener {
- l := entities.NewIOListener[[]byte]()
- l.OnClose(func() {
- // close the pipe writer
- writer, exists := r.session_pool.Load(session_id)
- if exists {
- writer.Close()
- }
- })
- return l
- }
- func (r *AWSPluginRuntime) Write(session_id string, data []byte) {
- // check if session exists
- var pw *io.PipeWriter
- var exists bool
- if pw, exists = r.session_pool.Load(session_id); !exists {
- url, err := url.JoinPath(r.lambda_url, "invoke")
- if err != nil {
- r.Error(fmt.Sprintf("Error creating request: %v", err))
- return
- }
- // create a new http request here
- npr, npw := io.Pipe()
- r.session_pool.Store(session_id, npw)
- pw = npw
- // create a new http request
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
- req, err := http.NewRequestWithContext(ctx, "POST", url, npr)
- if err != nil {
- r.Error(fmt.Sprintf("Error creating request: %v", err))
- return
- }
- req.Header.Set("Content-Type", "application/octet-stream")
- req.Header.Set("Accept", "application/octet-stream")
- routine.Submit(func() {
- response, err := http.DefaultClient.Do(req)
- if err != nil {
- r.Error(fmt.Sprintf("Error sending request to aws lambda: %v", err))
- return
- }
- // write to data stream
- for {
- buf := make([]byte, 1024)
- n, err := response.Body.Read(buf)
- if err != nil {
- if err == io.EOF {
- break
- } else {
- r.Error(fmt.Sprintf("Error reading response from aws lambda: %v", err))
- break
- }
- }
- // write to data stream
- select {
- case r.data_stream <- buf[:n]:
- default:
- r.Error("Data stream is full")
- }
- }
- // remove the session from the pool
- r.session_pool.Delete(session_id)
- })
- }
- if pw != nil {
- if _, err := pw.Write(data); err != nil {
- r.Error(fmt.Sprintf("Error writing to pipe writer: %v", err))
- }
- }
- }
|