install_plugin.go 10 KB


  1. package service
  2. import (
  3. "fmt"
  4. "os"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
  7. "github.com/langgenius/dify-plugin-daemon/internal/db"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  10. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  11. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  12. "github.com/langgenius/dify-plugin-daemon/internal/types/models/curd"
  13. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  14. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  15. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  16. "gorm.io/gorm"
  17. )
  18. func InstallPluginFromIdentifiers(
  19. config *app.Config,
  20. tenant_id string,
  21. plugin_unique_identifiers []plugin_entities.PluginUniqueIdentifier,
  22. source string,
  23. meta map[string]any,
  24. ) *entities.Response {
  25. var response struct {
  26. AllInstalled bool `json:"all_installed"`
  27. TaskID string `json:"task_id"`
  28. }
  29. // TODO: create installation task and dispatch to workers
  30. plugins_wait_for_installation := []plugin_entities.PluginUniqueIdentifier{}
  31. task := &models.InstallTask{
  32. Status: models.InstallTaskStatusRunning,
  33. TenantID: tenant_id,
  34. TotalPlugins: len(plugin_unique_identifiers),
  35. CompletedPlugins: 0,
  36. Plugins: []models.InstallTaskPluginStatus{},
  37. }
  38. for i, plugin_unique_identifier := range plugin_unique_identifiers {
  39. // check if plugin is already installed
  40. plugin, err := db.GetOne[models.Plugin](
  41. db.Equal("plugin_unique_identifier", plugin_unique_identifier.String()),
  42. )
  43. task.Plugins = append(task.Plugins, models.InstallTaskPluginStatus{
  44. PluginUniqueIdentifier: plugin_unique_identifier,
  45. PluginID: plugin_unique_identifier.PluginID(),
  46. Status: models.InstallTaskStatusPending,
  47. Message: "",
  48. })
  49. if err == nil {
  50. // already installed by other tenant
  51. declaration := plugin.Declaration
  52. if _, _, err := curd.InstallPlugin(
  53. tenant_id,
  54. plugin_unique_identifier,
  55. plugin.InstallType,
  56. &declaration,
  57. source,
  58. meta,
  59. ); err != nil {
  60. return entities.NewErrorResponse(-500, err.Error())
  61. }
  62. task.CompletedPlugins++
  63. task.Plugins[i].Status = models.InstallTaskStatusSuccess
  64. task.Plugins[i].Message = "Installed"
  65. continue
  66. }
  67. if err != db.ErrDatabaseNotFound {
  68. return entities.NewErrorResponse(-500, err.Error())
  69. }
  70. plugins_wait_for_installation = append(plugins_wait_for_installation, plugin_unique_identifier)
  71. }
  72. if len(plugins_wait_for_installation) == 0 {
  73. response.AllInstalled = true
  74. response.TaskID = ""
  75. return entities.NewSuccessResponse(response)
  76. }
  77. err := db.Create(task)
  78. if err != nil {
  79. return entities.NewErrorResponse(-500, err.Error())
  80. }
  81. response.TaskID = task.ID
  82. manager := plugin_manager.Manager()
  83. tasks := []func(){}
  84. for _, plugin_unique_identifier := range plugins_wait_for_installation {
  85. tasks = append(tasks, func() {
  86. updateTaskStatus := func(modifier func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus)) {
  87. if err := db.WithTransaction(func(tx *gorm.DB) error {
  88. task, err := db.GetOne[models.InstallTask](
  89. db.WithTransactionContext(tx),
  90. db.Equal("id", task.ID),
  91. db.WLock(), // write lock, multiple tasks can't update the same task
  92. )
  93. if err == db.ErrDatabaseNotFound {
  94. return nil
  95. }
  96. if err != nil {
  97. return err
  98. }
  99. task_pointer := &task
  100. var plugin_status *models.InstallTaskPluginStatus
  101. for i := range task.Plugins {
  102. if task.Plugins[i].PluginUniqueIdentifier == plugin_unique_identifier {
  103. plugin_status = &task.Plugins[i]
  104. break
  105. }
  106. }
  107. if plugin_status == nil {
  108. return nil
  109. }
  110. modifier(task_pointer, plugin_status)
  111. return db.Update(task_pointer, tx)
  112. }); err != nil {
  113. log.Error("failed to update install task status %s", err.Error())
  114. }
  115. }
  116. pkg_path, err := manager.GetPackagePath(plugin_unique_identifier)
  117. if err != nil {
  118. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  119. task.Status = models.InstallTaskStatusFailed
  120. plugin.Status = models.InstallTaskStatusFailed
  121. plugin.Message = err.Error()
  122. })
  123. return
  124. }
  125. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  126. plugin.Status = models.InstallTaskStatusRunning
  127. plugin.Message = "Installing"
  128. })
  129. var stream *stream.Stream[plugin_manager.PluginInstallResponse]
  130. if config.Platform == app.PLATFORM_AWS_LAMBDA {
  131. var zip_decoder *decoder.ZipPluginDecoder
  132. var pkg_file []byte
  133. pkg_file, err = os.ReadFile(pkg_path)
  134. if err != nil {
  135. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  136. task.Status = models.InstallTaskStatusFailed
  137. plugin.Status = models.InstallTaskStatusFailed
  138. plugin.Message = "Failed to read plugin package"
  139. })
  140. return
  141. }
  142. zip_decoder, err = decoder.NewZipPluginDecoder(pkg_file)
  143. if err != nil {
  144. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  145. task.Status = models.InstallTaskStatusFailed
  146. plugin.Status = models.InstallTaskStatusFailed
  147. plugin.Message = err.Error()
  148. })
  149. return
  150. }
  151. stream, err = manager.InstallToAWSFromPkg(tenant_id, zip_decoder, source, meta)
  152. } else if config.Platform == app.PLATFORM_LOCAL {
  153. stream, err = manager.InstallToLocal(tenant_id, pkg_path, plugin_unique_identifier, source, meta)
  154. } else {
  155. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  156. task.Status = models.InstallTaskStatusFailed
  157. plugin.Status = models.InstallTaskStatusFailed
  158. plugin.Message = "Unsupported platform"
  159. })
  160. return
  161. }
  162. if err != nil {
  163. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  164. task.Status = models.InstallTaskStatusFailed
  165. plugin.Status = models.InstallTaskStatusFailed
  166. plugin.Message = err.Error()
  167. })
  168. return
  169. }
  170. for stream.Next() {
  171. message, err := stream.Read()
  172. if err != nil {
  173. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  174. task.Status = models.InstallTaskStatusFailed
  175. plugin.Status = models.InstallTaskStatusFailed
  176. plugin.Message = err.Error()
  177. })
  178. return
  179. }
  180. if message.Event == plugin_manager.PluginInstallEventError {
  181. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  182. task.Status = models.InstallTaskStatusFailed
  183. plugin.Status = models.InstallTaskStatusFailed
  184. plugin.Message = message.Data
  185. })
  186. return
  187. }
  188. }
  189. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  190. plugin.Status = models.InstallTaskStatusSuccess
  191. plugin.Message = "Installed"
  192. task.CompletedPlugins++
  193. // check if all plugins are installed
  194. if task.CompletedPlugins == task.TotalPlugins {
  195. task.Status = models.InstallTaskStatusSuccess
  196. }
  197. })
  198. })
  199. }
  200. // submit async tasks
  201. routine.WithMaxRoutine(3, tasks)
  202. return entities.NewSuccessResponse(response)
  203. }
  204. func FetchPluginInstallationTasks(
  205. tenant_id string,
  206. page int,
  207. page_size int,
  208. ) *entities.Response {
  209. tasks, err := db.GetAll[models.InstallTask](
  210. db.Equal("tenant_id", tenant_id),
  211. db.OrderBy("created_at", true),
  212. db.Page(page, page_size),
  213. )
  214. if err != nil {
  215. return entities.NewErrorResponse(-500, err.Error())
  216. }
  217. return entities.NewSuccessResponse(tasks)
  218. }
  219. func FetchPluginInstallationTask(
  220. tenant_id string,
  221. task_id string,
  222. ) *entities.Response {
  223. task, err := db.GetOne[models.InstallTask](
  224. db.Equal("id", task_id),
  225. db.Equal("tenant_id", tenant_id),
  226. )
  227. if err != nil {
  228. return entities.NewErrorResponse(-500, err.Error())
  229. }
  230. return entities.NewSuccessResponse(task)
  231. }
  232. func DeletePluginInstallationTask(
  233. tenant_id string,
  234. task_id string,
  235. ) *entities.Response {
  236. err := db.DeleteByCondition(
  237. models.InstallTask{
  238. Model: models.Model{
  239. ID: task_id,
  240. },
  241. TenantID: tenant_id,
  242. },
  243. )
  244. if err != nil {
  245. return entities.NewErrorResponse(-500, err.Error())
  246. }
  247. return entities.NewSuccessResponse(true)
  248. }
  249. func DeletePluginInstallationItemFromTask(
  250. tenant_id string,
  251. task_id string,
  252. identifier plugin_entities.PluginUniqueIdentifier,
  253. ) *entities.Response {
  254. item, err := db.GetOne[models.InstallTask](
  255. db.Equal("task_id", task_id),
  256. db.Equal("tenant_id", tenant_id),
  257. )
  258. if err != nil {
  259. return entities.NewErrorResponse(-500, err.Error())
  260. }
  261. plugins := []models.InstallTaskPluginStatus{}
  262. for _, plugin := range item.Plugins {
  263. if plugin.PluginUniqueIdentifier != identifier {
  264. plugins = append(plugins, plugin)
  265. }
  266. }
  267. if len(plugins) == 0 {
  268. err = db.Delete(&item)
  269. } else {
  270. item.Plugins = plugins
  271. err = db.Update(&item)
  272. }
  273. if err != nil {
  274. return entities.NewErrorResponse(-500, err.Error())
  275. }
  276. return entities.NewSuccessResponse(true)
  277. }
  278. func FetchPluginFromIdentifier(
  279. plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  280. ) *entities.Response {
  281. _, err := db.GetOne[models.Plugin](
  282. db.Equal("plugin_unique_identifier", plugin_unique_identifier.String()),
  283. )
  284. if err == db.ErrDatabaseNotFound {
  285. return entities.NewSuccessResponse(false)
  286. }
  287. if err != nil {
  288. return entities.NewErrorResponse(-500, err.Error())
  289. }
  290. return entities.NewSuccessResponse(true)
  291. }
  292. func UninstallPlugin(
  293. tenant_id string,
  294. plugin_installation_id string,
  295. ) *entities.Response {
  296. // Check if the plugin exists for the tenant
  297. installation, err := db.GetOne[models.PluginInstallation](
  298. db.Equal("tenant_id", tenant_id),
  299. db.Equal("id", plugin_installation_id),
  300. )
  301. if err == db.ErrDatabaseNotFound {
  302. return entities.NewErrorResponse(-404, "Plugin installation not found for this tenant")
  303. }
  304. if err != nil {
  305. return entities.NewErrorResponse(-500, err.Error())
  306. }
  307. plugin_unique_identifier, err := plugin_entities.NewPluginUniqueIdentifier(installation.PluginUniqueIdentifier)
  308. if err != nil {
  309. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to parse plugin unique identifier: %v", err))
  310. }
  311. // Uninstall the plugin
  312. _, err = curd.UninstallPlugin(
  313. tenant_id,
  314. plugin_unique_identifier,
  315. installation.ID,
  316. )
  317. if err != nil {
  318. return entities.NewErrorResponse(-500, fmt.Sprintf("Failed to uninstall plugin: %s", err.Error()))
  319. }
  320. return entities.NewSuccessResponse(true)
  321. }