Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: resource deletion and adoption for 3 controllers #777

Merged
merged 1 commit into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 67 additions & 35 deletions controllers/collector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -92,95 +93,126 @@ func (r *CollectorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}

// Install RBAC resources for the filter plugin kubernetes
var rbacObj, saObj, bindingObj client.Object
rbacObj, saObj, bindingObj = operator.MakeRBACObjects(co.Name, co.Namespace, "collector", co.Spec.RBACRules, co.Spec.ServiceAccountAnnotations)
// Set ServiceAccount's owner to this fluentbit
if err := ctrl.SetControllerReference(&co, saObj, r.Scheme); err != nil {
cr, sa, crb := operator.MakeRBACObjects(co.Name, co.Namespace, "collector", co.Spec.RBACRules, co.Spec.ServiceAccountAnnotations)
// Deploy Fluent Bit Collector ClusterRole
if _, err := controllerutil.CreateOrPatch(ctx, r.Client, cr, r.mutate(cr, &co)); err != nil {
return ctrl.Result{}, err
}
if err := r.Create(ctx, rbacObj); err != nil && !errors.IsAlreadyExists(err) {
// Deploy Fluent Bit Collector ClusterRoleBinding
if _, err := controllerutil.CreateOrPatch(ctx, r.Client, crb, r.mutate(crb, &co)); err != nil {
return ctrl.Result{}, err
}
if err := r.Create(ctx, saObj); err != nil && !errors.IsAlreadyExists(err) {
return ctrl.Result{}, err
}
if err := r.Create(ctx, bindingObj); err != nil && !errors.IsAlreadyExists(err) {
// Deploy Fluent Bit Collector ServiceAccount
if _, err := controllerutil.CreateOrPatch(ctx, r.Client, sa, r.mutate(sa, &co)); err != nil {
return ctrl.Result{}, err
}

// Deploy Fluent Bit Statefulset
// Deploy Fluent Bit Collector Statefulset
sts := operator.MakefbStatefulset(co)
if err := ctrl.SetControllerReference(&co, sts, r.Scheme); err != nil {
return ctrl.Result{}, err
}

if _, err := controllerutil.CreateOrPatch(ctx, r.Client, sts, r.mutate(sts, co)); err != nil {
if _, err := controllerutil.CreateOrPatch(ctx, r.Client, sts, r.mutate(sts, &co)); err != nil {
return ctrl.Result{}, err
}

// Deploy collector Service
// Deploy Fluent Bit Collector Service
if !co.Spec.DisableService {
svc := operator.MakeCollectorService(co)
if err := ctrl.SetControllerReference(&co, svc, r.Scheme); err != nil {
return ctrl.Result{}, err
}

if _, err := controllerutil.CreateOrPatch(ctx, r.Client, svc, r.mutate(svc, co)); err != nil {
if _, err := controllerutil.CreateOrPatch(ctx, r.Client, svc, r.mutate(svc, &co)); err != nil {
return ctrl.Result{}, err
}
}

return ctrl.Result{}, nil
}

func (r *CollectorReconciler) mutate(obj client.Object, co fluentbitv1alpha2.Collector) controllerutil.MutateFn {
func (r *CollectorReconciler) mutate(obj client.Object, co *fluentbitv1alpha2.Collector) controllerutil.MutateFn {
switch o := obj.(type) {
case *rbacv1.ClusterRole:
expected, _, _ := operator.MakeRBACObjects(co.Name, co.Namespace, "collector", co.Spec.RBACRules, co.Spec.ServiceAccountAnnotations)

return func() error {
o.Rules = expected.Rules
return nil
}
case *corev1.ServiceAccount:
_, expected, _ := operator.MakeRBACObjects(co.Name, co.Namespace, "collector", co.Spec.RBACRules, co.Spec.ServiceAccountAnnotations)

return func() error {
o.Annotations = expected.Annotations
if err := ctrl.SetControllerReference(co, o, r.Scheme); err != nil {
return err
}
return nil
}
case *rbacv1.ClusterRoleBinding:
_, _, expected := operator.MakeRBACObjects(co.Name, co.Namespace, "collector", co.Spec.RBACRules, co.Spec.ServiceAccountAnnotations)

return func() error {
o.RoleRef = expected.RoleRef
o.Subjects = expected.Subjects
return nil
}
case *appsv1.StatefulSet:
expected := operator.MakefbStatefulset(co)
expected := operator.MakefbStatefulset(*co)

return func() error {
o.Labels = expected.Labels
o.Annotations = expected.Annotations
o.Spec = expected.Spec
o.SetOwnerReferences(nil)
if err := ctrl.SetControllerReference(&co, o, r.Scheme); err != nil {
if err := ctrl.SetControllerReference(co, o, r.Scheme); err != nil {
return err
}
return nil
}
case *corev1.Service:
expected := operator.MakeCollectorService(co)
expected := operator.MakeCollectorService(*co)

return func() error {
o.Labels = expected.Labels
o.Spec.Selector = expected.Spec.Selector
o.Spec.Ports = expected.Spec.Ports
o.SetOwnerReferences(nil)
if err := ctrl.SetControllerReference(&co, o, r.Scheme); err != nil {
if err := ctrl.SetControllerReference(co, o, r.Scheme); err != nil {
return err
}
return nil
}

default:
}

return nil
}

func (r *CollectorReconciler) delete(ctx context.Context, co *fluentbitv1alpha2.Collector) error {
var sa corev1.ServiceAccount
sa := corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: co.Name,
Namespace: co.Namespace,
},
}
if err := r.Delete(ctx, &sa); err != nil && !errors.IsNotFound(err) {
return err
}
// TODO: clusterrole, clusterrolebinding

var svc corev1.Service
if err := r.Delete(ctx, &svc); err != nil && !errors.IsNotFound(err) {
sts := appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: co.Name,
Namespace: co.Namespace,
},
}
if err := r.Delete(ctx, &sts); err != nil && !errors.IsNotFound(err) {
return err
}

var sts appsv1.StatefulSet
if err := r.Delete(ctx, &sts); err != nil && !errors.IsNotFound(err) {
svcName := co.Name
if co.Spec.Service.Name != "" {
svcName = co.Spec.Service.Name
}
svc := corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: svcName,
Namespace: co.Namespace,
ksdpmx marked this conversation as resolved.
Show resolved Hide resolved
},
}
if err := r.Delete(ctx, &svc); err != nil && !errors.IsNotFound(err) {
return err
}

Expand Down
74 changes: 50 additions & 24 deletions controllers/fluent_controller_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package controllers

import (
"context"
rbacv1 "k8s.io/api/rbac/v1"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand Down Expand Up @@ -64,33 +66,34 @@ func (r *FluentdReconciler) handleFinalizer(ctx context.Context, instance *fluen
}

func (r *FluentdReconciler) delete(ctx context.Context, fd *fluentdv1alpha1.Fluentd) error {
var sa corev1.ServiceAccount
err := r.Get(ctx, client.ObjectKey{Namespace: fd.Namespace, Name: fd.Name}, &sa)
if err == nil {
if err := r.Delete(ctx, &sa); err != nil && !errors.IsNotFound(err) {
return err
}
} else if !errors.IsNotFound(err) {
sa := corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: fd.Name,
Namespace: fd.Namespace,
},
}
if err := r.Delete(ctx, &sa); err != nil && !errors.IsNotFound(err) {
return err
}
// TODO: clusterrole, clusterrolebinding

var svc corev1.Service
err = r.Get(ctx, client.ObjectKey{Namespace: fd.Namespace, Name: fd.Name}, &svc)
if err == nil {
if err := r.Delete(ctx, &svc); err != nil && !errors.IsNotFound(err) {
return err
}
} else if !errors.IsNotFound(err) {
sts := appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: fd.Name,
Namespace: fd.Namespace,
},
}
if err := r.Delete(ctx, &sts); err != nil && !errors.IsNotFound(err) {
return err
}

var sts appsv1.StatefulSet
err = r.Get(ctx, client.ObjectKey{Namespace: fd.Namespace, Name: fd.Name}, &sts)
if err == nil {
if err := r.Delete(ctx, &sts); err != nil && !errors.IsNotFound(err) {
return err
}
} else if !errors.IsNotFound(err) {
svc := corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: fd.Name,
Namespace: fd.Namespace,
},
}
if err := r.Delete(ctx, &svc); err != nil && !errors.IsNotFound(err) {
return err
}

Expand All @@ -99,13 +102,38 @@ func (r *FluentdReconciler) delete(ctx context.Context, fd *fluentdv1alpha1.Flue

func (r *FluentdReconciler) mutate(obj client.Object, fd *fluentdv1alpha1.Fluentd) controllerutil.MutateFn {
switch o := obj.(type) {
case *rbacv1.ClusterRole:
expected, _, _ := operator.MakeRBACObjects(fd.Name, fd.Namespace, "fluentd", fd.Spec.RBACRules, fd.Spec.ServiceAccountAnnotations)

return func() error {
o.Rules = expected.Rules
return nil
}
case *corev1.ServiceAccount:
_, expected, _ := operator.MakeRBACObjects(fd.Name, fd.Namespace, "fluentd", fd.Spec.RBACRules, fd.Spec.ServiceAccountAnnotations)

return func() error {
o.Labels = expected.Labels
o.Annotations = expected.Annotations
if err := ctrl.SetControllerReference(fd, o, r.Scheme); err != nil {
return err
}
return nil
}
case *rbacv1.ClusterRoleBinding:
_, _, expected := operator.MakeRBACObjects(fd.Name, fd.Namespace, "fluentd", fd.Spec.RBACRules, fd.Spec.ServiceAccountAnnotations)

return func() error {
o.RoleRef = expected.RoleRef
o.Subjects = expected.Subjects
return nil
}
case *appsv1.StatefulSet:
expected := operator.MakeStatefulset(*fd)

return func() error {
o.Labels = expected.Labels
o.Spec = expected.Spec
o.SetOwnerReferences(nil)
if err := ctrl.SetControllerReference(fd, o, r.Scheme); err != nil {
return err
}
Expand All @@ -118,13 +146,11 @@ func (r *FluentdReconciler) mutate(obj client.Object, fd *fluentdv1alpha1.Fluent
o.Labels = expected.Labels
o.Spec.Selector = expected.Spec.Selector
o.Spec.Ports = expected.Spec.Ports
o.SetOwnerReferences(nil)
if err := ctrl.SetControllerReference(fd, o, r.Scheme); err != nil {
return err
}
return nil
}

default:
}

Expand Down
Loading
Loading