task.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567
  1. package backwards_invocation
  2. import (
  3. "encoding/hex"
  4. "errors"
  5. "fmt"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  7. "github.com/langgenius/dify-plugin-daemon/internal/core/persistence"
  8. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  9. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  10. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  11. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  12. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  13. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  14. )
  15. // returns error only if payload is not correct
  16. func InvokeDify(
  17. declaration *plugin_entities.PluginDeclaration,
  18. invoke_from access_types.PluginAccessType,
  19. session *session_manager.Session,
  20. writer BackwardsInvocationWriter,
  21. data []byte,
  22. ) error {
  23. // unmarshal invoke data
  24. request, err := parser.UnmarshalJsonBytes2Map(data)
  25. if err != nil {
  26. return fmt.Errorf("unmarshal invoke request failed: %s", err.Error())
  27. }
  28. if request == nil {
  29. return fmt.Errorf("invoke request is empty")
  30. }
  31. // prepare invocation arguments
  32. requestHandle, err := prepareDifyInvocationArguments(
  33. session,
  34. writer,
  35. request,
  36. )
  37. if err != nil {
  38. return err
  39. }
  40. if invoke_from == access_types.PLUGIN_ACCESS_TYPE_MODEL {
  41. requestHandle.WriteError(fmt.Errorf("you can not invoke dify from %s", invoke_from))
  42. requestHandle.EndResponse()
  43. return nil
  44. }
  45. // check permission
  46. if err := checkPermission(declaration, requestHandle); err != nil {
  47. requestHandle.WriteError(err)
  48. requestHandle.EndResponse()
  49. return nil
  50. }
  51. // dispatch invocation task
  52. routine.Submit(func() {
  53. dispatchDifyInvocationTask(requestHandle)
  54. defer requestHandle.EndResponse()
  55. })
  56. return nil
  57. }
  58. var (
  59. permissionMapping = map[dify_invocation.InvokeType]map[string]any{
  60. dify_invocation.INVOKE_TYPE_TOOL: {
  61. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  62. return declaration.Resource.Permission.AllowInvokeTool()
  63. },
  64. "error": "permission denied, you need to enable tool access in plugin manifest",
  65. },
  66. dify_invocation.INVOKE_TYPE_LLM: {
  67. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  68. return declaration.Resource.Permission.AllowInvokeLLM()
  69. },
  70. "error": "permission denied, you need to enable llm access in plugin manifest",
  71. },
  72. dify_invocation.INVOKE_TYPE_TEXT_EMBEDDING: {
  73. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  74. return declaration.Resource.Permission.AllowInvokeTextEmbedding()
  75. },
  76. "error": "permission denied, you need to enable text-embedding access in plugin manifest",
  77. },
  78. dify_invocation.INVOKE_TYPE_RERANK: {
  79. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  80. return declaration.Resource.Permission.AllowInvokeRerank()
  81. },
  82. "error": "permission denied, you need to enable rerank access in plugin manifest",
  83. },
  84. dify_invocation.INVOKE_TYPE_TTS: {
  85. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  86. return declaration.Resource.Permission.AllowInvokeTTS()
  87. },
  88. "error": "permission denied, you need to enable tts access in plugin manifest",
  89. },
  90. dify_invocation.INVOKE_TYPE_SPEECH2TEXT: {
  91. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  92. return declaration.Resource.Permission.AllowInvokeSpeech2Text()
  93. },
  94. "error": "permission denied, you need to enable speech2text access in plugin manifest",
  95. },
  96. dify_invocation.INVOKE_TYPE_MODERATION: {
  97. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  98. return declaration.Resource.Permission.AllowInvokeModeration()
  99. },
  100. "error": "permission denied, you need to enable moderation access in plugin manifest",
  101. },
  102. dify_invocation.INVOKE_TYPE_NODE_PARAMETER_EXTRACTOR: {
  103. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  104. return declaration.Resource.Permission.AllowInvokeNode()
  105. },
  106. "error": "permission denied, you need to enable node access in plugin manifest",
  107. },
  108. dify_invocation.INVOKE_TYPE_NODE_QUESTION_CLASSIFIER: {
  109. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  110. return declaration.Resource.Permission.AllowInvokeNode()
  111. },
  112. "error": "permission denied, you need to enable node access in plugin manifest",
  113. },
  114. dify_invocation.INVOKE_TYPE_APP: {
  115. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  116. return declaration.Resource.Permission.AllowInvokeApp()
  117. },
  118. "error": "permission denied, you need to enable app access in plugin manifest",
  119. },
  120. dify_invocation.INVOKE_TYPE_STORAGE: {
  121. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  122. return declaration.Resource.Permission.AllowInvokeStorage()
  123. },
  124. "error": "permission denied, you need to enable storage access in plugin manifest",
  125. },
  126. dify_invocation.INVOKE_TYPE_SYSTEM_SUMMARY: {
  127. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  128. return declaration.Resource.Permission.AllowInvokeLLM()
  129. },
  130. "error": "permission denied, you need to enable llm access in plugin manifest",
  131. },
  132. dify_invocation.INVOKE_TYPE_UPLOAD_FILE: {
  133. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  134. return true
  135. },
  136. "error": "permission denied, you need to enable storage access in plugin manifest",
  137. },
  138. }
  139. )
  140. func checkPermission(runtime *plugin_entities.PluginDeclaration, requestHandle *BackwardsInvocation) error {
  141. permission, ok := permissionMapping[requestHandle.Type()]
  142. if !ok {
  143. return fmt.Errorf("unsupported invoke type: %s", requestHandle.Type())
  144. }
  145. permissionFunc, ok := permission["func"].(func(runtime *plugin_entities.PluginDeclaration) bool)
  146. if !ok {
  147. return fmt.Errorf("permission function not found: %s", requestHandle.Type())
  148. }
  149. if !permissionFunc(runtime) {
  150. return fmt.Errorf(permission["error"].(string))
  151. }
  152. return nil
  153. }
  154. func prepareDifyInvocationArguments(
  155. session *session_manager.Session,
  156. writer BackwardsInvocationWriter,
  157. request map[string]any,
  158. ) (*BackwardsInvocation, error) {
  159. typ, ok := request["type"].(string)
  160. if !ok {
  161. return nil, fmt.Errorf("invoke request missing type: %s", request)
  162. }
  163. // get request id
  164. backwardsRequestId, ok := request["backwards_request_id"].(string)
  165. if !ok {
  166. return nil, fmt.Errorf("invoke request missing request_id: %s", request)
  167. }
  168. // get request
  169. detailedRequest, ok := request["request"].(map[string]any)
  170. if !ok {
  171. return nil, fmt.Errorf("invoke request missing request: %s", request)
  172. }
  173. return NewBackwardsInvocation(
  174. BackwardsInvocationType(typ),
  175. backwardsRequestId,
  176. session,
  177. writer,
  178. detailedRequest,
  179. ), nil
  180. }
  181. var (
  182. dispatchMapping = map[dify_invocation.InvokeType]func(handle *BackwardsInvocation){
  183. dify_invocation.INVOKE_TYPE_TOOL: func(handle *BackwardsInvocation) {
  184. genericDispatchTask(handle, executeDifyInvocationToolTask)
  185. },
  186. dify_invocation.INVOKE_TYPE_LLM: func(handle *BackwardsInvocation) {
  187. genericDispatchTask(handle, executeDifyInvocationLLMTask)
  188. },
  189. dify_invocation.INVOKE_TYPE_TEXT_EMBEDDING: func(handle *BackwardsInvocation) {
  190. genericDispatchTask(handle, executeDifyInvocationTextEmbeddingTask)
  191. },
  192. dify_invocation.INVOKE_TYPE_RERANK: func(handle *BackwardsInvocation) {
  193. genericDispatchTask(handle, executeDifyInvocationRerankTask)
  194. },
  195. dify_invocation.INVOKE_TYPE_TTS: func(handle *BackwardsInvocation) {
  196. genericDispatchTask(handle, executeDifyInvocationTTSTask)
  197. },
  198. dify_invocation.INVOKE_TYPE_SPEECH2TEXT: func(handle *BackwardsInvocation) {
  199. genericDispatchTask(handle, executeDifyInvocationSpeech2TextTask)
  200. },
  201. dify_invocation.INVOKE_TYPE_MODERATION: func(handle *BackwardsInvocation) {
  202. genericDispatchTask(handle, executeDifyInvocationModerationTask)
  203. },
  204. dify_invocation.INVOKE_TYPE_APP: func(handle *BackwardsInvocation) {
  205. genericDispatchTask(handle, executeDifyInvocationAppTask)
  206. },
  207. dify_invocation.INVOKE_TYPE_NODE_PARAMETER_EXTRACTOR: func(handle *BackwardsInvocation) {
  208. genericDispatchTask(handle, executeDifyInvocationParameterExtractor)
  209. },
  210. dify_invocation.INVOKE_TYPE_NODE_QUESTION_CLASSIFIER: func(handle *BackwardsInvocation) {
  211. genericDispatchTask(handle, executeDifyInvocationQuestionClassifier)
  212. },
  213. dify_invocation.INVOKE_TYPE_STORAGE: func(handle *BackwardsInvocation) {
  214. genericDispatchTask(handle, executeDifyInvocationStorageTask)
  215. },
  216. dify_invocation.INVOKE_TYPE_SYSTEM_SUMMARY: func(handle *BackwardsInvocation) {
  217. genericDispatchTask(handle, executeDifyInvocationSystemSummaryTask)
  218. },
  219. dify_invocation.INVOKE_TYPE_UPLOAD_FILE: func(handle *BackwardsInvocation) {
  220. genericDispatchTask(handle, executeDifyInvocationUploadFileTask)
  221. },
  222. }
  223. )
  224. func genericDispatchTask[T any](
  225. handle *BackwardsInvocation,
  226. dispatch func(
  227. handle *BackwardsInvocation,
  228. request *T,
  229. ),
  230. ) {
  231. r, err := parser.MapToStruct[T](handle.RequestData())
  232. if err != nil {
  233. handle.WriteError(fmt.Errorf("unmarshal backwards invoke request failed: %s", err.Error()))
  234. return
  235. }
  236. dispatch(handle, r)
  237. }
  238. func dispatchDifyInvocationTask(handle *BackwardsInvocation) {
  239. requestData := handle.RequestData()
  240. tenantId, err := handle.TenantID()
  241. if err != nil {
  242. handle.WriteError(fmt.Errorf("get tenant id failed: %s", err.Error()))
  243. return
  244. }
  245. requestData["tenant_id"] = tenantId
  246. userId, err := handle.UserID()
  247. if err != nil {
  248. handle.WriteError(fmt.Errorf("get user id failed: %s", err.Error()))
  249. return
  250. }
  251. requestData["user_id"] = userId
  252. typ := handle.Type()
  253. requestData["type"] = typ
  254. for t, v := range dispatchMapping {
  255. if t == handle.Type() {
  256. v(handle)
  257. return
  258. }
  259. }
  260. handle.WriteError(fmt.Errorf("unsupported invoke type: %s", handle.Type()))
  261. }
  262. func executeDifyInvocationToolTask(
  263. handle *BackwardsInvocation,
  264. request *dify_invocation.InvokeToolRequest,
  265. ) {
  266. response, err := handle.backwardsInvocation.InvokeTool(request)
  267. if err != nil {
  268. handle.WriteError(fmt.Errorf("invoke tool failed: %s", err.Error()))
  269. return
  270. }
  271. for response.Next() {
  272. value, err := response.Read()
  273. if err != nil {
  274. handle.WriteError(fmt.Errorf("read tool response failed: %s", err.Error()))
  275. return
  276. }
  277. handle.WriteResponse("stream", value)
  278. }
  279. }
  280. func executeDifyInvocationLLMTask(
  281. handle *BackwardsInvocation,
  282. request *dify_invocation.InvokeLLMRequest,
  283. ) {
  284. response, err := handle.backwardsInvocation.InvokeLLM(request)
  285. if err != nil {
  286. handle.WriteError(fmt.Errorf("invoke llm model failed: %s", err.Error()))
  287. return
  288. }
  289. for response.Next() {
  290. value, err := response.Read()
  291. if err != nil {
  292. handle.WriteError(fmt.Errorf("read llm model failed: %s", err.Error()))
  293. return
  294. }
  295. handle.WriteResponse("stream", value)
  296. }
  297. }
  298. func executeDifyInvocationTextEmbeddingTask(
  299. handle *BackwardsInvocation,
  300. request *dify_invocation.InvokeTextEmbeddingRequest,
  301. ) {
  302. response, err := handle.backwardsInvocation.InvokeTextEmbedding(request)
  303. if err != nil {
  304. handle.WriteError(fmt.Errorf("invoke text-embedding model failed: %s", err.Error()))
  305. return
  306. }
  307. handle.WriteResponse("struct", response)
  308. }
  309. func executeDifyInvocationRerankTask(
  310. handle *BackwardsInvocation,
  311. request *dify_invocation.InvokeRerankRequest,
  312. ) {
  313. response, err := handle.backwardsInvocation.InvokeRerank(request)
  314. if err != nil {
  315. handle.WriteError(fmt.Errorf("invoke rerank model failed: %s", err.Error()))
  316. return
  317. }
  318. handle.WriteResponse("struct", response)
  319. }
  320. func executeDifyInvocationTTSTask(
  321. handle *BackwardsInvocation,
  322. request *dify_invocation.InvokeTTSRequest,
  323. ) {
  324. response, err := handle.backwardsInvocation.InvokeTTS(request)
  325. if err != nil {
  326. handle.WriteError(fmt.Errorf("invoke tts model failed: %s", err.Error()))
  327. return
  328. }
  329. for response.Next() {
  330. value, err := response.Read()
  331. if err != nil {
  332. handle.WriteError(fmt.Errorf("read tts model failed: %s", err.Error()))
  333. return
  334. }
  335. handle.WriteResponse("stream", value)
  336. }
  337. }
  338. func executeDifyInvocationSpeech2TextTask(
  339. handle *BackwardsInvocation,
  340. request *dify_invocation.InvokeSpeech2TextRequest,
  341. ) {
  342. response, err := handle.backwardsInvocation.InvokeSpeech2Text(request)
  343. if err != nil {
  344. handle.WriteError(fmt.Errorf("invoke speech2text model failed: %s", err.Error()))
  345. return
  346. }
  347. handle.WriteResponse("struct", response)
  348. }
  349. func executeDifyInvocationModerationTask(
  350. handle *BackwardsInvocation,
  351. request *dify_invocation.InvokeModerationRequest,
  352. ) {
  353. response, err := handle.backwardsInvocation.InvokeModeration(request)
  354. if err != nil {
  355. handle.WriteError(fmt.Errorf("invoke moderation model failed: %s", err.Error()))
  356. return
  357. }
  358. handle.WriteResponse("struct", response)
  359. }
  360. func executeDifyInvocationAppTask(
  361. handle *BackwardsInvocation,
  362. request *dify_invocation.InvokeAppRequest,
  363. ) {
  364. response, err := handle.backwardsInvocation.InvokeApp(request)
  365. if err != nil {
  366. handle.WriteError(fmt.Errorf("invoke app failed: %s", err.Error()))
  367. return
  368. }
  369. userId, err := handle.UserID()
  370. if err != nil {
  371. handle.WriteError(fmt.Errorf("get user id failed: %s", err.Error()))
  372. return
  373. }
  374. request.User = userId
  375. for response.Next() {
  376. value, err := response.Read()
  377. if err != nil {
  378. handle.WriteError(fmt.Errorf("read app failed: %s", err.Error()))
  379. return
  380. }
  381. handle.WriteResponse("stream", value)
  382. }
  383. }
  384. func executeDifyInvocationParameterExtractor(
  385. handle *BackwardsInvocation,
  386. request *dify_invocation.InvokeParameterExtractorRequest,
  387. ) {
  388. response, err := handle.backwardsInvocation.InvokeParameterExtractor(request)
  389. if err != nil {
  390. handle.WriteError(fmt.Errorf("invoke parameter extractor failed: %s", err.Error()))
  391. return
  392. }
  393. handle.WriteResponse("struct", response)
  394. }
  395. func executeDifyInvocationQuestionClassifier(
  396. handle *BackwardsInvocation,
  397. request *dify_invocation.InvokeQuestionClassifierRequest,
  398. ) {
  399. response, err := handle.backwardsInvocation.InvokeQuestionClassifier(request)
  400. if err != nil {
  401. handle.WriteError(fmt.Errorf("invoke question classifier failed: %s", err.Error()))
  402. return
  403. }
  404. handle.WriteResponse("struct", response)
  405. }
  406. func executeDifyInvocationStorageTask(
  407. handle *BackwardsInvocation,
  408. request *dify_invocation.InvokeStorageRequest,
  409. ) {
  410. if handle.session == nil {
  411. handle.WriteError(fmt.Errorf("session not found"))
  412. return
  413. }
  414. persistence := persistence.GetPersistence()
  415. if persistence == nil {
  416. handle.WriteError(fmt.Errorf("persistence not found"))
  417. return
  418. }
  419. tenantId, err := handle.TenantID()
  420. if err != nil {
  421. handle.WriteError(fmt.Errorf("get tenant id failed: %s", err.Error()))
  422. return
  423. }
  424. pluginId := handle.session.PluginUniqueIdentifier
  425. if request.Opt == dify_invocation.STORAGE_OPT_GET {
  426. data, err := persistence.Load(tenantId, pluginId.PluginID(), request.Key)
  427. if err != nil {
  428. log.Error("load data failed: %s", err.Error())
  429. handle.WriteError(errors.New("load data failed, please check if the key is correct or you have not set it"))
  430. return
  431. }
  432. handle.WriteResponse("struct", map[string]any{
  433. "data": hex.EncodeToString(data),
  434. })
  435. } else if request.Opt == dify_invocation.STORAGE_OPT_SET {
  436. data, err := hex.DecodeString(request.Value)
  437. if err != nil {
  438. handle.WriteError(fmt.Errorf("decode data failed: %s", err.Error()))
  439. return
  440. }
  441. session := handle.session
  442. if session == nil {
  443. handle.WriteError(fmt.Errorf("session not found"))
  444. return
  445. }
  446. declaration := session.Declaration
  447. if declaration == nil {
  448. handle.WriteError(fmt.Errorf("declaration not found"))
  449. return
  450. }
  451. resource := declaration.Resource.Permission
  452. if resource == nil {
  453. handle.WriteError(fmt.Errorf("resource not found"))
  454. return
  455. }
  456. maxStorageSize := int64(-1)
  457. storage := resource.Storage
  458. if storage != nil {
  459. maxStorageSize = int64(storage.Size)
  460. }
  461. if err := persistence.Save(tenantId, pluginId.PluginID(), maxStorageSize, request.Key, data); err != nil {
  462. handle.WriteError(fmt.Errorf("save data failed: %s", err.Error()))
  463. return
  464. }
  465. handle.WriteResponse("struct", map[string]any{
  466. "data": "ok",
  467. })
  468. } else if request.Opt == dify_invocation.STORAGE_OPT_DEL {
  469. if err := persistence.Delete(tenantId, pluginId.PluginID(), request.Key); err != nil {
  470. handle.WriteError(fmt.Errorf("delete data failed: %s", err.Error()))
  471. return
  472. }
  473. handle.WriteResponse("struct", map[string]any{
  474. "data": "ok",
  475. })
  476. }
  477. }
  478. func executeDifyInvocationSystemSummaryTask(
  479. handle *BackwardsInvocation,
  480. request *dify_invocation.InvokeSummaryRequest,
  481. ) {
  482. response, err := handle.backwardsInvocation.InvokeSummary(request)
  483. if err != nil {
  484. handle.WriteError(fmt.Errorf("invoke summary failed: %s", err.Error()))
  485. return
  486. }
  487. handle.WriteResponse("struct", response)
  488. }
  489. func executeDifyInvocationUploadFileTask(
  490. handle *BackwardsInvocation,
  491. request *dify_invocation.UploadFileRequest,
  492. ) {
  493. response, err := handle.backwardsInvocation.UploadFile(request)
  494. if err != nil {
  495. handle.WriteError(fmt.Errorf("upload file failed: %s", err.Error()))
  496. return
  497. }
  498. handle.WriteResponse("struct", response)
  499. }