request.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package backwards_invocation
  2. import (
  3. "fmt"
  4. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  6. )
  7. type BackwardsInvocationType = dify_invocation.InvokeType
  8. type BackwardsInvocationWriter interface {
  9. Write(event session_manager.PLUGIN_IN_STREAM_EVENT, data any) error
  10. Done()
  11. }
  12. // BackwardsInvocation is a struct that represents a backwards invocation
  13. // For different plugin runtime type, stream handler is different
  14. // 1. Local and Remote: they are both full duplex, multiplexing could be implemented by different session
  15. // different session share the same physical channel.
  16. // 2. AWS: it is half duplex, one request could have multiple channels, we need to combine them into one stream
  17. //
  18. // That's why it has a writer, for different transaction, the writer is unique
  19. type BackwardsInvocation struct {
  20. typ BackwardsInvocationType
  21. id string
  22. detailedRequest map[string]any
  23. session *session_manager.Session
  24. // writer is the writer that writes the data to the session
  25. // NOTE: write operation will not raise errors
  26. writer BackwardsInvocationWriter
  27. // backwardsInvocation is the backwards invocation that is used to invoke dify
  28. backwardsInvocation dify_invocation.BackwardsInvocation
  29. }
  30. func NewBackwardsInvocation(
  31. typ BackwardsInvocationType,
  32. id string,
  33. session *session_manager.Session,
  34. writer BackwardsInvocationWriter,
  35. detailedRequest map[string]any,
  36. ) *BackwardsInvocation {
  37. return &BackwardsInvocation{
  38. typ: typ,
  39. id: id,
  40. detailedRequest: detailedRequest,
  41. session: session,
  42. writer: writer,
  43. backwardsInvocation: session.BackwardsInvocation(),
  44. }
  45. }
  46. func (bi *BackwardsInvocation) GetID() string {
  47. return bi.id
  48. }
  49. func (bi *BackwardsInvocation) WriteError(err error) {
  50. bi.writer.Write(
  51. session_manager.PLUGIN_IN_STREAM_EVENT_RESPONSE,
  52. NewErrorEvent(bi.id, err.Error()),
  53. )
  54. }
  55. func (bi *BackwardsInvocation) WriteResponse(message string, data any) {
  56. bi.writer.Write(
  57. session_manager.PLUGIN_IN_STREAM_EVENT_RESPONSE,
  58. NewResponseEvent(bi.id, message, data),
  59. )
  60. }
  61. func (bi *BackwardsInvocation) EndResponse() {
  62. bi.writer.Write(
  63. session_manager.PLUGIN_IN_STREAM_EVENT_RESPONSE,
  64. NewEndEvent(bi.id),
  65. )
  66. bi.writer.Done()
  67. }
  68. func (bi *BackwardsInvocation) Type() BackwardsInvocationType {
  69. return bi.typ
  70. }
  71. func (bi *BackwardsInvocation) RequestData() map[string]any {
  72. return bi.detailedRequest
  73. }
  74. func (bi *BackwardsInvocation) TenantID() (string, error) {
  75. if bi.session == nil {
  76. return "", fmt.Errorf("session is nil")
  77. }
  78. return bi.session.TenantID, nil
  79. }
  80. func (bi *BackwardsInvocation) UserID() (string, error) {
  81. if bi.session == nil {
  82. return "", fmt.Errorf("session is nil")
  83. }
  84. return bi.session.UserID, nil
  85. }