install_to_local.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package plugin_manager
  2. import (
  3. "io"
  4. "os"
  5. "path/filepath"
  6. "time"
  7. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  10. )
  11. // InstallToLocal installs a plugin to local
  12. func (p *PluginManager) InstallToLocal(
  13. tenant_id string,
  14. plugin_path string,
  15. plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  16. source string,
  17. meta map[string]any,
  18. ) (
  19. *stream.Stream[PluginInstallResponse], error,
  20. ) {
  21. plugin_file, err := os.Open(plugin_path)
  22. if err != nil {
  23. return nil, err
  24. }
  25. defer plugin_file.Close()
  26. installed_file_path := filepath.Join(p.pluginStoragePath, plugin_unique_identifier.String())
  27. installed_file, err := os.Create(installed_file_path)
  28. if err != nil {
  29. return nil, err
  30. }
  31. defer installed_file.Close()
  32. if _, err := io.Copy(installed_file, plugin_file); err != nil {
  33. return nil, err
  34. }
  35. runtime, err := p.launchLocal(installed_file_path)
  36. if err != nil {
  37. return nil, err
  38. }
  39. response := stream.NewStream[PluginInstallResponse](128)
  40. routine.Submit(func() {
  41. defer response.Close()
  42. ticker := time.NewTicker(time.Second * 5) // check heartbeat every 5 seconds
  43. defer ticker.Stop()
  44. timer := time.NewTimer(time.Second * 240) // timeout after 240 seconds
  45. defer timer.Stop()
  46. for {
  47. select {
  48. case <-ticker.C:
  49. // heartbeat
  50. response.Write(PluginInstallResponse{
  51. Event: PluginInstallEventInfo,
  52. Data: "Installing",
  53. })
  54. case <-timer.C:
  55. // timeout
  56. response.Write(PluginInstallResponse{
  57. Event: PluginInstallEventInfo,
  58. Data: "Timeout",
  59. })
  60. runtime.Stop()
  61. return
  62. case err := <-runtime.WaitLaunched():
  63. // launched
  64. if err != nil {
  65. response.Write(PluginInstallResponse{
  66. Event: PluginInstallEventError,
  67. Data: err.Error(),
  68. })
  69. runtime.Stop()
  70. return
  71. }
  72. response.Write(PluginInstallResponse{
  73. Event: PluginInstallEventDone,
  74. Data: "Installed",
  75. })
  76. return
  77. }
  78. }
  79. })
  80. return response, nil
  81. }