task.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542
  1. package backwards_invocation
  2. import (
  3. "encoding/hex"
  4. "fmt"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/persistence"
  7. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  8. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  11. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  12. )
  13. // returns error only if payload is not correct
  14. func InvokeDify(
  15. declaration *plugin_entities.PluginDeclaration,
  16. invoke_from access_types.PluginAccessType,
  17. session *session_manager.Session,
  18. writer BackwardsInvocationWriter,
  19. data []byte,
  20. ) error {
  21. // unmarshal invoke data
  22. request, err := parser.UnmarshalJsonBytes2Map(data)
  23. if err != nil {
  24. return fmt.Errorf("unmarshal invoke request failed: %s", err.Error())
  25. }
  26. if request == nil {
  27. return fmt.Errorf("invoke request is empty")
  28. }
  29. // prepare invocation arguments
  30. request_handle, err := prepareDifyInvocationArguments(
  31. session,
  32. writer,
  33. request,
  34. )
  35. if err != nil {
  36. return err
  37. }
  38. if invoke_from == access_types.PLUGIN_ACCESS_TYPE_MODEL {
  39. request_handle.WriteError(fmt.Errorf("you can not invoke dify from %s", invoke_from))
  40. request_handle.EndResponse()
  41. return nil
  42. }
  43. // check permission
  44. if err := checkPermission(declaration, request_handle); err != nil {
  45. request_handle.WriteError(err)
  46. request_handle.EndResponse()
  47. return nil
  48. }
  49. // dispatch invocation task
  50. routine.Submit(func() {
  51. dispatchDifyInvocationTask(request_handle)
  52. defer request_handle.EndResponse()
  53. })
  54. return nil
  55. }
  56. var (
  57. permissionMapping = map[dify_invocation.InvokeType]map[string]any{
  58. dify_invocation.INVOKE_TYPE_TOOL: {
  59. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  60. return declaration.Resource.Permission.AllowInvokeTool()
  61. },
  62. "error": "permission denied, you need to enable tool access in plugin manifest",
  63. },
  64. dify_invocation.INVOKE_TYPE_LLM: {
  65. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  66. return declaration.Resource.Permission.AllowInvokeLLM()
  67. },
  68. "error": "permission denied, you need to enable llm access in plugin manifest",
  69. },
  70. dify_invocation.INVOKE_TYPE_TEXT_EMBEDDING: {
  71. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  72. return declaration.Resource.Permission.AllowInvokeTextEmbedding()
  73. },
  74. "error": "permission denied, you need to enable text-embedding access in plugin manifest",
  75. },
  76. dify_invocation.INVOKE_TYPE_RERANK: {
  77. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  78. return declaration.Resource.Permission.AllowInvokeRerank()
  79. },
  80. "error": "permission denied, you need to enable rerank access in plugin manifest",
  81. },
  82. dify_invocation.INVOKE_TYPE_TTS: {
  83. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  84. return declaration.Resource.Permission.AllowInvokeTTS()
  85. },
  86. "error": "permission denied, you need to enable tts access in plugin manifest",
  87. },
  88. dify_invocation.INVOKE_TYPE_SPEECH2TEXT: {
  89. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  90. return declaration.Resource.Permission.AllowInvokeSpeech2Text()
  91. },
  92. "error": "permission denied, you need to enable speech2text access in plugin manifest",
  93. },
  94. dify_invocation.INVOKE_TYPE_MODERATION: {
  95. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  96. return declaration.Resource.Permission.AllowInvokeModeration()
  97. },
  98. "error": "permission denied, you need to enable moderation access in plugin manifest",
  99. },
  100. dify_invocation.INVOKE_TYPE_NODE_PARAMETER_EXTRACTOR: {
  101. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  102. return declaration.Resource.Permission.AllowInvokeNode()
  103. },
  104. "error": "permission denied, you need to enable node access in plugin manifest",
  105. },
  106. dify_invocation.INVOKE_TYPE_NODE_QUESTION_CLASSIFIER: {
  107. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  108. return declaration.Resource.Permission.AllowInvokeNode()
  109. },
  110. "error": "permission denied, you need to enable node access in plugin manifest",
  111. },
  112. dify_invocation.INVOKE_TYPE_APP: {
  113. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  114. return declaration.Resource.Permission.AllowInvokeApp()
  115. },
  116. "error": "permission denied, you need to enable app access in plugin manifest",
  117. },
  118. dify_invocation.INVOKE_TYPE_STORAGE: {
  119. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  120. return declaration.Resource.Permission.AllowInvokeStorage()
  121. },
  122. "error": "permission denied, you need to enable storage access in plugin manifest",
  123. },
  124. dify_invocation.INVOKE_TYPE_SUMMARY: {
  125. "func": func(declaration *plugin_entities.PluginDeclaration) bool {
  126. return declaration.Resource.Permission.AllowInvokeLLM()
  127. },
  128. "error": "permission denied, you need to enable llm access in plugin manifest",
  129. },
  130. }
  131. )
  132. func checkPermission(runtime *plugin_entities.PluginDeclaration, request_handle *BackwardsInvocation) error {
  133. permission, ok := permissionMapping[request_handle.Type()]
  134. if !ok {
  135. return fmt.Errorf("unsupported invoke type: %s", request_handle.Type())
  136. }
  137. permission_func, ok := permission["func"].(func(runtime *plugin_entities.PluginDeclaration) bool)
  138. if !ok {
  139. return fmt.Errorf("permission function not found: %s", request_handle.Type())
  140. }
  141. if !permission_func(runtime) {
  142. return fmt.Errorf(permission["error"].(string))
  143. }
  144. return nil
  145. }
  146. func prepareDifyInvocationArguments(
  147. session *session_manager.Session,
  148. writer BackwardsInvocationWriter,
  149. request map[string]any,
  150. ) (*BackwardsInvocation, error) {
  151. typ, ok := request["type"].(string)
  152. if !ok {
  153. return nil, fmt.Errorf("invoke request missing type: %s", request)
  154. }
  155. // get request id
  156. backwards_request_id, ok := request["backwards_request_id"].(string)
  157. if !ok {
  158. return nil, fmt.Errorf("invoke request missing request_id: %s", request)
  159. }
  160. // get request
  161. detailed_request, ok := request["request"].(map[string]any)
  162. if !ok {
  163. return nil, fmt.Errorf("invoke request missing request: %s", request)
  164. }
  165. return NewBackwardsInvocation(
  166. BackwardsInvocationType(typ),
  167. backwards_request_id,
  168. session,
  169. writer,
  170. detailed_request,
  171. ), nil
  172. }
  173. var (
  174. dispatchMapping = map[dify_invocation.InvokeType]func(handle *BackwardsInvocation){
  175. dify_invocation.INVOKE_TYPE_TOOL: func(handle *BackwardsInvocation) {
  176. genericDispatchTask(handle, executeDifyInvocationToolTask)
  177. },
  178. dify_invocation.INVOKE_TYPE_LLM: func(handle *BackwardsInvocation) {
  179. genericDispatchTask(handle, executeDifyInvocationLLMTask)
  180. },
  181. dify_invocation.INVOKE_TYPE_TEXT_EMBEDDING: func(handle *BackwardsInvocation) {
  182. genericDispatchTask(handle, executeDifyInvocationTextEmbeddingTask)
  183. },
  184. dify_invocation.INVOKE_TYPE_RERANK: func(handle *BackwardsInvocation) {
  185. genericDispatchTask(handle, executeDifyInvocationRerankTask)
  186. },
  187. dify_invocation.INVOKE_TYPE_TTS: func(handle *BackwardsInvocation) {
  188. genericDispatchTask(handle, executeDifyInvocationTTSTask)
  189. },
  190. dify_invocation.INVOKE_TYPE_SPEECH2TEXT: func(handle *BackwardsInvocation) {
  191. genericDispatchTask(handle, executeDifyInvocationSpeech2TextTask)
  192. },
  193. dify_invocation.INVOKE_TYPE_MODERATION: func(handle *BackwardsInvocation) {
  194. genericDispatchTask(handle, executeDifyInvocationModerationTask)
  195. },
  196. dify_invocation.INVOKE_TYPE_APP: func(handle *BackwardsInvocation) {
  197. genericDispatchTask(handle, executeDifyInvocationAppTask)
  198. },
  199. dify_invocation.INVOKE_TYPE_NODE_PARAMETER_EXTRACTOR: func(handle *BackwardsInvocation) {
  200. genericDispatchTask(handle, executeDifyInvocationParameterExtractor)
  201. },
  202. dify_invocation.INVOKE_TYPE_NODE_QUESTION_CLASSIFIER: func(handle *BackwardsInvocation) {
  203. genericDispatchTask(handle, executeDifyInvocationQuestionClassifier)
  204. },
  205. dify_invocation.INVOKE_TYPE_STORAGE: func(handle *BackwardsInvocation) {
  206. genericDispatchTask(handle, executeDifyInvocationStorageTask)
  207. },
  208. dify_invocation.INVOKE_TYPE_SUMMARY: func(handle *BackwardsInvocation) {
  209. genericDispatchTask(handle, executeDifyInvocationSummaryTask)
  210. },
  211. }
  212. )
  213. func genericDispatchTask[T any](
  214. handle *BackwardsInvocation,
  215. dispatch func(
  216. handle *BackwardsInvocation,
  217. request *T,
  218. ),
  219. ) {
  220. r, err := parser.MapToStruct[T](handle.RequestData())
  221. if err != nil {
  222. handle.WriteError(fmt.Errorf("unmarshal invoke tool request failed: %s", err.Error()))
  223. return
  224. }
  225. dispatch(handle, r)
  226. }
  227. func dispatchDifyInvocationTask(handle *BackwardsInvocation) {
  228. request_data := handle.RequestData()
  229. tenant_id, err := handle.TenantID()
  230. if err != nil {
  231. handle.WriteError(fmt.Errorf("get tenant id failed: %s", err.Error()))
  232. return
  233. }
  234. request_data["tenant_id"] = tenant_id
  235. user_id, err := handle.UserID()
  236. if err != nil {
  237. handle.WriteError(fmt.Errorf("get user id failed: %s", err.Error()))
  238. return
  239. }
  240. request_data["user_id"] = user_id
  241. typ := handle.Type()
  242. request_data["type"] = typ
  243. for t, v := range dispatchMapping {
  244. if t == handle.Type() {
  245. v(handle)
  246. return
  247. }
  248. }
  249. handle.WriteError(fmt.Errorf("unsupported invoke type: %s", handle.Type()))
  250. }
  251. func executeDifyInvocationToolTask(
  252. handle *BackwardsInvocation,
  253. request *dify_invocation.InvokeToolRequest,
  254. ) {
  255. response, err := handle.backwardsInvocation.InvokeTool(request)
  256. if err != nil {
  257. handle.WriteError(fmt.Errorf("invoke tool failed: %s", err.Error()))
  258. return
  259. }
  260. for response.Next() {
  261. value, err := response.Read()
  262. if err != nil {
  263. handle.WriteError(fmt.Errorf("read tool response failed: %s", err.Error()))
  264. return
  265. }
  266. handle.WriteResponse("stream", value)
  267. }
  268. }
  269. func executeDifyInvocationLLMTask(
  270. handle *BackwardsInvocation,
  271. request *dify_invocation.InvokeLLMRequest,
  272. ) {
  273. response, err := handle.backwardsInvocation.InvokeLLM(request)
  274. if err != nil {
  275. handle.WriteError(fmt.Errorf("invoke llm model failed: %s", err.Error()))
  276. return
  277. }
  278. for response.Next() {
  279. value, err := response.Read()
  280. if err != nil {
  281. handle.WriteError(fmt.Errorf("read llm model failed: %s", err.Error()))
  282. return
  283. }
  284. handle.WriteResponse("stream", value)
  285. }
  286. }
  287. func executeDifyInvocationTextEmbeddingTask(
  288. handle *BackwardsInvocation,
  289. request *dify_invocation.InvokeTextEmbeddingRequest,
  290. ) {
  291. response, err := handle.backwardsInvocation.InvokeTextEmbedding(request)
  292. if err != nil {
  293. handle.WriteError(fmt.Errorf("invoke text-embedding model failed: %s", err.Error()))
  294. return
  295. }
  296. handle.WriteResponse("struct", response)
  297. }
  298. func executeDifyInvocationRerankTask(
  299. handle *BackwardsInvocation,
  300. request *dify_invocation.InvokeRerankRequest,
  301. ) {
  302. response, err := handle.backwardsInvocation.InvokeRerank(request)
  303. if err != nil {
  304. handle.WriteError(fmt.Errorf("invoke rerank model failed: %s", err.Error()))
  305. return
  306. }
  307. handle.WriteResponse("struct", response)
  308. }
  309. func executeDifyInvocationTTSTask(
  310. handle *BackwardsInvocation,
  311. request *dify_invocation.InvokeTTSRequest,
  312. ) {
  313. response, err := handle.backwardsInvocation.InvokeTTS(request)
  314. if err != nil {
  315. handle.WriteError(fmt.Errorf("invoke tts model failed: %s", err.Error()))
  316. return
  317. }
  318. for response.Next() {
  319. value, err := response.Read()
  320. if err != nil {
  321. handle.WriteError(fmt.Errorf("read tts model failed: %s", err.Error()))
  322. return
  323. }
  324. handle.WriteResponse("stream", value)
  325. }
  326. }
  327. func executeDifyInvocationSpeech2TextTask(
  328. handle *BackwardsInvocation,
  329. request *dify_invocation.InvokeSpeech2TextRequest,
  330. ) {
  331. response, err := handle.backwardsInvocation.InvokeSpeech2Text(request)
  332. if err != nil {
  333. handle.WriteError(fmt.Errorf("invoke speech2text model failed: %s", err.Error()))
  334. return
  335. }
  336. handle.WriteResponse("struct", response)
  337. }
  338. func executeDifyInvocationModerationTask(
  339. handle *BackwardsInvocation,
  340. request *dify_invocation.InvokeModerationRequest,
  341. ) {
  342. response, err := handle.backwardsInvocation.InvokeModeration(request)
  343. if err != nil {
  344. handle.WriteError(fmt.Errorf("invoke moderation model failed: %s", err.Error()))
  345. return
  346. }
  347. handle.WriteResponse("struct", response)
  348. }
  349. func executeDifyInvocationAppTask(
  350. handle *BackwardsInvocation,
  351. request *dify_invocation.InvokeAppRequest,
  352. ) {
  353. response, err := handle.backwardsInvocation.InvokeApp(request)
  354. if err != nil {
  355. handle.WriteError(fmt.Errorf("invoke app failed: %s", err.Error()))
  356. return
  357. }
  358. user_id, err := handle.UserID()
  359. if err != nil {
  360. handle.WriteError(fmt.Errorf("get user id failed: %s", err.Error()))
  361. return
  362. }
  363. request.User = user_id
  364. for response.Next() {
  365. value, err := response.Read()
  366. if err != nil {
  367. handle.WriteError(fmt.Errorf("read app failed: %s", err.Error()))
  368. return
  369. }
  370. handle.WriteResponse("stream", value)
  371. }
  372. }
  373. func executeDifyInvocationParameterExtractor(
  374. handle *BackwardsInvocation,
  375. request *dify_invocation.InvokeParameterExtractorRequest,
  376. ) {
  377. response, err := handle.backwardsInvocation.InvokeParameterExtractor(request)
  378. if err != nil {
  379. handle.WriteError(fmt.Errorf("invoke parameter extractor failed: %s", err.Error()))
  380. return
  381. }
  382. handle.WriteResponse("struct", response)
  383. }
  384. func executeDifyInvocationQuestionClassifier(
  385. handle *BackwardsInvocation,
  386. request *dify_invocation.InvokeQuestionClassifierRequest,
  387. ) {
  388. response, err := handle.backwardsInvocation.InvokeQuestionClassifier(request)
  389. if err != nil {
  390. handle.WriteError(fmt.Errorf("invoke question classifier failed: %s", err.Error()))
  391. return
  392. }
  393. handle.WriteResponse("struct", response)
  394. }
  395. func executeDifyInvocationStorageTask(
  396. handle *BackwardsInvocation,
  397. request *dify_invocation.InvokeStorageRequest,
  398. ) {
  399. if handle.session == nil {
  400. handle.WriteError(fmt.Errorf("session not found"))
  401. return
  402. }
  403. persistence := persistence.GetPersistence()
  404. if persistence == nil {
  405. handle.WriteError(fmt.Errorf("persistence not found"))
  406. return
  407. }
  408. tenant_id, err := handle.TenantID()
  409. if err != nil {
  410. handle.WriteError(fmt.Errorf("get tenant id failed: %s", err.Error()))
  411. return
  412. }
  413. plugin_id := handle.session.PluginUniqueIdentifier
  414. if request.Opt == dify_invocation.STORAGE_OPT_GET {
  415. data, err := persistence.Load(tenant_id, plugin_id.PluginID(), request.Key)
  416. if err != nil {
  417. handle.WriteError(fmt.Errorf("load data failed: %s", err.Error()))
  418. return
  419. }
  420. handle.WriteResponse("struct", map[string]any{
  421. "data": hex.EncodeToString(data),
  422. })
  423. } else if request.Opt == dify_invocation.STORAGE_OPT_SET {
  424. data, err := hex.DecodeString(request.Value)
  425. if err != nil {
  426. handle.WriteError(fmt.Errorf("decode data failed: %s", err.Error()))
  427. return
  428. }
  429. session := handle.session
  430. if session == nil {
  431. handle.WriteError(fmt.Errorf("session not found"))
  432. return
  433. }
  434. declaration := session.Declaration
  435. if declaration == nil {
  436. handle.WriteError(fmt.Errorf("declaration not found"))
  437. return
  438. }
  439. resource := declaration.Resource.Permission
  440. if resource == nil {
  441. handle.WriteError(fmt.Errorf("resource not found"))
  442. return
  443. }
  444. max_storage_size := int64(-1)
  445. storage := resource.Storage
  446. if storage != nil {
  447. max_storage_size = int64(storage.Size)
  448. }
  449. if err := persistence.Save(tenant_id, plugin_id.PluginID(), max_storage_size, request.Key, data); err != nil {
  450. handle.WriteError(fmt.Errorf("save data failed: %s", err.Error()))
  451. return
  452. }
  453. handle.WriteResponse("struct", map[string]any{
  454. "data": "ok",
  455. })
  456. } else if request.Opt == dify_invocation.STORAGE_OPT_DEL {
  457. if err := persistence.Delete(tenant_id, plugin_id.PluginID(), request.Key); err != nil {
  458. handle.WriteError(fmt.Errorf("delete data failed: %s", err.Error()))
  459. return
  460. }
  461. handle.WriteResponse("struct", map[string]any{
  462. "data": "ok",
  463. })
  464. }
  465. }
  466. func executeDifyInvocationSummaryTask(
  467. handle *BackwardsInvocation,
  468. request *dify_invocation.InvokeSummaryRequest,
  469. ) {
  470. response, err := handle.backwardsInvocation.InvokeSummary(request)
  471. if err != nil {
  472. handle.WriteError(fmt.Errorf("invoke summary failed: %s", err.Error()))
  473. return
  474. }
  475. handle.WriteResponse("struct", response)
  476. }