|
@@ -29,6 +29,7 @@ type InstallPluginResponse struct {
|
|
|
type InstallPluginOnDoneHandler func(
|
|
|
pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
|
|
|
declaration *plugin_entities.PluginDeclaration,
|
|
|
+ meta map[string]any,
|
|
|
) error
|
|
|
|
|
|
func InstallPluginRuntimeToTenant(
|
|
@@ -36,7 +37,7 @@ func InstallPluginRuntimeToTenant(
|
|
|
tenant_id string,
|
|
|
plugin_unique_identifiers []plugin_entities.PluginUniqueIdentifier,
|
|
|
source string,
|
|
|
- meta map[string]any,
|
|
|
+ metas []map[string]any,
|
|
|
onDone InstallPluginOnDoneHandler, // since installing plugin is a async task, we need to call it asynchronously
|
|
|
) (*InstallPluginResponse, error) {
|
|
|
response := &InstallPluginResponse{}
|
|
@@ -85,7 +86,7 @@ func InstallPluginRuntimeToTenant(
|
|
|
})
|
|
|
|
|
|
if err == nil {
|
|
|
- if err := onDone(pluginUniqueIdentifier, pluginDeclaration); err != nil {
|
|
|
+ if err := onDone(pluginUniqueIdentifier, pluginDeclaration, metas[i]); err != nil {
|
|
|
return nil, errors.Join(err, errors.New("failed on plugin installation"))
|
|
|
} else {
|
|
|
task.CompletedPlugins++
|
|
@@ -118,7 +119,7 @@ func InstallPluginRuntimeToTenant(
|
|
|
manager := plugin_manager.Manager()
|
|
|
|
|
|
tasks := []func(){}
|
|
|
- for _, pluginUniqueIdentifier := range pluginsWaitForInstallation {
|
|
|
+ for i, pluginUniqueIdentifier := range pluginsWaitForInstallation {
|
|
|
// copy the variable to avoid race condition
|
|
|
pluginUniqueIdentifier := pluginUniqueIdentifier
|
|
|
|
|
@@ -131,6 +132,7 @@ func InstallPluginRuntimeToTenant(
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
+ i := i
|
|
|
tasks = append(tasks, func() {
|
|
|
updateTaskStatus := func(modifier func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus)) {
|
|
|
if err := db.WithTransaction(func(tx *gorm.DB) error {
|
|
@@ -213,9 +215,9 @@ func InstallPluginRuntimeToTenant(
|
|
|
})
|
|
|
return
|
|
|
}
|
|
|
- stream, err = manager.InstallToAWSFromPkg(zipDecoder, source, meta)
|
|
|
+ stream, err = manager.InstallToAWSFromPkg(zipDecoder, source, metas[i])
|
|
|
} else if config.Platform == app.PLATFORM_LOCAL {
|
|
|
- stream, err = manager.InstallToLocal(pluginUniqueIdentifier, source, meta)
|
|
|
+ stream, err = manager.InstallToLocal(pluginUniqueIdentifier, source, metas[i])
|
|
|
} else {
|
|
|
updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
|
|
|
task.Status = models.InstallTaskStatusFailed
|
|
@@ -255,7 +257,7 @@ func InstallPluginRuntimeToTenant(
|
|
|
}
|
|
|
|
|
|
if message.Event == plugin_manager.PluginInstallEventDone {
|
|
|
- if err := onDone(pluginUniqueIdentifier, declaration); err != nil {
|
|
|
+ if err := onDone(pluginUniqueIdentifier, declaration, metas[i]); err != nil {
|
|
|
updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
|
|
|
task.Status = models.InstallTaskStatusFailed
|
|
|
plugin.Status = models.InstallTaskStatusFailed
|
|
@@ -280,7 +282,7 @@ func InstallPluginRuntimeToTenant(
|
|
|
}
|
|
|
|
|
|
// submit async tasks
|
|
|
- routine.WithMaxRoutine(3, tasks)
|
|
|
+ routine.WithMaxRoutine(5, tasks)
|
|
|
|
|
|
return response, nil
|
|
|
}
|
|
@@ -290,17 +292,18 @@ func InstallPluginFromIdentifiers(
|
|
|
tenant_id string,
|
|
|
plugin_unique_identifiers []plugin_entities.PluginUniqueIdentifier,
|
|
|
source string,
|
|
|
- meta map[string]any,
|
|
|
+ metas []map[string]any,
|
|
|
) *entities.Response {
|
|
|
response, err := InstallPluginRuntimeToTenant(
|
|
|
config,
|
|
|
tenant_id,
|
|
|
plugin_unique_identifiers,
|
|
|
source,
|
|
|
- meta,
|
|
|
+ metas,
|
|
|
func(
|
|
|
pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
|
|
|
declaration *plugin_entities.PluginDeclaration,
|
|
|
+ meta map[string]any,
|
|
|
) error {
|
|
|
runtimeType := plugin_entities.PluginRuntimeType("")
|
|
|
|
|
@@ -371,10 +374,11 @@ func UpgradePlugin(
|
|
|
tenant_id,
|
|
|
[]plugin_entities.PluginUniqueIdentifier{new_plugin_unique_identifier},
|
|
|
source,
|
|
|
- meta,
|
|
|
+ []map[string]any{meta},
|
|
|
func(
|
|
|
pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier,
|
|
|
declaration *plugin_entities.PluginDeclaration,
|
|
|
+ meta map[string]any,
|
|
|
) error {
|
|
|
// uninstall the original plugin
|
|
|
upgradeResponse, err := curd.UpgradePlugin(
|