install_plugin.go 16 KB


  1. package service
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
  7. "github.com/langgenius/dify-plugin-daemon/internal/db"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  10. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  11. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  12. "github.com/langgenius/dify-plugin-daemon/internal/types/models/curd"
  13. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache/helper"
  14. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  15. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  16. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  17. "gorm.io/gorm"
  18. )
  19. type InstallPluginResponse struct {
  20. AllInstalled bool `json:"all_installed"`
  21. TaskID string `json:"task_id"`
  22. }
  23. type InstallPluginOnDoneHandler func(
  24. pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
  25. declaration *plugin_entities.PluginDeclaration,
  26. ) error
  27. func InstallPluginRuntimeToTenant(
  28. config *app.Config,
  29. tenant_id string,
  30. plugin_unique_identifiers []plugin_entities.PluginUniqueIdentifier,
  31. source string,
  32. meta map[string]any,
  33. onDone InstallPluginOnDoneHandler, // since installing plugin is a async task, we need to call it asynchronously
  34. ) (*InstallPluginResponse, error) {
  35. response := &InstallPluginResponse{}
  36. pluginsWaitForInstallation := []plugin_entities.PluginUniqueIdentifier{}
  37. runtimeType := plugin_entities.PluginRuntimeType("")
  38. if config.Platform == app.PLATFORM_AWS_LAMBDA {
  39. runtimeType = plugin_entities.PLUGIN_RUNTIME_TYPE_AWS
  40. } else if config.Platform == app.PLATFORM_LOCAL {
  41. runtimeType = plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL
  42. } else {
  43. return nil, fmt.Errorf("unsupported platform: %s", config.Platform)
  44. }
  45. task := &models.InstallTask{
  46. Status: models.InstallTaskStatusRunning,
  47. TenantID: tenant_id,
  48. TotalPlugins: len(plugin_unique_identifiers),
  49. CompletedPlugins: 0,
  50. Plugins: []models.InstallTaskPluginStatus{},
  51. }
  52. for i, pluginUniqueIdentifier := range plugin_unique_identifiers {
  53. // fetch plugin declaration first, before installing, we need to ensure pkg is uploaded
  54. pluginDeclaration, err := helper.CombinedGetPluginDeclaration(
  55. pluginUniqueIdentifier,
  56. tenant_id,
  57. runtimeType,
  58. )
  59. if err != nil {
  60. return nil, err
  61. }
  62. // check if plugin is already installed
  63. _, err = db.GetOne[models.Plugin](
  64. db.Equal("plugin_unique_identifier", pluginUniqueIdentifier.String()),
  65. )
  66. task.Plugins = append(task.Plugins, models.InstallTaskPluginStatus{
  67. PluginUniqueIdentifier: pluginUniqueIdentifier,
  68. PluginID: pluginUniqueIdentifier.PluginID(),
  69. Status: models.InstallTaskStatusPending,
  70. Icon: pluginDeclaration.Icon,
  71. Labels: pluginDeclaration.Label,
  72. Message: "",
  73. })
  74. if err == nil {
  75. if err := onDone(pluginUniqueIdentifier, pluginDeclaration); err != nil {
  76. return nil, errors.Join(err, errors.New("failed on plugin installation"))
  77. } else {
  78. task.CompletedPlugins++
  79. task.Plugins[i].Status = models.InstallTaskStatusSuccess
  80. task.Plugins[i].Message = "Installed"
  81. }
  82. continue
  83. }
  84. if err != db.ErrDatabaseNotFound {
  85. return nil, err
  86. }
  87. pluginsWaitForInstallation = append(pluginsWaitForInstallation, pluginUniqueIdentifier)
  88. }
  89. if len(pluginsWaitForInstallation) == 0 {
  90. response.AllInstalled = true
  91. response.TaskID = ""
  92. return response, nil
  93. }
  94. err := db.Create(task)
  95. if err != nil {
  96. return nil, err
  97. }
  98. response.TaskID = task.ID
  99. manager := plugin_manager.Manager()
  100. tasks := []func(){}
  101. for _, pluginUniqueIdentifier := range pluginsWaitForInstallation {
  102. // copy the variable to avoid race condition
  103. pluginUniqueIdentifier := pluginUniqueIdentifier
  104. declaration, err := helper.CombinedGetPluginDeclaration(
  105. pluginUniqueIdentifier,
  106. tenant_id,
  107. runtimeType,
  108. )
  109. if err != nil {
  110. return nil, err
  111. }
  112. tasks = append(tasks, func() {
  113. updateTaskStatus := func(modifier func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus)) {
  114. if err := db.WithTransaction(func(tx *gorm.DB) error {
  115. task, err := db.GetOne[models.InstallTask](
  116. db.WithTransactionContext(tx),
  117. db.Equal("id", task.ID),
  118. db.WLock(), // write lock, multiple tasks can't update the same task
  119. )
  120. if err == db.ErrDatabaseNotFound {
  121. return nil
  122. }
  123. if err != nil {
  124. return err
  125. }
  126. taskPointer := &task
  127. var pluginStatus *models.InstallTaskPluginStatus
  128. for i := range task.Plugins {
  129. if task.Plugins[i].PluginUniqueIdentifier == pluginUniqueIdentifier {
  130. pluginStatus = &task.Plugins[i]
  131. break
  132. }
  133. }
  134. if pluginStatus == nil {
  135. return nil
  136. }
  137. modifier(taskPointer, pluginStatus)
  138. successes := 0
  139. for _, plugin := range taskPointer.Plugins {
  140. if plugin.Status == models.InstallTaskStatusSuccess {
  141. successes++
  142. }
  143. }
  144. // delete the task if all plugins are installed successfully,
  145. // otherwise update the task status
  146. if successes == len(taskPointer.Plugins) {
  147. return db.Delete(taskPointer, tx)
  148. } else {
  149. return db.Update(taskPointer, tx)
  150. }
  151. }); err != nil {
  152. log.Error("failed to update install task status %s", err.Error())
  153. }
  154. }
  155. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  156. plugin.Status = models.InstallTaskStatusRunning
  157. plugin.Message = "Installing"
  158. })
  159. var stream *stream.Stream[plugin_manager.PluginInstallResponse]
  160. if config.Platform == app.PLATFORM_AWS_LAMBDA {
  161. var zipDecoder *decoder.ZipPluginDecoder
  162. var pkgFile []byte
  163. pkgFile, err = manager.GetPackage(pluginUniqueIdentifier)
  164. if err != nil {
  165. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  166. task.Status = models.InstallTaskStatusFailed
  167. plugin.Status = models.InstallTaskStatusFailed
  168. plugin.Message = "Failed to read plugin package"
  169. })
  170. return
  171. }
  172. zipDecoder, err = decoder.NewZipPluginDecoder(pkgFile)
  173. if err != nil {
  174. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  175. task.Status = models.InstallTaskStatusFailed
  176. plugin.Status = models.InstallTaskStatusFailed
  177. plugin.Message = err.Error()
  178. })
  179. return
  180. }
  181. stream, err = manager.InstallToAWSFromPkg(zipDecoder, source, meta)
  182. } else if config.Platform == app.PLATFORM_LOCAL {
  183. stream, err = manager.InstallToLocal(pluginUniqueIdentifier, source, meta)
  184. } else {
  185. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  186. task.Status = models.InstallTaskStatusFailed
  187. plugin.Status = models.InstallTaskStatusFailed
  188. plugin.Message = "Unsupported platform"
  189. })
  190. return
  191. }
  192. if err != nil {
  193. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  194. task.Status = models.InstallTaskStatusFailed
  195. plugin.Status = models.InstallTaskStatusFailed
  196. plugin.Message = err.Error()
  197. })
  198. return
  199. }
  200. for stream.Next() {
  201. message, err := stream.Read()
  202. if err != nil {
  203. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  204. task.Status = models.InstallTaskStatusFailed
  205. plugin.Status = models.InstallTaskStatusFailed
  206. plugin.Message = err.Error()
  207. })
  208. return
  209. }
  210. if message.Event == plugin_manager.PluginInstallEventError {
  211. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  212. task.Status = models.InstallTaskStatusFailed
  213. plugin.Status = models.InstallTaskStatusFailed
  214. plugin.Message = message.Data
  215. })
  216. return
  217. }
  218. if message.Event == plugin_manager.PluginInstallEventDone {
  219. if err := onDone(pluginUniqueIdentifier, declaration); err != nil {
  220. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  221. task.Status = models.InstallTaskStatusFailed
  222. plugin.Status = models.InstallTaskStatusFailed
  223. plugin.Message = "Failed to create plugin"
  224. })
  225. return
  226. }
  227. }
  228. }
  229. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  230. plugin.Status = models.InstallTaskStatusSuccess
  231. plugin.Message = "Installed"
  232. task.CompletedPlugins++
  233. // check if all plugins are installed
  234. if task.CompletedPlugins == task.TotalPlugins {
  235. task.Status = models.InstallTaskStatusSuccess
  236. }
  237. })
  238. })
  239. }
  240. // submit async tasks
  241. routine.WithMaxRoutine(3, tasks)
  242. return response, nil
  243. }
  244. func InstallPluginFromIdentifiers(
  245. config *app.Config,
  246. tenant_id string,
  247. plugin_unique_identifiers []plugin_entities.PluginUniqueIdentifier,
  248. source string,
  249. meta map[string]any,
  250. ) *entities.Response {
  251. response, err := InstallPluginRuntimeToTenant(
  252. config,
  253. tenant_id,
  254. plugin_unique_identifiers,
  255. source,
  256. meta,
  257. func(
  258. pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
  259. declaration *plugin_entities.PluginDeclaration,
  260. ) error {
  261. runtimeType := plugin_entities.PluginRuntimeType("")
  262. switch config.Platform {
  263. case app.PLATFORM_AWS_LAMBDA:
  264. runtimeType = plugin_entities.PLUGIN_RUNTIME_TYPE_AWS
  265. case app.PLATFORM_LOCAL:
  266. runtimeType = plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL
  267. default:
  268. return fmt.Errorf("unsupported platform: %s", config.Platform)
  269. }
  270. _, _, err := curd.InstallPlugin(
  271. tenant_id,
  272. pluginUniqueIdentifier,
  273. runtimeType,
  274. declaration,
  275. source,
  276. meta,
  277. )
  278. return err
  279. })
  280. if err != nil {
  281. return entities.NewErrorResponse(-500, err.Error())
  282. }
  283. return entities.NewSuccessResponse(response)
  284. }
  285. func UpgradePlugin(
  286. config *app.Config,
  287. tenant_id string,
  288. source string,
  289. meta map[string]any,
  290. original_plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  291. new_plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  292. ) *entities.Response {
  293. if original_plugin_unique_identifier == new_plugin_unique_identifier {
  294. return entities.NewErrorResponse(-400, "original and new plugin unique identifier are the same")
  295. }
  296. if original_plugin_unique_identifier.PluginID() != new_plugin_unique_identifier.PluginID() {
  297. return entities.NewErrorResponse(-400, "original and new plugin id are different")
  298. }
  299. // uninstall the original plugin
  300. installation, err := db.GetOne[models.PluginInstallation](
  301. db.Equal("tenant_id", tenant_id),
  302. db.Equal("plugin_unique_identifier", original_plugin_unique_identifier.String()),
  303. db.Equal("source", source),
  304. )
  305. if err == db.ErrDatabaseNotFound {
  306. return entities.NewErrorResponse(-404, "Plugin installation not found for this tenant")
  307. }
  308. if err != nil {
  309. return entities.NewErrorResponse(-500, err.Error())
  310. }
  311. // install the new plugin runtime
  312. response, err := InstallPluginRuntimeToTenant(
  313. config,
  314. tenant_id,
  315. []plugin_entities.PluginUniqueIdentifier{new_plugin_unique_identifier},
  316. source,
  317. meta,
  318. func(
  319. pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
  320. declaration *plugin_entities.PluginDeclaration,
  321. ) error {
  322. // uninstall the original plugin
  323. upgradeResponse, err := curd.UpgradePlugin(
  324. tenant_id,
  325. original_plugin_unique_identifier,
  326. new_plugin_unique_identifier,
  327. declaration,
  328. plugin_entities.PluginRuntimeType(installation.RuntimeType),
  329. source,
  330. meta,
  331. )
  332. if err != nil {
  333. return err
  334. }
  335. if upgradeResponse.IsOriginalPluginDeleted {
  336. // delete the plugin if no installation left
  337. manager := plugin_manager.Manager()
  338. if string(upgradeResponse.DeletedPlugin.InstallType) == string(
  339. plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL,
  340. ) {
  341. err = manager.UninstallFromLocal(
  342. plugin_entities.PluginUniqueIdentifier(upgradeResponse.DeletedPlugin.PluginUniqueIdentifier),
  343. )
  344. if err != nil {
  345. return err
  346. }
  347. }
  348. }
  349. return nil
  350. },
  351. )
  352. if err != nil {
  353. return entities.NewErrorResponse(-500, err.Error())
  354. }
  355. return entities.NewSuccessResponse(response)
  356. }
  357. func FetchPluginInstallationTasks(
  358. tenant_id string,
  359. page int,
  360. page_size int,
  361. ) *entities.Response {
  362. tasks, err := db.GetAll[models.InstallTask](
  363. db.Equal("tenant_id", tenant_id),
  364. db.OrderBy("created_at", true),
  365. db.Page(page, page_size),
  366. )
  367. if err != nil {
  368. return entities.NewErrorResponse(-500, err.Error())
  369. }
  370. return entities.NewSuccessResponse(tasks)
  371. }
  372. func FetchPluginInstallationTask(
  373. tenant_id string,
  374. task_id string,
  375. ) *entities.Response {
  376. task, err := db.GetOne[models.InstallTask](
  377. db.Equal("id", task_id),
  378. db.Equal("tenant_id", tenant_id),
  379. )
  380. if err != nil {
  381. return entities.NewErrorResponse(-500, err.Error())
  382. }
  383. return entities.NewSuccessResponse(task)
  384. }
  385. func DeletePluginInstallationTask(
  386. tenant_id string,
  387. task_id string,
  388. ) *entities.Response {
  389. err := db.DeleteByCondition(
  390. models.InstallTask{
  391. Model: models.Model{
  392. ID: task_id,
  393. },
  394. TenantID: tenant_id,
  395. },
  396. )
  397. if err != nil {
  398. return entities.NewErrorResponse(-500, err.Error())
  399. }
  400. return entities.NewSuccessResponse(true)
  401. }
  402. func DeletePluginInstallationItemFromTask(
  403. tenant_id string,
  404. task_id string,
  405. identifier plugin_entities.PluginUniqueIdentifier,
  406. ) *entities.Response {
  407. err := db.WithTransaction(func(tx *gorm.DB) error {
  408. item, err := db.GetOne[models.InstallTask](
  409. db.WithTransactionContext(tx),
  410. db.Equal("id", task_id),
  411. db.Equal("tenant_id", tenant_id),
  412. db.WLock(),
  413. )
  414. if err != nil {
  415. return err
  416. }
  417. plugins := []models.InstallTaskPluginStatus{}
  418. for _, plugin := range item.Plugins {
  419. if plugin.PluginUniqueIdentifier != identifier {
  420. plugins = append(plugins, plugin)
  421. }
  422. }
  423. successes := 0
  424. for _, plugin := range plugins {
  425. if plugin.Status == models.InstallTaskStatusSuccess {
  426. successes++
  427. }
  428. }
  429. if len(plugins) == successes {
  430. // delete the task if all plugins are installed successfully
  431. err = db.Delete(&item, tx)
  432. } else {
  433. item.Plugins = plugins
  434. err = db.Update(&item, tx)
  435. }
  436. return err
  437. })
  438. if err != nil {
  439. return entities.NewErrorResponse(-500, err.Error())
  440. }
  441. return entities.NewSuccessResponse(true)
  442. }
  443. func FetchPluginFromIdentifier(
  444. pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
  445. ) *entities.Response {
  446. _, err := db.GetOne[models.Plugin](
  447. db.Equal("plugin_unique_identifier", pluginUniqueIdentifier.String()),
  448. )
  449. if err == db.ErrDatabaseNotFound {
  450. return entities.NewSuccessResponse(false)
  451. }
  452. if err != nil {
  453. return entities.NewErrorResponse(-500, err.Error())
  454. }
  455. return entities.NewSuccessResponse(true)
  456. }
  457. func UninstallPlugin(
  458. tenant_id string,
  459. plugin_installation_id string,
  460. ) *entities.Response {
  461. // Check if the plugin exists for the tenant
  462. installation, err := db.GetOne[models.PluginInstallation](
  463. db.Equal("tenant_id", tenant_id),
  464. db.Equal("id", plugin_installation_id),
  465. )
  466. if err == db.ErrDatabaseNotFound {
  467. return entities.NewErrorResponse(-404, "Plugin installation not found for this tenant")
  468. }
  469. if err != nil {
  470. return entities.NewErrorResponse(-500, err.Error())
  471. }
  472. pluginUniqueIdentifier, err := plugin_entities.NewPluginUniqueIdentifier(installation.PluginUniqueIdentifier)
  473. if err != nil {
  474. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to parse plugin unique identifier: %v", err))
  475. }
  476. // Uninstall the plugin
  477. deleteResponse, err := curd.UninstallPlugin(
  478. tenant_id,
  479. pluginUniqueIdentifier,
  480. installation.ID,
  481. )
  482. if err != nil {
  483. return entities.NewErrorResponse(-500, fmt.Sprintf("Failed to uninstall plugin: %s", err.Error()))
  484. }
  485. if deleteResponse.IsPluginDeleted {
  486. // delete the plugin if no installation left
  487. manager := plugin_manager.Manager()
  488. if deleteResponse.Installation.RuntimeType == string(
  489. plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL,
  490. ) {
  491. err = manager.UninstallFromLocal(pluginUniqueIdentifier)
  492. if err != nil {
  493. return entities.NewErrorResponse(-500, fmt.Sprintf("Failed to uninstall plugin: %s", err.Error()))
  494. }
  495. }
  496. }
  497. return entities.NewSuccessResponse(true)
  498. }