Skip to content

Commit

Permalink
Make the number of ramp steps configurable
Browse files Browse the repository at this point in the history
In #371 we introduced steps to make the scaling up possible even when
the HPA forces a 10% change. The problem is that 10% might not be
sufficient for some specific scaling scenarios.

For example, a an application targeting 12 pods and using a
ScalingSchedule with the value of 10000 to achieve that, will require a
target of 833. With 10 ramp steps the 90% bucket will return a metric of
9000 and the HPA calculates (9000/833) 10.8 pods, rounding to 11 pods.
Once the metric reaches the time to return 100% it will won't be
effective, since the change of the current number of pods (11) and the
desired one (12) is less than 10%.

This commit does not try to tackle this problem completely, since the
10% rule is not fixed, might change among different clusters and is also
dependent on the value given to each ScalingSchedule. Therefore, this
commit makes the number of ramp steps configurable via the
`--scaling-schedule-ramp-steps` config flag, defaulting to 10.

Signed-off-by: Jonathan Juares Beber <jonathanbeber@gmail.com>
  • Loading branch information
jonathanbeber committed Oct 18, 2021
1 parent e04cd10 commit db18d74
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 31 deletions.
49 changes: 29 additions & 20 deletions pkg/collector/scaling_schedule_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type ScalingScheduleCollectorPlugin struct {
store Store
now Now
defaultScalingWindow time.Duration
rampSteps int
}

// ClusterScalingScheduleCollectorPlugin is a collector plugin for initializing metrics
Expand All @@ -90,38 +91,41 @@ type ClusterScalingScheduleCollectorPlugin struct {
store Store
now Now
defaultScalingWindow time.Duration
rampSteps int
}

