install_plugin.go 9.7 KB


  1. package service
  2. import (
  3. "errors"
  4. "fmt"
  5. "os"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  7. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
  8. "github.com/langgenius/dify-plugin-daemon/internal/db"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  10. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  11. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  12. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  13. "github.com/langgenius/dify-plugin-daemon/internal/types/models/curd"
  14. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  15. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  16. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  17. "gorm.io/gorm"
  18. )
  19. func InstallPluginFromIdentifiers(
  20. config *app.Config,
  21. tenant_id string,
  22. plugin_unique_identifiers []plugin_entities.PluginUniqueIdentifier,
  23. source string,
  24. meta map[string]any,
  25. ) *entities.Response {
  26. var response struct {
  27. AllInstalled bool `json:"all_installed"`
  28. TaskID string `json:"task_id"`
  29. }
  30. // TODO: create installation task and dispatch to workers
  31. plugins_wait_for_installation := []plugin_entities.PluginUniqueIdentifier{}
  32. task := &models.InstallTask{
  33. Status: models.InstallTaskStatusRunning,
  34. TenantID: tenant_id,
  35. TotalPlugins: len(plugin_unique_identifiers),
  36. CompletedPlugins: 0,
  37. Plugins: []models.InstallTaskPluginStatus{},
  38. }
  39. for i, plugin_unique_identifier := range plugin_unique_identifiers {
  40. // check if plugin is already installed
  41. plugin, err := db.GetOne[models.Plugin](
  42. db.Equal("plugin_unique_identifier", plugin_unique_identifier.String()),
  43. )
  44. task.Plugins = append(task.Plugins, models.InstallTaskPluginStatus{
  45. PluginUniqueIdentifier: plugin_unique_identifier,
  46. PluginID: plugin_unique_identifier.PluginID(),
  47. Status: models.InstallTaskStatusPending,
  48. Message: "",
  49. })
  50. if err == nil {
  51. // already installed by other tenant
  52. declaration := plugin.Declaration
  53. if _, _, err := curd.InstallPlugin(
  54. tenant_id,
  55. plugin_unique_identifier,
  56. plugin.InstallType,
  57. &declaration,
  58. source,
  59. meta,
  60. ); err != nil {
  61. return entities.NewErrorResponse(-500, err.Error())
  62. }
  63. task.CompletedPlugins++
  64. task.Plugins[i].Status = models.InstallTaskStatusSuccess
  65. task.Plugins[i].Message = "Installed"
  66. continue
  67. }
  68. if err != db.ErrDatabaseNotFound {
  69. return entities.NewErrorResponse(-500, err.Error())
  70. }
  71. plugins_wait_for_installation = append(plugins_wait_for_installation, plugin_unique_identifier)
  72. }
  73. if len(plugins_wait_for_installation) == 0 {
  74. response.AllInstalled = true
  75. response.TaskID = ""
  76. return entities.NewSuccessResponse(response)
  77. }
  78. err := db.Create(task)
  79. if err != nil {
  80. return entities.NewErrorResponse(-500, err.Error())
  81. }
  82. response.TaskID = task.ID
  83. manager := plugin_manager.Manager()
  84. tasks := []func(){}
  85. for _, plugin_unique_identifier := range plugins_wait_for_installation {
  86. tasks = append(tasks, func() {
  87. updateTaskStatus := func(modifier func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus)) {
  88. if err := db.WithTransaction(func(tx *gorm.DB) error {
  89. task, err := db.GetOne[models.InstallTask](
  90. db.WithTransactionContext(tx),
  91. db.Equal("id", task.ID),
  92. db.WLock(), // write lock, multiple tasks can't update the same task
  93. )
  94. if err != nil {
  95. return err
  96. }
  97. task_pointer := &task
  98. var plugin_status *models.InstallTaskPluginStatus
  99. for i := range task.Plugins {
  100. if task.Plugins[i].PluginUniqueIdentifier == plugin_unique_identifier {
  101. plugin_status = &task.Plugins[i]
  102. break
  103. }
  104. }
  105. if plugin_status == nil {
  106. return errors.New("plugin status not found")
  107. }
  108. modifier(task_pointer, plugin_status)
  109. return db.Update(task_pointer, tx)
  110. }); err != nil {
  111. log.Error("failed to update install task status %s", err.Error())
  112. }
  113. }
  114. pkg_path, err := manager.GetPackagePath(plugin_unique_identifier)
  115. if err != nil {
  116. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  117. task.Status = models.InstallTaskStatusFailed
  118. plugin.Status = models.InstallTaskStatusFailed
  119. plugin.Message = err.Error()
  120. })
  121. return
  122. }
  123. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  124. plugin.Status = models.InstallTaskStatusRunning
  125. plugin.Message = "Installing"
  126. })
  127. var stream *stream.Stream[plugin_manager.PluginInstallResponse]
  128. if config.Platform == app.PLATFORM_AWS_LAMBDA {
  129. var zip_decoder *decoder.ZipPluginDecoder
  130. var pkg_file []byte
  131. pkg_file, err = os.ReadFile(pkg_path)
  132. if err != nil {
  133. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  134. task.Status = models.InstallTaskStatusFailed
  135. plugin.Status = models.InstallTaskStatusFailed
  136. plugin.Message = "Failed to read plugin package"
  137. })
  138. return
  139. }
  140. zip_decoder, err = decoder.NewZipPluginDecoder(pkg_file)
  141. if err != nil {
  142. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  143. task.Status = models.InstallTaskStatusFailed
  144. plugin.Status = models.InstallTaskStatusFailed
  145. plugin.Message = err.Error()
  146. })
  147. return
  148. }
  149. stream, err = manager.InstallToAWSFromPkg(tenant_id, zip_decoder, source, meta)
  150. } else if config.Platform == app.PLATFORM_LOCAL {
  151. stream, err = manager.InstallToLocal(tenant_id, pkg_path, plugin_unique_identifier, source, meta)
  152. } else {
  153. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  154. task.Status = models.InstallTaskStatusFailed
  155. plugin.Status = models.InstallTaskStatusFailed
  156. plugin.Message = "Unsupported platform"
  157. })
  158. return
  159. }
  160. if err != nil {
  161. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  162. task.Status = models.InstallTaskStatusFailed
  163. plugin.Status = models.InstallTaskStatusFailed
  164. plugin.Message = err.Error()
  165. })
  166. return
  167. }
  168. for stream.Next() {
  169. message, err := stream.Read()
  170. if err != nil {
  171. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  172. task.Status = models.InstallTaskStatusFailed
  173. plugin.Status = models.InstallTaskStatusFailed
  174. plugin.Message = err.Error()
  175. })
  176. return
  177. }
  178. if message.Event == plugin_manager.PluginInstallEventError {
  179. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  180. task.Status = models.InstallTaskStatusFailed
  181. plugin.Status = models.InstallTaskStatusFailed
  182. plugin.Message = message.Data
  183. })
  184. return
  185. }
  186. }
  187. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  188. plugin.Status = models.InstallTaskStatusSuccess
  189. plugin.Message = "Installed"
  190. task.CompletedPlugins++
  191. // check if all plugins are installed
  192. if task.CompletedPlugins == task.TotalPlugins {
  193. task.Status = models.InstallTaskStatusSuccess
  194. }
  195. })
  196. })
  197. }
  198. // submit async tasks
  199. routine.WithMaxRoutine(3, tasks)
  200. return entities.NewSuccessResponse(response)
  201. }
  202. func FetchPluginInstallationTasks(
  203. tenant_id string,
  204. page int,
  205. page_size int,
  206. ) *entities.Response {
  207. tasks, err := db.GetAll[models.InstallTask](
  208. db.Equal("tenant_id", tenant_id),
  209. db.OrderBy("created_at", true),
  210. db.Page(page, page_size),
  211. )
  212. if err != nil {
  213. return entities.NewErrorResponse(-500, err.Error())
  214. }
  215. return entities.NewSuccessResponse(tasks)
  216. }
  217. func FetchPluginInstallationTask(
  218. tenant_id string,
  219. task_id string,
  220. ) *entities.Response {
  221. task, err := db.GetOne[models.InstallTask](
  222. db.Equal("id", task_id),
  223. db.Equal("tenant_id", tenant_id),
  224. )
  225. if err != nil {
  226. return entities.NewErrorResponse(-500, err.Error())
  227. }
  228. return entities.NewSuccessResponse(task)
  229. }
  230. func DeletePluginInstallationTask(
  231. tenant_id string,
  232. task_id string,
  233. ) *entities.Response {
  234. err := db.DeleteByCondition(
  235. models.InstallTask{
  236. Model: models.Model{
  237. ID: task_id,
  238. },
  239. TenantID: tenant_id,
  240. },
  241. )
  242. if err != nil {
  243. return entities.NewErrorResponse(-500, err.Error())
  244. }
  245. return entities.NewSuccessResponse(true)
  246. }
  247. func FetchPluginFromIdentifier(
  248. plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  249. ) *entities.Response {
  250. _, err := db.GetOne[models.Plugin](
  251. db.Equal("plugin_unique_identifier", plugin_unique_identifier.String()),
  252. )
  253. if err == db.ErrDatabaseNotFound {
  254. return entities.NewSuccessResponse(false)
  255. }
  256. if err != nil {
  257. return entities.NewErrorResponse(-500, err.Error())
  258. }
  259. return entities.NewSuccessResponse(true)
  260. }
  261. func UninstallPlugin(
  262. tenant_id string,
  263. plugin_installation_id string,
  264. ) *entities.Response {
  265. // Check if the plugin exists for the tenant
  266. installation, err := db.GetOne[models.PluginInstallation](
  267. db.Equal("tenant_id", tenant_id),
  268. db.Equal("id", plugin_installation_id),
  269. )
  270. if err == db.ErrDatabaseNotFound {
  271. return entities.NewErrorResponse(-404, "Plugin installation not found for this tenant")
  272. }
  273. if err != nil {
  274. return entities.NewErrorResponse(-500, err.Error())
  275. }
  276. plugin_unique_identifier, err := plugin_entities.NewPluginUniqueIdentifier(installation.PluginUniqueIdentifier)
  277. if err != nil {
  278. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to parse plugin unique identifier: %v", err))
  279. }
  280. // Uninstall the plugin
  281. _, err = curd.UninstallPlugin(
  282. tenant_id,
  283. plugin_unique_identifier,
  284. installation.ID,
  285. )
  286. if err != nil {
  287. return entities.NewErrorResponse(-500, fmt.Sprintf("Failed to uninstall plugin: %s", err.Error()))
  288. }
  289. return entities.NewSuccessResponse(true)
  290. }