install_to_local.go 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. package plugin_manager
  2. import (
  3. "time"
  4. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  5. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  6. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  7. )
  8. // InstallToLocal installs a plugin to local
  9. func (p *PluginManager) InstallToLocal(
  10. plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
  11. source string,
  12. meta map[string]any,
  13. ) (
  14. *stream.Stream[PluginInstallResponse], error,
  15. ) {
  16. packageFile, err := p.packageBucket.Get(plugin_unique_identifier.String())
  17. if err != nil {
  18. return nil, err
  19. }
  20. err = p.installedBucket.Save(plugin_unique_identifier, packageFile)
  21. if err != nil {
  22. return nil, err
  23. }
  24. runtime, launchedChan, errChan, err := p.launchLocal(plugin_unique_identifier)
  25. if err != nil {
  26. return nil, err
  27. }
  28. response := stream.NewStream[PluginInstallResponse](128)
  29. routine.Submit(func() {
  30. defer response.Close()
  31. ticker := time.NewTicker(time.Second * 5) // check heartbeat every 5 seconds
  32. defer ticker.Stop()
  33. timer := time.NewTimer(time.Second * 240) // timeout after 240 seconds
  34. defer timer.Stop()
  35. for {
  36. select {
  37. case <-ticker.C:
  38. // heartbeat
  39. response.Write(PluginInstallResponse{
  40. Event: PluginInstallEventInfo,
  41. Data: "Installing",
  42. })
  43. case <-timer.C:
  44. // timeout
  45. response.Write(PluginInstallResponse{
  46. Event: PluginInstallEventInfo,
  47. Data: "Timeout",
  48. })
  49. runtime.Stop()
  50. return
  51. case err := <-errChan:
  52. if err != nil {
  53. // if error occurs, stop the plugin
  54. response.Write(PluginInstallResponse{
  55. Event: PluginInstallEventError,
  56. Data: err.Error(),
  57. })
  58. runtime.Stop()
  59. return
  60. }
  61. case <-launchedChan:
  62. response.Write(PluginInstallResponse{
  63. Event: PluginInstallEventDone,
  64. Data: "Installed",
  65. })
  66. return
  67. }
  68. }
  69. })
  70. return response, nil
  71. }