Skip to content

Commit

Permalink
Feat/event based instance replacement - Fix to terminateRandomSpotIns…
Browse files Browse the repository at this point in the history
…tance logic in Schedule run + specify ondemandprice multiplier on a ASG group level (#425)

* Allow flavor to be customised (#359)

* fix typo (#360)

* Fix typo (#361)

* UserData wrappers for CloudFormation helper scripts when using Beanstalk (#366)

* Support custom role for cfn-init in Beanstalk UserData

* Wrappers & refactoring

* Docs

* Docs fixes

* More docs fixes

* Docs fixes

* Yet more docs fixes

* Lambda & Kubernetes config

* AutoSpottingElasticBeanstalk managed policy + Rename beanstalk_cfn_init_role to beanstalk_cfn_wrappers

* Update param description

* Kubernetes config fix

* Rename Beanstalk variable + Move test data

* Add missing permission for AutoSpottingBeanstalk role

* Bring Go to 1.13 (#367)

* merge fixes

* Begin - port regional CloudFormation stack changes to the main template so it can also run from a StackSet

* Merge fix

* Fix to template

* Multiple fixes to handle lambda concurrency

As instaces can be launched in concurrency/overlapping we have the
problems related to multiple lambdas acting on the same ASG

* Removed commented code

* Begin working on:

#354 (comment)

* Progress #354

Created Queue in CF template
Begin sending message

* Progress #354

* Progress #354

* Progress #354

* Progress #354

* Progress #354

* * Use inline python lambda to increase ASG

Use it only if AttachInstances method fails for wrong ASG max size.

* * Use inline python lambda to increase ASG

progress

* * Use inline python lambda to increase ASG

progress

* * Improvements on event based instance replacement

- Ported to StackSet deploy mode.

- Fix to "ScalingActivityInProgress" if launched ondemand instance is
terminated before going "inservice":

	Created method "waitForInstanceStatus", that wait, until a max
	retry (5), that an instance belonging to an ASG is in the desired
	status (InService).
	(Sleep time is 5*retry)

- Fix to multiple problems caused by lambda concurrency changing ASG
MaxSize:
	Created another Lambda (LambdaManageASG) in the same region of
	the main one; code python3.7, inline in template.
	Lambda concurrency is set to one.
	Is function is to change ASG MaxSize by the ammount specified.

	Used a "try-catch approach":
	if AttachInstances return error code "ValidationError" and
	string "update the AutoScalingGroup sizes" is present in error
	message, means that we need to increase ASG MaxSize.
	So we execute method changeAutoScalingMaxSize that invoke
	LambdaManageASG.

	If invoke return error "ErrCodeTooManyRequestsException", means
	that multiple Main lambdas are executing LambdaManageASG, we
	sleep for a random interval part of seconds and retry.

	Method attachSpotInstance now return an int that represent the
	ammount of change to the ASG MaxSize, so that in
	swapWithGroupMember we can defer the call to
	changeAutoScalingMaxSize to decrease the ASG MaxSize of the
	previously increased ammount.

	We use "waitForInstanceStatus" in attachSpotInstance before
	returning to be sure that spot instance has been attached and
	are InService before beginning to terminate ondemand one.

* * Improvements on event based instance replacement

- Add logPrefix to better identify lambda actions in case of concurent
executions.
	TE = Spot terminate event
	ST = Instance start event
	SC = Schedule event

TE and ST are followed by instanceID, SC is followed by activation time.

* * Improvements on event based instance replacement

- fix/improvement on Suspending/Resuming Termination process:
suspend/resume is now handled by LambdaManageASG too.

When i successfully suspend termination, i add a tag
"autospotting_suspend_process_by" with value equals to the instanceId
 event that triggered the main lambda.
When i try to resume termination first check if the value of the tag
above equals the current one.
If not, means that another main lambda, different from the one who suspended
it, is trying to resume the process; in that case i do not resume it.

Considered that LambdaManageASG has a concurrent execution limit set to
1 we can have the following cases:

1)
  *) A main lambda suspend the process.
  *) No other lambda suspend it.
  *) Main lambda resume the process.
  *) Another main lambda suspend the process
  *) ....and so on

2)
  *) A main lambda suspend the process.
  *) Before the first one resume the process another one suspend it (so
  replacing the tag value)
  *) First lambda do not resume the process (tag value differ)
  *) Second lambda resume the process

In the wrost scenario of a Lambda dying before resuming the process, it
will be resumed after another one will suspend it.

* * Improvements on event based instance replacement

