install_plugin.go 8.6 KB


  1. package service
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
  7. "github.com/langgenius/dify-plugin-daemon/internal/db"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  10. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  11. "github.com/langgenius/dify-plugin-daemon/internal/types/models/curd"
  12. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  13. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  14. "gorm.io/gorm"
  15. )
  16. func InstallPluginFromIdentifiers(
  17. tenant_id string,
  18. plugin_unique_identifiers []plugin_entities.PluginUniqueIdentifier,
  19. source string,
  20. meta map[string]any,
  21. ) *entities.Response {
  22. var response struct {
  23. AllInstalled bool `json:"all_installed"`
  24. TaskID string `json:"task_id"`
  25. }
  26. // TODO: create installation task and dispatch to workers
  27. plugins_wait_for_installation := []plugin_entities.PluginUniqueIdentifier{}
  28. task := &models.InstallTask{
  29. Status: models.InstallTaskStatusRunning,
  30. TenantID: tenant_id,
  31. TotalPlugins: len(plugin_unique_identifiers),
  32. CompletedPlugins: 0,
  33. Plugins: []models.InstallTaskPluginStatus{},
  34. }
  35. for i, plugin_unique_identifier := range plugin_unique_identifiers {
  36. // check if plugin is already installed
  37. plugin, err := db.GetOne[models.Plugin](
  38. db.Equal("plugin_unique_identifier", plugin_unique_identifier.String()),
  39. )
  40. task.Plugins = append(task.Plugins, models.InstallTaskPluginStatus{
  41. PluginUniqueIdentifier: plugin_unique_identifier,
  42. PluginID: plugin_unique_identifier.PluginID(),
  43. Status: models.InstallTaskStatusPending,
  44. Message: "",
  45. })
  46. if err == nil {
  47. // already installed by other tenant
  48. declaration := plugin.Declaration
  49. if _, _, err := curd.InstallPlugin(
  50. tenant_id,
  51. plugin_unique_identifier,
  52. plugin.InstallType,
  53. &declaration,
  54. source,
  55. meta,
  56. ); err != nil {
  57. return entities.NewErrorResponse(-500, err.Error())
  58. }
  59. task.CompletedPlugins++
  60. task.Plugins[i].Status = models.InstallTaskStatusSuccess
  61. task.Plugins[i].Message = "Installed"
  62. continue
  63. }
  64. if err != db.ErrDatabaseNotFound {
  65. return entities.NewErrorResponse(-500, err.Error())
  66. }
  67. plugins_wait_for_installation = append(plugins_wait_for_installation, plugin_unique_identifier)
  68. }
  69. if len(plugins_wait_for_installation) == 0 {
  70. response.AllInstalled = true
  71. response.TaskID = ""
  72. return entities.NewSuccessResponse(response)
  73. }
  74. err := db.Create(task)
  75. if err != nil {
  76. return entities.NewErrorResponse(-500, err.Error())
  77. }
  78. response.TaskID = task.ID
  79. manager := plugin_manager.Manager()
  80. tasks := []func(){}
  81. for _, plugin_unique_identifier := range plugins_wait_for_installation {
  82. tasks = append(tasks, func() {
  83. updateTaskStatus := func(modifier func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus)) {
  84. if err := db.WithTransaction(func(tx *gorm.DB) error {
  85. task, err := db.GetOne[models.InstallTask](
  86. db.WithTransactionContext(tx),
  87. db.Equal("id", task.ID),
  88. db.WLock(), // write lock, multiple tasks can't update the same task
  89. )
  90. if err != nil {
  91. return err
  92. }
  93. task_pointer := &task
  94. var plugin_status *models.InstallTaskPluginStatus
  95. for i := range task.Plugins {
  96. if task.Plugins[i].PluginUniqueIdentifier == plugin_unique_identifier {
  97. plugin_status = &task.Plugins[i]
  98. break
  99. }
  100. }
  101. if plugin_status == nil {
  102. return errors.New("plugin status not found")
  103. }
  104. modifier(task_pointer, plugin_status)
  105. return db.Update(task_pointer, tx)
  106. }); err != nil {
  107. log.Error("failed to update install task status %s", err.Error())
  108. }
  109. }
  110. pkg, err := manager.GetPackage(plugin_unique_identifier)
  111. if err != nil {
  112. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  113. task.Status = models.InstallTaskStatusFailed
  114. plugin.Status = models.InstallTaskStatusFailed
  115. plugin.Message = err.Error()
  116. })
  117. return
  118. }
  119. decoder, err := decoder.NewZipPluginDecoder(pkg)
  120. if err != nil {
  121. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  122. task.Status = models.InstallTaskStatusFailed
  123. plugin.Status = models.InstallTaskStatusFailed
  124. plugin.Message = err.Error()
  125. })
  126. return
  127. }
  128. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  129. plugin.Status = models.InstallTaskStatusRunning
  130. plugin.Message = "Installing"
  131. })
  132. stream, err := manager.Install(tenant_id, decoder, source, meta)
  133. if err != nil {
  134. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  135. task.Status = models.InstallTaskStatusFailed
  136. plugin.Status = models.InstallTaskStatusFailed
  137. plugin.Message = err.Error()
  138. })
  139. return
  140. }
  141. for stream.Next() {
  142. message, err := stream.Read()
  143. if err != nil {
  144. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  145. task.Status = models.InstallTaskStatusFailed
  146. plugin.Status = models.InstallTaskStatusFailed
  147. plugin.Message = err.Error()
  148. })
  149. return
  150. }
  151. if message.Event == plugin_manager.PluginInstallEventError {
  152. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  153. task.Status = models.InstallTaskStatusFailed
  154. plugin.Status = models.InstallTaskStatusFailed
  155. plugin.Message = message.Data
  156. })
  157. return
  158. }
  159. }
  160. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  161. plugin.Status = models.InstallTaskStatusSuccess
  162. plugin.Message = "Installed"
  163. task.CompletedPlugins++
  164. // check if all plugins are installed
  165. if task.CompletedPlugins == task.TotalPlugins {
  166. task.Status = models.InstallTaskStatusSuccess
  167. }
  168. })
  169. })
  170. }
  171. // submit async tasks
  172. routine.WithMaxRoutine(3, tasks)
  173. return entities.NewSuccessResponse(response)
  174. }
  175. func FetchPluginInstallationTasks(
  176. tenant_id string,
  177. page int,
  178. page_size int,
  179. ) *entities.Response {
  180. tasks, err := db.GetAll[models.InstallTask](
  181. db.Equal("tenant_id", tenant_id),
  182. db.OrderBy("created_at", true),
  183. db.Page(page, page_size),
  184. )
  185. if err != nil {
  186. return entities.NewErrorResponse(-500, err.Error())
  187. }
  188. return entities.NewSuccessResponse(tasks)
  189. }
  190. func FetchPluginInstallationTask(
  191. tenant_id string,
  192. task_id string,
  193. ) *entities.Response {
  194. task, err := db.GetOne[models.InstallTask](
  195. db.Equal("id", task_id),
  196. db.Equal("tenant_id", tenant_id),
  197. )
  198. if err != nil {
  199. return entities.NewErrorResponse(-500, err.Error())
  200. }
  201. return entities.NewSuccessResponse(task)
  202. }
  203. func DeletePluginInstallationTask(
  204. tenant_id string,
  205. task_id string,
  206. ) *entities.Response {
  207. err := db.DeleteByCondition(
  208. models.InstallTask{
  209. Model: models.Model{
  210. ID: task_id,
  211. },
  212. TenantID: tenant_id,
  213. },
  214. )
  215. if err != nil {
  216. return entities.NewErrorResponse(-500, err.Error())
  217. }
  218. return entities.NewSuccessResponse(true)
  219. }
  220. func FetchPluginFromIdentifier(
  221. plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  222. ) *entities.Response {
  223. _, err := db.GetOne[models.Plugin](
  224. db.Equal("plugin_unique_identifier", plugin_unique_identifier.String()),
  225. )
  226. if err == db.ErrDatabaseNotFound {
  227. return entities.NewSuccessResponse(false)
  228. }
  229. if err != nil {
  230. return entities.NewErrorResponse(-500, err.Error())
  231. }
  232. return entities.NewSuccessResponse(true)
  233. }
  234. func UninstallPlugin(
  235. tenant_id string,
  236. plugin_installation_id string,
  237. ) *entities.Response {
  238. // Check if the plugin exists for the tenant
  239. installation, err := db.GetOne[models.PluginInstallation](
  240. db.Equal("tenant_id", tenant_id),
  241. db.Equal("id", plugin_installation_id),
  242. )
  243. if err == db.ErrDatabaseNotFound {
  244. return entities.NewErrorResponse(-404, "Plugin installation not found for this tenant")
  245. }
  246. if err != nil {
  247. return entities.NewErrorResponse(-500, err.Error())
  248. }
  249. plugin_unique_identifier, err := plugin_entities.NewPluginUniqueIdentifier(installation.PluginUniqueIdentifier)
  250. if err != nil {
  251. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to parse plugin unique identifier: %v", err))
  252. }
  253. // Uninstall the plugin
  254. _, err = curd.UninstallPlugin(
  255. tenant_id,
  256. plugin_unique_identifier,
  257. installation.ID,
  258. )
  259. if err != nil {
  260. return entities.NewErrorResponse(-500, fmt.Sprintf("Failed to uninstall plugin: %s", err.Error()))
  261. }
  262. return entities.NewSuccessResponse(true)
  263. }