diff --git a/pkg/collector/scaling_schedule_collector.go b/pkg/collector/scaling_schedule_collector.go index 3dba87b0..ae87c4f1 100644 --- a/pkg/collector/scaling_schedule_collector.go +++ b/pkg/collector/scaling_schedule_collector.go @@ -82,6 +82,7 @@ type ScalingScheduleCollectorPlugin struct { store Store now Now defaultScalingWindow time.Duration + rampSteps int } // ClusterScalingScheduleCollectorPlugin is a collector plugin for initializing metrics @@ -90,23 +91,26 @@ type ClusterScalingScheduleCollectorPlugin struct { store Store now Now defaultScalingWindow time.Duration + rampSteps int } // NewScalingScheduleCollectorPlugin initializes a new ScalingScheduleCollectorPlugin. -func NewScalingScheduleCollectorPlugin(store Store, now Now, defaultScalingWindow time.Duration) (*ScalingScheduleCollectorPlugin, error) { +func NewScalingScheduleCollectorPlugin(store Store, now Now, defaultScalingWindow time.Duration, rampSteps int) (*ScalingScheduleCollectorPlugin, error) { return &ScalingScheduleCollectorPlugin{ store: store, now: now, defaultScalingWindow: defaultScalingWindow, + rampSteps: rampSteps, }, nil } // NewClusterScalingScheduleCollectorPlugin initializes a new ClusterScalingScheduleCollectorPlugin. -func NewClusterScalingScheduleCollectorPlugin(store Store, now Now, defaultScalingWindow time.Duration) (*ClusterScalingScheduleCollectorPlugin, error) { +func NewClusterScalingScheduleCollectorPlugin(store Store, now Now, defaultScalingWindow time.Duration, rampSteps int) (*ClusterScalingScheduleCollectorPlugin, error) { return &ClusterScalingScheduleCollectorPlugin{ store: store, now: now, defaultScalingWindow: defaultScalingWindow, + rampSteps: rampSteps, }, nil } @@ -114,14 +118,14 @@ func NewClusterScalingScheduleCollectorPlugin(store Store, now Now, defaultScali // specified HPA. It's the only required method to implement the // collector.CollectorPlugin interface. func (c *ScalingScheduleCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) { - return NewScalingScheduleCollector(c.store, c.defaultScalingWindow, c.now, hpa, config, interval) + return NewScalingScheduleCollector(c.store, c.defaultScalingWindow, c.rampSteps, c.now, hpa, config, interval) } // NewCollector initializes a new cluster wide scaling schedule // collector from the specified HPA. It's the only required method to // implement the collector.CollectorPlugin interface. func (c *ClusterScalingScheduleCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) { - return NewClusterScalingScheduleCollector(c.store, c.defaultScalingWindow, c.now, hpa, config, interval) + return NewClusterScalingScheduleCollector(c.store, c.defaultScalingWindow, c.rampSteps, c.now, hpa, config, interval) } // ScalingScheduleCollector is a metrics collector for time based @@ -148,10 +152,11 @@ type scalingScheduleCollector struct { interval time.Duration config MetricConfig defaultScalingWindow time.Duration + rampSteps int } // NewScalingScheduleCollector initializes a new ScalingScheduleCollector. -func NewScalingScheduleCollector(store Store, defaultScalingWindow time.Duration, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ScalingScheduleCollector, error) { +func NewScalingScheduleCollector(store Store, defaultScalingWindow time.Duration, rampSteps int, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ScalingScheduleCollector, error) { return &ScalingScheduleCollector{ scalingScheduleCollector{ store: store, @@ -162,12 +167,13 @@ func NewScalingScheduleCollector(store Store, defaultScalingWindow time.Duration interval: interval, config: *config, defaultScalingWindow: defaultScalingWindow, + rampSteps: rampSteps, }, }, nil } // NewClusterScalingScheduleCollector initializes a new ScalingScheduleCollector. -func NewClusterScalingScheduleCollector(store Store, defaultScalingWindow time.Duration, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ClusterScalingScheduleCollector, error) { +func NewClusterScalingScheduleCollector(store Store, defaultScalingWindow time.Duration, rampSteps int, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ClusterScalingScheduleCollector, error) { return &ClusterScalingScheduleCollector{ scalingScheduleCollector{ store: store, @@ -178,6 +184,7 @@ func NewClusterScalingScheduleCollector(store Store, defaultScalingWindow time.D interval: interval, config: *config, defaultScalingWindow: defaultScalingWindow, + rampSteps: rampSteps, }, }, nil } @@ -196,7 +203,7 @@ func (c *ScalingScheduleCollector) GetMetrics() ([]CollectedMetric, error) { if !ok { return nil, ErrNotScalingScheduleFound } - return calculateMetrics(scalingSchedule.Spec, c.defaultScalingWindow, c.now(), c.objectReference, c.metric) + return calculateMetrics(scalingSchedule.Spec, c.defaultScalingWindow, c.rampSteps, c.now(), c.objectReference, c.metric) } // GetMetrics is the main implementation for collector.Collector interface @@ -229,7 +236,7 @@ func (c *ClusterScalingScheduleCollector) GetMetrics() ([]CollectedMetric, error clusterScalingSchedule = v1.ClusterScalingSchedule(*scalingSchedule) } - return calculateMetrics(clusterScalingSchedule.Spec, c.defaultScalingWindow, c.now(), c.objectReference, c.metric) + return calculateMetrics(clusterScalingSchedule.Spec, c.defaultScalingWindow, c.rampSteps, c.now(), c.objectReference, c.metric) } // Interval returns the interval at which the collector should run. @@ -242,7 +249,7 @@ func (c *ClusterScalingScheduleCollector) Interval() time.Duration { return c.interval } -func calculateMetrics(spec v1.ScalingScheduleSpec, defaultScalingWindow time.Duration, now time.Time, objectReference custom_metrics.ObjectReference, metric autoscalingv2.MetricIdentifier) ([]CollectedMetric, error) { +func calculateMetrics(spec v1.ScalingScheduleSpec, defaultScalingWindow time.Duration, rampSteps int, now time.Time, objectReference custom_metrics.ObjectReference, metric autoscalingv2.MetricIdentifier) ([]CollectedMetric, error) { scalingWindowDuration := defaultScalingWindow if spec.ScalingWindowDurationMinutes != nil { scalingWindowDuration = time.Duration(*spec.ScalingWindowDurationMinutes) * time.Minute @@ -285,7 +292,7 @@ func calculateMetrics(spec v1.ScalingScheduleSpec, defaultScalingWindow time.Dur parsedStartTime.Nanosecond(), location, ) - value = maxInt64(value, valueForEntry(now, scheduledTime, schedule.Duration(), scalingWindowDuration, schedule.Value)) + value = maxInt64(value, valueForEntry(now, scheduledTime, schedule.Duration(), scalingWindowDuration, rampSteps, schedule.Value)) break } } @@ -295,7 +302,7 @@ func calculateMetrics(spec v1.ScalingScheduleSpec, defaultScalingWindow time.Dur return nil, ErrInvalidScheduleDate } - value = maxInt64(value, valueForEntry(now, scheduledTime, schedule.Duration(), scalingWindowDuration, schedule.Value)) + value = maxInt64(value, valueForEntry(now, scheduledTime, schedule.Duration(), scalingWindowDuration, rampSteps, schedule.Value)) } } @@ -313,7 +320,7 @@ func calculateMetrics(spec v1.ScalingScheduleSpec, defaultScalingWindow time.Dur }, nil } -func valueForEntry(timestamp time.Time, startTime time.Time, entryDuration time.Duration, scalingWindowDuration time.Duration, value int64) int64 { +func valueForEntry(timestamp time.Time, startTime time.Time, entryDuration time.Duration, scalingWindowDuration time.Duration, rampSteps int, value int64) int64 { scaleUpStart := startTime.Add(-scalingWindowDuration) endTime := startTime.Add(entryDuration) scaleUpEnd := endTime.Add(scalingWindowDuration) @@ -322,23 +329,25 @@ func valueForEntry(timestamp time.Time, startTime time.Time, entryDuration time. return value } if between(timestamp, scaleUpStart, startTime) { - return scaledValue(timestamp, scaleUpStart, scalingWindowDuration, value) + return scaledValue(timestamp, scaleUpStart, scalingWindowDuration, rampSteps, value) } if between(timestamp, endTime, scaleUpEnd) { - return scaledValue(scaleUpEnd, timestamp, scalingWindowDuration, value) + return scaledValue(scaleUpEnd, timestamp, scalingWindowDuration, rampSteps, value) } return 0 } -func scaledValue(timestamp time.Time, startTime time.Time, scalingWindowDuration time.Duration, value int64) int64 { +// The HPA has a rule to do not scale up or down if the change in the +// metric is less than 10% (by default) of the current value. We will +// use buckets of time using the floor of each as the returned metric. +// Any config greater or equal to 10 buckets must guarantee changes +// bigger than 10%. +func scaledValue(timestamp time.Time, startTime time.Time, scalingWindowDuration time.Duration, rampSteps int, value int64) int64 { if scalingWindowDuration == 0 { return 0 } - // The HPA has a rule to do not scale up or down if the change in - // the metric is less than 10% of the current value. We will use 10 - // buckets of time using the floor of each. This value might be - // flexible one day, but for now it's fixed. - const steps float64 = 10 + + steps := float64(rampSteps) requiredPercentage := math.Abs(float64(timestamp.Sub(startTime))) / float64(scalingWindowDuration) return int64(math.Floor(requiredPercentage*steps) * (float64(value) / steps)) diff --git a/pkg/collector/scaling_schedule_collector_test.go b/pkg/collector/scaling_schedule_collector_test.go index 305ebf66..aa9b0edf 100644 --- a/pkg/collector/scaling_schedule_collector_test.go +++ b/pkg/collector/scaling_schedule_collector_test.go @@ -15,6 +15,7 @@ import ( const ( hHMMFormat = "15:04" defaultScalingWindowDuration = 1 * time.Minute + defaultRampSteps = 10 ) type schedule struct { @@ -48,6 +49,7 @@ func TestScalingScheduleCollector(t *testing.T) { scalingWindowDurationMinutes *int64 expectedValue int64 err error + rampSteps int }{ { msg: "Return the right value for one time config", @@ -109,6 +111,31 @@ func TestScalingScheduleCollector(t *testing.T) { }, expectedValue: 60, }, + { + msg: "10 steps (default) return 90% of the metric, even 1 second before", + schedules: []schedule{ + { + date: nowTime.Add(time.Second * 1).Format(time.RFC3339), + kind: "OneTime", + duration: 45, + value: 100, + }, + }, + expectedValue: 90, + }, + { + msg: "5 steps return 80% of the metric, even 1 second before", + schedules: []schedule{ + { + date: nowTime.Add(time.Second * 1).Format(time.RFC3339), + kind: "OneTime", + duration: 45, + value: 100, + }, + }, + expectedValue: 80, + rampSteps: 5, + }, { msg: "Return the scaled value (90) for one time config with a custom scaling window - 30 seconds before starting", scalingWindowDurationMinutes: &tenMinutes, @@ -471,17 +498,22 @@ func TestScalingScheduleCollector(t *testing.T) { scalingScheduleName := "my_scaling_schedule" namespace := "default" + rampSteps := tc.rampSteps + if rampSteps == 0 { + rampSteps = defaultRampSteps + } + schedules := getSchedules(tc.schedules) store := newMockStore(scalingScheduleName, namespace, tc.scalingWindowDurationMinutes, schedules) - plugin, err := NewScalingScheduleCollectorPlugin(store, now, defaultScalingWindowDuration) + plugin, err := NewScalingScheduleCollectorPlugin(store, now, defaultScalingWindowDuration, rampSteps) require.NoError(t, err) clusterStore := newClusterMockStore(scalingScheduleName, tc.scalingWindowDurationMinutes, schedules) - clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, now, defaultScalingWindowDuration) + clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, now, defaultScalingWindowDuration, rampSteps) require.NoError(t, err) clusterStoreFirstRun := newClusterMockStoreFirstRun(scalingScheduleName, tc.scalingWindowDurationMinutes, schedules) - clusterPluginFirstRun, err := NewClusterScalingScheduleCollectorPlugin(clusterStoreFirstRun, now, defaultScalingWindowDuration) + clusterPluginFirstRun, err := NewClusterScalingScheduleCollectorPlugin(clusterStoreFirstRun, now, defaultScalingWindowDuration, rampSteps) require.NoError(t, err) hpa := makeScalingScheduleHPA(namespace, scalingScheduleName) @@ -549,14 +581,14 @@ func TestScalingScheduleObjectNotPresentReturnsError(t *testing.T) { make(map[string]interface{}), getByKeyFn, } - plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration) + plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration, defaultRampSteps) require.NoError(t, err) clusterStore := mockStore{ make(map[string]interface{}), getByKeyFn, } - clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, time.Now, defaultScalingWindowDuration) + clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, time.Now, defaultScalingWindowDuration, defaultRampSteps) require.NoError(t, err) hpa := makeScalingScheduleHPA("namespace", "scalingScheduleName") @@ -611,10 +643,10 @@ func TestReturnsErrorWhenStoreDoes(t *testing.T) { }, } - plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration) + plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration, defaultRampSteps) require.NoError(t, err) - clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration) + clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration, defaultRampSteps) require.NoError(t, err) hpa := makeScalingScheduleHPA("namespace", "scalingScheduleName") diff --git a/pkg/server/start.go b/pkg/server/start.go index 1b9de872..dd11287a 100644 --- a/pkg/server/start.go +++ b/pkg/server/start.go @@ -128,7 +128,8 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command { flags.DurationVar(&o.GCInterval, "garbage-collector-interval", 10*time.Minute, "Interval to clean up metrics that are stored in in-memory cache.") flags.BoolVar(&o.ScalingScheduleMetrics, "scaling-schedule", o.ScalingScheduleMetrics, ""+ "whether to enable time-based ScalingSchedule metrics") - flags.DurationVar(&o.DefaultScheduledScalingWindow, "scaling-schedule-default-scaling-window", 10*time.Minute, "Default scale-up/scale-down window duration for scheduled metrics") + flags.DurationVar(&o.DefaultScheduledScalingWindow, "scaling-schedule-default-scaling-window", 10*time.Minute, "Default rampup and rampdown window duration for ScalingSchedules") + flags.IntVar(&o.RampSteps, "scaling-schedule-ramp-steps", 10, "Number of steps used to rampup and rampdown ScalingSchedules. It's used to guarantee won't avoid reaching the max scaling due to the 10% minimum change rule.") return cmd } @@ -294,7 +295,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct ) go reflector.Run(ctx.Done()) - clusterPlugin, err := collector.NewClusterScalingScheduleCollectorPlugin(clusterScalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow) + clusterPlugin, err := collector.NewClusterScalingScheduleCollectorPlugin(clusterScalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow, o.RampSteps) if err != nil { return fmt.Errorf("unable to create ClusterScalingScheduleCollector plugin: %v", err) } @@ -303,7 +304,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct return fmt.Errorf("failed to register ClusterScalingSchedule object collector plugin: %v", err) } - plugin, err := collector.NewScalingScheduleCollectorPlugin(scalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow) + plugin, err := collector.NewScalingScheduleCollectorPlugin(scalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow, o.RampSteps) if err != nil { return fmt.Errorf("unable to create ScalingScheduleCollector plugin: %v", err) } @@ -429,6 +430,8 @@ type AdapterServerOptions struct { GCInterval time.Duration // Time-based scaling based on the CRDs ScheduleScaling and ClusterScheduleScaling. ScalingScheduleMetrics bool - // Default scale-up/scale-down window duration for scheduled metrics + // Default ramp-up/ramp-down window duration for scheduled metrics DefaultScheduledScalingWindow time.Duration + // Number of steps utilized during the rampup and rampdown for scheduled metrics + RampSteps int }