install_plugin.go 16 KB


  1. package service
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
  7. "github.com/langgenius/dify-plugin-daemon/internal/db"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  10. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  11. "github.com/langgenius/dify-plugin-daemon/internal/types/exception"
  12. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  13. "github.com/langgenius/dify-plugin-daemon/internal/types/models/curd"
  14. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache/helper"
  15. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  16. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  17. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  18. "gorm.io/gorm"
  19. )
  20. type InstallPluginResponse struct {
  21. AllInstalled bool `json:"all_installed"`
  22. TaskID string `json:"task_id"`
  23. }
  24. type InstallPluginOnDoneHandler func(
  25. pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
  26. declaration *plugin_entities.PluginDeclaration,
  27. ) error
  28. func InstallPluginRuntimeToTenant(
  29. config *app.Config,
  30. tenant_id string,
  31. plugin_unique_identifiers []plugin_entities.PluginUniqueIdentifier,
  32. source string,
  33. meta map[string]any,
  34. onDone InstallPluginOnDoneHandler, // since installing plugin is a async task, we need to call it asynchronously
  35. ) (*InstallPluginResponse, error) {
  36. response := &InstallPluginResponse{}
  37. pluginsWaitForInstallation := []plugin_entities.PluginUniqueIdentifier{}
  38. runtimeType := plugin_entities.PluginRuntimeType("")
  39. if config.Platform == app.PLATFORM_AWS_LAMBDA {
  40. runtimeType = plugin_entities.PLUGIN_RUNTIME_TYPE_AWS
  41. } else if config.Platform == app.PLATFORM_LOCAL {
  42. runtimeType = plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL
  43. } else {
  44. return nil, fmt.Errorf("unsupported platform: %s", config.Platform)
  45. }
  46. task := &models.InstallTask{
  47. Status: models.InstallTaskStatusRunning,
  48. TenantID: tenant_id,
  49. TotalPlugins: len(plugin_unique_identifiers),
  50. CompletedPlugins: 0,
  51. Plugins: []models.InstallTaskPluginStatus{},
  52. }
  53. for i, pluginUniqueIdentifier := range plugin_unique_identifiers {
  54. // fetch plugin declaration first, before installing, we need to ensure pkg is uploaded
  55. pluginDeclaration, err := helper.CombinedGetPluginDeclaration(
  56. pluginUniqueIdentifier,
  57. tenant_id,
  58. runtimeType,
  59. )
  60. if err != nil {
  61. return nil, err
  62. }
  63. // check if plugin is already installed
  64. _, err = db.GetOne[models.Plugin](
  65. db.Equal("plugin_unique_identifier", pluginUniqueIdentifier.String()),
  66. )
  67. task.Plugins = append(task.Plugins, models.InstallTaskPluginStatus{
  68. PluginUniqueIdentifier: pluginUniqueIdentifier,
  69. PluginID: pluginUniqueIdentifier.PluginID(),
  70. Status: models.InstallTaskStatusPending,
  71. Icon: pluginDeclaration.Icon,
  72. Labels: pluginDeclaration.Label,
  73. Message: "",
  74. })
  75. if err == nil {
  76. if err := onDone(pluginUniqueIdentifier, pluginDeclaration); err != nil {
  77. return nil, errors.Join(err, errors.New("failed on plugin installation"))
  78. } else {
  79. task.CompletedPlugins++
  80. task.Plugins[i].Status = models.InstallTaskStatusSuccess
  81. task.Plugins[i].Message = "Installed"
  82. }
  83. continue
  84. }
  85. if err != db.ErrDatabaseNotFound {
  86. return nil, err
  87. }
  88. pluginsWaitForInstallation = append(pluginsWaitForInstallation, pluginUniqueIdentifier)
  89. }
  90. if len(pluginsWaitForInstallation) == 0 {
  91. response.AllInstalled = true
  92. response.TaskID = ""
  93. return response, nil
  94. }
  95. err := db.Create(task)
  96. if err != nil {
  97. return nil, err
  98. }
  99. response.TaskID = task.ID
  100. manager := plugin_manager.Manager()
  101. tasks := []func(){}
  102. for _, pluginUniqueIdentifier := range pluginsWaitForInstallation {
  103. // copy the variable to avoid race condition
  104. pluginUniqueIdentifier := pluginUniqueIdentifier
  105. declaration, err := helper.CombinedGetPluginDeclaration(
  106. pluginUniqueIdentifier,
  107. tenant_id,
  108. runtimeType,
  109. )
  110. if err != nil {
  111. return nil, err
  112. }
  113. tasks = append(tasks, func() {
  114. updateTaskStatus := func(modifier func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus)) {
  115. if err := db.WithTransaction(func(tx *gorm.DB) error {
  116. task, err := db.GetOne[models.InstallTask](
  117. db.WithTransactionContext(tx),
  118. db.Equal("id", task.ID),
  119. db.WLock(), // write lock, multiple tasks can't update the same task
  120. )
  121. if err == db.ErrDatabaseNotFound {
  122. return nil
  123. }
  124. if err != nil {
  125. return err
  126. }
  127. taskPointer := &task
  128. var pluginStatus *models.InstallTaskPluginStatus
  129. for i := range task.Plugins {
  130. if task.Plugins[i].PluginUniqueIdentifier == pluginUniqueIdentifier {
  131. pluginStatus = &task.Plugins[i]
  132. break
  133. }
  134. }
  135. if pluginStatus == nil {
  136. return nil
  137. }
  138. modifier(taskPointer, pluginStatus)
  139. successes := 0
  140. for _, plugin := range taskPointer.Plugins {
  141. if plugin.Status == models.InstallTaskStatusSuccess {
  142. successes++
  143. }
  144. }
  145. // delete the task if all plugins are installed successfully,
  146. // otherwise update the task status
  147. if successes == len(taskPointer.Plugins) {
  148. return db.Delete(taskPointer, tx)
  149. } else {
  150. return db.Update(taskPointer, tx)
  151. }
  152. }); err != nil {
  153. log.Error("failed to update install task status %s", err.Error())
  154. }
  155. }
  156. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  157. plugin.Status = models.InstallTaskStatusRunning
  158. plugin.Message = "Installing"
  159. })
  160. var stream *stream.Stream[plugin_manager.PluginInstallResponse]
  161. if config.Platform == app.PLATFORM_AWS_LAMBDA {
  162. var zipDecoder *decoder.ZipPluginDecoder
  163. var pkgFile []byte
  164. pkgFile, err = manager.GetPackage(pluginUniqueIdentifier)
  165. if err != nil {
  166. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  167. task.Status = models.InstallTaskStatusFailed
  168. plugin.Status = models.InstallTaskStatusFailed
  169. plugin.Message = "Failed to read plugin package"
  170. })
  171. return
  172. }
  173. zipDecoder, err = decoder.NewZipPluginDecoder(pkgFile)
  174. if err != nil {
  175. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  176. task.Status = models.InstallTaskStatusFailed
  177. plugin.Status = models.InstallTaskStatusFailed
  178. plugin.Message = err.Error()
  179. })
  180. return
  181. }
  182. stream, err = manager.InstallToAWSFromPkg(zipDecoder, source, meta)
  183. } else if config.Platform == app.PLATFORM_LOCAL {
  184. stream, err = manager.InstallToLocal(pluginUniqueIdentifier, source, meta)
  185. } else {
  186. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  187. task.Status = models.InstallTaskStatusFailed
  188. plugin.Status = models.InstallTaskStatusFailed
  189. plugin.Message = "Unsupported platform"
  190. })
  191. return
  192. }
  193. if err != nil {
  194. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  195. task.Status = models.InstallTaskStatusFailed
  196. plugin.Status = models.InstallTaskStatusFailed
  197. plugin.Message = err.Error()
  198. })
  199. return
  200. }
  201. for stream.Next() {
  202. message, err := stream.Read()
  203. if err != nil {
  204. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  205. task.Status = models.InstallTaskStatusFailed
  206. plugin.Status = models.InstallTaskStatusFailed
  207. plugin.Message = err.Error()
  208. })
  209. return
  210. }
  211. if message.Event == plugin_manager.PluginInstallEventError {
  212. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  213. task.Status = models.InstallTaskStatusFailed
  214. plugin.Status = models.InstallTaskStatusFailed
  215. plugin.Message = message.Data
  216. })
  217. return
  218. }
  219. if message.Event == plugin_manager.PluginInstallEventDone {
  220. if err := onDone(pluginUniqueIdentifier, declaration); err != nil {
  221. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  222. task.Status = models.InstallTaskStatusFailed
  223. plugin.Status = models.InstallTaskStatusFailed
  224. plugin.Message = "Failed to create plugin, perhaps it's already installed"
  225. })
  226. return
  227. }
  228. }
  229. }
  230. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  231. plugin.Status = models.InstallTaskStatusSuccess
  232. plugin.Message = "Installed"
  233. task.CompletedPlugins++
  234. // check if all plugins are installed
  235. if task.CompletedPlugins == task.TotalPlugins {
  236. task.Status = models.InstallTaskStatusSuccess
  237. }
  238. })
  239. })
  240. }
  241. // submit async tasks
  242. routine.WithMaxRoutine(3, tasks)
  243. return response, nil
  244. }
  245. func InstallPluginFromIdentifiers(
  246. config *app.Config,
  247. tenant_id string,
  248. plugin_unique_identifiers []plugin_entities.PluginUniqueIdentifier,
  249. source string,
  250. meta map[string]any,
  251. ) *entities.Response {
  252. response, err := InstallPluginRuntimeToTenant(
  253. config,
  254. tenant_id,
  255. plugin_unique_identifiers,
  256. source,
  257. meta,
  258. func(
  259. pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
  260. declaration *plugin_entities.PluginDeclaration,
  261. ) error {
  262. runtimeType := plugin_entities.PluginRuntimeType("")
  263. switch config.Platform {
  264. case app.PLATFORM_AWS_LAMBDA:
  265. runtimeType = plugin_entities.PLUGIN_RUNTIME_TYPE_AWS
  266. case app.PLATFORM_LOCAL:
  267. runtimeType = plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL
  268. default:
  269. return fmt.Errorf("unsupported platform: %s", config.Platform)
  270. }
  271. _, _, err := curd.InstallPlugin(
  272. tenant_id,
  273. pluginUniqueIdentifier,
  274. runtimeType,
  275. declaration,
  276. source,
  277. meta,
  278. )
  279. return err
  280. })
  281. if err != nil {
  282. return exception.InternalServerError(err).ToResponse()
  283. }
  284. return entities.NewSuccessResponse(response)
  285. }
  286. func UpgradePlugin(
  287. config *app.Config,
  288. tenant_id string,
  289. source string,
  290. meta map[string]any,
  291. original_plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  292. new_plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  293. ) *entities.Response {
  294. if original_plugin_unique_identifier == new_plugin_unique_identifier {
  295. return exception.BadRequestError(errors.New("original and new plugin unique identifier are the same")).ToResponse()
  296. }
  297. if original_plugin_unique_identifier.PluginID() != new_plugin_unique_identifier.PluginID() {
  298. return exception.BadRequestError(errors.New("original and new plugin id are different")).ToResponse()
  299. }
  300. // uninstall the original plugin
  301. installation, err := db.GetOne[models.PluginInstallation](
  302. db.Equal("tenant_id", tenant_id),
  303. db.Equal("plugin_unique_identifier", original_plugin_unique_identifier.String()),
  304. db.Equal("source", source),
  305. )
  306. if err == db.ErrDatabaseNotFound {
  307. return exception.NotFoundError(errors.New("plugin installation not found for this tenant")).ToResponse()
  308. }
  309. if err != nil {
  310. return exception.InternalServerError(err).ToResponse()
  311. }
  312. // install the new plugin runtime
  313. response, err := InstallPluginRuntimeToTenant(
  314. config,
  315. tenant_id,
  316. []plugin_entities.PluginUniqueIdentifier{new_plugin_unique_identifier},
  317. source,
  318. meta,
  319. func(
  320. pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
  321. declaration *plugin_entities.PluginDeclaration,
  322. ) error {
  323. // uninstall the original plugin
  324. upgradeResponse, err := curd.UpgradePlugin(
  325. tenant_id,
  326. original_plugin_unique_identifier,
  327. new_plugin_unique_identifier,
  328. declaration,
  329. plugin_entities.PluginRuntimeType(installation.RuntimeType),
  330. source,
  331. meta,
  332. )
  333. if err != nil {
  334. return err
  335. }
  336. if upgradeResponse.IsOriginalPluginDeleted {
  337. // delete the plugin if no installation left
  338. manager := plugin_manager.Manager()
  339. if string(upgradeResponse.DeletedPlugin.InstallType) == string(
  340. plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL,
  341. ) {
  342. err = manager.UninstallFromLocal(
  343. plugin_entities.PluginUniqueIdentifier(upgradeResponse.DeletedPlugin.PluginUniqueIdentifier),
  344. )
  345. if err != nil {
  346. return err
  347. }
  348. }
  349. }
  350. return nil
  351. },
  352. )
  353. if err != nil {
  354. return exception.InternalServerError(err).ToResponse()
  355. }
  356. return entities.NewSuccessResponse(response)
  357. }
  358. func FetchPluginInstallationTasks(
  359. tenant_id string,
  360. page int,
  361. page_size int,
  362. ) *entities.Response {
  363. tasks, err := db.GetAll[models.InstallTask](
  364. db.Equal("tenant_id", tenant_id),
  365. db.OrderBy("created_at", true),
  366. db.Page(page, page_size),
  367. )
  368. if err != nil {
  369. return exception.InternalServerError(err).ToResponse()
  370. }
  371. return entities.NewSuccessResponse(tasks)
  372. }
  373. func FetchPluginInstallationTask(
  374. tenant_id string,
  375. task_id string,
  376. ) *entities.Response {
  377. task, err := db.GetOne[models.InstallTask](
  378. db.Equal("id", task_id),
  379. db.Equal("tenant_id", tenant_id),
  380. )
  381. if err != nil {
  382. return exception.InternalServerError(err).ToResponse()
  383. }
  384. return entities.NewSuccessResponse(task)
  385. }
  386. func DeletePluginInstallationTask(
  387. tenant_id string,
  388. task_id string,
  389. ) *entities.Response {
  390. err := db.DeleteByCondition(
  391. models.InstallTask{
  392. Model: models.Model{
  393. ID: task_id,
  394. },
  395. TenantID: tenant_id,
  396. },
  397. )
  398. if err != nil {
  399. return exception.InternalServerError(err).ToResponse()
  400. }
  401. return entities.NewSuccessResponse(true)
  402. }
  403. func DeleteAllPluginInstallationTasks(
  404. tenant_id string,
  405. ) *entities.Response {
  406. err := db.DeleteByCondition(
  407. models.InstallTask{
  408. TenantID: tenant_id,
  409. },
  410. )
  411. if err != nil {
  412. return exception.InternalServerError(err).ToResponse()
  413. }
  414. return entities.NewSuccessResponse(true)
  415. }
  416. func DeletePluginInstallationItemFromTask(
  417. tenant_id string,
  418. task_id string,
  419. identifier plugin_entities.PluginUniqueIdentifier,
  420. ) *entities.Response {
  421. err := db.WithTransaction(func(tx *gorm.DB) error {
  422. item, err := db.GetOne[models.InstallTask](
  423. db.WithTransactionContext(tx),
  424. db.Equal("id", task_id),
  425. db.Equal("tenant_id", tenant_id),
  426. db.WLock(),
  427. )
  428. if err != nil {
  429. return err
  430. }
  431. plugins := []models.InstallTaskPluginStatus{}
  432. for _, plugin := range item.Plugins {
  433. if plugin.PluginUniqueIdentifier != identifier {
  434. plugins = append(plugins, plugin)
  435. }
  436. }
  437. successes := 0
  438. for _, plugin := range plugins {
  439. if plugin.Status == models.InstallTaskStatusSuccess {
  440. successes++
  441. }
  442. }
  443. if len(plugins) == successes {
  444. // delete the task if all plugins are installed successfully
  445. err = db.Delete(&item, tx)
  446. } else {
  447. item.Plugins = plugins
  448. err = db.Update(&item, tx)
  449. }
  450. return err
  451. })
  452. if err != nil {
  453. return exception.InternalServerError(err).ToResponse()
  454. }
  455. return entities.NewSuccessResponse(true)
  456. }
  457. func FetchPluginFromIdentifier(
  458. pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
  459. ) *entities.Response {
  460. _, err := db.GetOne[models.Plugin](
  461. db.Equal("plugin_unique_identifier", pluginUniqueIdentifier.String()),
  462. )
  463. if err == db.ErrDatabaseNotFound {
  464. return entities.NewSuccessResponse(false)
  465. }
  466. if err != nil {
  467. return exception.InternalServerError(err).ToResponse()
  468. }
  469. return entities.NewSuccessResponse(true)
  470. }
  471. func UninstallPlugin(
  472. tenant_id string,
  473. plugin_installation_id string,
  474. ) *entities.Response {
  475. // Check if the plugin exists for the tenant
  476. installation, err := db.GetOne[models.PluginInstallation](
  477. db.Equal("tenant_id", tenant_id),
  478. db.Equal("id", plugin_installation_id),
  479. )
  480. if err == db.ErrDatabaseNotFound {
  481. return exception.ErrPluginNotFound().ToResponse()
  482. }
  483. if err != nil {
  484. return exception.InternalServerError(err).ToResponse()
  485. }
  486. pluginUniqueIdentifier, err := plugin_entities.NewPluginUniqueIdentifier(installation.PluginUniqueIdentifier)
  487. if err != nil {
  488. return exception.PluginUniqueIdentifierError(err).ToResponse()
  489. }
  490. // Uninstall the plugin
  491. deleteResponse, err := curd.UninstallPlugin(
  492. tenant_id,
  493. pluginUniqueIdentifier,
  494. installation.ID,
  495. )
  496. if err != nil {
  497. return exception.InternalServerError(fmt.Errorf("failed to uninstall plugin: %s", err.Error())).ToResponse()
  498. }
  499. if deleteResponse.IsPluginDeleted {
  500. // delete the plugin if no installation left
  501. manager := plugin_manager.Manager()
  502. if deleteResponse.Installation.RuntimeType == string(
  503. plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL,
  504. ) {
  505. err = manager.UninstallFromLocal(pluginUniqueIdentifier)
  506. if err != nil {
  507. return exception.InternalServerError(fmt.Errorf("failed to uninstall plugin: %s", err.Error())).ToResponse()
  508. }
  509. }
  510. }
  511. return entities.NewSuccessResponse(true)
  512. }