install_plugin.go 16 KB


  1. package service
  2. import (
  3. "fmt"
  4. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
  6. "github.com/langgenius/dify-plugin-daemon/internal/db"
  7. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  10. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  11. "github.com/langgenius/dify-plugin-daemon/internal/types/models/curd"
  12. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache/helper"
  13. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  14. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  15. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  16. "gorm.io/gorm"
  17. )
  18. type InstallPluginResponse struct {
  19. AllInstalled bool `json:"all_installed"`
  20. TaskID string `json:"task_id"`
  21. }
  22. type InstallPluginOnDoneHandler func(
  23. pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
  24. declaration *plugin_entities.PluginDeclaration,
  25. ) error
  26. func InstallPluginRuntimeToTenant(
  27. config *app.Config,
  28. tenant_id string,
  29. plugin_unique_identifiers []plugin_entities.PluginUniqueIdentifier,
  30. source string,
  31. meta map[string]any,
  32. onDone InstallPluginOnDoneHandler, // since installing plugin is a async task, we need to call it asynchronously
  33. ) (*InstallPluginResponse, error) {
  34. response := &InstallPluginResponse{}
  35. pluginsWaitForInstallation := []plugin_entities.PluginUniqueIdentifier{}
  36. runtimeType := plugin_entities.PluginRuntimeType("")
  37. if config.Platform == app.PLATFORM_AWS_LAMBDA {
  38. runtimeType = plugin_entities.PLUGIN_RUNTIME_TYPE_AWS
  39. } else if config.Platform == app.PLATFORM_LOCAL {
  40. runtimeType = plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL
  41. } else {
  42. return nil, fmt.Errorf("unsupported platform: %s", config.Platform)
  43. }
  44. task := &models.InstallTask{
  45. Status: models.InstallTaskStatusRunning,
  46. TenantID: tenant_id,
  47. TotalPlugins: len(plugin_unique_identifiers),
  48. CompletedPlugins: 0,
  49. Plugins: []models.InstallTaskPluginStatus{},
  50. }
  51. for i, pluginUniqueIdentifier := range plugin_unique_identifiers {
  52. // fetch plugin declaration first, before installing, we need to ensure pkg is uploaded
  53. pluginDeclaration, err := helper.CombinedGetPluginDeclaration(
  54. pluginUniqueIdentifier,
  55. tenant_id,
  56. runtimeType,
  57. )
  58. if err != nil {
  59. return nil, err
  60. }
  61. // check if plugin is already installed
  62. _, err = db.GetOne[models.Plugin](
  63. db.Equal("plugin_unique_identifier", pluginUniqueIdentifier.String()),
  64. )
  65. task.Plugins = append(task.Plugins, models.InstallTaskPluginStatus{
  66. PluginUniqueIdentifier: pluginUniqueIdentifier,
  67. PluginID: pluginUniqueIdentifier.PluginID(),
  68. Status: models.InstallTaskStatusPending,
  69. Icon: pluginDeclaration.Icon,
  70. Labels: pluginDeclaration.Label,
  71. Message: "",
  72. })
  73. if err == nil {
  74. task.CompletedPlugins++
  75. task.Plugins[i].Status = models.InstallTaskStatusSuccess
  76. task.Plugins[i].Message = "Installed"
  77. onDone(pluginUniqueIdentifier, pluginDeclaration)
  78. continue
  79. }
  80. if err != db.ErrDatabaseNotFound {
  81. return nil, err
  82. }
  83. pluginsWaitForInstallation = append(pluginsWaitForInstallation, pluginUniqueIdentifier)
  84. }
  85. if len(pluginsWaitForInstallation) == 0 {
  86. response.AllInstalled = true
  87. response.TaskID = ""
  88. return response, nil
  89. }
  90. err := db.Create(task)
  91. if err != nil {
  92. return nil, err
  93. }
  94. response.TaskID = task.ID
  95. manager := plugin_manager.Manager()
  96. tasks := []func(){}
  97. for _, pluginUniqueIdentifier := range pluginsWaitForInstallation {
  98. // copy the variable to avoid race condition
  99. pluginUniqueIdentifier := pluginUniqueIdentifier
  100. declaration, err := helper.CombinedGetPluginDeclaration(
  101. pluginUniqueIdentifier,
  102. tenant_id,
  103. runtimeType,
  104. )
  105. if err != nil {
  106. return nil, err
  107. }
  108. tasks = append(tasks, func() {
  109. updateTaskStatus := func(modifier func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus)) {
  110. if err := db.WithTransaction(func(tx *gorm.DB) error {
  111. task, err := db.GetOne[models.InstallTask](
  112. db.WithTransactionContext(tx),
  113. db.Equal("id", task.ID),
  114. db.WLock(), // write lock, multiple tasks can't update the same task
  115. )
  116. if err == db.ErrDatabaseNotFound {
  117. return nil
  118. }
  119. if err != nil {
  120. return err
  121. }
  122. taskPointer := &task
  123. var pluginStatus *models.InstallTaskPluginStatus
  124. for i := range task.Plugins {
  125. if task.Plugins[i].PluginUniqueIdentifier == pluginUniqueIdentifier {
  126. pluginStatus = &task.Plugins[i]
  127. break
  128. }
  129. }
  130. if pluginStatus == nil {
  131. return nil
  132. }
  133. modifier(taskPointer, pluginStatus)
  134. successes := 0
  135. for _, plugin := range taskPointer.Plugins {
  136. if plugin.Status == models.InstallTaskStatusSuccess {
  137. successes++
  138. }
  139. }
  140. // delete the task if all plugins are installed successfully,
  141. // otherwise update the task status
  142. if successes == len(taskPointer.Plugins) {
  143. return db.Delete(taskPointer, tx)
  144. } else {
  145. return db.Update(taskPointer, tx)
  146. }
  147. }); err != nil {
  148. log.Error("failed to update install task status %s", err.Error())
  149. }
  150. }
  151. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  152. plugin.Status = models.InstallTaskStatusRunning
  153. plugin.Message = "Installing"
  154. })
  155. var stream *stream.Stream[plugin_manager.PluginInstallResponse]
  156. if config.Platform == app.PLATFORM_AWS_LAMBDA {
  157. var zipDecoder *decoder.ZipPluginDecoder
  158. var pkgFile []byte
  159. pkgFile, err = manager.GetPackage(pluginUniqueIdentifier)
  160. if err != nil {
  161. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  162. task.Status = models.InstallTaskStatusFailed
  163. plugin.Status = models.InstallTaskStatusFailed
  164. plugin.Message = "Failed to read plugin package"
  165. })
  166. return
  167. }
  168. zipDecoder, err = decoder.NewZipPluginDecoder(pkgFile)
  169. if err != nil {
  170. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  171. task.Status = models.InstallTaskStatusFailed
  172. plugin.Status = models.InstallTaskStatusFailed
  173. plugin.Message = err.Error()
  174. })
  175. return
  176. }
  177. stream, err = manager.InstallToAWSFromPkg(zipDecoder, source, meta)
  178. } else if config.Platform == app.PLATFORM_LOCAL {
  179. stream, err = manager.InstallToLocal(pluginUniqueIdentifier, source, meta)
  180. } else {
  181. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  182. task.Status = models.InstallTaskStatusFailed
  183. plugin.Status = models.InstallTaskStatusFailed
  184. plugin.Message = "Unsupported platform"
  185. })
  186. return
  187. }
  188. if err != nil {
  189. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  190. task.Status = models.InstallTaskStatusFailed
  191. plugin.Status = models.InstallTaskStatusFailed
  192. plugin.Message = err.Error()
  193. })
  194. return
  195. }
  196. for stream.Next() {
  197. message, err := stream.Read()
  198. if err != nil {
  199. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  200. task.Status = models.InstallTaskStatusFailed
  201. plugin.Status = models.InstallTaskStatusFailed
  202. plugin.Message = err.Error()
  203. })
  204. return
  205. }
  206. if message.Event == plugin_manager.PluginInstallEventError {
  207. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  208. task.Status = models.InstallTaskStatusFailed
  209. plugin.Status = models.InstallTaskStatusFailed
  210. plugin.Message = message.Data
  211. })
  212. return
  213. }
  214. if message.Event == plugin_manager.PluginInstallEventDone {
  215. if err := onDone(pluginUniqueIdentifier, declaration); err != nil {
  216. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  217. task.Status = models.InstallTaskStatusFailed
  218. plugin.Status = models.InstallTaskStatusFailed
  219. plugin.Message = "Failed to create plugin"
  220. })
  221. return
  222. }
  223. }
  224. }
  225. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  226. plugin.Status = models.InstallTaskStatusSuccess
  227. plugin.Message = "Installed"
  228. task.CompletedPlugins++
  229. // check if all plugins are installed
  230. if task.CompletedPlugins == task.TotalPlugins {
  231. task.Status = models.InstallTaskStatusSuccess
  232. }
  233. })
  234. })
  235. }
  236. // submit async tasks
  237. routine.WithMaxRoutine(3, tasks)
  238. return response, nil
  239. }
  240. func InstallPluginFromIdentifiers(
  241. config *app.Config,
  242. tenant_id string,
  243. plugin_unique_identifiers []plugin_entities.PluginUniqueIdentifier,
  244. source string,
  245. meta map[string]any,
  246. ) *entities.Response {
  247. response, err := InstallPluginRuntimeToTenant(
  248. config,
  249. tenant_id,
  250. plugin_unique_identifiers,
  251. source,
  252. meta,
  253. func(
  254. pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
  255. declaration *plugin_entities.PluginDeclaration,
  256. ) error {
  257. runtimeType := plugin_entities.PluginRuntimeType("")
  258. switch config.Platform {
  259. case app.PLATFORM_AWS_LAMBDA:
  260. runtimeType = plugin_entities.PLUGIN_RUNTIME_TYPE_AWS
  261. case app.PLATFORM_LOCAL:
  262. runtimeType = plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL
  263. default:
  264. return fmt.Errorf("unsupported platform: %s", config.Platform)
  265. }
  266. _, _, err := curd.InstallPlugin(
  267. tenant_id,
  268. pluginUniqueIdentifier,
  269. runtimeType,
  270. declaration,
  271. source,
  272. meta,
  273. )
  274. return err
  275. })
  276. if err != nil {
  277. return entities.NewErrorResponse(-500, err.Error())
  278. }
  279. return entities.NewSuccessResponse(response)
  280. }
  281. func UpgradePlugin(
  282. config *app.Config,
  283. tenant_id string,
  284. source string,
  285. meta map[string]any,
  286. original_plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  287. new_plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  288. ) *entities.Response {
  289. if original_plugin_unique_identifier == new_plugin_unique_identifier {
  290. return entities.NewErrorResponse(-400, "original and new plugin unique identifier are the same")
  291. }
  292. if original_plugin_unique_identifier.PluginID() != new_plugin_unique_identifier.PluginID() {
  293. return entities.NewErrorResponse(-400, "original and new plugin id are different")
  294. }
  295. // uninstall the original plugin
  296. installation, err := db.GetOne[models.PluginInstallation](
  297. db.Equal("tenant_id", tenant_id),
  298. db.Equal("plugin_unique_identifier", original_plugin_unique_identifier.String()),
  299. db.Equal("source", source),
  300. )
  301. if err == db.ErrDatabaseNotFound {
  302. return entities.NewErrorResponse(-404, "Plugin installation not found for this tenant")
  303. }
  304. if err != nil {
  305. return entities.NewErrorResponse(-500, err.Error())
  306. }
  307. // install the new plugin runtime
  308. response, err := InstallPluginRuntimeToTenant(
  309. config,
  310. tenant_id,
  311. []plugin_entities.PluginUniqueIdentifier{new_plugin_unique_identifier},
  312. source,
  313. meta,
  314. func(
  315. pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
  316. declaration *plugin_entities.PluginDeclaration,
  317. ) error {
  318. // uninstall the original plugin
  319. upgradeResponse, err := curd.UpgradePlugin(
  320. tenant_id,
  321. original_plugin_unique_identifier,
  322. new_plugin_unique_identifier,
  323. declaration,
  324. plugin_entities.PluginRuntimeType(installation.RuntimeType),
  325. source,
  326. meta,
  327. )
  328. if err != nil {
  329. return err
  330. }
  331. if upgradeResponse.IsOriginalPluginDeleted {
  332. // delete the plugin if no installation left
  333. manager := plugin_manager.Manager()
  334. if string(upgradeResponse.DeletedPlugin.InstallType) == string(
  335. plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL,
  336. ) {
  337. err = manager.UninstallFromLocal(
  338. plugin_entities.PluginUniqueIdentifier(upgradeResponse.DeletedPlugin.PluginUniqueIdentifier),
  339. )
  340. if err != nil {
  341. return err
  342. }
  343. }
  344. }
  345. return nil
  346. },
  347. )
  348. if err != nil {
  349. return entities.NewErrorResponse(-500, err.Error())
  350. }
  351. return entities.NewSuccessResponse(response)
  352. }
  353. func FetchPluginInstallationTasks(
  354. tenant_id string,
  355. page int,
  356. page_size int,
  357. ) *entities.Response {
  358. tasks, err := db.GetAll[models.InstallTask](
  359. db.Equal("tenant_id", tenant_id),
  360. db.OrderBy("created_at", true),
  361. db.Page(page, page_size),
  362. )
  363. if err != nil {
  364. return entities.NewErrorResponse(-500, err.Error())
  365. }
  366. return entities.NewSuccessResponse(tasks)
  367. }
  368. func FetchPluginInstallationTask(
  369. tenant_id string,
  370. task_id string,
  371. ) *entities.Response {
  372. task, err := db.GetOne[models.InstallTask](
  373. db.Equal("id", task_id),
  374. db.Equal("tenant_id", tenant_id),
  375. )
  376. if err != nil {
  377. return entities.NewErrorResponse(-500, err.Error())
  378. }
  379. return entities.NewSuccessResponse(task)
  380. }
  381. func DeletePluginInstallationTask(
  382. tenant_id string,
  383. task_id string,
  384. ) *entities.Response {
  385. err := db.DeleteByCondition(
  386. models.InstallTask{
  387. Model: models.Model{
  388. ID: task_id,
  389. },
  390. TenantID: tenant_id,
  391. },
  392. )
  393. if err != nil {
  394. return entities.NewErrorResponse(-500, err.Error())
  395. }
  396. return entities.NewSuccessResponse(true)
  397. }
  398. func DeletePluginInstallationItemFromTask(
  399. tenant_id string,
  400. task_id string,
  401. identifier plugin_entities.PluginUniqueIdentifier,
  402. ) *entities.Response {
  403. err := db.WithTransaction(func(tx *gorm.DB) error {
  404. item, err := db.GetOne[models.InstallTask](
  405. db.WithTransactionContext(tx),
  406. db.Equal("task_id", task_id),
  407. db.Equal("tenant_id", tenant_id),
  408. db.WLock(),
  409. )
  410. if err != nil {
  411. return err
  412. }
  413. plugins := []models.InstallTaskPluginStatus{}
  414. for _, plugin := range item.Plugins {
  415. if plugin.PluginUniqueIdentifier != identifier {
  416. plugins = append(plugins, plugin)
  417. }
  418. }
  419. successes := 0
  420. for _, plugin := range plugins {
  421. if plugin.Status == models.InstallTaskStatusSuccess {
  422. successes++
  423. }
  424. }
  425. if len(plugins) == successes {
  426. // delete the task if all plugins are installed successfully
  427. err = db.Delete(&item, tx)
  428. } else {
  429. item.Plugins = plugins
  430. err = db.Update(&item, tx)
  431. }
  432. return err
  433. })
  434. if err != nil {
  435. return entities.NewErrorResponse(-500, err.Error())
  436. }
  437. return entities.NewSuccessResponse(true)
  438. }
  439. func FetchPluginFromIdentifier(
  440. pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
  441. ) *entities.Response {
  442. _, err := db.GetOne[models.Plugin](
  443. db.Equal("plugin_unique_identifier", pluginUniqueIdentifier.String()),
  444. )
  445. if err == db.ErrDatabaseNotFound {
  446. return entities.NewSuccessResponse(false)
  447. }
  448. if err != nil {
  449. return entities.NewErrorResponse(-500, err.Error())
  450. }
  451. return entities.NewSuccessResponse(true)
  452. }
  453. func UninstallPlugin(
  454. tenant_id string,
  455. plugin_installation_id string,
  456. ) *entities.Response {
  457. // Check if the plugin exists for the tenant
  458. installation, err := db.GetOne[models.PluginInstallation](
  459. db.Equal("tenant_id", tenant_id),
  460. db.Equal("id", plugin_installation_id),
  461. )
  462. if err == db.ErrDatabaseNotFound {
  463. return entities.NewErrorResponse(-404, "Plugin installation not found for this tenant")
  464. }
  465. if err != nil {
  466. return entities.NewErrorResponse(-500, err.Error())
  467. }
  468. pluginUniqueIdentifier, err := plugin_entities.NewPluginUniqueIdentifier(installation.PluginUniqueIdentifier)
  469. if err != nil {
  470. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to parse plugin unique identifier: %v", err))
  471. }
  472. // Uninstall the plugin
  473. deleteResponse, err := curd.UninstallPlugin(
  474. tenant_id,
  475. pluginUniqueIdentifier,
  476. installation.ID,
  477. )
  478. if err != nil {
  479. return entities.NewErrorResponse(-500, fmt.Sprintf("Failed to uninstall plugin: %s", err.Error()))
  480. }
  481. if deleteResponse.IsPluginDeleted {
  482. // delete the plugin if no installation left
  483. manager := plugin_manager.Manager()
  484. if deleteResponse.Installation.RuntimeType == string(
  485. plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL,
  486. ) {
  487. err = manager.UninstallFromLocal(pluginUniqueIdentifier)
  488. if err != nil {
  489. return entities.NewErrorResponse(-500, fmt.Sprintf("Failed to uninstall plugin: %s", err.Error()))
  490. }
  491. }
  492. }
  493. return entities.NewSuccessResponse(true)
  494. }