install_plugin.go 8.5 KB


  1. package service
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
  7. "github.com/langgenius/dify-plugin-daemon/internal/db"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  10. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  11. "github.com/langgenius/dify-plugin-daemon/internal/types/models/curd"
  12. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  13. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  14. "gorm.io/gorm"
  15. )
  16. func InstallPluginFromIdentifiers(
  17. tenant_id string,
  18. plugin_unique_identifiers []plugin_entities.PluginUniqueIdentifier,
  19. source string,
  20. meta map[string]any,
  21. ) *entities.Response {
  22. var response struct {
  23. AllInstalled bool `json:"all_installed"`
  24. TaskID string `json:"task_id"`
  25. }
  26. // TODO: create installation task and dispatch to workers
  27. plugins_wait_for_installation := []plugin_entities.PluginUniqueIdentifier{}
  28. task := &models.InstallTask{
  29. Status: models.InstallTaskStatusRunning,
  30. TenantID: tenant_id,
  31. TotalPlugins: len(plugin_unique_identifiers),
  32. CompletedPlugins: 0,
  33. Plugins: []models.InstallTaskPluginStatus{},
  34. }
  35. for i, plugin_unique_identifier := range plugin_unique_identifiers {
  36. // check if plugin is already installed
  37. plugin, err := db.GetOne[models.Plugin](
  38. db.Equal("plugin_unique_identifier", plugin_unique_identifier.String()),
  39. )
  40. task.Plugins = append(task.Plugins, models.InstallTaskPluginStatus{
  41. PluginUniqueIdentifier: plugin_unique_identifier,
  42. PluginID: plugin_unique_identifier.PluginID(),
  43. Status: models.InstallTaskStatusPending,
  44. Message: "",
  45. })
  46. task.TotalPlugins++
  47. if err == nil {
  48. // already installed by other tenant
  49. declaration := plugin.Declaration
  50. if _, _, err := curd.InstallPlugin(
  51. tenant_id,
  52. plugin_unique_identifier,
  53. plugin.InstallType,
  54. &declaration,
  55. source,
  56. meta,
  57. ); err != nil {
  58. return entities.NewErrorResponse(-500, err.Error())
  59. }
  60. task.CompletedPlugins++
  61. task.Plugins[i].Status = models.InstallTaskStatusSuccess
  62. task.Plugins[i].Message = "Installed"
  63. continue
  64. }
  65. if err != db.ErrDatabaseNotFound {
  66. return entities.NewErrorResponse(-500, err.Error())
  67. }
  68. plugins_wait_for_installation = append(plugins_wait_for_installation, plugin_unique_identifier)
  69. }
  70. if len(plugins_wait_for_installation) == 0 {
  71. response.AllInstalled = true
  72. response.TaskID = ""
  73. return entities.NewSuccessResponse(response)
  74. }
  75. err := db.Create(task)
  76. if err != nil {
  77. return entities.NewErrorResponse(-500, err.Error())
  78. }
  79. response.TaskID = task.ID
  80. manager := plugin_manager.Manager()
  81. tasks := []func(){}
  82. for _, plugin_unique_identifier := range plugins_wait_for_installation {
  83. tasks = append(tasks, func() {
  84. updateTaskStatus := func(modifier func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus)) {
  85. if err := db.WithTransaction(func(tx *gorm.DB) error {
  86. task, err := db.GetOne[models.InstallTask](
  87. db.WithTransactionContext(tx),
  88. db.Equal("id", task.ID),
  89. db.WLock(), // write lock, multiple tasks can't update the same task
  90. )
  91. if err != nil {
  92. return err
  93. }
  94. task_pointer := &task
  95. var plugin_status *models.InstallTaskPluginStatus
  96. for _, plugin := range task.Plugins {
  97. if plugin.PluginUniqueIdentifier == plugin_unique_identifier {
  98. plugin_status = &plugin
  99. }
  100. }
  101. modifier(task_pointer, plugin_status)
  102. return db.Update(task_pointer, tx)
  103. }); err != nil {
  104. log.Error("failed to update install task status %s", err.Error())
  105. }
  106. }
  107. pkg, err := manager.GetPackage(plugin_unique_identifier)
  108. if err != nil {
  109. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  110. task.Status = models.InstallTaskStatusFailed
  111. plugin.Status = models.InstallTaskStatusFailed
  112. plugin.Message = err.Error()
  113. })
  114. return
  115. }
  116. decoder, err := decoder.NewZipPluginDecoder(pkg)
  117. if err != nil {
  118. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  119. task.Status = models.InstallTaskStatusFailed
  120. plugin.Status = models.InstallTaskStatusFailed
  121. plugin.Message = err.Error()
  122. })
  123. return
  124. }
  125. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  126. plugin.Status = models.InstallTaskStatusRunning
  127. plugin.Message = "Installing"
  128. })
  129. stream, err := manager.Install(tenant_id, decoder, source, meta)
  130. if err != nil {
  131. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  132. task.Status = models.InstallTaskStatusFailed
  133. plugin.Status = models.InstallTaskStatusFailed
  134. plugin.Message = err.Error()
  135. })
  136. return
  137. }
  138. for stream.Next() {
  139. message, err := stream.Read()
  140. if err != nil {
  141. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  142. task.Status = models.InstallTaskStatusFailed
  143. plugin.Status = models.InstallTaskStatusFailed
  144. plugin.Message = err.Error()
  145. })
  146. return
  147. }
  148. if message.Event == plugin_manager.PluginInstallEventError {
  149. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  150. task.Status = models.InstallTaskStatusFailed
  151. plugin.Status = models.InstallTaskStatusFailed
  152. plugin.Message = message.Data
  153. })
  154. return
  155. }
  156. }
  157. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  158. plugin.Status = models.InstallTaskStatusSuccess
  159. plugin.Message = "Installed"
  160. task.CompletedPlugins++
  161. // check if all plugins are installed
  162. if task.CompletedPlugins == task.TotalPlugins {
  163. task.Status = models.InstallTaskStatusSuccess
  164. }
  165. })
  166. })
  167. }
  168. // submit async tasks
  169. routine.WithMaxRoutine(3, tasks, func() {
  170. time.AfterFunc(time.Second*5, func() {
  171. // get task
  172. task, err := db.GetOne[models.InstallTask](
  173. db.Equal("id", task.ID),
  174. )
  175. if err != nil {
  176. return
  177. }
  178. if task.CompletedPlugins == task.TotalPlugins {
  179. // delete task if all plugins are installed successfully
  180. db.Delete(&task)
  181. }
  182. })
  183. })
  184. return entities.NewSuccessResponse(response)
  185. }
  186. func FetchPluginInstallationTasks(
  187. tenant_id string,
  188. page int,
  189. page_size int,
  190. ) *entities.Response {
  191. tasks, err := db.GetAll[models.InstallTask](
  192. db.Equal("tenant_id", tenant_id),
  193. db.OrderBy("created_at", true),
  194. db.Page(page, page_size),
  195. )
  196. if err != nil {
  197. return entities.NewErrorResponse(-500, err.Error())
  198. }
  199. return entities.NewSuccessResponse(tasks)
  200. }
  201. func FetchPluginInstallationTask(
  202. tenant_id string,
  203. task_id string,
  204. ) *entities.Response {
  205. task, err := db.GetOne[models.InstallTask](
  206. db.Equal("id", task_id),
  207. db.Equal("tenant_id", tenant_id),
  208. )
  209. if err != nil {
  210. return entities.NewErrorResponse(-500, err.Error())
  211. }
  212. return entities.NewSuccessResponse(task)
  213. }
  214. func FetchPluginFromIdentifier(
  215. plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  216. ) *entities.Response {
  217. _, err := db.GetOne[models.Plugin](
  218. db.Equal("plugin_unique_identifier", plugin_unique_identifier.String()),
  219. )
  220. if err == db.ErrDatabaseNotFound {
  221. return entities.NewSuccessResponse(false)
  222. }
  223. if err != nil {
  224. return entities.NewErrorResponse(-500, err.Error())
  225. }
  226. return entities.NewSuccessResponse(true)
  227. }
  228. func UninstallPlugin(
  229. tenant_id string,
  230. plugin_installation_id string,
  231. ) *entities.Response {
  232. // Check if the plugin exists for the tenant
  233. installation, err := db.GetOne[models.PluginInstallation](
  234. db.Equal("tenant_id", tenant_id),
  235. db.Equal("id", plugin_installation_id),
  236. )
  237. if err == db.ErrDatabaseNotFound {
  238. return entities.NewErrorResponse(-404, "Plugin installation not found for this tenant")
  239. }
  240. if err != nil {
  241. return entities.NewErrorResponse(-500, err.Error())
  242. }
  243. plugin_unique_identifier, err := plugin_entities.NewPluginUniqueIdentifier(installation.PluginUniqueIdentifier)
  244. if err != nil {
  245. return entities.NewErrorResponse(-500, fmt.Sprintf("failed to parse plugin unique identifier: %v", err))
  246. }
  247. // Uninstall the plugin
  248. _, err = curd.UninstallPlugin(
  249. tenant_id,
  250. plugin_unique_identifier,
  251. installation.ID,
  252. )
  253. if err != nil {
  254. return entities.NewErrorResponse(-500, fmt.Sprintf("Failed to uninstall plugin: %s", err.Error()))
  255. }
  256. return entities.NewSuccessResponse(true)
  257. }