install_to_local.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package plugin_manager
  2. import (
  3. "time"
  4. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  5. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  6. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  7. )
  8. // InstallToLocal installs a plugin to local
  9. func (p *PluginManager) InstallToLocal(
  10. plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  11. source string,
  12. meta map[string]any,
  13. ) (
  14. *stream.Stream[PluginInstallResponse], error,
  15. ) {
  16. packageFile, err := p.packageBucket.Get(plugin_unique_identifier.String())
  17. if err != nil {
  18. return nil, err
  19. }
  20. err = p.installedBucket.Save(plugin_unique_identifier, packageFile)
  21. if err != nil {
  22. return nil, err
  23. }
  24. runtime, launchedChan, errChan, err := p.launchLocal(plugin_unique_identifier)
  25. if err != nil {
  26. return nil, err
  27. }
  28. response := stream.NewStream[PluginInstallResponse](128)
  29. routine.Submit(map[string]string{
  30. "module": "plugin_manager",
  31. "function": "InstallToLocal",
  32. }, func() {
  33. defer response.Close()
  34. ticker := time.NewTicker(time.Second * 5) // check heartbeat every 5 seconds
  35. defer ticker.Stop()
  36. timer := time.NewTimer(time.Second * 240) // timeout after 240 seconds
  37. defer timer.Stop()
  38. for {
  39. select {
  40. case <-ticker.C:
  41. // heartbeat
  42. response.Write(PluginInstallResponse{
  43. Event: PluginInstallEventInfo,
  44. Data: "Installing",
  45. })
  46. case <-timer.C:
  47. // timeout
  48. response.Write(PluginInstallResponse{
  49. Event: PluginInstallEventInfo,
  50. Data: "Timeout",
  51. })
  52. runtime.Stop()
  53. return
  54. case err := <-errChan:
  55. if err != nil {
  56. // if error occurs, stop the plugin
  57. response.Write(PluginInstallResponse{
  58. Event: PluginInstallEventError,
  59. Data: err.Error(),
  60. })
  61. runtime.Stop()
  62. return
  63. }
  64. case <-launchedChan:
  65. response.Write(PluginInstallResponse{
  66. Event: PluginInstallEventDone,
  67. Data: "Installed",
  68. })
  69. return
  70. }
  71. }
  72. })
  73. return response, nil
  74. }