1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- package backwards_invocation
- import (
- "fmt"
- "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
- "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
- )
- type BackwardsInvocationType = dify_invocation.InvokeType
- type BackwardsInvocationWriter interface {
- Write(event session_manager.PLUGIN_IN_STREAM_EVENT, data any) error
- Done()
- }
- // BackwardsInvocation is a struct that represents a backwards invocation
- // For different plugin runtime type, stream handler is different
- // 1. Local and Remote: they are both full duplex, multiplexing could be implemented by different session
- // different session share the same physical channel.
- // 2. AWS: it is half duplex, one request could have multiple channels, we need to combine them into one stream
- //
- // That's why it has a writer, for different transaction, the writer is unique
- type BackwardsInvocation struct {
- typ BackwardsInvocationType
- id string
- detailed_request map[string]any
- session *session_manager.Session
- // writer is the writer that writes the data to the session
- // NOTE: write operation will not raise errors
- writer BackwardsInvocationWriter
- }
- func NewBackwardsInvocation(
- typ BackwardsInvocationType,
- id string,
- session *session_manager.Session,
- writer BackwardsInvocationWriter,
- detailed_request map[string]any,
- ) *BackwardsInvocation {
- return &BackwardsInvocation{
- typ: typ,
- id: id,
- detailed_request: detailed_request,
- session: session,
- writer: writer,
- }
- }
- func (bi *BackwardsInvocation) GetID() string {
- return bi.id
- }
- func (bi *BackwardsInvocation) WriteError(err error) {
- bi.writer.Write(
- session_manager.PLUGIN_IN_STREAM_EVENT_RESPONSE,
- NewErrorEvent(bi.id, err.Error()),
- )
- }
- func (bi *BackwardsInvocation) WriteResponse(message string, data any) {
- bi.writer.Write(
- session_manager.PLUGIN_IN_STREAM_EVENT_RESPONSE,
- NewResponseEvent(bi.id, message, data),
- )
- }
- func (bi *BackwardsInvocation) EndResponse() {
- bi.writer.Write(
- session_manager.PLUGIN_IN_STREAM_EVENT_RESPONSE,
- NewEndEvent(bi.id),
- )
- bi.writer.Done()
- }
- func (bi *BackwardsInvocation) Type() BackwardsInvocationType {
- return bi.typ
- }
- func (bi *BackwardsInvocation) RequestData() map[string]any {
- return bi.detailed_request
- }
- func (bi *BackwardsInvocation) TenantID() (string, error) {
- if bi.session == nil {
- return "", fmt.Errorf("session is nil")
- }
- return bi.session.TenantID(), nil
- }
- func (bi *BackwardsInvocation) UserID() (string, error) {
- if bi.session == nil {
- return "", fmt.Errorf("session is nil")
- }
- return bi.session.UserID(), nil
- }
|