123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- package plugin_manager
- import (
- "time"
- "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
- )
- // InstallToLocal installs a plugin to local
- func (p *PluginManager) InstallToLocal(
- plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
- source string,
- meta map[string]any,
- ) (
- *stream.Stream[PluginInstallResponse], error,
- ) {
- package_file, err := p.packageBucket.Get(plugin_unique_identifier.String())
- if err != nil {
- return nil, err
- }
- err = p.installedBucket.Save(plugin_unique_identifier, package_file)
- if err != nil {
- return nil, err
- }
- runtime, launched_chan, err := p.launchLocal(plugin_unique_identifier)
- if err != nil {
- return nil, err
- }
- response := stream.NewStream[PluginInstallResponse](128)
- routine.Submit(func() {
- defer response.Close()
- ticker := time.NewTicker(time.Second * 5) // check heartbeat every 5 seconds
- defer ticker.Stop()
- timer := time.NewTimer(time.Second * 240) // timeout after 240 seconds
- defer timer.Stop()
- for {
- select {
- case <-ticker.C:
- // heartbeat
- response.Write(PluginInstallResponse{
- Event: PluginInstallEventInfo,
- Data: "Installing",
- })
- case <-timer.C:
- // timeout
- response.Write(PluginInstallResponse{
- Event: PluginInstallEventInfo,
- Data: "Timeout",
- })
- runtime.Stop()
- return
- case <-launched_chan:
- // launched
- if err != nil {
- response.Write(PluginInstallResponse{
- Event: PluginInstallEventError,
- Data: err.Error(),
- })
- runtime.Stop()
- return
- }
- response.Write(PluginInstallResponse{
- Event: PluginInstallEventDone,
- Data: "Installed",
- })
- return
- }
- }
- })
- return response, nil
- }
|