// NewScalingScheduleCollectorPlugin initializes a new ScalingScheduleCollectorPlugin.
func NewScalingScheduleCollectorPlugin(store Store, now Now, defaultScalingWindow time.Duration) (*ScalingScheduleCollectorPlugin, error) {
func NewScalingScheduleCollectorPlugin(store Store, now Now, defaultScalingWindow time.Duration, rampSteps int) (*ScalingScheduleCollectorPlugin, error) {
return &ScalingScheduleCollectorPlugin{
store: store,
now: now,
defaultScalingWindow: defaultScalingWindow,
rampSteps: rampSteps,
}, nil
}

// NewClusterScalingScheduleCollectorPlugin initializes a new ClusterScalingScheduleCollectorPlugin.
func NewClusterScalingScheduleCollectorPlugin(store Store, now Now, defaultScalingWindow time.Duration) (*ClusterScalingScheduleCollectorPlugin, error) {
func NewClusterScalingScheduleCollectorPlugin(store Store, now Now, defaultScalingWindow time.Duration, rampSteps int) (*ClusterScalingScheduleCollectorPlugin, error) {
return &ClusterScalingScheduleCollectorPlugin{
store: store,
now: now,
defaultScalingWindow: defaultScalingWindow,
rampSteps: rampSteps,
}, nil
}

// NewCollector initializes a new scaling schedule collector from the
// specified HPA. It's the only required method to implement the
// collector.CollectorPlugin interface.
func (c *ScalingScheduleCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewScalingScheduleCollector(c.store, c.defaultScalingWindow, c.now, hpa, config, interval)
return NewScalingScheduleCollector(c.store, c.defaultScalingWindow, c.rampSteps, c.now, hpa, config, interval)
}

// NewCollector initializes a new cluster wide scaling schedule
// collector from the specified HPA. It's the only required method to
// implement the collector.CollectorPlugin interface.
func (c *ClusterScalingScheduleCollectorPlugin) NewCollector(hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (Collector, error) {
return NewClusterScalingScheduleCollector(c.store, c.defaultScalingWindow, c.now, hpa, config, interval)
return NewClusterScalingScheduleCollector(c.store, c.defaultScalingWindow, c.rampSteps, c.now, hpa, config, interval)
}

// ScalingScheduleCollector is a metrics collector for time based
Expand All @@ -148,10 +152,11 @@ type scalingScheduleCollector struct {
interval time.Duration
config MetricConfig
defaultScalingWindow time.Duration
rampSteps int
}

// NewScalingScheduleCollector initializes a new ScalingScheduleCollector.
func NewScalingScheduleCollector(store Store, defaultScalingWindow time.Duration, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ScalingScheduleCollector, error) {
func NewScalingScheduleCollector(store Store, defaultScalingWindow time.Duration, rampSteps int, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ScalingScheduleCollector, error) {
return &ScalingScheduleCollector{
scalingScheduleCollector{
store: store,
Expand All @@ -162,12 +167,13 @@ func NewScalingScheduleCollector(store Store, defaultScalingWindow time.Duration
interval: interval,
config: *config,
defaultScalingWindow: defaultScalingWindow,
rampSteps: rampSteps,
},
}, nil
}

// NewClusterScalingScheduleCollector initializes a new ScalingScheduleCollector.
func NewClusterScalingScheduleCollector(store Store, defaultScalingWindow time.Duration, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ClusterScalingScheduleCollector, error) {
func NewClusterScalingScheduleCollector(store Store, defaultScalingWindow time.Duration, rampSteps int, now Now, hpa *autoscalingv2.HorizontalPodAutoscaler, config *MetricConfig, interval time.Duration) (*ClusterScalingScheduleCollector, error) {
return &ClusterScalingScheduleCollector{
scalingScheduleCollector{
store: store,
Expand All @@ -178,6 +184,7 @@ func NewClusterScalingScheduleCollector(store Store, defaultScalingWindow time.D
interval: interval,
config: *config,
defaultScalingWindow: defaultScalingWindow,
rampSteps: rampSteps,
},
}, nil
}
Expand All @@ -196,7 +203,7 @@ func (c *ScalingScheduleCollector) GetMetrics() ([]CollectedMetric, error) {
if !ok {
return nil, ErrNotScalingScheduleFound
}
return calculateMetrics(scalingSchedule.Spec, c.defaultScalingWindow, c.now(), c.objectReference, c.metric)
return calculateMetrics(scalingSchedule.Spec, c.defaultScalingWindow, c.rampSteps, c.now(), c.objectReference, c.metric)
}

// GetMetrics is the main implementation for collector.Collector interface
Expand Down Expand Up @@ -229,7 +236,7 @@ func (c *ClusterScalingScheduleCollector) GetMetrics() ([]CollectedMetric, error
clusterScalingSchedule = v1.ClusterScalingSchedule(*scalingSchedule)
}

return calculateMetrics(clusterScalingSchedule.Spec, c.defaultScalingWindow, c.now(), c.objectReference, c.metric)
return calculateMetrics(clusterScalingSchedule.Spec, c.defaultScalingWindow, c.rampSteps, c.now(), c.objectReference, c.metric)
}

// Interval returns the interval at which the collector should run.
Expand All @@ -242,7 +249,7 @@ func (c *ClusterScalingScheduleCollector) Interval() time.Duration {
return c.interval
}

func calculateMetrics(spec v1.ScalingScheduleSpec, defaultScalingWindow time.Duration, now time.Time, objectReference custom_metrics.ObjectReference, metric autoscalingv2.MetricIdentifier) ([]CollectedMetric, error) {
func calculateMetrics(spec v1.ScalingScheduleSpec, defaultScalingWindow time.Duration, rampSteps int, now time.Time, objectReference custom_metrics.ObjectReference, metric autoscalingv2.MetricIdentifier) ([]CollectedMetric, error) {
scalingWindowDuration := defaultScalingWindow
if spec.ScalingWindowDurationMinutes != nil {
scalingWindowDuration = time.Duration(*spec.ScalingWindowDurationMinutes) * time.Minute
Expand Down Expand Up @@ -285,7 +292,7 @@ func calculateMetrics(spec v1.ScalingScheduleSpec, defaultScalingWindow time.Dur
parsedStartTime.Nanosecond(),
location,
)
value = maxInt64(value, valueForEntry(now, scheduledTime, schedule.Duration(), scalingWindowDuration, schedule.Value))
value = maxInt64(value, valueForEntry(now, scheduledTime, schedule.Duration(), scalingWindowDuration, rampSteps, schedule.Value))
break
}
}
Expand All @@ -295,7 +302,7 @@ func calculateMetrics(spec v1.ScalingScheduleSpec, defaultScalingWindow time.Dur
return nil, ErrInvalidScheduleDate
}

value = maxInt64(value, valueForEntry(now, scheduledTime, schedule.Duration(), scalingWindowDuration, schedule.Value))
value = maxInt64(value, valueForEntry(now, scheduledTime, schedule.Duration(), scalingWindowDuration, rampSteps, schedule.Value))
}
}

Expand All @@ -313,7 +320,7 @@ func calculateMetrics(spec v1.ScalingScheduleSpec, defaultScalingWindow time.Dur
}, nil
}

func valueForEntry(timestamp time.Time, startTime time.Time, entryDuration time.Duration, scalingWindowDuration time.Duration, value int64) int64 {
func valueForEntry(timestamp time.Time, startTime time.Time, entryDuration time.Duration, scalingWindowDuration time.Duration, rampSteps int, value int64) int64 {
scaleUpStart := startTime.Add(-scalingWindowDuration)
endTime := startTime.Add(entryDuration)
scaleUpEnd := endTime.Add(scalingWindowDuration)
Expand All @@ -322,23 +329,25 @@ func valueForEntry(timestamp time.Time, startTime time.Time, entryDuration time.
return value
}
if between(timestamp, scaleUpStart, startTime) {
return scaledValue(timestamp, scaleUpStart, scalingWindowDuration, value)
return scaledValue(timestamp, scaleUpStart, scalingWindowDuration, rampSteps, value)
}
if between(timestamp, endTime, scaleUpEnd) {
return scaledValue(scaleUpEnd, timestamp, scalingWindowDuration, value)
return scaledValue(scaleUpEnd, timestamp, scalingWindowDuration, rampSteps, value)
}
return 0
}

func scaledValue(timestamp time.Time, startTime time.Time, scalingWindowDuration time.Duration, value int64) int64 {
// The HPA has a rule to do not scale up or down if the change in the
// metric is less than 10% (by default) of the current value. We will
// use buckets of time using the floor of each as the returned metric.
// Any config greater or equal to 10 buckets must guarantee changes
// bigger than 10%.
func scaledValue(timestamp time.Time, startTime time.Time, scalingWindowDuration time.Duration, rampSteps int, value int64) int64 {
if scalingWindowDuration == 0 {
return 0
}
// The HPA has a rule to do not scale up or down if the change in
// the metric is less than 10% of the current value. We will use 10
// buckets of time using the floor of each. This value might be
// flexible one day, but for now it's fixed.
const steps float64 = 10

steps := float64(rampSteps)

requiredPercentage := math.Abs(float64(timestamp.Sub(startTime))) / float64(scalingWindowDuration)
return int64(math.Floor(requiredPercentage*steps) * (float64(value) / steps))
Expand Down
46 changes: 39 additions & 7 deletions pkg/collector/scaling_schedule_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
const (
hHMMFormat = "15:04"
defaultScalingWindowDuration = 1 * time.Minute
defaultRampSteps = 10
)

type schedule struct {
Expand Down Expand Up @@ -48,6 +49,7 @@ func TestScalingScheduleCollector(t *testing.T) {
scalingWindowDurationMinutes *int64
expectedValue int64
err error
rampSteps int
}{
{
msg: "Return the right value for one time config",
Expand Down Expand Up @@ -109,6 +111,31 @@ func TestScalingScheduleCollector(t *testing.T) {
},
expectedValue: 60,
},
{
msg: "10 steps (default) return 90% of the metric, even 1 second before",
schedules: []schedule{
{
date: nowTime.Add(time.Second * 1).Format(time.RFC3339),
kind: "OneTime",
duration: 45,
value: 100,
},
},
expectedValue: 90,
},
{
msg: "5 steps return 80% of the metric, even 1 second before",
schedules: []schedule{
{
date: nowTime.Add(time.Second * 1).Format(time.RFC3339),
kind: "OneTime",
duration: 45,
value: 100,
},
},
expectedValue: 80,
rampSteps: 5,
},
{
msg: "Return the scaled value (90) for one time config with a custom scaling window - 30 seconds before starting",
scalingWindowDurationMinutes: &tenMinutes,
Expand Down Expand Up @@ -471,17 +498,22 @@ func TestScalingScheduleCollector(t *testing.T) {
scalingScheduleName := "my_scaling_schedule"
namespace := "default"

rampSteps := tc.rampSteps
if rampSteps == 0 {
rampSteps = defaultRampSteps
}

schedules := getSchedules(tc.schedules)
store := newMockStore(scalingScheduleName, namespace, tc.scalingWindowDurationMinutes, schedules)
plugin, err := NewScalingScheduleCollectorPlugin(store, now, defaultScalingWindowDuration)
plugin, err := NewScalingScheduleCollectorPlugin(store, now, defaultScalingWindowDuration, rampSteps)
require.NoError(t, err)

clusterStore := newClusterMockStore(scalingScheduleName, tc.scalingWindowDurationMinutes, schedules)
clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, now, defaultScalingWindowDuration)
clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, now, defaultScalingWindowDuration, rampSteps)
require.NoError(t, err)

clusterStoreFirstRun := newClusterMockStoreFirstRun(scalingScheduleName, tc.scalingWindowDurationMinutes, schedules)
clusterPluginFirstRun, err := NewClusterScalingScheduleCollectorPlugin(clusterStoreFirstRun, now, defaultScalingWindowDuration)
clusterPluginFirstRun, err := NewClusterScalingScheduleCollectorPlugin(clusterStoreFirstRun, now, defaultScalingWindowDuration, rampSteps)
require.NoError(t, err)

hpa := makeScalingScheduleHPA(namespace, scalingScheduleName)
Expand Down Expand Up @@ -549,14 +581,14 @@ func TestScalingScheduleObjectNotPresentReturnsError(t *testing.T) {
make(map[string]interface{}),
getByKeyFn,
}
plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration)
plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration, defaultRampSteps)
require.NoError(t, err)

clusterStore := mockStore{
make(map[string]interface{}),
getByKeyFn,
}
clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, time.Now, defaultScalingWindowDuration)
clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(clusterStore, time.Now, defaultScalingWindowDuration, defaultRampSteps)
require.NoError(t, err)

hpa := makeScalingScheduleHPA("namespace", "scalingScheduleName")
Expand Down Expand Up @@ -611,10 +643,10 @@ func TestReturnsErrorWhenStoreDoes(t *testing.T) {
},
}

plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration)
plugin, err := NewScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration, defaultRampSteps)
require.NoError(t, err)

clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration)
clusterPlugin, err := NewClusterScalingScheduleCollectorPlugin(store, time.Now, defaultScalingWindowDuration, defaultRampSteps)
require.NoError(t, err)

hpa := makeScalingScheduleHPA("namespace", "scalingScheduleName")
Expand Down
11 changes: 7 additions & 4 deletions pkg/server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ func NewCommandStartAdapterServer(stopCh <-chan struct{}) *cobra.Command {
flags.DurationVar(&o.GCInterval, "garbage-collector-interval", 10*time.Minute, "Interval to clean up metrics that are stored in in-memory cache.")
flags.BoolVar(&o.ScalingScheduleMetrics, "scaling-schedule", o.ScalingScheduleMetrics, ""+
"whether to enable time-based ScalingSchedule metrics")
flags.DurationVar(&o.DefaultScheduledScalingWindow, "scaling-schedule-default-scaling-window", 10*time.Minute, "Default scale-up/scale-down window duration for scheduled metrics")
flags.DurationVar(&o.DefaultScheduledScalingWindow, "scaling-schedule-default-scaling-window", 10*time.Minute, "Default rampup and rampdown window duration for ScalingSchedules")
flags.IntVar(&o.RampSteps, "scaling-schedule-ramp-steps", 10, "Number of steps used to rampup and rampdown ScalingSchedules. It's used to guarantee won't avoid reaching the max scaling due to the 10% minimum change rule.")
return cmd
}

Expand Down Expand Up @@ -294,7 +295,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
)
go reflector.Run(ctx.Done())

clusterPlugin, err := collector.NewClusterScalingScheduleCollectorPlugin(clusterScalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow)
clusterPlugin, err := collector.NewClusterScalingScheduleCollectorPlugin(clusterScalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow, o.RampSteps)
if err != nil {
return fmt.Errorf("unable to create ClusterScalingScheduleCollector plugin: %v", err)
}
Expand All @@ -303,7 +304,7 @@ func (o AdapterServerOptions) RunCustomMetricsAdapterServer(stopCh <-chan struct
return fmt.Errorf("failed to register ClusterScalingSchedule object collector plugin: %v", err)
}

plugin, err := collector.NewScalingScheduleCollectorPlugin(scalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow)
plugin, err := collector.NewScalingScheduleCollectorPlugin(scalingSchedulesStore, time.Now, o.DefaultScheduledScalingWindow, o.RampSteps)
if err != nil {
return fmt.Errorf("unable to create ScalingScheduleCollector plugin: %v", err)
}
Expand Down Expand Up @@ -429,6 +430,8 @@ type AdapterServerOptions struct {
GCInterval time.Duration
// Time-based scaling based on the CRDs ScheduleScaling and ClusterScheduleScaling.
ScalingScheduleMetrics bool
// Default scale-up/scale-down window duration for scheduled metrics
// Default ramp-up/ramp-down window duration for scheduled metrics
DefaultScheduledScalingWindow time.Duration
// Number of steps utilized during the rampup and rampdown for scheduled metrics
RampSteps int
}

0 comments on commit db18d74

Please sign in to comment.