Skip to content

Commit

Permalink
Extract script runner to a separate type; fix work with env. variables (
Browse files Browse the repository at this point in the history
#132)

Signed-off-by: Oleg Atamanenko <oleg.atamanenko@gmail.com>
  • Loading branch information
uthark committed Nov 11, 2020
1 parent 3641d61 commit 40b0873
Show file tree
Hide file tree
Showing 6 changed files with 364 additions and 243 deletions.
2 changes: 1 addition & 1 deletion controllers/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ func Test_createK8sV1Event(t *testing.T) {

mgr, err := buildManager()
g.Expect(err).NotTo(gomega.HaveOccurred())
c = mgr.GetClient()

rcRollingUpgrade := &RollingUpgradeReconciler{
Client: mgr.GetClient(),
Expand All @@ -31,6 +30,7 @@ func Test_createK8sV1Event(t *testing.T) {
ruObjNameToASG: sync.Map{},
ClusterState: NewClusterState(),
CacheConfig: cache.NewConfig(0*time.Second, 0, 0),
ScriptRunner: NewScriptRunner(log2.NullLogger{}),
}

event := rcRollingUpgrade.createK8sV1Event(ruObj, EventReasonRUStarted, EventLevelNormal, map[string]string{})
Expand Down
167 changes: 34 additions & 133 deletions controllers/rollingupgrade_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package controllers
import (
"context"
"fmt"
"os"
"os/exec"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -70,11 +68,6 @@ const (
instanceIDKey = "INSTANCE_ID"
instanceNameKey = "INSTANCE_NAME"

// KubeCtlBinary is the path to the kubectl executable
KubeCtlBinary = "/usr/local/bin/kubectl"
// ShellBinary is the path to the shell executable
ShellBinary = "/bin/sh"

// InService is a state of an instance
InService = "InService"
)
Expand Down Expand Up @@ -108,6 +101,7 @@ type RollingUpgradeReconciler struct {
ClusterState ClusterState
maxParallel int
CacheConfig *cache.Config
ScriptRunner ScriptRunner
}

func (r *RollingUpgradeReconciler) SetMaxParallel(max int) {
Expand All @@ -117,87 +111,33 @@ func (r *RollingUpgradeReconciler) SetMaxParallel(max int) {
}
}

func (r *RollingUpgradeReconciler) runScript(script string, background bool, ruObj *upgrademgrv1alpha1.RollingUpgrade) (string, error) {
r.info(ruObj, "Running script", "script", script)
if background {
r.info(ruObj, "Running script in background. Logs not available.")
err := exec.Command(ShellBinary, "-c", script).Run()
if err != nil {
r.info(ruObj, fmt.Sprintf("Script finished with error: %s", err))
}

return "", nil
}

out, err := exec.Command(ShellBinary, "-c", script).CombinedOutput()
if err != nil {
r.error(ruObj, err, "Script finished", "output", string(out))
} else {
r.info(ruObj, "Script finished", "output", string(out))
}
return string(out), err
}

func (r *RollingUpgradeReconciler) preDrainHelper(ruObj *upgrademgrv1alpha1.RollingUpgrade) error {
if ruObj.Spec.PreDrain.Script != "" {
script := ruObj.Spec.PreDrain.Script
_, err := r.runScript(script, false, ruObj)
if err != nil {
msg := "Failed to run preDrain script"
r.error(ruObj, err, msg)
return errors.Wrap(err, msg)
}
}
return nil
func (r *RollingUpgradeReconciler) preDrainHelper(instanceID, nodeName string, ruObj *upgrademgrv1alpha1.RollingUpgrade) error {
return r.ScriptRunner.PreDrain(instanceID, nodeName, ruObj)
}

// Operates on any scripts that were provided after the draining of the node.
// kubeCtlCall is provided as an argument to decouple the method from the actual kubectl call
func (r *RollingUpgradeReconciler) postDrainHelper(ruObj *upgrademgrv1alpha1.RollingUpgrade, nodeName string, kubeCtlCall string) error {
if ruObj.Spec.PostDrain.Script != "" {
_, err := r.runScript(ruObj.Spec.PostDrain.Script, false, ruObj)
if err != nil {
msg := "Failed to run postDrain script: "
r.error(ruObj, err, msg)
result := errors.Wrap(err, msg)

r.info(ruObj, "Uncordoning the node %s since it failed to run postDrain Script", "nodeName", nodeName)
_, err = r.runScript(kubeCtlCall+" uncordon "+nodeName, false, ruObj)
if err != nil {
r.error(ruObj, err, "Failed to uncordon", "nodeName", nodeName)
}
return result
}
func (r *RollingUpgradeReconciler) postDrainHelper(instanceID, nodeName string, ruObj *upgrademgrv1alpha1.RollingUpgrade) error {
err := r.ScriptRunner.PostDrain(instanceID, nodeName, ruObj)
if err != nil {
return err
}

r.info(ruObj, "Waiting for postDrainDelay", "postDrainDelay", ruObj.Spec.PostDrainDelaySeconds)
time.Sleep(time.Duration(ruObj.Spec.PostDrainDelaySeconds) * time.Second)

if ruObj.Spec.PostDrain.PostWaitScript != "" {
_, err := r.runScript(ruObj.Spec.PostDrain.PostWaitScript, false, ruObj)
if err != nil {
msg := "Failed to run postDrainWait script: " + err.Error()
r.error(ruObj, err, msg)
result := errors.Wrap(err, msg)

r.info(ruObj, "Uncordoning the node %s since it failed to run postDrain Script", "nodeName", nodeName)
_, err = r.runScript(kubeCtlCall+" uncordon "+nodeName, false, ruObj)
if err != nil {
r.error(ruObj, err, "Failed to uncordon", "nodeName", nodeName)
}
return result
}
}
return nil
return r.ScriptRunner.PostWait(instanceID, nodeName, ruObj)

}

// DrainNode runs "kubectl drain" on the given node
// kubeCtlCall is provided as an argument to decouple the method from the actual kubectl call
func (r *RollingUpgradeReconciler) DrainNode(ruObj *upgrademgrv1alpha1.RollingUpgrade,
nodeName string,
kubeCtlCall string,
instanceID string,
drainTimeout int) error {
// Running kubectl drain node.
err := r.preDrainHelper(ruObj)
err := r.preDrainHelper(instanceID, nodeName, ruObj)
if err != nil {
return errors.New(ruObj.Name + ": Predrain script failed: " + err.Error())
}
Expand All @@ -218,7 +158,7 @@ func (r *RollingUpgradeReconciler) DrainNode(ruObj *upgrademgrv1alpha1.RollingUp
}

r.info(ruObj, "Invoking kubectl drain for the node", "nodeName", nodeName)
go r.CallKubectlDrain(ctx, nodeName, kubeCtlCall, ruObj, errChan)
go r.CallKubectlDrain(nodeName, ruObj, errChan)

// Listening to signals from the CallKubectlDrain go routine
select {
Expand All @@ -232,24 +172,21 @@ func (r *RollingUpgradeReconciler) DrainNode(ruObj *upgrademgrv1alpha1.RollingUp
r.info(ruObj, "Kubectl drain completed for node", "nodeName", nodeName)
}

return r.postDrainHelper(ruObj, nodeName, kubeCtlCall)
return r.postDrainHelper(instanceID, nodeName, ruObj)
}

// CallKubectlDrain runs the "kubectl drain" for a given node
// Node will be terminated even if pod eviction is not completed when the drain timeout is exceeded
func (r *RollingUpgradeReconciler) CallKubectlDrain(ctx context.Context, nodeName, kubeCtlCall string, ruObj *upgrademgrv1alpha1.RollingUpgrade, errChan chan error) {
func (r *RollingUpgradeReconciler) CallKubectlDrain(nodeName string, ruObj *upgrademgrv1alpha1.RollingUpgrade, errChan chan error) {

// kops behavior implements the same behavior by using these flags when draining nodes
// https://github.com/kubernetes/kops/blob/7a629c77431dda02d02aadf00beb0bed87518cbf/pkg/instancegroups/instancegroups.go lines 337-340
script := fmt.Sprintf("%s drain %s --ignore-daemonsets=true --delete-local-data=true --force --grace-period=-1", kubeCtlCall, nodeName)
out, err := r.runScript(script, false, ruObj)
out, err := r.ScriptRunner.drainNode(nodeName, ruObj)
if err != nil {
if strings.HasPrefix(out, "Error from server (NotFound)") {
r.error(ruObj, err, "Not executing postDrainHelper. Node not found.", "output", out)
errChan <- nil
return
}
errChan <- errors.New(ruObj.Name + " :Failed to drain: " + err.Error())
errChan <- errors.New(ruObj.Name + ": Failed to drain: " + err.Error())
return
}
errChan <- nil
Expand Down Expand Up @@ -392,7 +329,7 @@ func (r *RollingUpgradeReconciler) SetStandby(ruObj *upgrademgrv1alpha1.RollingU
}

// TerminateNode actually terminates the given node.
func (r *RollingUpgradeReconciler) TerminateNode(ruObj *upgrademgrv1alpha1.RollingUpgrade, instanceID string) error {
func (r *RollingUpgradeReconciler) TerminateNode(ruObj *upgrademgrv1alpha1.RollingUpgrade, instanceID string, nodeName string) error {

input := &autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: aws.String(instanceID),
Expand Down Expand Up @@ -427,19 +364,7 @@ func (r *RollingUpgradeReconciler) TerminateNode(ruObj *upgrademgrv1alpha1.Rolli
r.info(ruObj, "Instance terminated.", "instanceID", instanceID)
r.info(ruObj, "starting post termination sleep", "instanceID", instanceID, "nodeIntervalSeconds", ruObj.Spec.NodeIntervalSeconds)
time.Sleep(time.Duration(ruObj.Spec.NodeIntervalSeconds) * time.Second)
if ruObj.Spec.PostTerminate.Script != "" {
out, err := r.runScript(ruObj.Spec.PostTerminate.Script, false, ruObj)
if err != nil {
if strings.HasPrefix(out, "Error from server (NotFound)") {
r.error(ruObj, err, "Node not found when running postTerminate. Ignoring ...", "output", out, "instanceID", instanceID)
return nil
}
msg := "Failed to run postTerminate script"
r.error(ruObj, err, msg, "instanceID", instanceID)
return errors.Wrap(err, msg)
}
}
return nil
return r.ScriptRunner.PostTerminate(instanceID, nodeName, ruObj)
}

func (r *RollingUpgradeReconciler) getNodeName(i *autoscaling.Instance, nodeList *corev1.NodeList, ruObj *upgrademgrv1alpha1.RollingUpgrade) string {
Expand Down Expand Up @@ -502,21 +427,6 @@ func (r *RollingUpgradeReconciler) populateNodeList(ruObj *upgrademgrv1alpha1.Ro
return nil
}

// Loads specific environment variables for scripts to use
// on a given rollingUpgrade and autoscaling instance
func loadEnvironmentVariables(ruObj *upgrademgrv1alpha1.RollingUpgrade, instanceID string, nodeName string) error {
if err := os.Setenv(asgNameKey, ruObj.Spec.AsgName); err != nil {
return errors.New(ruObj.Name + ": Could not load " + asgNameKey + ": " + err.Error())
}
if err := os.Setenv(instanceIDKey, instanceID); err != nil {
return errors.New(ruObj.Name + ": Could not load " + instanceIDKey + ": " + err.Error())
}
if err := os.Setenv(instanceNameKey, nodeName); err != nil {
return errors.New(ruObj.Name + ": Could not load " + instanceNameKey + ": " + err.Error())
}
return nil
}

func (r *RollingUpgradeReconciler) getInProgressInstances(instances []*autoscaling.Instance) ([]*autoscaling.Instance, error) {
var inProgressInstances []*autoscaling.Instance
taggedInstances, err := getTaggedInstances(EC2StateTagKey, "in-progress", r.EC2Client)
Expand All @@ -531,7 +441,8 @@ func (r *RollingUpgradeReconciler) getInProgressInstances(instances []*autoscali
return inProgressInstances, nil
}

func (r *RollingUpgradeReconciler) runRestack(ctx *context.Context, ruObj *upgrademgrv1alpha1.RollingUpgrade, KubeCtlCall string) (int, error) {
func (r *RollingUpgradeReconciler) runRestack(ctx *context.Context, ruObj *upgrademgrv1alpha1.RollingUpgrade) (int, error) {

asg, err := r.GetAutoScalingGroup(ruObj.Name)
if err != nil {
return 0, fmt.Errorf("Unable to load ASG with name: %s", ruObj.Name)
Expand Down Expand Up @@ -586,7 +497,7 @@ func (r *RollingUpgradeReconciler) runRestack(ctx *context.Context, ruObj *upgra
}

// update the instances
err := r.UpdateInstances(ctx, ruObj, instances, launchDefinition, KubeCtlCall)
err := r.UpdateInstances(ctx, ruObj, instances, launchDefinition)
processedInstances += len(instances)
if err != nil {
return processedInstances, err
Expand Down Expand Up @@ -701,7 +612,7 @@ func (r *RollingUpgradeReconciler) Process(ctx *context.Context,
}

// Run the restack that actually performs the rolling update.
nodesProcessed, err := r.runRestack(ctx, ruObj, KubeCtlBinary)
nodesProcessed, err := r.runRestack(ctx, ruObj)
if err != nil {
r.error(ruObj, err, "Failed to runRestack")
r.finishExecution(StatusError, nodesProcessed, ctx, ruObj)
Expand Down Expand Up @@ -942,8 +853,7 @@ func NewUpdateInstancesError(instanceUpdateErrors []error) *UpdateInstancesError
func (r *RollingUpgradeReconciler) UpdateInstances(ctx *context.Context,
ruObj *upgrademgrv1alpha1.RollingUpgrade,
instances []*autoscaling.Instance,
launchDefinition *launchDefinition,
KubeCtlCall string) error {
launchDefinition *launchDefinition) error {

totalNodes := len(instances)
if totalNodes == 0 {
Expand All @@ -960,7 +870,7 @@ func (r *RollingUpgradeReconciler) UpdateInstances(ctx *context.Context,
"strategy": string(ruObj.Spec.Strategy.Type),
"msg": fmt.Sprintf("Started Updating Instance %s, in AZ: %s", *instance.InstanceId, *instance.AvailabilityZone),
})
go r.UpdateInstance(ctx, ruObj, instance, launchDefinition, KubeCtlCall, ch)
go r.UpdateInstance(ctx, ruObj, instance, launchDefinition, ch)
}

// wait for upgrades to complete
Expand Down Expand Up @@ -997,8 +907,7 @@ func (r *RollingUpgradeReconciler) UpdateInstances(ctx *context.Context,
func (r *RollingUpgradeReconciler) UpdateInstanceEager(
ruObj *upgrademgrv1alpha1.RollingUpgrade,
nodeName,
targetInstanceID,
KubeCtlCall string,
targetInstanceID string,
ch chan error) {

// Set instance to standby
Expand All @@ -1023,27 +932,27 @@ func (r *RollingUpgradeReconciler) UpdateInstanceEager(
}

// Drain and wait for draining node.
r.DrainTerminate(ruObj, nodeName, targetInstanceID, KubeCtlCall, ch)
r.DrainTerminate(ruObj, nodeName, targetInstanceID, ch)

}

func (r *RollingUpgradeReconciler) DrainTerminate(
ruObj *upgrademgrv1alpha1.RollingUpgrade,
nodeName,
targetInstanceID,
KubeCtlCall string,
targetInstanceID string,
ch chan error) {

// Drain and wait for draining node.
if nodeName != "" {
err := r.DrainNode(ruObj, nodeName, KubeCtlCall, ruObj.Spec.Strategy.DrainTimeout)
err := r.DrainNode(ruObj, nodeName, targetInstanceID, ruObj.Spec.Strategy.DrainTimeout)
if err != nil && !ruObj.Spec.IgnoreDrainFailures {
ch <- err
return
}
}

// Terminate instance.
err := r.TerminateNode(ruObj, targetInstanceID)
err := r.TerminateNode(ruObj, targetInstanceID, nodeName)
if err != nil {
ch <- err
return
Expand All @@ -1055,7 +964,6 @@ func (r *RollingUpgradeReconciler) UpdateInstance(ctx *context.Context,
ruObj *upgrademgrv1alpha1.RollingUpgrade,
i *autoscaling.Instance,
launchDefinition *launchDefinition,
KubeCtlCall string,
ch chan error) {
targetInstanceID := aws.StringValue(i.InstanceId)
// If an instance was marked as "in-progress" in ClusterState, it has to be marked
Expand Down Expand Up @@ -1084,15 +992,8 @@ func (r *RollingUpgradeReconciler) UpdateInstance(ctx *context.Context,

nodeName := r.getNodeName(i, r.NodeList, ruObj)

// Load the environment variables for scripts to run
err := loadEnvironmentVariables(ruObj, targetInstanceID, nodeName)
if err != nil {
ch <- err
return
}

// set the EC2 tag indicating the state to in-progress
err = r.setStateTag(ruObj, targetInstanceID, "in-progress")
err := r.setStateTag(ruObj, targetInstanceID, "in-progress")
if err != nil {
ch <- err
return
Expand All @@ -1101,10 +1002,10 @@ func (r *RollingUpgradeReconciler) UpdateInstance(ctx *context.Context,
mode := ruObj.Spec.Strategy.Mode.String()
if strings.ToLower(mode) == upgrademgrv1alpha1.UpdateStrategyModeEager.String() {
r.info(ruObj, "starting replacement with eager mode", "mode", mode)
r.UpdateInstanceEager(ruObj, nodeName, targetInstanceID, KubeCtlCall, ch)
r.UpdateInstanceEager(ruObj, nodeName, targetInstanceID, ch)
} else if strings.ToLower(mode) == upgrademgrv1alpha1.UpdateStrategyModeLazy.String() {
r.info(ruObj, "starting replacement with lazy mode", "mode", mode)
r.DrainTerminate(ruObj, nodeName, targetInstanceID, KubeCtlCall, ch)
r.DrainTerminate(ruObj, nodeName, targetInstanceID, ch)
}

unjoined, err := r.WaitForTermination(ruObj, nodeName, r.generatedClient.CoreV1().Nodes())
Expand Down
Loading

0 comments on commit 40b0873

Please sign in to comment.