install_to_local.go 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package plugin_manager
  2. import (
  3. "io"
  4. "os"
  5. "path/filepath"
  6. "time"
  7. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  10. )
  11. // InstallToLocal installs a plugin to local
  12. func (p *PluginManager) InstallToLocal(
  13. plugin_path string,
  14. plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  15. source string,
  16. meta map[string]any,
  17. ) (
  18. *stream.Stream[PluginInstallResponse], error,
  19. ) {
  20. plugin_file, err := os.Open(plugin_path)
  21. if err != nil {
  22. return nil, err
  23. }
  24. defer plugin_file.Close()
  25. installed_file_path := filepath.Join(p.pluginStoragePath, plugin_unique_identifier.String())
  26. dir_path := filepath.Dir(installed_file_path)
  27. if err := os.MkdirAll(dir_path, 0755); err != nil {
  28. return nil, err
  29. }
  30. installed_file, err := os.Create(installed_file_path)
  31. if err != nil {
  32. return nil, err
  33. }
  34. defer installed_file.Close()
  35. if _, err := io.Copy(installed_file, plugin_file); err != nil {
  36. return nil, err
  37. }
  38. runtime, launched_chan, err := p.launchLocal(installed_file_path)
  39. if err != nil {
  40. return nil, err
  41. }
  42. response := stream.NewStream[PluginInstallResponse](128)
  43. routine.Submit(func() {
  44. defer response.Close()
  45. ticker := time.NewTicker(time.Second * 5) // check heartbeat every 5 seconds
  46. defer ticker.Stop()
  47. timer := time.NewTimer(time.Second * 240) // timeout after 240 seconds
  48. defer timer.Stop()
  49. for {
  50. select {
  51. case <-ticker.C:
  52. // heartbeat
  53. response.Write(PluginInstallResponse{
  54. Event: PluginInstallEventInfo,
  55. Data: "Installing",
  56. })
  57. case <-timer.C:
  58. // timeout
  59. response.Write(PluginInstallResponse{
  60. Event: PluginInstallEventInfo,
  61. Data: "Timeout",
  62. })
  63. runtime.Stop()
  64. return
  65. case <-launched_chan:
  66. // launched
  67. if err != nil {
  68. response.Write(PluginInstallResponse{
  69. Event: PluginInstallEventError,
  70. Data: err.Error(),
  71. })
  72. runtime.Stop()
  73. return
  74. }
  75. response.Write(PluginInstallResponse{
  76. Event: PluginInstallEventDone,
  77. Data: "Installed",
  78. })
  79. return
  80. }
  81. }
  82. })
  83. return response, nil
  84. }