io.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package aws_manager
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "net/http"
  7. "net/url"
  8. "time"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  11. )
  12. // consume data from data stream
  13. func (r *AWSPluginRuntime) consume() {
  14. for {
  15. select {
  16. case data := <-r.data_stream:
  17. fmt.Println(data)
  18. }
  19. }
  20. }
  21. func (r *AWSPluginRuntime) Listen(session_id string) *entities.BytesIOListener {
  22. l := entities.NewIOListener[[]byte]()
  23. l.OnClose(func() {
  24. // close the pipe writer
  25. writer, exists := r.session_pool.Load(session_id)
  26. if exists {
  27. writer.Close()
  28. }
  29. })
  30. return l
  31. }
  32. func (r *AWSPluginRuntime) Write(session_id string, data []byte) {
  33. // check if session exists
  34. var pw *io.PipeWriter
  35. var exists bool
  36. if pw, exists = r.session_pool.Load(session_id); !exists {
  37. url, err := url.JoinPath(r.lambda_url, "invoke")
  38. if err != nil {
  39. r.Error(fmt.Sprintf("Error creating request: %v", err))
  40. return
  41. }
  42. // create a new http request here
  43. npr, npw := io.Pipe()
  44. r.session_pool.Store(session_id, npw)
  45. pw = npw
  46. // create a new http request
  47. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  48. defer cancel()
  49. req, err := http.NewRequestWithContext(ctx, "POST", url, npr)
  50. if err != nil {
  51. r.Error(fmt.Sprintf("Error creating request: %v", err))
  52. return
  53. }
  54. req.Header.Set("Content-Type", "application/octet-stream")
  55. req.Header.Set("Accept", "application/octet-stream")
  56. routine.Submit(func() {
  57. response, err := http.DefaultClient.Do(req)
  58. if err != nil {
  59. r.Error(fmt.Sprintf("Error sending request to aws lambda: %v", err))
  60. return
  61. }
  62. // write to data stream
  63. for {
  64. buf := make([]byte, 1024)
  65. n, err := response.Body.Read(buf)
  66. if err != nil {
  67. if err == io.EOF {
  68. break
  69. } else {
  70. r.Error(fmt.Sprintf("Error reading response from aws lambda: %v", err))
  71. break
  72. }
  73. }
  74. // write to data stream
  75. select {
  76. case r.data_stream <- buf[:n]:
  77. default:
  78. r.Error("Data stream is full")
  79. }
  80. }
  81. // remove the session from the pool
  82. r.session_pool.Delete(session_id)
  83. })
  84. }
  85. if pw != nil {
  86. if _, err := pw.Write(data); err != nil {
  87. r.Error(fmt.Sprintf("Error writing to pipe writer: %v", err))
  88. }
  89. }
  90. }