io.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package aws_manager
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "net/http"
  7. "net/url"
  8. "time"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  10. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  11. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  12. )
  13. // consume data from data stream
  14. func (r *AWSPluginRuntime) consume() {
  15. for {
  16. select {
  17. case data := <-r.data_stream:
  18. fmt.Println(data)
  19. }
  20. }
  21. }
  22. func (r *AWSPluginRuntime) Listen(session_id string) *entities.Broadcast[plugin_entities.SessionMessage] {
  23. l := entities.NewBroadcast[plugin_entities.SessionMessage]()
  24. l.OnClose(func() {
  25. // close the pipe writer
  26. writer, exists := r.session_pool.Load(session_id)
  27. if exists {
  28. writer.Close()
  29. }
  30. })
  31. return l
  32. }
  33. func (r *AWSPluginRuntime) Write(session_id string, data []byte) {
  34. // check if session exists
  35. var pw *io.PipeWriter
  36. var exists bool
  37. if pw, exists = r.session_pool.Load(session_id); !exists {
  38. url, err := url.JoinPath(r.lambda_url, "invoke")
  39. if err != nil {
  40. r.Error(fmt.Sprintf("Error creating request: %v", err))
  41. return
  42. }
  43. // create a new http request here
  44. npr, npw := io.Pipe()
  45. r.session_pool.Store(session_id, npw)
  46. pw = npw
  47. // create a new http request
  48. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  49. defer cancel()
  50. req, err := http.NewRequestWithContext(ctx, "POST", url, npr)
  51. if err != nil {
  52. r.Error(fmt.Sprintf("Error creating request: %v", err))
  53. return
  54. }
  55. req.Header.Set("Content-Type", "application/octet-stream")
  56. req.Header.Set("Accept", "application/octet-stream")
  57. routine.Submit(func() {
  58. response, err := http.DefaultClient.Do(req)
  59. if err != nil {
  60. r.Error(fmt.Sprintf("Error sending request to aws lambda: %v", err))
  61. return
  62. }
  63. // write to data stream
  64. for {
  65. buf := make([]byte, 1024)
  66. n, err := response.Body.Read(buf)
  67. if err != nil {
  68. if err == io.EOF {
  69. break
  70. } else {
  71. r.Error(fmt.Sprintf("Error reading response from aws lambda: %v", err))
  72. break
  73. }
  74. }
  75. // write to data stream
  76. select {
  77. case r.data_stream <- buf[:n]:
  78. default:
  79. r.Error("Data stream is full")
  80. }
  81. }
  82. // remove the session from the pool
  83. r.session_pool.Delete(session_id)
  84. })
  85. }
  86. if pw != nil {
  87. if _, err := pw.Write(data); err != nil {
  88. r.Error(fmt.Sprintf("Error writing to pipe writer: %v", err))
  89. }
  90. }
  91. }