|
@@ -28,6 +28,10 @@ func (c *Cluster) RegisterPlugin(lifetime plugin_entities.PluginLifetime) error
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
+ if c.showLog {
|
|
|
+ log.Info("registering plugin %s", identity.String())
|
|
|
+ }
|
|
|
+
|
|
|
if c.plugins.Exists(identity.String()) {
|
|
|
return errors.New("plugin has been registered")
|
|
|
}
|
|
@@ -57,7 +61,9 @@ func (c *Cluster) RegisterPlugin(lifetime plugin_entities.PluginLifetime) error
|
|
|
}
|
|
|
c.pluginLock.Unlock()
|
|
|
|
|
|
- log.Info("start to schedule plugin %s", identity)
|
|
|
+ if c.showLog {
|
|
|
+ log.Info("start to schedule plugin %s", identity)
|
|
|
+ }
|
|
|
|
|
|
return nil
|
|
|
}
|
|
@@ -83,6 +89,10 @@ func (c *Cluster) getScanPluginsByIdKey(plugin_id string) string {
|
|
|
// as for the plugin has exited, normally, it will be removed automatically
|
|
|
// but once a plugin is not removed, it will be gc by the master node
|
|
|
func (c *Cluster) schedulePlugins() error {
|
|
|
+ if c.showLog {
|
|
|
+ log.Info("scheduling %d plugins", c.plugins.Len())
|
|
|
+ }
|
|
|
+
|
|
|
c.notifyPluginSchedule()
|
|
|
defer c.notifyPluginScheduleCompleted()
|
|
|
|
|
@@ -90,15 +100,26 @@ func (c *Cluster) schedulePlugins() error {
|
|
|
if time.Since(value.lastScheduledAt) < PLUGIN_SCHEDULER_INTERVAL {
|
|
|
return true
|
|
|
}
|
|
|
+ if c.showLog {
|
|
|
+ log.Info("scheduling plugin %s", key)
|
|
|
+ }
|
|
|
// do plugin state update
|
|
|
err := c.doPluginStateUpdate(value)
|
|
|
if err != nil {
|
|
|
log.Error("failed to update plugin state: %s", err.Error())
|
|
|
}
|
|
|
|
|
|
+ if c.showLog {
|
|
|
+ log.Info("scheduled plugin %s", key)
|
|
|
+ }
|
|
|
+
|
|
|
return true
|
|
|
})
|
|
|
|
|
|
+ if c.showLog {
|
|
|
+ log.Info("scheduled %d plugins", c.plugins.Len())
|
|
|
+ }
|
|
|
+
|
|
|
return nil
|
|
|
}
|
|
|
|
|
@@ -115,6 +136,10 @@ func (c *Cluster) doPluginStateUpdate(lifetime *pluginLifeTime) error {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
+ if c.showLog {
|
|
|
+ log.Info("updating plugin state %s", identity.String())
|
|
|
+ }
|
|
|
+
|
|
|
scheduleState := &pluginState{
|
|
|
Identity: identity.String(),
|
|
|
PluginRuntimeState: state,
|
|
@@ -124,12 +149,18 @@ func (c *Cluster) doPluginStateUpdate(lifetime *pluginLifeTime) error {
|
|
|
|
|
|
// check if the plugin has been removed
|
|
|
if !c.plugins.Exists(identity.String()) {
|
|
|
+ if c.showLog {
|
|
|
+ log.Info("removing plugin state %s due no longer exists", identity.String())
|
|
|
+ }
|
|
|
// remove state
|
|
|
err = c.removePluginState(c.id, hashIdentity)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
} else {
|
|
|
+ if c.showLog {
|
|
|
+ log.Info("updating plugin state %s", identity.String())
|
|
|
+ }
|
|
|
// update plugin state
|
|
|
scheduleState.ScheduledAt = &[]time.Time{time.Now()}[0]
|
|
|
err = cache.SetMapOneField(PLUGIN_STATE_MAP_KEY, stateKey, scheduleState)
|
|
@@ -137,6 +168,9 @@ func (c *Cluster) doPluginStateUpdate(lifetime *pluginLifeTime) error {
|
|
|
return err
|
|
|
}
|
|
|
lifetime.lifetime.UpdateScheduledAt(*scheduleState.ScheduledAt)
|
|
|
+ if c.showLog {
|
|
|
+ log.Info("updated plugin state %s", identity.String())
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
lifetime.lastScheduledAt = time.Now()
|
|
@@ -145,12 +179,17 @@ func (c *Cluster) doPluginStateUpdate(lifetime *pluginLifeTime) error {
|
|
|
}
|
|
|
|
|
|
func (c *Cluster) removePluginState(nodeId string, hashed_identity string) error {
|
|
|
+ if c.showLog {
|
|
|
+ log.Info("removing plugin state %s", hashed_identity)
|
|
|
+ }
|
|
|
err := cache.DelMapField(PLUGIN_STATE_MAP_KEY, c.getPluginStateKey(nodeId, hashed_identity))
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- log.Info("plugin %s has been removed from node %s", hashed_identity, c.id)
|
|
|
+ if c.showLog {
|
|
|
+ log.Info("plugin %s has been removed from node %s", hashed_identity, c.id)
|
|
|
+ }
|
|
|
|
|
|
return nil
|
|
|
}
|