code cosmetic fix

* * Improvements on event based instance replacement

- for rand seed instead of using time.Now().UnixNano() we build a seed
based on the instanceId that triggered the event.

The seed is build this way:
for every char of instanceId (starting from third char) we get his rune
 "representation" and sum it to previous one.
We use it as a temporary seed and get a random number between 0 and 9.

The final seed is the concatenation of the generated random numbers.

This way we have a seed number (int64) that depend from the instanceId and
of the same lenght.

* * Improvements on event based instance replacement

- no more need to "revert attach/detach order when running on minimum
capacity".
Defer changeAutoScalingMaxSize and use same logic of
swapWithGroupMember.

* * Improvements on event based instance replacement

- Use suspendResumeProcess for schedule replaceOnDemandInstanceWithSpot
too.

We use it as a trick to avoid rare cases of concurrency between
scheduled and event lambdas.
As lambda that handle "suspendResumeProcess" have a concurrency limit of
one, scheduled and event lambdas, if concurrent, will be "time shifted"
by a random value.
This way they will not execute attachSpotInstance at the same time.

In case of scheduled lambda we add the "S" char to the instanceId used for
the randSeed to avoid that it resume process suspended by event lambda.

- Fix in swapWithGroupMember:
defer asg.suspendResumeProcess for resume so that it will be executed
even if function swapWithGroupMember return error.

* * gofmt cosmetic changes

* Fix to rand.Intn parameter to include even 9

* fix terminateRandomSpotInstanceIfHavingEnough

need another condition to avoid terminating valid spot instance in case ASG have minOnDemand > 0

if all ASG instances are in state running and
  Min OnDemand instance equals total ondemand running and
  all instances running equals desired capacity
means that i do not need to terminate a spot instance

need some testing

* fix terminateRandomSpotInstanceIfHavingEnough

fixes

* fix terminateRandomSpotInstanceIfHavingEnough

changed allInstancesRunning to return ondemand instances running too

* Specify/override multiplier for the on-demand price on a group
level

As i.price already have been multiplied by the global value, if
specified, i need first to divide it by the same value and then multiply
it by the multiplier specific to the ASG.

We need to do this for both scheduled and event actions.
For event we act in function belongsToEnabledASG.
For schedule we act in launchSpotReplacement.

Need deep testing...

* Specify/override multiplier for the on-demand price on a group
    level - fix

in loadConfOnDemandPriceMultiplier need to use
a.config.OnDemandPriceMultiplier in place of
a.region.conf.OnDemandPriceMultiplier

this way a.region.conf.OnDemandPriceMultiplier will conserve the
original global value

* Misc fixes to be able to run go tests

* Misc fixes to be able to run go tests - continue

* Misc fixes to be able to run go tests - end for now

* For Test_autoScalingGroup_terminateRandomSpotInstanceIfHavingEnough
added contion:
"spot capacity is correct, skip termination"

Co-authored-by: Chris <chris.farrenden@domain.com.au>
Co-authored-by: 0x11 <14269809+codenoid@users.noreply.github.com>
Co-authored-by: Jawad <jawad.stouli@gmail.com>
Co-authored-by: Gábor Lipták <gliptak@gmail.com>
  • Loading branch information
5 people committed Apr 3, 2020
1 parent 5e035ee commit 565b644
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 18 deletions.
3 changes: 2 additions & 1 deletion autospotting.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ func parseCommandLineFlags() {
"Can be overridden on a per-group basis using the tag "+autospotting.OnDemandPercentageTag+
"\n\tIt is ignored if min_on_demand_number is also set.\n")

