install_plugin.go 17 KB


  1. package service
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
  7. "github.com/langgenius/dify-plugin-daemon/internal/db"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  10. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  11. "github.com/langgenius/dify-plugin-daemon/internal/types/exception"
  12. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  13. "github.com/langgenius/dify-plugin-daemon/internal/types/models/curd"
  14. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache/helper"
  15. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  16. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  17. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  18. "gorm.io/gorm"
  19. )
  20. type InstallPluginResponse struct {
  21. AllInstalled bool `json:"all_installed"`
  22. TaskID string `json:"task_id"`
  23. }
  24. type InstallPluginOnDoneHandler func(
  25. pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
  26. declaration *plugin_entities.PluginDeclaration,
  27. ) error
  28. func InstallPluginRuntimeToTenant(
  29. config *app.Config,
  30. tenant_id string,
  31. plugin_unique_identifiers []plugin_entities.PluginUniqueIdentifier,
  32. source string,
  33. meta map[string]any,
  34. onDone InstallPluginOnDoneHandler, // since installing plugin is a async task, we need to call it asynchronously
  35. ) (*InstallPluginResponse, error) {
  36. response := &InstallPluginResponse{}
  37. pluginsWaitForInstallation := []plugin_entities.PluginUniqueIdentifier{}
  38. runtimeType := plugin_entities.PluginRuntimeType("")
  39. if config.Platform == app.PLATFORM_AWS_LAMBDA {
  40. runtimeType = plugin_entities.PLUGIN_RUNTIME_TYPE_AWS
  41. } else if config.Platform == app.PLATFORM_LOCAL {
  42. runtimeType = plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL
  43. } else {
  44. return nil, fmt.Errorf("unsupported platform: %s", config.Platform)
  45. }
  46. task := &models.InstallTask{
  47. Status: models.InstallTaskStatusRunning,
  48. TenantID: tenant_id,
  49. TotalPlugins: len(plugin_unique_identifiers),
  50. CompletedPlugins: 0,
  51. Plugins: []models.InstallTaskPluginStatus{},
  52. }
  53. for i, pluginUniqueIdentifier := range plugin_unique_identifiers {
  54. // fetch plugin declaration first, before installing, we need to ensure pkg is uploaded
  55. pluginDeclaration, err := helper.CombinedGetPluginDeclaration(
  56. pluginUniqueIdentifier,
  57. tenant_id,
  58. runtimeType,
  59. )
  60. if err != nil {
  61. return nil, err
  62. }
  63. // check if plugin is already installed
  64. _, err = db.GetOne[models.Plugin](
  65. db.Equal("plugin_unique_identifier", pluginUniqueIdentifier.String()),
  66. )
  67. task.Plugins = append(task.Plugins, models.InstallTaskPluginStatus{
  68. PluginUniqueIdentifier: pluginUniqueIdentifier,
  69. PluginID: pluginUniqueIdentifier.PluginID(),
  70. Status: models.InstallTaskStatusPending,
  71. Icon: pluginDeclaration.Icon,
  72. Labels: pluginDeclaration.Label,
  73. Message: "",
  74. })
  75. if err == nil {
  76. if err := onDone(pluginUniqueIdentifier, pluginDeclaration); err != nil {
  77. return nil, errors.Join(err, errors.New("failed on plugin installation"))
  78. } else {
  79. task.CompletedPlugins++
  80. task.Plugins[i].Status = models.InstallTaskStatusSuccess
  81. task.Plugins[i].Message = "Installed"
  82. }
  83. continue
  84. }
  85. if err != db.ErrDatabaseNotFound {
  86. return nil, err
  87. }
  88. pluginsWaitForInstallation = append(pluginsWaitForInstallation, pluginUniqueIdentifier)
  89. }
  90. if len(pluginsWaitForInstallation) == 0 {
  91. response.AllInstalled = true
  92. response.TaskID = ""
  93. return response, nil
  94. }
  95. err := db.Create(task)
  96. if err != nil {
  97. return nil, err
  98. }
  99. response.TaskID = task.ID
  100. manager := plugin_manager.Manager()
  101. tasks := []func(){}
  102. for _, pluginUniqueIdentifier := range pluginsWaitForInstallation {
  103. // copy the variable to avoid race condition
  104. pluginUniqueIdentifier := pluginUniqueIdentifier
  105. declaration, err := helper.CombinedGetPluginDeclaration(
  106. pluginUniqueIdentifier,
  107. tenant_id,
  108. runtimeType,
  109. )
  110. if err != nil {
  111. return nil, err
  112. }
  113. tasks = append(tasks, func() {
  114. updateTaskStatus := func(modifier func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus)) {
  115. if err := db.WithTransaction(func(tx *gorm.DB) error {
  116. task, err := db.GetOne[models.InstallTask](
  117. db.WithTransactionContext(tx),
  118. db.Equal("id", task.ID),
  119. db.WLock(), // write lock, multiple tasks can't update the same task
  120. )
  121. if err == db.ErrDatabaseNotFound {
  122. return nil
  123. }
  124. if err != nil {
  125. return err
  126. }
  127. taskPointer := &task
  128. var pluginStatus *models.InstallTaskPluginStatus
  129. for i := range task.Plugins {
  130. if task.Plugins[i].PluginUniqueIdentifier == pluginUniqueIdentifier {
  131. pluginStatus = &task.Plugins[i]
  132. break
  133. }
  134. }
  135. if pluginStatus == nil {
  136. return nil
  137. }
  138. modifier(taskPointer, pluginStatus)
  139. successes := 0
  140. for _, plugin := range taskPointer.Plugins {
  141. if plugin.Status == models.InstallTaskStatusSuccess {
  142. successes++
  143. }
  144. }
  145. // delete the task if all plugins are installed successfully,
  146. // otherwise update the task status
  147. if successes == len(taskPointer.Plugins) {
  148. return db.Delete(taskPointer, tx)
  149. } else {
  150. return db.Update(taskPointer, tx)
  151. }
  152. }); err != nil {
  153. log.Error("failed to update install task status %s", err.Error())
  154. }
  155. }
  156. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  157. plugin.Status = models.InstallTaskStatusRunning
  158. plugin.Message = "Installing"
  159. })
  160. var stream *stream.Stream[plugin_manager.PluginInstallResponse]
  161. if config.Platform == app.PLATFORM_AWS_LAMBDA {
  162. var zipDecoder *decoder.ZipPluginDecoder
  163. var pkgFile []byte
  164. pkgFile, err = manager.GetPackage(pluginUniqueIdentifier)
  165. if err != nil {
  166. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  167. task.Status = models.InstallTaskStatusFailed
  168. plugin.Status = models.InstallTaskStatusFailed
  169. plugin.Message = "Failed to read plugin package"
  170. })
  171. return
  172. }
  173. zipDecoder, err = decoder.NewZipPluginDecoder(pkgFile)
  174. if err != nil {
  175. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  176. task.Status = models.InstallTaskStatusFailed
  177. plugin.Status = models.InstallTaskStatusFailed
  178. plugin.Message = err.Error()
  179. })
  180. return
  181. }
  182. stream, err = manager.InstallToAWSFromPkg(zipDecoder, source, meta)
  183. } else if config.Platform == app.PLATFORM_LOCAL {
  184. stream, err = manager.InstallToLocal(pluginUniqueIdentifier, source, meta)
  185. } else {
  186. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  187. task.Status = models.InstallTaskStatusFailed
  188. plugin.Status = models.InstallTaskStatusFailed
  189. plugin.Message = "Unsupported platform"
  190. })
  191. return
  192. }
  193. if err != nil {
  194. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  195. task.Status = models.InstallTaskStatusFailed
  196. plugin.Status = models.InstallTaskStatusFailed
  197. plugin.Message = err.Error()
  198. })
  199. return
  200. }
  201. for stream.Next() {
  202. message, err := stream.Read()
  203. if err != nil {
  204. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  205. task.Status = models.InstallTaskStatusFailed
  206. plugin.Status = models.InstallTaskStatusFailed
  207. plugin.Message = err.Error()
  208. })
  209. return
  210. }
  211. if message.Event == plugin_manager.PluginInstallEventError {
  212. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  213. task.Status = models.InstallTaskStatusFailed
  214. plugin.Status = models.InstallTaskStatusFailed
  215. plugin.Message = message.Data
  216. })
  217. return
  218. }
  219. if message.Event == plugin_manager.PluginInstallEventDone {
  220. if err := onDone(pluginUniqueIdentifier, declaration); err != nil {
  221. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  222. task.Status = models.InstallTaskStatusFailed
  223. plugin.Status = models.InstallTaskStatusFailed
  224. plugin.Message = "Failed to create plugin, perhaps it's already installed"
  225. })
  226. return
  227. }
  228. }
  229. }
  230. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  231. plugin.Status = models.InstallTaskStatusSuccess
  232. plugin.Message = "Installed"
  233. task.CompletedPlugins++
  234. // check if all plugins are installed
  235. if task.CompletedPlugins == task.TotalPlugins {
  236. task.Status = models.InstallTaskStatusSuccess
  237. }
  238. })
  239. })
  240. }
  241. // submit async tasks
  242. routine.WithMaxRoutine(3, tasks)
  243. return response, nil
  244. }
  245. func InstallPluginFromIdentifiers(
  246. config *app.Config,
  247. tenant_id string,
  248. plugin_unique_identifiers []plugin_entities.PluginUniqueIdentifier,
  249. source string,
  250. meta map[string]any,
  251. ) *entities.Response {
  252. response, err := InstallPluginRuntimeToTenant(
  253. config,
  254. tenant_id,
  255. plugin_unique_identifiers,
  256. source,
  257. meta,
  258. func(
  259. pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
  260. declaration *plugin_entities.PluginDeclaration,
  261. ) error {
  262. runtimeType := plugin_entities.PluginRuntimeType("")
  263. switch config.Platform {
  264. case app.PLATFORM_AWS_LAMBDA:
  265. runtimeType = plugin_entities.PLUGIN_RUNTIME_TYPE_AWS
  266. case app.PLATFORM_LOCAL:
  267. runtimeType = plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL
  268. default:
  269. return fmt.Errorf("unsupported platform: %s", config.Platform)
  270. }
  271. _, _, err := curd.InstallPlugin(
  272. tenant_id,
  273. pluginUniqueIdentifier,
  274. runtimeType,
  275. declaration,
  276. source,
  277. meta,
  278. )
  279. return err
  280. },
  281. )
  282. if err != nil {
  283. if errors.Is(err, curd.ErrPluginAlreadyInstalled) {
  284. return exception.BadRequestError(err).ToResponse()
  285. }
  286. return exception.InternalServerError(err).ToResponse()
  287. }
  288. return entities.NewSuccessResponse(response)
  289. }
  290. func UpgradePlugin(
  291. config *app.Config,
  292. tenant_id string,
  293. source string,
  294. meta map[string]any,
  295. original_plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  296. new_plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  297. ) *entities.Response {
  298. if original_plugin_unique_identifier == new_plugin_unique_identifier {
  299. return exception.BadRequestError(errors.New("original and new plugin unique identifier are the same")).ToResponse()
  300. }
  301. if original_plugin_unique_identifier.PluginID() != new_plugin_unique_identifier.PluginID() {
  302. return exception.BadRequestError(errors.New("original and new plugin id are different")).ToResponse()
  303. }
  304. // uninstall the original plugin
  305. installation, err := db.GetOne[models.PluginInstallation](
  306. db.Equal("tenant_id", tenant_id),
  307. db.Equal("plugin_unique_identifier", original_plugin_unique_identifier.String()),
  308. db.Equal("source", source),
  309. )
  310. if err == db.ErrDatabaseNotFound {
  311. return exception.NotFoundError(errors.New("plugin installation not found for this tenant")).ToResponse()
  312. }
  313. if err != nil {
  314. return exception.InternalServerError(err).ToResponse()
  315. }
  316. // install the new plugin runtime
  317. response, err := InstallPluginRuntimeToTenant(
  318. config,
  319. tenant_id,
  320. []plugin_entities.PluginUniqueIdentifier{new_plugin_unique_identifier},
  321. source,
  322. meta,
  323. func(
  324. pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
  325. declaration *plugin_entities.PluginDeclaration,
  326. ) error {
  327. // uninstall the original plugin
  328. upgradeResponse, err := curd.UpgradePlugin(
  329. tenant_id,
  330. original_plugin_unique_identifier,
  331. new_plugin_unique_identifier,
  332. declaration,
  333. plugin_entities.PluginRuntimeType(installation.RuntimeType),
  334. source,
  335. meta,
  336. )
  337. if err != nil {
  338. return err
  339. }
  340. if upgradeResponse.IsOriginalPluginDeleted {
  341. // delete the plugin if no installation left
  342. manager := plugin_manager.Manager()
  343. if string(upgradeResponse.DeletedPlugin.InstallType) == string(
  344. plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL,
  345. ) {
  346. err = manager.UninstallFromLocal(
  347. plugin_entities.PluginUniqueIdentifier(upgradeResponse.DeletedPlugin.PluginUniqueIdentifier),
  348. )
  349. if err != nil {
  350. return err
  351. }
  352. }
  353. }
  354. return nil
  355. },
  356. )
  357. if err != nil {
  358. return exception.InternalServerError(err).ToResponse()
  359. }
  360. return entities.NewSuccessResponse(response)
  361. }
  362. func FetchPluginInstallationTasks(
  363. tenant_id string,
  364. page int,
  365. page_size int,
  366. ) *entities.Response {
  367. tasks, err := db.GetAll[models.InstallTask](
  368. db.Equal("tenant_id", tenant_id),
  369. db.OrderBy("created_at", true),
  370. db.Page(page, page_size),
  371. )
  372. if err != nil {
  373. return exception.InternalServerError(err).ToResponse()
  374. }
  375. return entities.NewSuccessResponse(tasks)
  376. }
  377. func FetchPluginInstallationTask(
  378. tenant_id string,
  379. task_id string,
  380. ) *entities.Response {
  381. task, err := db.GetOne[models.InstallTask](
  382. db.Equal("id", task_id),
  383. db.Equal("tenant_id", tenant_id),
  384. )
  385. if err != nil {
  386. return exception.InternalServerError(err).ToResponse()
  387. }
  388. return entities.NewSuccessResponse(task)
  389. }
  390. func DeletePluginInstallationTask(
  391. tenant_id string,
  392. task_id string,
  393. ) *entities.Response {
  394. err := db.DeleteByCondition(
  395. models.InstallTask{
  396. Model: models.Model{
  397. ID: task_id,
  398. },
  399. TenantID: tenant_id,
  400. },
  401. )
  402. if err != nil {
  403. return exception.InternalServerError(err).ToResponse()
  404. }
  405. return entities.NewSuccessResponse(true)
  406. }
  407. func DeleteAllPluginInstallationTasks(
  408. tenant_id string,
  409. ) *entities.Response {
  410. err := db.DeleteByCondition(
  411. models.InstallTask{
  412. TenantID: tenant_id,
  413. },
  414. )
  415. if err != nil {
  416. return exception.InternalServerError(err).ToResponse()
  417. }
  418. return entities.NewSuccessResponse(true)
  419. }
  420. func DeletePluginInstallationItemFromTask(
  421. tenant_id string,
  422. task_id string,
  423. identifier plugin_entities.PluginUniqueIdentifier,
  424. ) *entities.Response {
  425. err := db.WithTransaction(func(tx *gorm.DB) error {
  426. item, err := db.GetOne[models.InstallTask](
  427. db.WithTransactionContext(tx),
  428. db.Equal("id", task_id),
  429. db.Equal("tenant_id", tenant_id),
  430. db.WLock(),
  431. )
  432. if err != nil {
  433. return err
  434. }
  435. plugins := []models.InstallTaskPluginStatus{}
  436. for _, plugin := range item.Plugins {
  437. if plugin.PluginUniqueIdentifier != identifier {
  438. plugins = append(plugins, plugin)
  439. }
  440. }
  441. successes := 0
  442. for _, plugin := range plugins {
  443. if plugin.Status == models.InstallTaskStatusSuccess {
  444. successes++
  445. }
  446. }
  447. if len(plugins) == successes {
  448. // delete the task if all plugins are installed successfully
  449. err = db.Delete(&item, tx)
  450. } else {
  451. item.Plugins = plugins
  452. err = db.Update(&item, tx)
  453. }
  454. return err
  455. })
  456. if err != nil {
  457. return exception.InternalServerError(err).ToResponse()
  458. }
  459. return entities.NewSuccessResponse(true)
  460. }
  461. func FetchPluginFromIdentifier(
  462. pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
  463. ) *entities.Response {
  464. _, err := db.GetOne[models.Plugin](
  465. db.Equal("plugin_unique_identifier", pluginUniqueIdentifier.String()),
  466. )
  467. if err == db.ErrDatabaseNotFound {
  468. return entities.NewSuccessResponse(false)
  469. }
  470. if err != nil {
  471. return exception.InternalServerError(err).ToResponse()
  472. }
  473. return entities.NewSuccessResponse(true)
  474. }
  475. func UninstallPlugin(
  476. tenant_id string,
  477. plugin_installation_id string,
  478. ) *entities.Response {
  479. // Check if the plugin exists for the tenant
  480. installation, err := db.GetOne[models.PluginInstallation](
  481. db.Equal("tenant_id", tenant_id),
  482. db.Equal("id", plugin_installation_id),
  483. )
  484. if err == db.ErrDatabaseNotFound {
  485. return exception.ErrPluginNotFound().ToResponse()
  486. }
  487. if err != nil {
  488. return exception.InternalServerError(err).ToResponse()
  489. }
  490. pluginUniqueIdentifier, err := plugin_entities.NewPluginUniqueIdentifier(installation.PluginUniqueIdentifier)
  491. if err != nil {
  492. return exception.PluginUniqueIdentifierError(err).ToResponse()
  493. }
  494. // Uninstall the plugin
  495. deleteResponse, err := curd.UninstallPlugin(
  496. tenant_id,
  497. pluginUniqueIdentifier,
  498. installation.ID,
  499. )
  500. if err != nil {
  501. return exception.InternalServerError(fmt.Errorf("failed to uninstall plugin: %s", err.Error())).ToResponse()
  502. }
  503. if deleteResponse.IsPluginDeleted {
  504. // delete the plugin if no installation left
  505. manager := plugin_manager.Manager()
  506. if deleteResponse.Installation.RuntimeType == string(
  507. plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL,
  508. ) {
  509. err = manager.UninstallFromLocal(pluginUniqueIdentifier)
  510. if err != nil {
  511. return exception.InternalServerError(fmt.Errorf("failed to uninstall plugin: %s", err.Error())).ToResponse()
  512. }
  513. }
  514. }
  515. return entities.NewSuccessResponse(true)
  516. }