request.go 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package backwards_invocation
  2. import (
  3. "fmt"
  4. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  6. )
  7. type BackwardsInvocationType = dify_invocation.InvokeType
  8. type BackwardsInvocationWriter interface {
  9. Write(event session_manager.PLUGIN_IN_STREAM_EVENT, data any) error
  10. Done()
  11. }
  12. // BackwardsInvocation is a struct that represents a backwards invocation
  13. // For different plugin runtime type, stream handler is different
  14. // 1. Local and Remote: they are both full duplex, multiplexing could be implemented by different session
  15. // different session share the same physical channel.
  16. // 2. AWS: it is half duplex, one request could have multiple channels, we need to combine them into one stream
  17. //
  18. // That's why it has a writer, for different transaction, the writer is unique
  19. type BackwardsInvocation struct {
  20. typ BackwardsInvocationType
  21. id string
  22. detailed_request map[string]any
  23. session *session_manager.Session
  24. // writer is the writer that writes the data to the session
  25. // NOTE: write operation will not raise errors
  26. writer BackwardsInvocationWriter
  27. }
  28. func NewBackwardsInvocation(
  29. typ BackwardsInvocationType,
  30. id string,
  31. session *session_manager.Session,
  32. writer BackwardsInvocationWriter,
  33. detailed_request map[string]any,
  34. ) *BackwardsInvocation {
  35. return &BackwardsInvocation{
  36. typ: typ,
  37. id: id,
  38. detailed_request: detailed_request,
  39. session: session,
  40. writer: writer,
  41. }
  42. }
  43. func (bi *BackwardsInvocation) GetID() string {
  44. return bi.id
  45. }
  46. func (bi *BackwardsInvocation) WriteError(err error) {
  47. bi.writer.Write(
  48. session_manager.PLUGIN_IN_STREAM_EVENT_RESPONSE,
  49. NewErrorEvent(bi.id, err.Error()),
  50. )
  51. }
  52. func (bi *BackwardsInvocation) WriteResponse(message string, data any) {
  53. bi.writer.Write(
  54. session_manager.PLUGIN_IN_STREAM_EVENT_RESPONSE,
  55. NewResponseEvent(bi.id, message, data),
  56. )
  57. }
  58. func (bi *BackwardsInvocation) EndResponse() {
  59. bi.writer.Write(
  60. session_manager.PLUGIN_IN_STREAM_EVENT_RESPONSE,
  61. NewEndEvent(bi.id),
  62. )
  63. bi.writer.Done()
  64. }
  65. func (bi *BackwardsInvocation) Type() BackwardsInvocationType {
  66. return bi.typ
  67. }
  68. func (bi *BackwardsInvocation) RequestData() map[string]any {
  69. return bi.detailed_request
  70. }
  71. func (bi *BackwardsInvocation) TenantID() (string, error) {
  72. if bi.session == nil {
  73. return "", fmt.Errorf("session is nil")
  74. }
  75. return bi.session.TenantID(), nil
  76. }
  77. func (bi *BackwardsInvocation) UserID() (string, error) {
  78. if bi.session == nil {
  79. return "", fmt.Errorf("session is nil")
  80. }
  81. return bi.session.UserID(), nil
  82. }