install_plugin.go 15 KB


  1. package service
  2. import (
  3. "fmt"
  4. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
  6. "github.com/langgenius/dify-plugin-daemon/internal/db"
  7. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  10. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  11. "github.com/langgenius/dify-plugin-daemon/internal/types/models/curd"
  12. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache/helper"
  13. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  14. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  15. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  16. "gorm.io/gorm"
  17. )
  18. type InstallPluginResponse struct {
  19. AllInstalled bool `json:"all_installed"`
  20. TaskID string `json:"task_id"`
  21. }
  22. type InstallPluginOnDoneHandler func(
  23. plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  24. declaration *plugin_entities.PluginDeclaration,
  25. ) error
  26. func InstallPluginRuntimeToTenant(
  27. config *app.Config,
  28. tenant_id string,
  29. plugin_unique_identifiers []plugin_entities.PluginUniqueIdentifier,
  30. source string,
  31. meta map[string]any,
  32. on_done InstallPluginOnDoneHandler, // since installing plugin is a async task, we need to call it asynchronously
  33. ) (*InstallPluginResponse, error) {
  34. response := &InstallPluginResponse{}
  35. plugins_wait_for_installation := []plugin_entities.PluginUniqueIdentifier{}
  36. task := &models.InstallTask{
  37. Status: models.InstallTaskStatusRunning,
  38. TenantID: tenant_id,
  39. TotalPlugins: len(plugin_unique_identifiers),
  40. CompletedPlugins: 0,
  41. Plugins: []models.InstallTaskPluginStatus{},
  42. }
  43. for i, plugin_unique_identifier := range plugin_unique_identifiers {
  44. // fetch plugin declaration first, before installing, we need to ensure pkg is uploaded
  45. plugin_declaration, err := helper.CombinedGetPluginDeclaration(plugin_unique_identifier)
  46. if err != nil {
  47. return nil, err
  48. }
  49. // check if plugin is already installed
  50. plugin, err := db.GetOne[models.Plugin](
  51. db.Equal("plugin_unique_identifier", plugin_unique_identifier.String()),
  52. )
  53. task.Plugins = append(task.Plugins, models.InstallTaskPluginStatus{
  54. PluginUniqueIdentifier: plugin_unique_identifier,
  55. PluginID: plugin_unique_identifier.PluginID(),
  56. Status: models.InstallTaskStatusPending,
  57. Icon: plugin_declaration.Icon,
  58. Labels: plugin_declaration.Label,
  59. Message: "",
  60. })
  61. if err == nil {
  62. // already installed by other tenant
  63. declaration := plugin.Declaration
  64. if _, _, err := curd.InstallPlugin(
  65. tenant_id,
  66. plugin_unique_identifier,
  67. plugin.InstallType,
  68. &declaration,
  69. source,
  70. meta,
  71. ); err != nil {
  72. return nil, err
  73. }
  74. task.CompletedPlugins++
  75. task.Plugins[i].Status = models.InstallTaskStatusSuccess
  76. task.Plugins[i].Message = "Installed"
  77. continue
  78. }
  79. if err != db.ErrDatabaseNotFound {
  80. return nil, err
  81. }
  82. plugins_wait_for_installation = append(plugins_wait_for_installation, plugin_unique_identifier)
  83. }
  84. if len(plugins_wait_for_installation) == 0 {
  85. response.AllInstalled = true
  86. response.TaskID = ""
  87. return response, nil
  88. }
  89. err := db.Create(task)
  90. if err != nil {
  91. return nil, err
  92. }
  93. response.TaskID = task.ID
  94. manager := plugin_manager.Manager()
  95. tasks := []func(){}
  96. for _, plugin_unique_identifier := range plugins_wait_for_installation {
  97. // copy the variable to avoid race condition
  98. plugin_unique_identifier := plugin_unique_identifier
  99. declaration, err := manager.GetDeclaration(plugin_unique_identifier)
  100. if err != nil {
  101. return nil, err
  102. }
  103. tasks = append(tasks, func() {
  104. updateTaskStatus := func(modifier func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus)) {
  105. if err := db.WithTransaction(func(tx *gorm.DB) error {
  106. task, err := db.GetOne[models.InstallTask](
  107. db.WithTransactionContext(tx),
  108. db.Equal("id", task.ID),
  109. db.WLock(), // write lock, multiple tasks can't update the same task
  110. )
  111. if err == db.ErrDatabaseNotFound {
  112. return nil
  113. }
  114. if err != nil {
  115. return err
  116. }
  117. task_pointer := &task
  118. var plugin_status *models.InstallTaskPluginStatus
  119. for i := range task.Plugins {
  120. if task.Plugins[i].PluginUniqueIdentifier == plugin_unique_identifier {
  121. plugin_status = &task.Plugins[i]
  122. break
  123. }
  124. }
  125. if plugin_status == nil {
  126. return nil
  127. }
  128. modifier(task_pointer, plugin_status)
  129. return db.Update(task_pointer, tx)
  130. }); err != nil {
  131. log.Error("failed to update install task status %s", err.Error())
  132. }
  133. }
  134. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  135. plugin.Status = models.InstallTaskStatusRunning
  136. plugin.Message = "Installing"
  137. })
  138. var stream *stream.Stream[plugin_manager.PluginInstallResponse]
  139. if config.Platform == app.PLATFORM_AWS_LAMBDA {
  140. var zip_decoder *decoder.ZipPluginDecoder
  141. var pkg_file []byte
  142. pkg_file, err = manager.GetPackage(plugin_unique_identifier)
  143. if err != nil {
  144. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  145. task.Status = models.InstallTaskStatusFailed
  146. plugin.Status = models.InstallTaskStatusFailed
  147. plugin.Message = "Failed to read plugin package"
  148. })
  149. return
  150. }
  151. zip_decoder, err = decoder.NewZipPluginDecoder(pkg_file)
  152. if err != nil {
  153. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  154. task.Status = models.InstallTaskStatusFailed
  155. plugin.Status = models.InstallTaskStatusFailed
  156. plugin.Message = err.Error()
  157. })
  158. return
  159. }
  160. stream, err = manager.InstallToAWSFromPkg(zip_decoder, source, meta)
  161. } else if config.Platform == app.PLATFORM_LOCAL {
  162. stream, err = manager.InstallToLocal(plugin_unique_identifier, source, meta)
  163. } else {
  164. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  165. task.Status = models.InstallTaskStatusFailed
  166. plugin.Status = models.InstallTaskStatusFailed
  167. plugin.Message = "Unsupported platform"
  168. })
  169. return
  170. }
  171. if err != nil {
  172. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  173. task.Status = models.InstallTaskStatusFailed
  174. plugin.Status = models.InstallTaskStatusFailed
  175. plugin.Message = err.Error()
  176. })
  177. return
  178. }
  179. for stream.Next() {
  180. message, err := stream.Read()
  181. if err != nil {
  182. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  183. task.Status = models.InstallTaskStatusFailed
  184. plugin.Status = models.InstallTaskStatusFailed
  185. plugin.Message = err.Error()
  186. })
  187. return
  188. }
  189. if message.Event == plugin_manager.PluginInstallEventError {
  190. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  191. task.Status = models.InstallTaskStatusFailed
  192. plugin.Status = models.InstallTaskStatusFailed
  193. plugin.Message = message.Data
  194. })
  195. return
  196. }
  197. if message.Event == plugin_manager.PluginInstallEventDone {
  198. if err := on_done(plugin_unique_identifier, declaration); err != nil {
  199. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  200. task.Status = models.InstallTaskStatusFailed
  201. plugin.Status = models.InstallTaskStatusFailed
  202. plugin.Message = "Failed to create plugin"
  203. })
  204. return
  205. }
  206. }
  207. }
  208. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  209. plugin.Status = models.InstallTaskStatusSuccess
  210. plugin.Message = "Installed"
  211. task.CompletedPlugins++
  212. // check if all plugins are installed
  213. if task.CompletedPlugins == task.TotalPlugins {
  214. task.Status = models.InstallTaskStatusSuccess
  215. }
  216. })
  217. })
  218. }
  219. // submit async tasks
  220. routine.WithMaxRoutine(3, tasks)
  221. return response, nil
  222. }
  223. func InstallPluginFromIdentifiers(
  224. config *app.Config,
  225. tenant_id string,
  226. plugin_unique_identifiers []plugin_entities.PluginUniqueIdentifier,
  227. source string,
  228. meta map[string]any,
  229. ) *entities.Response {
  230. response, err := InstallPluginRuntimeToTenant(config, tenant_id, plugin_unique_identifiers, source, meta, func(
  231. plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  232. declaration *plugin_entities.PluginDeclaration,
  233. ) error {
  234. runtime_type := plugin_entities.PluginRuntimeType("")
  235. switch config.Platform {
  236. case app.PLATFORM_AWS_LAMBDA:
  237. runtime_type = plugin_entities.PLUGIN_RUNTIME_TYPE_AWS
  238. case app.PLATFORM_LOCAL:
  239. runtime_type = plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL
  240. default:
  241. return fmt.Errorf("unsupported platform: %s", config.Platform)
  242. }
  243. _, _, err := curd.InstallPlugin(
  244. tenant_id,
  245. plugin_unique_identifier,
  246. runtime_type,
  247. declaration,
  248. source,
  249. meta,
  250. )
  251. return err
  252. })
  253. if err != nil {
  254. return entities.NewErrorResponse(-500, err.Error())
  255. }
  256. return entities.NewSuccessResponse(response)
  257. }
  258. func UpgradePlugin(
  259. config *app.Config,
  260. tenant_id string,
  261. source string,
  262. meta map[string]any,
  263. original_plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  264. new_plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  265. ) *entities.Response {
  266. if original_plugin_unique_identifier == new_plugin_unique_identifier {
  267. return entities.NewErrorResponse(-400, "original and new plugin unique identifier are the same")
  268. }
  269. if original_plugin_unique_identifier.PluginID() != new_plugin_unique_identifier.PluginID() {
  270. return entities.NewErrorResponse(-400, "original and new plugin id are different")
  271. }
  272. // uninstall the original plugin
  273. installation, err := db.GetOne[models.PluginInstallation](
  274. db.Equal("tenant_id", tenant_id),
  275. db.Equal("plugin_unique_identifier", original_plugin_unique_identifier.String()),
  276. db.Equal("source", source),
  277. )
  278. if err == db.ErrDatabaseNotFound {
  279. return entities.NewErrorResponse(-404, "Plugin installation not found for this tenant")
  280. }
  281. if err != nil {
  282. return entities.NewErrorResponse(-500, err.Error())
  283. }
  284. // install the new plugin runtime
  285. response, err := InstallPluginRuntimeToTenant(
  286. config,
  287. tenant_id,
  288. []plugin_entities.PluginUniqueIdentifier{new_plugin_unique_identifier},
  289. source,
  290. meta,
  291. func(
  292. plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  293. declaration *plugin_entities.PluginDeclaration,
  294. ) error {
  295. // uninstall the original plugin
  296. err = curd.UpgradePlugin(
  297. tenant_id,
  298. original_plugin_unique_identifier,
  299. new_plugin_unique_identifier,
  300. declaration,
  301. plugin_entities.PluginRuntimeType(installation.RuntimeType),
  302. source,
  303. meta,
  304. )
  305. if err != nil {
  306. return err
  307. }
  308. return nil
  309. },
  310. )
  311. if err != nil {
  312. return entities.NewErrorResponse(-500, err.Error())
  313. }
  314. return entities.NewSuccessResponse(response)
  315. }
  316. func FetchPluginInstallationTasks(
  317. tenant_id string,
  318. page int,
  319. page_size int,
  320. ) *entities.Response {
  321. tasks, err := db.GetAll[models.InstallTask](
  322. db.Equal("tenant_id", tenant_id),
  323. db.OrderBy("created_at", true),
  324. db.Page(page, page_size),
  325. )
  326. if err != nil {
  327. return entities.NewErrorResponse(-500, err.Error())
  328. }
  329. return entities.NewSuccessResponse(tasks)
  330. }
  331. func FetchPluginInstallationTask(
  332. tenant_id string,
  333. task_id string,
  334. ) *entities.Response {
  335. task, err := db.GetOne[models.InstallTask](
  336. db.Equal("id", task_id),
  337. db.Equal("tenant_id", tenant_id),
  338. )
  339. if err != nil {
  340. return entities.NewErrorResponse(-500, err.Error())
  341. }
  342. return entities.NewSuccessResponse(task)
  343. }
  344. func DeletePluginInstallationTask(
  345. tenant_id string,
  346. task_id string,
  347. ) *entities.Response {
  348. err := db.DeleteByCondition(
  349. models.InstallTask{
  350. Model: models.Model{
  351. ID: task_id,
  352. },
  353. TenantID: tenant_id,
  354. },
  355. )
  356. if err != nil {
  357. return entities.NewErrorResponse(-500, err.Error())
  358. }
  359. return entities.NewSuccessResponse(true)
  360. }
  361. func DeletePluginInstallationItemFromTask(
  362. tenant_id string,
  363. task_id string,
  364. identifier plugin_entities.PluginUniqueIdentifier,
  365. ) *entities.Response {
  366. item, err := db.GetOne[models.InstallTask](
  367. db.Equal("task_id", task_id),
  368. db.Equal("tenant_id", tenant_id),
  369. )
  370. if err != nil {
  371. return entities.NewErrorResponse(-500, err.Error())
  372. }
  373. plugins := []models.InstallTaskPluginStatus{}
  374. for _, plugin := range item.Plugins {
  375. if plugin.PluginUniqueIdentifier != identifier {
  376. plugins = append(plugins, plugin)
  377. }
  378. }
  379. if len(plugins) == 0 {
  380. err = db.Delete(&item)
  381. } else {
  382. item.Plugins = plugins
  383. err = db.Update(&item)
  384. }
  385. if err != nil {
  386. return entities.NewErrorResponse(-500, err.Error())
  387. }
  388. return entities.NewSuccessResponse(true)
  389. }
  390. func FetchPluginFromIdentifier(
  391. plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  392. ) *entities.Response {
  393. _, err := db.GetOne[models.Plugin](
  394. db.Equal("plugin_unique_identifier", plugin_unique_identifier.String()),
  395. )
  396. if err == db.ErrDatabaseNotFound {
  397. return entities.NewSuccessResponse(false)
  398. }
  399. if err != nil {
  400. return entities.NewErrorResponse(-500, err.Error())
  401. }
  402. return entities.NewSuccessResponse(true)
  403. }
  404. func UninstallPlugin(
  405. tenant_id string,
  406. plugin_installation_id string,
  407. ) *entities.Response {
  408. // Check if the plugin exists for the tenant
  409. installation, err := db.GetOne[models.PluginInstallation](
  410. db.Equal("tenant_id", tenant_id),
  411. db.Equal("id", plugin_installation_id),
  412. )
  413. if err == db.ErrDatabaseNotFound {
  414. return entities.NewErrorResponse(-404, "Plugin installation not found for this tenant")
  415. }
  416. if err != nil {
  417. return entities.NewErrorResponse(-500, err.Error())
  418. }
  419. plugin_unique_identifier, err := plugin_entities.NewPluginUniqueIdentifier(installation.PluginUniqueIdentifier)
  420. if err != nil {
  421. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to parse plugin unique identifier: %v", err))
  422. }
  423. // Uninstall the plugin
  424. delete_response, err := curd.UninstallPlugin(
  425. tenant_id,
  426. plugin_unique_identifier,
  427. installation.ID,
  428. )
  429. if err != nil {
  430. return entities.NewErrorResponse(-500, fmt.Sprintf("Failed to uninstall plugin: %s", err.Error()))
  431. }
  432. if delete_response.IsPluginDeleted {
  433. // delete the plugin if no installation left
  434. manager := plugin_manager.Manager()
  435. if delete_response.Installation.RuntimeType == string(
  436. plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL,
  437. ) {
  438. err = manager.UninstallFromLocal(plugin_unique_identifier)
  439. if err != nil {
  440. return entities.NewErrorResponse(-500, fmt.Sprintf("Failed to uninstall plugin: %s", err.Error()))
  441. }
  442. }
  443. }
  444. return entities.NewSuccessResponse(true)
  445. }