install_plugin.go 9.5 KB


  1. package service
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "mime/multipart"
  7. "time"
  8. "github.com/gin-gonic/gin"
  9. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  10. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
  11. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/verifier"
  12. "github.com/langgenius/dify-plugin-daemon/internal/db"
  13. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  14. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  15. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  16. "github.com/langgenius/dify-plugin-daemon/internal/types/models"
  17. "github.com/langgenius/dify-plugin-daemon/internal/types/models/curd"
  18. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  19. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  20. "gorm.io/gorm"
  21. )
  22. func UploadPluginFromPkg(
  23. config *app.Config,
  24. c *gin.Context,
  25. tenant_id string,
  26. dify_pkg_file multipart.File,
  27. verify_signature bool,
  28. ) *entities.Response {
  29. plugin_file, err := io.ReadAll(dify_pkg_file)
  30. if err != nil {
  31. return entities.NewErrorResponse(-500, err.Error())
  32. }
  33. decoder, err := decoder.NewZipPluginDecoder(plugin_file)
  34. if err != nil {
  35. return entities.NewErrorResponse(-500, err.Error())
  36. }
  37. if config.ForceVerifyingSignature || verify_signature {
  38. err := verifier.VerifyPlugin(decoder)
  39. if err != nil {
  40. return entities.NewErrorResponse(-500, errors.Join(err, errors.New(
  41. "plugin verification has been enabled, and the plugin you want to install has a bad signature",
  42. )).Error())
  43. }
  44. }
  45. manifest, err := decoder.Manifest()
  46. if err != nil {
  47. return entities.NewErrorResponse(-500, err.Error())
  48. }
  49. return entities.NewSuccessResponse(manifest.Identity())
  50. }
  51. func InstallPluginFromIdentifiers(
  52. tenant_id string,
  53. plugin_unique_identifiers []plugin_entities.PluginUniqueIdentifier,
  54. source string,
  55. meta map[string]any,
  56. ) *entities.Response {
  57. var response struct {
  58. AllInstalled bool `json:"all_installed"`
  59. TaskID string `json:"task_id"`
  60. }
  61. // TODO: create installation task and dispatch to workers
  62. plugins_wait_for_installation := []plugin_entities.PluginUniqueIdentifier{}
  63. task := &models.InstallTask{
  64. Status: models.InstallTaskStatusRunning,
  65. TotalPlugins: len(plugins_wait_for_installation),
  66. CompletedPlugins: 0,
  67. Plugins: []models.InstallTaskPluginStatus{},
  68. }
  69. for i, plugin_unique_identifier := range plugin_unique_identifiers {
  70. // check if plugin is already installed
  71. plugin, err := db.GetOne[models.Plugin](
  72. db.Equal("plugin_unique_identifier", plugin_unique_identifier.String()),
  73. )
  74. task.Plugins = append(task.Plugins, models.InstallTaskPluginStatus{
  75. PluginUniqueIdentifier: plugin_unique_identifier,
  76. PluginID: plugin_unique_identifier.PluginID(),
  77. Status: models.InstallTaskStatusPending,
  78. Message: "",
  79. })
  80. task.TotalPlugins++
  81. if err == nil {
  82. // already installed by other tenant
  83. declaration := plugin.Declaration
  84. if _, _, err := curd.InstallPlugin(
  85. tenant_id,
  86. plugin_unique_identifier,
  87. plugin.InstallType,
  88. &declaration,
  89. source,
  90. meta,
  91. ); err != nil {
  92. return entities.NewErrorResponse(-500, err.Error())
  93. }
  94. task.CompletedPlugins++
  95. task.Plugins[i].Status = models.InstallTaskStatusSuccess
  96. task.Plugins[i].Message = "Installed"
  97. continue
  98. }
  99. if err != db.ErrDatabaseNotFound {
  100. return entities.NewErrorResponse(-500, err.Error())
  101. }
  102. plugins_wait_for_installation = append(plugins_wait_for_installation, plugin_unique_identifier)
  103. }
  104. if len(plugins_wait_for_installation) == 0 {
  105. response.AllInstalled = true
  106. response.TaskID = ""
  107. return entities.NewSuccessResponse(response)
  108. }
  109. err := db.Create(task)
  110. if err != nil {
  111. return entities.NewErrorResponse(-500, err.Error())
  112. }
  113. response.TaskID = task.ID
  114. manager := plugin_manager.Manager()
  115. tasks := []func(){}
  116. for _, plugin_unique_identifier := range plugins_wait_for_installation {
  117. tasks = append(tasks, func() {
  118. updateTaskStatus := func(modifier func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus)) {
  119. if err := db.WithTransaction(func(tx *gorm.DB) error {
  120. task, err := db.GetOne[models.InstallTask](
  121. db.WithTransactionContext(tx),
  122. db.Equal("id", task.ID),
  123. db.WLock(), // write lock, multiple tasks can't update the same task
  124. )
  125. if err != nil {
  126. return err
  127. }
  128. task_pointer := &task
  129. var plugin_status *models.InstallTaskPluginStatus
  130. for _, plugin := range task.Plugins {
  131. if plugin.PluginUniqueIdentifier == plugin_unique_identifier {
  132. plugin_status = &plugin
  133. }
  134. }
  135. modifier(task_pointer, plugin_status)
  136. return db.Update(task_pointer, tx)
  137. }); err != nil {
  138. log.Error("failed to update install task status %s", err.Error())
  139. }
  140. }
  141. pkg, err := manager.GetPackage(plugin_unique_identifier)
  142. if err != nil {
  143. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  144. task.Status = models.InstallTaskStatusFailed
  145. plugin.Status = models.InstallTaskStatusFailed
  146. plugin.Message = err.Error()
  147. })
  148. return
  149. }
  150. decoder, err := decoder.NewZipPluginDecoder(pkg)
  151. if err != nil {
  152. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  153. task.Status = models.InstallTaskStatusFailed
  154. plugin.Status = models.InstallTaskStatusFailed
  155. plugin.Message = err.Error()
  156. })
  157. return
  158. }
  159. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  160. plugin.Status = models.InstallTaskStatusRunning
  161. plugin.Message = "Installing"
  162. })
  163. stream, err := manager.Install(tenant_id, decoder, source, meta)
  164. if err != nil {
  165. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  166. task.Status = models.InstallTaskStatusFailed
  167. plugin.Status = models.InstallTaskStatusFailed
  168. plugin.Message = err.Error()
  169. })
  170. return
  171. }
  172. for stream.Next() {
  173. message, err := stream.Read()
  174. if err != nil {
  175. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  176. task.Status = models.InstallTaskStatusFailed
  177. plugin.Status = models.InstallTaskStatusFailed
  178. plugin.Message = err.Error()
  179. })
  180. return
  181. }
  182. if message.Event == plugin_manager.PluginInstallEventError {
  183. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  184. task.Status = models.InstallTaskStatusFailed
  185. plugin.Status = models.InstallTaskStatusFailed
  186. plugin.Message = message.Data
  187. })
  188. return
  189. }
  190. }
  191. updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
  192. plugin.Status = models.InstallTaskStatusSuccess
  193. plugin.Message = "Installed"
  194. task.CompletedPlugins++
  195. // check if all plugins are installed
  196. if task.CompletedPlugins == task.TotalPlugins {
  197. task.Status = models.InstallTaskStatusSuccess
  198. }
  199. })
  200. })
  201. }
  202. // submit async tasks
  203. routine.WithMaxRoutine(3, tasks, func() {
  204. time.AfterFunc(time.Second*5, func() {
  205. // get task
  206. task, err := db.GetOne[models.InstallTask](
  207. db.Equal("id", task.ID),
  208. )
  209. if err != nil {
  210. return
  211. }
  212. if task.CompletedPlugins == task.TotalPlugins {
  213. // delete task if all plugins are installed successfully
  214. db.Delete(&task)
  215. }
  216. })
  217. })
  218. return entities.NewSuccessResponse(response)
  219. }
  220. func FetchPluginInstallationTasks(
  221. tenant_id string,
  222. page int,
  223. page_size int,
  224. ) *entities.Response {
  225. tasks, err := db.GetAll[models.InstallTask](
  226. db.Equal("tenant_id", tenant_id),
  227. db.OrderBy("created_at", true),
  228. db.Page(page, page_size),
  229. )
  230. if err != nil {
  231. return entities.NewErrorResponse(-500, err.Error())
  232. }
  233. return entities.NewSuccessResponse(tasks)
  234. }
  235. func FetchPluginInstallationTask(
  236. tenant_id string,
  237. task_id string,
  238. ) *entities.Response {
  239. task, err := db.GetOne[models.InstallTask](
  240. db.Equal("id", task_id),
  241. db.Equal("tenant_id", tenant_id),
  242. )
  243. if err != nil {
  244. return entities.NewErrorResponse(-500, err.Error())
  245. }
  246. return entities.NewSuccessResponse(task)
  247. }
  248. func FetchPluginManifest(
  249. tenant_id string,
  250. plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  251. ) *entities.Response {
  252. return nil
  253. }
  254. func FetchPluginFromIdentifier(
  255. plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  256. ) *entities.Response {
  257. _, err := db.GetOne[models.Plugin](
  258. db.Equal("plugin_unique_identifier", plugin_unique_identifier.String()),
  259. )
  260. if err == db.ErrDatabaseNotFound {
  261. return entities.NewSuccessResponse(false)
  262. }
  263. if err != nil {
  264. return entities.NewErrorResponse(-500, err.Error())
  265. }
  266. return entities.NewSuccessResponse(true)
  267. }
  268. func UninstallPlugin(
  269. tenant_id string,
  270. plugin_installation_id string,
  271. ) *entities.Response {
  272. // Check if the plugin exists for the tenant
  273. installation, err := db.GetOne[models.PluginInstallation](
  274. db.Equal("tenant_id", tenant_id),
  275. db.Equal("id", plugin_installation_id),
  276. )
  277. if err == db.ErrDatabaseNotFound {
  278. return entities.NewErrorResponse(-404, "Plugin installation not found for this tenant")
  279. }
  280. if err != nil {
  281. return entities.NewErrorResponse(-500, err.Error())
  282. }
  283. // Uninstall the plugin
  284. _, err = curd.UninstallPlugin(
  285. tenant_id,
  286. plugin_entities.PluginUniqueIdentifier(installation.PluginUniqueIdentifier),
  287. installation.ID,
  288. )
  289. if err != nil {
  290. return entities.NewErrorResponse(-500, fmt.Sprintf("Failed to uninstall plugin: %s", err.Error()))
  291. }
  292. return entities.NewSuccessResponse(true)
  293. }