install_plugin.go 17 KB


  1. package service
  2. import (
  3. "errors"
  4. "fmt"
  5. "time"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  7. "github.com/langgenius/dify-plugin-daemon/internal/db"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/exception"
  10. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  11. "github.com/langgenius/dify-plugin-daemon/internal/types/models/curd"
  12. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache/helper"
  13. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  14. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  15. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  16. "github.com/langgenius/dify-plugin-daemon/pkg/entities"
  17. "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
  18. "github.com/langgenius/dify-plugin-daemon/pkg/plugin_packager/decoder"
  19. "gorm.io/gorm"
  20. )
  21. type InstallPluginResponse struct {
  22. AllInstalled bool `json:"all_installed"`
  23. TaskID string `json:"task_id"`
  24. }
  25. type InstallPluginOnDoneHandler func(
  26. pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
  27. declaration *plugin_entities.PluginDeclaration,
  28. meta map[string]any,
  29. ) error
  30. func InstallPluginRuntimeToTenant(
  31. config *app.Config,
  32. tenant_id string,
  33. plugin_unique_identifiers []plugin_entities.PluginUniqueIdentifier,
  34. source string,
  35. metas []map[string]any,
  36. onDone InstallPluginOnDoneHandler, // since installing plugin is a async task, we need to call it asynchronously
  37. ) (*InstallPluginResponse, error) {
  38. response := &InstallPluginResponse{}
  39. pluginsWaitForInstallation := []plugin_entities.PluginUniqueIdentifier{}
  40. runtimeType := plugin_entities.PluginRuntimeType("")
  41. if config.Platform == app.PLATFORM_SERVERLESS {
  42. runtimeType = plugin_entities.PLUGIN_RUNTIME_TYPE_SERVERLESS
  43. } else if config.Platform == app.PLATFORM_LOCAL {
  44. runtimeType = plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL
  45. } else {
  46. return nil, fmt.Errorf("unsupported platform: %s", config.Platform)
  47. }
  48. task := &models.InstallTask{
  49. Status: models.InstallTaskStatusRunning,
  50. TenantID: tenant_id,
  51. TotalPlugins: len(plugin_unique_identifiers),
  52. CompletedPlugins: 0,
  53. Plugins: []models.InstallTaskPluginStatus{},
  54. }
  55. for i, pluginUniqueIdentifier := range plugin_unique_identifiers {
  56. // fetch plugin declaration first, before installing, we need to ensure pkg is uploaded
  57. pluginDeclaration, err := helper.CombinedGetPluginDeclaration(
  58. pluginUniqueIdentifier,
  59. tenant_id,
  60. runtimeType,
  61. )
  62. if err != nil {
  63. return nil, err
  64. }
  65. // check if plugin is already installed
  66. _, err = db.GetOne[models.Plugin](
  67. db.Equal("plugin_unique_identifier", pluginUniqueIdentifier.String()),
  68. )
  69. task.Plugins = append(task.Plugins, models.InstallTaskPluginStatus{
  70. PluginUniqueIdentifier: pluginUniqueIdentifier,
  71. PluginID: pluginUniqueIdentifier.PluginID(),
  72. Status: models.InstallTaskStatusPending,
  73. Icon: pluginDeclaration.Icon,
  74. Labels: pluginDeclaration.Label,
  75. Message: "",
  76. })
  77. if err == nil {
  78. if err := onDone(pluginUniqueIdentifier, pluginDeclaration, metas[i]); err != nil {
  79. return nil, errors.Join(err, errors.New("failed on plugin installation"))
  80. } else {
  81. task.CompletedPlugins++
  82. task.Plugins[i].Status = models.InstallTaskStatusSuccess
  83. task.Plugins[i].Message = "Installed"
  84. }
  85. continue
  86. }
  87. if err != db.ErrDatabaseNotFound {
  88. return nil, err
  89. }
  90. pluginsWaitForInstallation = append(pluginsWaitForInstallation, pluginUniqueIdentifier)
  91. }
  92. if len(pluginsWaitForInstallation) == 0 {
  93. response.AllInstalled = true
  94. response.TaskID = ""
  95. return response, nil
  96. }
  97. err := db.Create(task)
  98. if err != nil {
  99. return nil, err
  100. }
  101. response.TaskID = task.ID
  102. manager := plugin_manager.Manager()
  103. tasks := []func(){}
  104. for i, pluginUniqueIdentifier := range pluginsWaitForInstallation {
  105. // copy the variable to avoid race condition
  106. pluginUniqueIdentifier := pluginUniqueIdentifier
  107. declaration, err := helper.CombinedGetPluginDeclaration(
  108. pluginUniqueIdentifier,
  109. tenant_id,
  110. runtimeType,
  111. )
  112. if err != nil {
  113. return nil, err
  114. }
  115. i := i
  116. tasks = append(tasks, func() {
  117. updateTaskStatus := func(modifier func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus)) {
  118. if err := db.WithTransaction(func(tx *gorm.DB) error {
  119. task, err := db.GetOne[models.InstallTask](
  120. db.WithTransactionContext(tx),
  121. db.Equal("id", task.ID),
  122. db.WLock(), // write lock, multiple tasks can't update the same task
  123. )
  124. if err == db.ErrDatabaseNotFound {
  125. return nil
  126. }
  127. if err != nil {
  128. return err
  129. }
  130. taskPointer := &task
  131. var pluginStatus *models.InstallTaskPluginStatus
  132. for i := range task.Plugins {
  133. if task.Plugins[i].PluginUniqueIdentifier == pluginUniqueIdentifier {
  134. pluginStatus = &task.Plugins[i]
  135. break
  136. }
  137. }
  138. if pluginStatus == nil {
  139. return nil
  140. }
  141. modifier(taskPointer, pluginStatus)
  142. successes := 0
  143. for _, plugin := range taskPointer.Plugins {
  144. if plugin.Status == models.InstallTaskStatusSuccess {
  145. successes++
  146. }
  147. }
  148. if successes == len(taskPointer.Plugins) {
  149. // update status
  150. taskPointer.Status = models.InstallTaskStatusSuccess
  151. // delete the task after 120 seconds without transaction
  152. time.AfterFunc(120*time.Second, func() {
  153. db.Delete(taskPointer)
  154. })
  155. }
  156. return db.Update(taskPointer, tx)
  157. }); err != nil {
  158. log.Error("failed to update install task status %s", err.Error())
  159. }
  160. }
  161. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  162. plugin.Status = models.InstallTaskStatusRunning
  163. plugin.Message = "Installing"
  164. })
  165. var stream *stream.Stream[plugin_manager.PluginInstallResponse]
  166. if config.Platform == app.PLATFORM_SERVERLESS {
  167. var zipDecoder *decoder.ZipPluginDecoder
  168. var pkgFile []byte
  169. pkgFile, err = manager.GetPackage(pluginUniqueIdentifier)
  170. if err != nil {
  171. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  172. task.Status = models.InstallTaskStatusFailed
  173. plugin.Status = models.InstallTaskStatusFailed
  174. plugin.Message = "Failed to read plugin package"
  175. })
  176. return
  177. }
  178. zipDecoder, err = decoder.NewZipPluginDecoder(pkgFile)
  179. if err != nil {
  180. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  181. task.Status = models.InstallTaskStatusFailed
  182. plugin.Status = models.InstallTaskStatusFailed
  183. plugin.Message = err.Error()
  184. })
  185. return
  186. }
  187. stream, err = manager.InstallToAWSFromPkg(pkgFile, zipDecoder, source, metas[i])
  188. } else if config.Platform == app.PLATFORM_LOCAL {
  189. stream, err = manager.InstallToLocal(pluginUniqueIdentifier, source, metas[i])
  190. } else {
  191. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  192. task.Status = models.InstallTaskStatusFailed
  193. plugin.Status = models.InstallTaskStatusFailed
  194. plugin.Message = "Unsupported platform"
  195. })
  196. return
  197. }
  198. if err != nil {
  199. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  200. task.Status = models.InstallTaskStatusFailed
  201. plugin.Status = models.InstallTaskStatusFailed
  202. plugin.Message = err.Error()
  203. })
  204. return
  205. }
  206. for stream.Next() {
  207. message, err := stream.Read()
  208. if err != nil {
  209. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  210. task.Status = models.InstallTaskStatusFailed
  211. plugin.Status = models.InstallTaskStatusFailed
  212. plugin.Message = err.Error()
  213. })
  214. return
  215. }
  216. if message.Event == plugin_manager.PluginInstallEventError {
  217. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  218. task.Status = models.InstallTaskStatusFailed
  219. plugin.Status = models.InstallTaskStatusFailed
  220. plugin.Message = message.Data
  221. })
  222. return
  223. }
  224. if message.Event == plugin_manager.PluginInstallEventDone {
  225. if err := onDone(pluginUniqueIdentifier, declaration, metas[i]); err != nil {
  226. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  227. task.Status = models.InstallTaskStatusFailed
  228. plugin.Status = models.InstallTaskStatusFailed
  229. plugin.Message = "Failed to create plugin, perhaps it's already installed"
  230. })
  231. return
  232. }
  233. }
  234. }
  235. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  236. plugin.Status = models.InstallTaskStatusSuccess
  237. plugin.Message = "Installed"
  238. task.CompletedPlugins++
  239. // check if all plugins are installed
  240. if task.CompletedPlugins == task.TotalPlugins {
  241. task.Status = models.InstallTaskStatusSuccess
  242. }
  243. })
  244. })
  245. }
  246. // submit async tasks
  247. routine.WithMaxRoutine(5, tasks)
  248. return response, nil
  249. }
  250. func InstallPluginFromIdentifiers(
  251. config *app.Config,
  252. tenant_id string,
  253. plugin_unique_identifiers []plugin_entities.PluginUniqueIdentifier,
  254. source string,
  255. metas []map[string]any,
  256. ) *entities.Response {
  257. response, err := InstallPluginRuntimeToTenant(
  258. config,
  259. tenant_id,
  260. plugin_unique_identifiers,
  261. source,
  262. metas,
  263. func(
  264. pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
  265. declaration *plugin_entities.PluginDeclaration,
  266. meta map[string]any,
  267. ) error {
  268. runtimeType := plugin_entities.PluginRuntimeType("")
  269. switch config.Platform {
  270. case app.PLATFORM_SERVERLESS:
  271. runtimeType = plugin_entities.PLUGIN_RUNTIME_TYPE_SERVERLESS
  272. case app.PLATFORM_LOCAL:
  273. runtimeType = plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL
  274. default:
  275. return fmt.Errorf("unsupported platform: %s", config.Platform)
  276. }
  277. _, _, err := curd.InstallPlugin(
  278. tenant_id,
  279. pluginUniqueIdentifier,
  280. runtimeType,
  281. declaration,
  282. source,
  283. meta,
  284. )
  285. return err
  286. },
  287. )
  288. if err != nil {
  289. if errors.Is(err, curd.ErrPluginAlreadyInstalled) {
  290. return exception.BadRequestError(err).ToResponse()
  291. }
  292. return exception.InternalServerError(err).ToResponse()
  293. }
  294. return entities.NewSuccessResponse(response)
  295. }
  296. func UpgradePlugin(
  297. config *app.Config,
  298. tenant_id string,
  299. source string,
  300. meta map[string]any,
  301. original_plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  302. new_plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  303. ) *entities.Response {
  304. if original_plugin_unique_identifier == new_plugin_unique_identifier {
  305. return exception.BadRequestError(errors.New("original and new plugin unique identifier are the same")).ToResponse()
  306. }
  307. if original_plugin_unique_identifier.PluginID() != new_plugin_unique_identifier.PluginID() {
  308. return exception.BadRequestError(errors.New("original and new plugin id are different")).ToResponse()
  309. }
  310. // uninstall the original plugin
  311. installation, err := db.GetOne[models.PluginInstallation](
  312. db.Equal("tenant_id", tenant_id),
  313. db.Equal("plugin_unique_identifier", original_plugin_unique_identifier.String()),
  314. db.Equal("source", source),
  315. )
  316. if err == db.ErrDatabaseNotFound {
  317. return exception.NotFoundError(errors.New("plugin installation not found for this tenant")).ToResponse()
  318. }
  319. if err != nil {
  320. return exception.InternalServerError(err).ToResponse()
  321. }
  322. // install the new plugin runtime
  323. response, err := InstallPluginRuntimeToTenant(
  324. config,
  325. tenant_id,
  326. []plugin_entities.PluginUniqueIdentifier{new_plugin_unique_identifier},
  327. source,
  328. []map[string]any{meta},
  329. func(
  330. pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
  331. declaration *plugin_entities.PluginDeclaration,
  332. meta map[string]any,
  333. ) error {
  334. // uninstall the original plugin
  335. upgradeResponse, err := curd.UpgradePlugin(
  336. tenant_id,
  337. original_plugin_unique_identifier,
  338. new_plugin_unique_identifier,
  339. declaration,
  340. plugin_entities.PluginRuntimeType(installation.RuntimeType),
  341. source,
  342. meta,
  343. )
  344. if err != nil {
  345. return err
  346. }
  347. if upgradeResponse.IsOriginalPluginDeleted {
  348. // delete the plugin if no installation left
  349. manager := plugin_manager.Manager()
  350. if string(upgradeResponse.DeletedPlugin.InstallType) == string(
  351. plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL,
  352. ) {
  353. err = manager.UninstallFromLocal(
  354. plugin_entities.PluginUniqueIdentifier(upgradeResponse.DeletedPlugin.PluginUniqueIdentifier),
  355. )
  356. if err != nil {
  357. return err
  358. }
  359. }
  360. }
  361. return nil
  362. },
  363. )
  364. if err != nil {
  365. return exception.InternalServerError(err).ToResponse()
  366. }
  367. return entities.NewSuccessResponse(response)
  368. }
  369. func FetchPluginInstallationTasks(
  370. tenant_id string,
  371. page int,
  372. page_size int,
  373. ) *entities.Response {
  374. tasks, err := db.GetAll[models.InstallTask](
  375. db.Equal("tenant_id", tenant_id),
  376. db.OrderBy("created_at", true),
  377. db.Page(page, page_size),
  378. )
  379. if err != nil {
  380. return exception.InternalServerError(err).ToResponse()
  381. }
  382. return entities.NewSuccessResponse(tasks)
  383. }
  384. func FetchPluginInstallationTask(
  385. tenant_id string,
  386. task_id string,
  387. ) *entities.Response {
  388. task, err := db.GetOne[models.InstallTask](
  389. db.Equal("id", task_id),
  390. db.Equal("tenant_id", tenant_id),
  391. )
  392. if err != nil {
  393. return exception.InternalServerError(err).ToResponse()
  394. }
  395. return entities.NewSuccessResponse(task)
  396. }
  397. func DeletePluginInstallationTask(
  398. tenant_id string,
  399. task_id string,
  400. ) *entities.Response {
  401. err := db.DeleteByCondition(
  402. models.InstallTask{
  403. Model: models.Model{
  404. ID: task_id,
  405. },
  406. TenantID: tenant_id,
  407. },
  408. )
  409. if err != nil {
  410. return exception.InternalServerError(err).ToResponse()
  411. }
  412. return entities.NewSuccessResponse(true)
  413. }
  414. func DeleteAllPluginInstallationTasks(
  415. tenant_id string,
  416. ) *entities.Response {
  417. err := db.DeleteByCondition(
  418. models.InstallTask{
  419. TenantID: tenant_id,
  420. },
  421. )
  422. if err != nil {
  423. return exception.InternalServerError(err).ToResponse()
  424. }
  425. return entities.NewSuccessResponse(true)
  426. }
  427. func DeletePluginInstallationItemFromTask(
  428. tenant_id string,
  429. task_id string,
  430. identifier plugin_entities.PluginUniqueIdentifier,
  431. ) *entities.Response {
  432. err := db.WithTransaction(func(tx *gorm.DB) error {
  433. item, err := db.GetOne[models.InstallTask](
  434. db.WithTransactionContext(tx),
  435. db.Equal("id", task_id),
  436. db.Equal("tenant_id", tenant_id),
  437. db.WLock(),
  438. )
  439. if err != nil {
  440. return err
  441. }
  442. plugins := []models.InstallTaskPluginStatus{}
  443. for _, plugin := range item.Plugins {
  444. if plugin.PluginUniqueIdentifier != identifier {
  445. plugins = append(plugins, plugin)
  446. }
  447. }
  448. successes := 0
  449. for _, plugin := range plugins {
  450. if plugin.Status == models.InstallTaskStatusSuccess {
  451. successes++
  452. }
  453. }
  454. if len(plugins) == successes {
  455. // delete the task if all plugins are installed successfully
  456. err = db.Delete(&item, tx)
  457. } else {
  458. item.Plugins = plugins
  459. err = db.Update(&item, tx)
  460. }
  461. return err
  462. })
  463. if err != nil {
  464. return exception.InternalServerError(err).ToResponse()
  465. }
  466. return entities.NewSuccessResponse(true)
  467. }
  468. func FetchPluginFromIdentifier(
  469. pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
  470. ) *entities.Response {
  471. _, err := db.GetOne[models.Plugin](
  472. db.Equal("plugin_unique_identifier", pluginUniqueIdentifier.String()),
  473. )
  474. if err == db.ErrDatabaseNotFound {
  475. return entities.NewSuccessResponse(false)
  476. }
  477. if err != nil {
  478. return exception.InternalServerError(err).ToResponse()
  479. }
  480. return entities.NewSuccessResponse(true)
  481. }
  482. func UninstallPlugin(
  483. tenant_id string,
  484. plugin_installation_id string,
  485. ) *entities.Response {
  486. // Check if the plugin exists for the tenant
  487. installation, err := db.GetOne[models.PluginInstallation](
  488. db.Equal("tenant_id", tenant_id),
  489. db.Equal("id", plugin_installation_id),
  490. )
  491. if err == db.ErrDatabaseNotFound {
  492. return exception.ErrPluginNotFound().ToResponse()
  493. }
  494. if err != nil {
  495. return exception.InternalServerError(err).ToResponse()
  496. }
  497. pluginUniqueIdentifier, err := plugin_entities.NewPluginUniqueIdentifier(installation.PluginUniqueIdentifier)
  498. if err != nil {
  499. return exception.UniqueIdentifierError(err).ToResponse()
  500. }
  501. // Uninstall the plugin
  502. deleteResponse, err := curd.UninstallPlugin(
  503. tenant_id,
  504. pluginUniqueIdentifier,
  505. installation.ID,
  506. )
  507. if err != nil {
  508. return exception.InternalServerError(fmt.Errorf("failed to uninstall plugin: %s", err.Error())).ToResponse()
  509. }
  510. if deleteResponse.IsPluginDeleted {
  511. // delete the plugin if no installation left
  512. manager := plugin_manager.Manager()
  513. if deleteResponse.Installation.RuntimeType == string(
  514. plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL,
  515. ) {
  516. err = manager.UninstallFromLocal(pluginUniqueIdentifier)
  517. if err != nil {
  518. return exception.InternalServerError(fmt.Errorf("failed to uninstall plugin: %s", err.Error())).ToResponse()
  519. }
  520. }
  521. }
  522. return entities.NewSuccessResponse(true)
  523. }