Skip to content

Commit

Permalink
Further improvements
Browse files Browse the repository at this point in the history
Only handle the "running" events of new instances because the instance
information wasn't filled while the instance was in pending state.

Also run spot attachment in parallel with detachment of the on-demand
instance in order to avoid race conditions with the group's process that
enforces the desired capacity like when doing multiple instances in parallel
for new groups or multiple instance launches.
  • Loading branch information
cristim committed Jul 31, 2019
1 parent 9719e17 commit cceb686
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 57 deletions.
11 changes: 5 additions & 6 deletions cloudformation/stacks/AutoSpotting/regional_template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Description: >
"Implements support for triggering the main AutoSpotting Lambda function on
regional events such as instance launches or imminent spot terminations that
can only be detected from wthin a given region"
can only be detected from within a given region"
Parameters:
AutoSpottingLambdaARN:
Description: "The ARN of the main AutoSpotting Lambda function"
Expand Down Expand Up @@ -96,7 +96,7 @@
Fn::GetAtt:
- "EventHandler"
- "Arn"
InstancePendingLambdaPermission:
InstanceRunningLambdaPermission:
Type: "AWS::Lambda::Permission"
Properties:
Action: "lambda:InvokeFunction"
Expand All @@ -105,9 +105,9 @@
Principal: "events.amazonaws.com"
SourceArn:
Fn::GetAtt:
- "InstancePendingEventRule"
- "InstanceRunningEventRule"
- "Arn"
InstancePendingEventRule:
InstanceRunningEventRule:
Type: "AWS::Events::Rule"
Properties:
Description: >
Expand All @@ -119,12 +119,11 @@
- "aws.ec2"
detail:
state:
- "pending"
- "running"
State: "ENABLED"
Targets:
-
Id: "InstancePendingEventGenerator"
Id: "InstanceRunningEventGenerator"
Arn:
Fn::GetAtt:
- "EventHandler"
Expand Down
4 changes: 2 additions & 2 deletions core/autoscaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ func (a *autoScalingGroup) enableForInstanceLaunchEventHandling() {
func (a *autoScalingGroup) isEnabledForEventBasedInstanceReplacement() bool {
if time.Now().Sub(*a.CreatedTime) < time.Hour {
logger.Println("ASG %s is newer than an hour, enabling it for event-based "+
"instance replacement", a.name)
a.enableForInstanceLaunchEventHandling()
"instance replacement", a.name)
a.enableForInstanceLaunchEventHandling()
return true
}

Expand Down
45 changes: 19 additions & 26 deletions core/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,25 +507,6 @@ func (i *instance) launchSpotReplacement() (*string, error) {
return nil, err
}

// // replaceWithSpotAndTerminate replaces an on-demand instance with a compatible
// // spot instance then immediately terminates it. This is supposed to be called
// // against a recently launched on-demand instance, while it's still in the
// // pending state.
// func (i *instance) replaceWithSpotAndTerminate() error {
// spotinstanceID, err := i.launchSpotReplacement()
// if err != nil {
// logger.Println("Couldn't launch spot replacement")
// return err
// }
// i.asg.attachSpotInstance(*spotinstanceID, true)

// err = i.asg.detachAndTerminateOnDemandInstance(i.InstanceId, true)
// if err != nil {
// logger.Println("Couldn't detach and terminate instance", i.InstanceId)
// }
// return err
// }

func (i *instance) getPricetoBid(
baseOnDemandPrice float64, currentSpotPrice float64) float64 {

Expand Down Expand Up @@ -843,12 +824,24 @@ func (i *instance) swapWithGroupMember() (*instance, error) {
defer asg.setAutoScalingMaxSize(max)
}

if err := asg.attachSpotInstance(*i.InstanceId, true); err != nil {
logger.Printf("Spot instance %s couldn't be attached to the group %s, terminating it...",
*i.InstanceId, asg.name)
i.terminate()
return nil, fmt.Errorf("couldn't attach spot instance %s ", *i.InstanceId)
}
// We use this to capture the error message from the closure below that is
// executed asynchronously.
c := make(chan error)

// Attach the new spot instance in parallel with the termination of the
// running on-demand instance in order to avoid increasing the capacity enough
// for the ASG to notice it and start terminating instances. The ASGs were
// sometimes seen to just terminates newly launched spot instances so the
// group was unintentionally kept with on-demand capacity.
go func() {
if err := asg.attachSpotInstance(*i.InstanceId, true); err != nil {
logger.Printf("Spot instance %s couldn't be attached to the group %s, terminating it...",
*i.InstanceId, asg.name)
i.terminate()
c <- fmt.Errorf("couldn't attach spot instance %s ", *i.InstanceId)
}
c <- nil
}()

if err := asg.terminateInstanceInAutoScalingGroup(odInstanceID, true, true); err != nil {
logger.Printf("On-demand instance %s couldn't be terminated, re-trying...",
Expand All @@ -857,7 +850,7 @@ func (i *instance) swapWithGroupMember() (*instance, error) {
*odInstanceID)
}

return odInstance, nil
return odInstance, <-c
}

// returns an instance ID as *string, set to nil if we need to wait for the next
Expand Down
52 changes: 29 additions & 23 deletions core/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,39 +260,45 @@ func (a *AutoSpotting) handleNewInstanceLaunch(regionName string, instanceID str
logger.Printf("%s Found instance %s in state %s",
i.region.name, *i.InstanceId, *i.State.Name)

if state == "pending" && i.belongsToEnabledASG() && i.shouldBeReplacedWithSpot() {
logger.Printf("%s instance %s is in pending state, belongs to an enabled ASG "+
"and should be replaced with spot, attempting to launch spot replacement", i.region.name, *i.InstanceId)
if state != "running" {
logger.Printf("%s Instance %s is not in the running state",
i.region.name, *i.InstanceId)
return errors.New("instance not in running state")
}

if i.belongsToEnabledASG() && i.shouldBeReplacedWithSpot() {
logger.Printf("%s instance %s belongs to an enabled ASG and should be "+
"replaced with spot, attempting to launch spot replacement",
i.region.name, *i.InstanceId)
if _, err := i.launchSpotReplacement(); err != nil {
logger.Printf("%s Couldn't launch spot replacement for %s",
i.region.name, *i.InstanceId)
return err
}
} else {
logger.Printf("%s skipping instance %s: either not in pending state (%s), doesn't "+
"belong to an enabled ASG or should not be replaced with spot, ",
i.region.name, *i.InstanceId, *i.State.Name)
logger.Printf("%s skipping instance %s: either doesn't belong to an "+
"enabled ASG or should not be replaced with spot, ",
i.region.name, *i.InstanceId)
debug.Printf("%#v", i)
}

if state == "running" {
logger.Printf("%s Found instance %s in running state, checking if it's a spot instance "+
"that should be attached to any ASG", i.region.name, *i.InstanceId)
unattached := i.isUnattachedSpotInstanceLaunchedForAnEnabledASG()
if !unattached {
logger.Printf("%s Found instance %s is already attached to an ASG, skipping it",
i.region.name, *i.InstanceId)
return nil
}

logger.Printf("%s Found instance %s is not yet attached to its ASG, "+
"attempting to swap it against a running on-demand instance",
logger.Printf("%s Checking if %s is a spot instance that should be "+
"attached to any ASG", i.region.name, *i.InstanceId)
unattached := i.isUnattachedSpotInstanceLaunchedForAnEnabledASG()
if !unattached {
logger.Printf("%s Instance %s is already attached to an ASG, skipping it",
i.region.name, *i.InstanceId)
return nil
}

if _, err := i.swapWithGroupMember(); err != nil {
logger.Printf("%s, couldn't perform spot replacement of %s ",
i.region.name, *i.InstanceId)
return err
}
logger.Printf("%s Found instance %s is not yet attached to its ASG, "+
"attempting to swap it against a running on-demand instance",
i.region.name, *i.InstanceId)

if _, err := i.swapWithGroupMember(); err != nil {
logger.Printf("%s, couldn't perform spot replacement of %s ",
i.region.name, *i.InstanceId)
return err
}

return nil
Expand Down

0 comments on commit cceb686

Please sign in to comment.