flag.Float64Var(&conf.OnDemandPriceMultiplier, "on_demand_price_multiplier", 1.0,
flag.Float64Var(&conf.OnDemandPriceMultiplier, "on_demand_price_multiplier", autospotting.DefaultOnDemandPriceMultiplier,
"\n\tMultiplier for the on-demand price. Numbers less than 1.0 are useful for volume discounts.\n"+
"The tag "+autospotting.OnDemandPriceMultiplierTag+" can be used to override this on a group level.\n"+
"\tExample: ./AutoSpotting -on_demand_price_multiplier 0.6 will have the on-demand price "+
"considered at 60% of the actual value.\n")

Expand Down
4 changes: 3 additions & 1 deletion cloudformation/stacks/AutoSpotting/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@
"Multiplier for the on-demand price. This is useful for volume discounts
or if you want to set your bid price to be lower than the on demand
price to ensure you don't run spot instances instead of your existing
reserved instances."
reserved instances. It is also a global default value that can be overridden on a
per-group basis using the 'autospotting_on_demand_price_multiplier' tag
that can be set on the AutoScaling group."
Type: "Number"
Regions:
Default: "*"
Expand Down
27 changes: 17 additions & 10 deletions core/autoscaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,16 @@ func (a *autoScalingGroup) terminateRandomSpotInstanceIfHavingEnough(totalRunnin
return nil
}

if a.allInstancesRunning() && a.instances.count64() < *a.DesiredCapacity {
logger.Println("Not enough capacity in the group")
return nil
if allInstancesAreRunning, onDemandRunning := a.allInstancesRunning(); allInstancesAreRunning {
if a.instances.count64() == *a.DesiredCapacity && onDemandRunning == a.minOnDemand {
logger.Println("Currently Spot running equals to the required number, skipping termination")
return nil
}

if a.instances.count64() < *a.DesiredCapacity {
logger.Println("Not enough capacity in the group")
return nil
}
}

randomSpot := a.getAnySpotInstance()
Expand All @@ -111,9 +118,9 @@ func (a *autoScalingGroup) terminateRandomSpotInstanceIfHavingEnough(totalRunnin
}
}

func (a *autoScalingGroup) allInstancesRunning() bool {
_, totalRunning := a.alreadyRunningInstanceCount(false, nil)
return totalRunning == a.instances.count64()
func (a *autoScalingGroup) allInstancesRunning() (bool, int64) {
onDemandRunning, totalRunning := a.alreadyRunningInstanceCount(false, nil)
return totalRunning == a.instances.count64(), onDemandRunning
}

func (a *autoScalingGroup) calculateHourlySavings() float64 {
Expand Down Expand Up @@ -595,7 +602,7 @@ func (a *autoScalingGroup) changeAutoScalingMaxSize(value int64, instanceId stri
logger.Printf("LambdaManageASG concurrent execution, sleeping for %v", sleepTime)
continue
} else {
logger.Printf("Error invoking LambdaManageASG retrying attempt %s on %s: %v",
logger.Printf("Error invoking LambdaManageASG retrying attempt %d on %d: %v",
retry, maxRetry, err.Error())
retry++
}
Expand Down Expand Up @@ -807,7 +814,7 @@ func (a *autoScalingGroup) alreadyRunningInstanceCount(
if !spot {
instanceCategory = "on-demand"
}
logger.Println(a.name, "Counting already running on demand instances ")
logger.Println(a.name, "Counting already running", instanceCategory, "instances")
for inst := range a.instances.instances() {

if *inst.Instance.State.Name == "running" {
Expand Down Expand Up @@ -904,7 +911,7 @@ func (a *autoScalingGroup) suspendResumeProcess(instanceId string, action string

for retry, maxRetry := 0, 5; changed == false; {
if retry > maxRetry {
return fmt.Errorf("Unable to %s process for ASG", action, a.name)
return fmt.Errorf("Unable to %s process for ASG %s", action, a.name)
} else {
_, err := svc.Invoke(
&lambda.InvokeInput{
Expand All @@ -922,7 +929,7 @@ func (a *autoScalingGroup) suspendResumeProcess(instanceId string, action string
logger.Printf("LambdaManageASG concurrent execution, sleeping for %v", sleepTime)
continue
} else {
logger.Printf("Error invoking LambdaManageASG retrying attempt %s on %s: %v",
logger.Printf("Error invoking LambdaManageASG retrying attempt %d on %d: %v",
retry, maxRetry, err.Error())
retry++
}
Expand Down
47 changes: 46 additions & 1 deletion core/autoscaling_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ const (
// absolute number.
OnDemandNumberLong = "autospotting_min_on_demand_number"

// OnDemandPriceMultiplierTag is the name of a tag that can be defined on a
// per-group level for overriding multiplier for the on-demand price.
OnDemandPriceMultiplierTag = "autospotting_on_demand_price_multiplier"

// BiddingPolicyTag stores the bidding policy for the spot instance
BiddingPolicyTag = "autospotting_bidding_policy"

Expand Down Expand Up @@ -56,6 +60,10 @@ const (
// the spot bid on a per-group level
DefaultBiddingPolicy = "normal"

// DefaultOnDemandPriceMultiplier stores the default OnDemand price multiplier
// on a per-group level
DefaultOnDemandPriceMultiplier = 1.0

// DefaultInstanceTerminationMethod is the default value for the instance termination
// method configuration option
DefaultInstanceTerminationMethod = AutoScalingTerminationMethod
Expand Down Expand Up @@ -160,6 +168,21 @@ func (a *autoScalingGroup) loadNumberOnDemand(tagValue *string) (int64, bool) {
return DefaultMinOnDemandValue, false
}

func (a *autoScalingGroup) loadOnDemandPriceMultiplier(tagValue *string) (float64, bool) {
onDemandPriceMultiplier, err := strconv.ParseFloat(*tagValue, 64)

if err != nil {
logger.Printf("Error with ParseFloat: %s\n", err.Error())
return DefaultOnDemandPriceMultiplier, false
} else if onDemandPriceMultiplier <= 0 {
logger.Printf("Ignoring out of range value : %f\n", onDemandPriceMultiplier)
return DefaultOnDemandPriceMultiplier, false
}

logger.Printf("Loaded OnDemandPriceMultiplier value to %f from tag %s\n", onDemandPriceMultiplier, OnDemandPriceMultiplierTag)
return onDemandPriceMultiplier, true
}

func (a *autoScalingGroup) getTagValue(keyMatch string) *string {
for _, asgTag := range a.Tags {
if *asgTag.Key == keyMatch {
Expand Down Expand Up @@ -269,11 +292,30 @@ func (a *autoScalingGroup) loadConfSpotPrice() bool {
return done
}

func (a *autoScalingGroup) loadConfOnDemandPriceMultiplier() bool {

tagValue := a.getTagValue(OnDemandPriceMultiplierTag)
if tagValue == nil {
return false
}

newValue, done := a.loadOnDemandPriceMultiplier(tagValue)
if !done {
debug.Println("Couldn't find tag", OnDemandPriceMultiplierTag)
return false
}

a.config.OnDemandPriceMultiplier = newValue
return done
}

// Add configuration of other elements here: prices, whitelisting, etc
func (a *autoScalingGroup) loadConfigFromTags() bool {

resOnDemandConf := a.loadConfOnDemand()

resOnDemandPriceMultiplierConf := a.loadConfOnDemandPriceMultiplier()

resSpotConf := a.loadConfSpot()

resSpotPriceConf := a.loadConfSpotPrice()
Expand All @@ -285,13 +327,16 @@ func (a *autoScalingGroup) loadConfigFromTags() bool {
if resOnDemandConf {
logger.Println("Found and applied configuration for OnDemand value")
}
if resOnDemandPriceMultiplierConf {
logger.Println("Found and applied configuration for OnDemand Price Multiplier")
}
if resSpotConf {
logger.Println("Found and applied configuration for Spot Bid")
}
if resSpotPriceConf {
logger.Println("Found and applied configuration for Spot Price")
}
if resOnDemandConf || resSpotConf || resSpotPriceConf {
if resOnDemandConf || resOnDemandPriceMultiplierConf || resSpotConf || resSpotPriceConf {
return true
}
return false
Expand Down
58 changes: 54 additions & 4 deletions core/autoscaling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/davecgh/go-spew/spew"
Expand Down Expand Up @@ -819,7 +820,16 @@ func TestAttachSpotInstance(t *testing.T) {
regionASG: &region{
name: "regionTest",
services: connections{
autoScaling: mockASG{aierr: nil},
autoScaling: mockASG{
aierr: nil,
dasio: &autoscaling.DescribeAutoScalingInstancesOutput{
AutoScalingInstances: []*autoscaling.InstanceDetails{
{
LifecycleState: aws.String("InService"),
},
},
},
},
ec2: mockEC2{
dio: &ec2.DescribeInstancesOutput{
Reservations: []*ec2.Reservation{
Expand All @@ -844,7 +854,10 @@ func TestAttachSpotInstance(t *testing.T) {
regionASG: &region{
name: "regionTest",
services: connections{
autoScaling: mockASG{aierr: errors.New("attach")},
autoScaling: mockASG{
aierr: error(awserr.New("ValidationError", "Error", errors.New("attach"))),
// aierr: errors.New("attach"),
},
ec2: mockEC2{
dio: &ec2.DescribeInstancesOutput{
Reservations: []*ec2.Reservation{
Expand All @@ -863,7 +876,7 @@ func TestAttachSpotInstance(t *testing.T) {
},
},
instanceID: "1",
expected: errors.New("attach"),
expected: error(awserr.New("ValidationError", "Error", errors.New("attach"))),
},
}
for _, tt := range tests {
Expand All @@ -877,7 +890,7 @@ func TestAttachSpotInstance(t *testing.T) {
DesiredCapacity: aws.Int64(3),
},
}
err := a.attachSpotInstance(tt.instanceID, false)
_, err := a.attachSpotInstance(tt.instanceID, false)
CheckErrors(t, err, tt.expected)
})
}
Expand Down Expand Up @@ -2413,8 +2426,18 @@ func TestReplaceOnDemandInstanceWithSpot(t *testing.T) {
dlho: &autoscaling.DescribeLifecycleHooksOutput{
LifecycleHooks: []*autoscaling.LifecycleHook{},
},
dasio: &autoscaling.DescribeAutoScalingInstancesOutput{
AutoScalingInstances: []*autoscaling.InstanceDetails{
{
LifecycleState: aws.String("InService"),
},
},
},
},
ec2: &mockEC2{},
lambda: &mockLambda{
ierr: nil,
},
},
instances: makeInstancesWithCatalog(
instanceMap{
Expand Down Expand Up @@ -3642,6 +3665,33 @@ func Test_autoScalingGroup_terminateRandomSpotInstanceIfHavingEnough(t *testing.
wantErr: false,
},

{name: "spot capacity is correct, skip termination",
group: &autoscaling.Group{
DesiredCapacity: aws.Int64(2),
},
minOnDemand: 1,
instances: makeInstancesWithCatalog(instanceMap{
"i-f00": &instance{
Instance: &ec2.Instance{
InstanceId: aws.String("i-foo0"),
InstanceLifecycle: aws.String("spot"),
State: &ec2.InstanceState{
Name: aws.String("running"),
},
},
},
"i-f01": &instance{
Instance: &ec2.Instance{
InstanceId: aws.String("i-foo1"),
State: &ec2.InstanceState{
Name: aws.String("running"),
},
},
},
}),
wantErr: false,
},

{name: "spot capacity exists in the group, terminating using the default termination method",
group: &autoscaling.Group{
DesiredCapacity: aws.Int64(1),
Expand Down
3 changes: 2 additions & 1 deletion core/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (i *instance) belongsToEnabledASG() bool {
asg.loadConfigFromTags()
asg.loadLaunchConfiguration()
i.asg = &asg
i.price = i.typeInfo.pricing.onDemand
i.price = i.typeInfo.pricing.onDemand / i.region.conf.OnDemandPriceMultiplier * i.asg.config.OnDemandPriceMultiplier
logger.Printf("%s instace %s belongs to enabled ASG %s", i.region.name,
*i.InstanceId, i.asg.name)
return true
Expand Down Expand Up @@ -476,6 +476,7 @@ func (i *instance) getCompatibleSpotInstanceTypesListSortedAscendingByPrice(allo
}

func (i *instance) launchSpotReplacement() (*string, error) {
i.price = i.typeInfo.pricing.onDemand / i.region.conf.OnDemandPriceMultiplier * i.asg.config.OnDemandPriceMultiplier
instanceTypes, err := i.getCompatibleSpotInstanceTypesListSortedAscendingByPrice(
i.asg.getAllowedInstanceTypes(i),
i.asg.getDisallowedInstanceTypes(i))
Expand Down
3 changes: 3 additions & 0 deletions core/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2148,6 +2148,9 @@ func Test_instance_createRunInstancesInput(t *testing.T) {
Key: aws.String("launched-for-asg"),
Value: aws.String("mygroup"),
},
{
Key: aws.String("launched-for-replacing-instance"),
},
},
},
},
Expand Down
15 changes: 15 additions & 0 deletions core/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/aws/aws-sdk-go/service/cloudformation/cloudformationiface"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
"github.com/aws/aws-sdk-go/service/lambda"
"github.com/aws/aws-sdk-go/service/lambda/lambdaiface"
)

func CheckErrors(t *testing.T, err error, expected error) {
Expand Down Expand Up @@ -225,3 +227,16 @@ type mockCloudFormation struct {
func (m mockCloudFormation) DescribeStacks(*cloudformation.DescribeStacksInput) (*cloudformation.DescribeStacksOutput, error) {
return m.dso, m.dserr
}

// All fields are composed of the abbreviation of their method
// This is useful when methods are doing multiple calls to AWS API
type mockLambda struct {
lambdaiface.LambdaAPI
// Invoke
io *lambda.InvokeOutput
ierr error
}

func (m mockLambda) Invoke(*lambda.InvokeInput) (*lambda.InvokeOutput, error){
return m.io, m.ierr
}

0 comments on commit 565b644

Please sign in to comment.