From 9c7b7b9d7d0e111c2839d750c753e115c639623e Mon Sep 17 00:00:00 2001 From: Gregory Fast Date: Thu, 27 Jun 2024 18:28:46 -0400 Subject: [PATCH 1/5] OpAMP Bridge: Applier support for OTel CRD v1beta1 API Version **Description**: Updates the opamp bridge's config applier interface and client to support opentelemetry.io/v1beta1, explicitly removing support for applying OpenTelemetryCollector configurations of the opentelemetry.io/v1alph1 version **Link to tracking Issue(s)**: https://github.com/open-telemetry/opentelemetry-operator/issues/2985 **Testing**: Update Documentation: n/a --- cmd/operator-opamp-bridge/agent/agent.go | 4 +- cmd/operator-opamp-bridge/agent/agent_test.go | 53 ++-- .../agent/testdata/basic.yaml | 3 +- .../agent/testdata/invalid.yaml | 2 +- .../agent/testdata/updated.yaml | 3 +- cmd/operator-opamp-bridge/operator/client.go | 249 ++++++++++-------- .../operator/client_test.go | 64 ++++- .../operator/testdata/collector-v1alpha1.yaml | 31 +++ .../operator/testdata/collector.yaml | 4 +- .../operator/testdata/invalid-collector.yaml | 2 +- .../testdata/reporting-collector.yaml | 4 +- .../testdata/unmanaged-collector.yaml | 4 +- .../operator/testdata/updated-collector.yaml | 4 +- 13 files changed, 276 insertions(+), 151 deletions(-) create mode 100644 cmd/operator-opamp-bridge/operator/testdata/collector-v1alpha1.yaml diff --git a/cmd/operator-opamp-bridge/agent/agent.go b/cmd/operator-opamp-bridge/agent/agent.go index 792c2d23de..f6fdb33cde 100644 --- a/cmd/operator-opamp-bridge/agent/agent.go +++ b/cmd/operator-opamp-bridge/agent/agent.go @@ -30,7 +30,7 @@ import ( "k8s.io/utils/clock" "sigs.k8s.io/yaml" - "github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1" + "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" "github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/config" "github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/metrics" "github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/operator" @@ -136,7 +136,7 @@ func (agent *Agent) generateCollectorPoolHealth() (map[string]*protobufs.Compone } // getCollectorSelector destructures the collectors scale selector if present, if uses the labelmap from the operator. -func (agent *Agent) getCollectorSelector(col v1alpha1.OpenTelemetryCollector) map[string]string { +func (agent *Agent) getCollectorSelector(col v1beta1.OpenTelemetryCollector) map[string]string { if len(col.Status.Scale.Selector) > 0 { selMap := map[string]string{} for _, kvPair := range strings.Split(col.Status.Scale.Selector, ",") { diff --git a/cmd/operator-opamp-bridge/agent/agent_test.go b/cmd/operator-opamp-bridge/agent/agent_test.go index 75d533eae1..3ab67cba70 100644 --- a/cmd/operator-opamp-bridge/agent/agent_test.go +++ b/cmd/operator-opamp-bridge/agent/agent_test.go @@ -39,6 +39,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" "github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1" + "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" "github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/config" "github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/operator" ) @@ -147,15 +148,15 @@ type mockOpampClient struct { settings types.StartSettings } -func (m *mockOpampClient) SetCustomCapabilities(customCapabilities *protobufs.CustomCapabilities) error { +func (m *mockOpampClient) SetCustomCapabilities(_ *protobufs.CustomCapabilities) error { return nil } -func (m *mockOpampClient) SendCustomMessage(message *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) { +func (m *mockOpampClient) SendCustomMessage(_ *protobufs.CustomMessage) (messageSendingChannel chan struct{}, err error) { return nil, nil } -func (m *mockOpampClient) RequestConnectionSettings(request *protobufs.ConnectionSettingsRequest) error { +func (m *mockOpampClient) RequestConnectionSettings(_ *protobufs.ConnectionSettingsRequest) error { return nil } @@ -201,6 +202,7 @@ func (m *mockOpampClient) SetPackageStatuses(_ *protobufs.PackageStatuses) error func getFakeApplier(t *testing.T, conf *config.Config, lists ...runtimeClient.ObjectList) *operator.Client { schemeBuilder := runtime.NewSchemeBuilder(func(s *runtime.Scheme) error { s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.OpenTelemetryCollector{}, &v1alpha1.OpenTelemetryCollectorList{}) + s.AddKnownTypes(v1beta1.GroupVersion, &v1beta1.OpenTelemetryCollector{}, &v1beta1.OpenTelemetryCollectorList{}) s.AddKnownTypes(v1.SchemeGroupVersion, &v1.Pod{}, &v1.PodList{}) metav1.AddToGroupVersion(s, v1alpha1.GroupVersion) return nil @@ -414,14 +416,17 @@ func TestAgent_getHealth(t *testing.T) { agent.clock = fakeClock err := agent.Start() defer agent.Shutdown() + require.NoError(t, err, "should be able to start agent") if len(tt.args.configs) > 0 { - require.True(t, len(tt.args.configs) == len(tt.want), "must have an equal amount of configs and checks.") + require.Len(t, tt.args.configs, len(tt.want), "must have an equal amount of configs and checks.") } else { require.Len(t, tt.want, 1, "must have exactly one want if no config is supplied.") require.Equal(t, tt.want[0], agent.getHealth()) } + for i, configMap := range tt.args.configs { + var data *types.MessageData data, err := getMessageDataFromConfigFile(configMap) require.NoError(t, err, "should be able to load data") agent.onMessage(tt.args.ctx, data) @@ -495,7 +500,8 @@ func TestAgent_onMessage(t *testing.T) { "name: " + testCollectorName, "namespace: " + testNamespace, "send_batch_size: 10000", - "receivers: [otlp]", + "receivers:", + "- otlp", "status:", }, }, @@ -523,7 +529,8 @@ func TestAgent_onMessage(t *testing.T) { "name: " + testCollectorName, "namespace: " + testNamespace, "send_batch_size: 10000", - "receivers: [otlp]", + "receivers:", + "- otlp", "status:", }, }, @@ -549,7 +556,7 @@ func TestAgent_onMessage(t *testing.T) { status: &protobufs.RemoteConfigStatus{ LastRemoteConfigHash: []byte(invalidYamlConfigHash), Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED, - ErrorMessage: "error converting YAML to JSON: yaml: line 23: could not find expected ':'", + ErrorMessage: "failed to unmarshal config into v1beta1 API Version: error converting YAML to JSON: yaml: line 23: could not find expected ':'", }, }, }, @@ -571,7 +578,8 @@ func TestAgent_onMessage(t *testing.T) { "name: " + testCollectorName, "namespace: " + testNamespace, "send_batch_size: 10000", - "receivers: [otlp]", + "receivers:", + "- otlp", "status:", }, }, @@ -656,7 +664,9 @@ func TestAgent_onMessage(t *testing.T) { "name: " + testCollectorName, "namespace: " + testNamespace, "send_batch_size: 10000", - "processors: [memory_limiter, batch]", + "processors:", + "- memory_limiter", + "- batch", "replicas: 3", "status:", }, @@ -709,7 +719,7 @@ func TestAgent_onMessage(t *testing.T) { nextStatus: &protobufs.RemoteConfigStatus{ LastRemoteConfigHash: []byte(invalidYamlConfigHash), // The new hash should be of the bad config Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED, - ErrorMessage: "error converting YAML to JSON: yaml: line 23: could not find expected ':'", + ErrorMessage: "failed to unmarshal config into v1beta1 API Version: error converting YAML to JSON: yaml: line 23: could not find expected ':'", }, }, }, @@ -757,7 +767,9 @@ func TestAgent_onMessage(t *testing.T) { "name: " + otherCollectorName, "namespace: " + testNamespace, "send_batch_size: 10000", - "processors: [memory_limiter, batch]", + "processors:", + "- memory_limiter", + "- batch", "status:", }, }, @@ -805,14 +817,17 @@ func TestAgent_onMessage(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { mockClient := &mockOpampClient{} + conf := config.NewConfig(logr.Discard()) loadErr := config.LoadFromFile(conf, tt.fields.configFile) require.NoError(t, loadErr, "should be able to load config") + applier := getFakeApplier(t, conf) agent := NewAgent(l, applier, conf, mockClient) err := agent.Start() defer agent.Shutdown() require.NoError(t, err, "should be able to start agent") + data, err := getMessageDataFromConfigFile(tt.args.configFile) require.NoError(t, err, "should be able to load data") agent.onMessage(tt.args.ctx, data) @@ -824,17 +839,20 @@ func TestAgent_onMessage(t *testing.T) { } assert.NotNilf(t, effectiveConfig.ConfigMap.GetConfigMap(), "configmap should have data") for colNameNamespace, expectedContents := range tt.want.contents { - assert.Contains(t, effectiveConfig.ConfigMap.GetConfigMap(), colNameNamespace) + configFileMap := effectiveConfig.ConfigMap.GetConfigMap() + require.Contains(t, configFileMap, colNameNamespace) + configFileString := string(configFileMap[colNameNamespace].GetBody()) for _, content := range expectedContents { - asString := string(effectiveConfig.ConfigMap.GetConfigMap()[colNameNamespace].GetBody()) - assert.Contains(t, asString, content) + assert.Contains(t, configFileString, content, "config should contain %s", content) } } assert.Equal(t, tt.want.status, mockClient.lastStatus) + if tt.args.nextConfigFile == nil { // Nothing left to do! return } + nextData, err := getMessageDataFromConfigFile(tt.args.nextConfigFile) require.NoError(t, err, "should be able to load updated data") agent.onMessage(tt.args.ctx, nextData) @@ -843,10 +861,11 @@ func TestAgent_onMessage(t *testing.T) { assert.Equal(t, nextEffectiveConfig, mockClient.lastEffectiveConfig, "client's config should be updated") assert.NotNilf(t, nextEffectiveConfig.ConfigMap.GetConfigMap(), "configmap should have updated data") for colNameNamespace, expectedContents := range tt.want.nextContents { - assert.Contains(t, nextEffectiveConfig.ConfigMap.GetConfigMap(), colNameNamespace) + configFileMap := nextEffectiveConfig.ConfigMap.GetConfigMap() + require.Contains(t, configFileMap, colNameNamespace) + configFileString := string(configFileMap[colNameNamespace].GetBody()) for _, content := range expectedContents { - asString := string(nextEffectiveConfig.ConfigMap.GetConfigMap()[colNameNamespace].GetBody()) - assert.Contains(t, asString, content) + assert.Contains(t, configFileString, content) } } assert.Equal(t, tt.want.nextStatus, mockClient.lastStatus) diff --git a/cmd/operator-opamp-bridge/agent/testdata/basic.yaml b/cmd/operator-opamp-bridge/agent/testdata/basic.yaml index 410a328edd..89432eea51 100644 --- a/cmd/operator-opamp-bridge/agent/testdata/basic.yaml +++ b/cmd/operator-opamp-bridge/agent/testdata/basic.yaml @@ -1,10 +1,11 @@ +apiVersion: opentelemetry.io/v1beta1 kind: OpenTelemetryCollector metadata: name: simplest labels: "opentelemetry.io/opamp-managed": "true" spec: - config: | + config: receivers: otlp: protocols: diff --git a/cmd/operator-opamp-bridge/agent/testdata/invalid.yaml b/cmd/operator-opamp-bridge/agent/testdata/invalid.yaml index 2a060581f4..75969bc50e 100644 --- a/cmd/operator-opamp-bridge/agent/testdata/invalid.yaml +++ b/cmd/operator-opamp-bridge/agent/testdata/invalid.yaml @@ -4,7 +4,7 @@ metadata: labels: "opentelemetry.io/opamp-managed": "true" spec: - config: | + config: receivers: otlp: protocols: diff --git a/cmd/operator-opamp-bridge/agent/testdata/updated.yaml b/cmd/operator-opamp-bridge/agent/testdata/updated.yaml index 53ce4af492..a83f6517fc 100644 --- a/cmd/operator-opamp-bridge/agent/testdata/updated.yaml +++ b/cmd/operator-opamp-bridge/agent/testdata/updated.yaml @@ -1,10 +1,11 @@ +apiVersion: opentelemetry.io/v1beta1 kind: OpenTelemetryCollector metadata: name: simplest labels: "opentelemetry.io/opamp-managed": "test-bridge" spec: - config: | + config: receivers: otlp: protocols: diff --git a/cmd/operator-opamp-bridge/operator/client.go b/cmd/operator-opamp-bridge/operator/client.go index 899d3bb636..72010b5ff3 100644 --- a/cmd/operator-opamp-bridge/operator/client.go +++ b/cmd/operator-opamp-bridge/operator/client.go @@ -28,7 +28,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/yaml" - "github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1" + "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" ) const ( @@ -43,17 +43,17 @@ type ConfigApplier interface { // Apply receives a name and namespace to apply an OpenTelemetryCollector CRD that is contained in the configmap. Apply(name string, namespace string, configmap *protobufs.AgentConfigFile) error + // Delete attempts to delete an OpenTelemetryCollector object given a name and namespace. + Delete(name string, namespace string) error + + // ListInstances retrieves all OpenTelemetryCollector CRDs created by the operator-opamp-bridge agent. + ListInstances() ([]v1beta1.OpenTelemetryCollector, error) + // GetInstance retrieves an OpenTelemetryCollector CRD given a name and namespace. - GetInstance(name string, namespace string) (*v1alpha1.OpenTelemetryCollector, error) + GetInstance(name string, namespace string) (*v1beta1.OpenTelemetryCollector, error) // GetCollectorPods retrieves all pods that match the given collector's selector labels and namespace. GetCollectorPods(selectorLabels map[string]string, namespace string) (*v1.PodList, error) - - // ListInstances retrieves all OpenTelemetryCollector CRDs created by the operator-opamp-bridge agent. - ListInstances() ([]v1alpha1.OpenTelemetryCollector, error) - - // Delete attempts to delete an OpenTelemetryCollector object given a name and namespace. - Delete(name string, namespace string) error } type Client struct { @@ -76,20 +76,111 @@ func NewClient(name string, log logr.Logger, c client.Client, componentsAllowed } } -func (c Client) labelSetContainsLabel(instance *v1alpha1.OpenTelemetryCollector, label, value string) bool { - if instance == nil || instance.GetLabels() == nil { - return false +func (c Client) Apply(name string, namespace string, configmap *protobufs.AgentConfigFile) error { + c.log.Info("Received new config", "name", name, "namespace", namespace) + + if len(configmap.Body) == 0 { + return errors.NewBadRequest("invalid config to apply: config is empty") + } + + var collector v1beta1.OpenTelemetryCollector + err := yaml.Unmarshal(configmap.Body, &collector) + if err != nil { + return errors.NewBadRequest(fmt.Sprintf("failed to unmarshal config into v1beta1 API Version: %v", err)) } - if labels := instance.GetLabels(); labels != nil && strings.EqualFold(labels[label], value) { - return true + + err = c.validateComponents(&collector.Spec.Config) + if err != nil { + return err } - return false + + ctx := context.Background() + updatedCollector := collector.DeepCopy() + instance, err := c.GetInstance(name, namespace) + if err != nil { + return err + } + + err = c.validateLabels(instance) + if err != nil { + return err + } + err = c.validateLabels(updatedCollector) + if err != nil { + return err + } + + if instance == nil { + return c.create(ctx, name, namespace, updatedCollector) + } + return c.update(ctx, instance, updatedCollector) } -func (c Client) create(ctx context.Context, name string, namespace string, collector *v1alpha1.OpenTelemetryCollector) error { +func (c Client) validateComponents(collectorConfig *v1beta1.Config) error { + if c.componentsAllowed == nil || len(c.componentsAllowed) == 0 { + return nil + } + + configuredComponents := map[string]map[string]interface{}{ + "receivers": collectorConfig.Receivers.Object, + "processors": collectorConfig.Processors.Object, + "exporters": collectorConfig.Exporters.Object, + } + + var invalidComponents []string + for component, componentMap := range configuredComponents { + if _, ok := c.componentsAllowed[component]; !ok { + invalidComponents = append(invalidComponents, component) + continue + } + for componentName := range componentMap { + if _, ok := c.componentsAllowed[component][componentName]; !ok { + invalidComponents = append(invalidComponents, fmt.Sprintf("%s.%s", component, componentName)) + } + } + } + + if len(invalidComponents) > 0 { + return errors.NewBadRequest(fmt.Sprintf("Items in config are not allowed: %v", invalidComponents)) + } + + return nil +} + +func (c Client) validateLabels(collector *v1beta1.OpenTelemetryCollector) error { + if collector == nil { + return nil + } + + resourceLabels := collector.GetLabels() + + // If either the received collector resource has labels indicating it should only report and is not managed, + // disallow applying the new collector config + if labelSetContainsLabel(resourceLabels, ReportingLabelKey, "true") { + return errors.NewBadRequest(fmt.Sprintf("cannot modify a collector with `%s: true`", ReportingLabelKey)) + } + + // If either the collector doesn't have the managed label set to true, it should disallow applying the new collector + // config + if !labelSetContainsLabel(resourceLabels, ManagedLabelKey, "true") && + !labelSetContainsLabel(resourceLabels, ManagedLabelKey, c.name) { + return errors.NewBadRequest(fmt.Sprintf("cannot modify a collector that doesn't have `%s: true | ` set", ManagedLabelKey)) + } + + return nil +} + +func labelSetContainsLabel(resourceLabelSet map[string]string, label, value string) bool { + if len(resourceLabelSet) == 0 { + return false + } + return strings.EqualFold(resourceLabelSet[label], value) +} + +func (c Client) create(ctx context.Context, name string, namespace string, collector *v1beta1.OpenTelemetryCollector) error { // Set the defaults collector.TypeMeta.Kind = CollectorResource - collector.TypeMeta.APIVersion = v1alpha1.GroupVersion.String() + collector.TypeMeta.APIVersion = v1beta1.GroupVersion.String() collector.ObjectMeta.Name = name collector.ObjectMeta.Namespace = namespace @@ -97,61 +188,22 @@ func (c Client) create(ctx context.Context, name string, namespace string, colle collector.ObjectMeta.Labels = map[string]string{} } collector.ObjectMeta.Labels[ResourceIdentifierKey] = ResourceIdentifierValue + c.log.Info("Creating collector") return c.k8sClient.Create(ctx, collector) } -func (c Client) update(ctx context.Context, old *v1alpha1.OpenTelemetryCollector, new *v1alpha1.OpenTelemetryCollector) error { +func (c Client) update(ctx context.Context, old *v1beta1.OpenTelemetryCollector, new *v1beta1.OpenTelemetryCollector) error { new.ObjectMeta = old.ObjectMeta new.TypeMeta = old.TypeMeta + c.log.Info("Updating collector") return c.k8sClient.Update(ctx, new) } -func (c Client) Apply(name string, namespace string, configmap *protobufs.AgentConfigFile) error { - c.log.Info("Received new config", "name", name, "namespace", namespace) - var collector v1alpha1.OpenTelemetryCollector - err := yaml.Unmarshal(configmap.Body, &collector) - if err != nil { - return err - } - if len(collector.Spec.Config) == 0 { - return errors.NewBadRequest("Must supply valid configuration") - } - reasons, validateErr := c.validate(collector.Spec) - if validateErr != nil { - return validateErr - } - if len(reasons) > 0 { - return errors.NewBadRequest(fmt.Sprintf("Items in config are not allowed: %v", reasons)) - } - updatedCollector := collector.DeepCopy() - ctx := context.Background() - instance, err := c.GetInstance(name, namespace) - if err != nil { - return err - } - // If either the received collector or the collector being created has reporting set to true, it should be denied - if c.labelSetContainsLabel(instance, ReportingLabelKey, "true") || - c.labelSetContainsLabel(updatedCollector, ReportingLabelKey, "true") { - return errors.NewBadRequest("cannot modify a collector with `opentelemetry.io/opamp-reporting: true`") - } - // If either the received collector or the collector doesn't have the managed label set to true, it should be denied - if !c.labelSetContainsLabel(instance, ManagedLabelKey, "true") && - !c.labelSetContainsLabel(instance, ManagedLabelKey, c.name) && - !c.labelSetContainsLabel(updatedCollector, ManagedLabelKey, "true") && - !c.labelSetContainsLabel(updatedCollector, ManagedLabelKey, c.name) { - return errors.NewBadRequest("cannot modify a collector that doesn't have `opentelemetry.io/opamp-managed: true | ` set") - } - if instance == nil { - return c.create(ctx, name, namespace, updatedCollector) - } - return c.update(ctx, instance, updatedCollector) -} - func (c Client) Delete(name string, namespace string) error { ctx := context.Background() - result := v1alpha1.OpenTelemetryCollector{} + result := v1beta1.OpenTelemetryCollector{} err := c.k8sClient.Get(ctx, client.ObjectKey{ Namespace: namespace, Name: name, @@ -165,36 +217,51 @@ func (c Client) Delete(name string, namespace string) error { return c.k8sClient.Delete(ctx, &result) } -func (c Client) ListInstances() ([]v1alpha1.OpenTelemetryCollector, error) { +func (c Client) ListInstances() ([]v1beta1.OpenTelemetryCollector, error) { ctx := context.Background() - result := v1alpha1.OpenTelemetryCollectorList{} + + var instances []v1beta1.OpenTelemetryCollector + labelSelector := labels.NewSelector() requirement, err := labels.NewRequirement(ManagedLabelKey, selection.In, []string{c.name, "true"}) if err != nil { return nil, err } - err = c.k8sClient.List(ctx, &result, client.MatchingLabelsSelector{Selector: labelSelector.Add(*requirement)}) - if err != nil { - return nil, err + managedCollectorLabelSelector := client.MatchingLabelsSelector{Selector: labelSelector.Add(*requirement)} + reportingCollectorLabelMatcher := client.MatchingLabels{ReportingLabelKey: "true"} + + managedCollectors, listErr := c.listCollectors(ctx, managedCollectorLabelSelector) + if listErr != nil { + return nil, listErr } - reportingCollectors := v1alpha1.OpenTelemetryCollectorList{} - err = c.k8sClient.List(ctx, &reportingCollectors, client.MatchingLabels{ - ReportingLabelKey: "true", - }) - if err != nil { - return nil, err + instances = append(instances, managedCollectors.Items...) + + reportingCollectors, listErr := c.listCollectors(ctx, reportingCollectorLabelMatcher) + if listErr != nil { + return nil, listErr } - items := append(result.Items, reportingCollectors.Items...) - for i := range items { - items[i].SetManagedFields(nil) + instances = append(instances, reportingCollectors.Items...) + + for i := range instances { + instances[i].SetManagedFields(nil) } - return items, nil + return instances, nil } -func (c Client) GetInstance(name string, namespace string) (*v1alpha1.OpenTelemetryCollector, error) { +func (c Client) listCollectors(ctx context.Context, opts ...client.ListOption) (v1beta1.OpenTelemetryCollectorList, error) { + collectorsList := v1beta1.OpenTelemetryCollectorList{} + err := c.k8sClient.List(ctx, &collectorsList, opts...) + if err != nil { + return v1beta1.OpenTelemetryCollectorList{}, err + } + return collectorsList, nil +} + +func (c Client) GetInstance(name string, namespace string) (*v1beta1.OpenTelemetryCollector, error) { ctx := context.Background() - result := v1alpha1.OpenTelemetryCollector{} + result := v1beta1.OpenTelemetryCollector{} + err := c.k8sClient.Get(ctx, client.ObjectKey{ Namespace: namespace, Name: name, @@ -214,33 +281,3 @@ func (c Client) GetCollectorPods(selectorLabels map[string]string, namespace str err := c.k8sClient.List(ctx, podList, client.MatchingLabels(selectorLabels), client.InNamespace(namespace)) return podList, err } - -func (c Client) validate(spec v1alpha1.OpenTelemetryCollectorSpec) ([]string, error) { - // Do not use this feature if it's not specified - if c.componentsAllowed == nil || len(c.componentsAllowed) == 0 { - return nil, nil - } - collectorConfig := make(map[string]map[string]interface{}) - err := yaml.Unmarshal([]byte(spec.Config), &collectorConfig) - if err != nil { - return nil, err - } - var invalidComponents []string - for component, componentMap := range collectorConfig { - if component == "service" { - // We don't care about what's in the service pipelines. - // Only components declared in the configuration can be used in the service pipeline. - continue - } - if _, ok := c.componentsAllowed[component]; !ok { - invalidComponents = append(invalidComponents, component) - continue - } - for componentName := range componentMap { - if _, ok := c.componentsAllowed[component][componentName]; !ok { - invalidComponents = append(invalidComponents, fmt.Sprintf("%s.%s", component, componentName)) - } - } - } - return invalidComponents, nil -} diff --git a/cmd/operator-opamp-bridge/operator/client_test.go b/cmd/operator-opamp-bridge/operator/client_test.go index a5f000b877..ed8e0ada07 100644 --- a/cmd/operator-opamp-bridge/operator/client_test.go +++ b/cmd/operator-opamp-bridge/operator/client_test.go @@ -32,6 +32,7 @@ import ( "sigs.k8s.io/yaml" "github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1" + "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" ) var ( @@ -45,6 +46,7 @@ const ( func getFakeClient(t *testing.T, lists ...client.ObjectList) client.WithWatch { schemeBuilder := runtime.NewSchemeBuilder(func(s *runtime.Scheme) error { s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.OpenTelemetryCollector{}, &v1alpha1.OpenTelemetryCollectorList{}) + s.AddKnownTypes(v1beta1.GroupVersion, &v1beta1.OpenTelemetryCollector{}, &v1beta1.OpenTelemetryCollectorList{}) s.AddKnownTypes(v1.SchemeGroupVersion, &v1.Pod{}, &v1.PodList{}) metav1.AddToGroupVersion(s, v1alpha1.GroupVersion) return nil @@ -96,7 +98,7 @@ func TestClient_Apply(t *testing.T) { config: "", }, wantErr: true, - errContains: "Must supply valid configuration", + errContains: "invalid config to apply: config is empty", }, { name: "create reporting-only", @@ -118,6 +120,16 @@ func TestClient_Apply(t *testing.T) { wantErr: true, errContains: "opentelemetry.io/opamp-managed", }, + { + name: "cannot apply v1alpha1 Collector config", + args: args{ + name: "test", + namespace: "opentelemetry", + file: "testdata/collector-v1alpha1.yaml", + }, + wantErr: true, + errContains: "failed to unmarshal config into v1beta1 API Version", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -144,7 +156,7 @@ func TestClient_Apply(t *testing.T) { } } -func Test_collectorUpdate(t *testing.T) { +func TestClient_ApplyUpdate(t *testing.T) { name := "test" namespace := "testing" fakeClient := getFakeClient(t) @@ -153,19 +165,24 @@ func Test_collectorUpdate(t *testing.T) { // Load reporting-only collector reportingColConfig, err := loadConfig("testdata/reporting-collector.yaml") require.NoError(t, err, "Should be no error on loading test configuration") - var reportingCol v1alpha1.OpenTelemetryCollector + + var reportingCol v1beta1.OpenTelemetryCollector err = yaml.Unmarshal(reportingColConfig, &reportingCol) require.NoError(t, err, "Should be no error on unmarshal") + reportingCol.TypeMeta.Kind = CollectorResource - reportingCol.TypeMeta.APIVersion = v1alpha1.GroupVersion.String() + reportingCol.TypeMeta.APIVersion = v1beta1.GroupVersion.String() reportingCol.ObjectMeta.Name = "simplest" reportingCol.ObjectMeta.Namespace = namespace + err = fakeClient.Create(context.Background(), &reportingCol) require.NoError(t, err, "Should be able to make reporting col") + allInstances, err := c.ListInstances() require.NoError(t, err, "Should be able to list all collectors") require.Len(t, allInstances, 1) + // Create managed collector colConfig, err := loadConfig("testdata/collector.yaml") require.NoError(t, err, "Should be no error on loading test configuration") configmap := &protobufs.AgentConfigFile{ @@ -176,15 +193,26 @@ func Test_collectorUpdate(t *testing.T) { err = c.Apply(name, namespace, configmap) require.NoError(t, err, "Should apply base config") - // Get the newly created collector + // Confirm there are now two collector instances, reporting and managed + allInstances, err = c.ListInstances() + require.NoError(t, err, "Should be able to list all collectors") + require.Len(t, allInstances, 2, "Should be two collector instances") + + // Get the newly created collector instance instance, err := c.GetInstance(name, namespace) require.NoError(t, err, "Should be able to get the newly created instance") - assert.Contains(t, instance.Spec.Config, "processors: []") - // Try updating with an invalid one + require.NotNil(t, instance, "Should be able to get the newly created instance") + require.Len(t, instance.Spec.Config.Service.Pipelines, 1, "Should have a single pipeline") + require.Contains(t, instance.Spec.Config.Service.Pipelines, "traces", "Should have a traces pipeline") + originalTracesPipeline := instance.Spec.Config.Service.Pipelines["traces"] + require.NotNil(t, originalTracesPipeline, "Should have a traces pipeline") + require.Empty(t, originalTracesPipeline.Processors, "Should have the no processors configured for the traces pipeline") + + // Try updating with an invalid configuration configmap.Body = []byte("empty, invalid!") err = c.Apply(name, namespace, configmap) - assert.Error(t, err, "Should be unable to update") + assert.Error(t, err, "Should be unable to update with invalid config") // Update successfully with a valid configuration newColConfig, err := loadConfig("testdata/updated-collector.yaml") @@ -198,8 +226,13 @@ func Test_collectorUpdate(t *testing.T) { // Get the updated collector updatedInstance, err := c.GetInstance(name, namespace) - require.NoError(t, err, "Should be able to get the updated instance") - assert.Contains(t, updatedInstance.Spec.Config, "processors: [memory_limiter, batch]") + require.NoError(t, err, "Should be able to get the updated instance without error") + require.NotNil(t, updatedInstance, "Should be able to get the newly created instance") + require.Len(t, updatedInstance.Spec.Config.Service.Pipelines, 1, "Should have a single pipeline") + require.Contains(t, updatedInstance.Spec.Config.Service.Pipelines, "traces", "Should have a traces pipeline") + newTracesPipeline := updatedInstance.Spec.Config.Service.Pipelines["traces"] + require.NotNil(t, newTracesPipeline, "Should have a traces pipeline") + require.Equal(t, []string{"memory_limiter", "batch"}, newTracesPipeline.Processors, "Should have the memory_limiter and batch processors") allInstances, err = c.ListInstances() require.NoError(t, err, "Should be able to list all collectors") @@ -208,7 +241,7 @@ func Test_collectorUpdate(t *testing.T) { assert.Contains(t, allInstances, *updatedInstance) } -func Test_collectorDelete(t *testing.T) { +func TestClient_Delete(t *testing.T) { name := "test" namespace := "testing" fakeClient := getFakeClient(t) @@ -225,8 +258,11 @@ func Test_collectorDelete(t *testing.T) { // Get the newly created collector instance, err := c.GetInstance(name, namespace) - require.NoError(t, err, "Should be able to get the newly created instance") - assert.Contains(t, instance.Spec.Config, "processors: []") + require.NoError(t, err, "Should be able to get the newly created instance without error") + require.NotNil(t, instance, "Should be able to get the newly created instance") + require.NotNil(t, instance.Spec.Config.Processors, "Should have processor") + require.Contains(t, instance.Spec.Config.Processors.Object, "batch", "Should have the batch processor") + require.Len(t, instance.Spec.Config.Service.Pipelines, 1, "Should have a pipeline") // Delete it err = c.Delete(name, namespace) @@ -235,7 +271,7 @@ func Test_collectorDelete(t *testing.T) { // Check there's nothing left allInstances, err := c.ListInstances() require.NoError(t, err, "Should be able to list all collectors") - assert.Len(t, allInstances, 0) + require.Empty(t, allInstances, "Should be empty after deletion") } func loadConfig(file string) ([]byte, error) { diff --git a/cmd/operator-opamp-bridge/operator/testdata/collector-v1alpha1.yaml b/cmd/operator-opamp-bridge/operator/testdata/collector-v1alpha1.yaml new file mode 100644 index 0000000000..c77c64c488 --- /dev/null +++ b/cmd/operator-opamp-bridge/operator/testdata/collector-v1alpha1.yaml @@ -0,0 +1,31 @@ +apiVersion: opentelemetry.io/v1alpha1 +kind: OpenTelemetryCollector +metadata: + name: simplest + labels: + opentelemetry.io/opamp-managed: "true" +spec: + config: | + receivers: + otlp: + protocols: + grpc: + http: + processors: + memory_limiter: + check_interval: 1s + limit_percentage: 75 + spike_limit_percentage: 15 + batch: + send_batch_size: 10000 + timeout: 10s + + exporters: + debug: + + service: + pipelines: + traces: + receivers: [otlp] + processors: [] + exporters: [debug] diff --git a/cmd/operator-opamp-bridge/operator/testdata/collector.yaml b/cmd/operator-opamp-bridge/operator/testdata/collector.yaml index c77c64c488..16cb21cc28 100644 --- a/cmd/operator-opamp-bridge/operator/testdata/collector.yaml +++ b/cmd/operator-opamp-bridge/operator/testdata/collector.yaml @@ -1,11 +1,11 @@ -apiVersion: opentelemetry.io/v1alpha1 +apiVersion: opentelemetry.io/v1beta1 kind: OpenTelemetryCollector metadata: name: simplest labels: opentelemetry.io/opamp-managed: "true" spec: - config: | + config: receivers: otlp: protocols: diff --git a/cmd/operator-opamp-bridge/operator/testdata/invalid-collector.yaml b/cmd/operator-opamp-bridge/operator/testdata/invalid-collector.yaml index f64ebcedf4..4d165d9745 100644 --- a/cmd/operator-opamp-bridge/operator/testdata/invalid-collector.yaml +++ b/cmd/operator-opamp-bridge/operator/testdata/invalid-collector.yaml @@ -2,7 +2,7 @@ kind: OpenTelemetryCollector metadata: name: simplest spec: - config: | + config: receivers: otlp: protocols: diff --git a/cmd/operator-opamp-bridge/operator/testdata/reporting-collector.yaml b/cmd/operator-opamp-bridge/operator/testdata/reporting-collector.yaml index 3b0a925464..32cdc3899f 100644 --- a/cmd/operator-opamp-bridge/operator/testdata/reporting-collector.yaml +++ b/cmd/operator-opamp-bridge/operator/testdata/reporting-collector.yaml @@ -1,11 +1,11 @@ -apiVersion: opentelemetry.io/v1alpha1 +apiVersion: opentelemetry.io/v1beta1 kind: OpenTelemetryCollector metadata: name: simplest labels: "opentelemetry.io/opamp-reporting": "true" spec: - config: | + config: receivers: otlp: protocols: diff --git a/cmd/operator-opamp-bridge/operator/testdata/unmanaged-collector.yaml b/cmd/operator-opamp-bridge/operator/testdata/unmanaged-collector.yaml index 48b793f98d..1c941b9832 100644 --- a/cmd/operator-opamp-bridge/operator/testdata/unmanaged-collector.yaml +++ b/cmd/operator-opamp-bridge/operator/testdata/unmanaged-collector.yaml @@ -1,11 +1,11 @@ -apiVersion: opentelemetry.io/v1alpha1 +apiVersion: opentelemetry.io/v1beta1 kind: OpenTelemetryCollector metadata: name: simplest labels: "opentelemetry.io/opamp-managed": "false" spec: - config: | + config: receivers: otlp: protocols: diff --git a/cmd/operator-opamp-bridge/operator/testdata/updated-collector.yaml b/cmd/operator-opamp-bridge/operator/testdata/updated-collector.yaml index 81a919b28d..e0ede2376a 100644 --- a/cmd/operator-opamp-bridge/operator/testdata/updated-collector.yaml +++ b/cmd/operator-opamp-bridge/operator/testdata/updated-collector.yaml @@ -1,11 +1,11 @@ -apiVersion: opentelemetry.io/v1alpha1 +apiVersion: opentelemetry.io/v1beta1 kind: OpenTelemetryCollector metadata: name: simplest labels: opentelemetry.io/opamp-managed: "true" spec: - config: | + config: receivers: otlp: protocols: From 6ff74f38df57c497eb871afccf4f97f76d0b065e Mon Sep 17 00:00:00 2001 From: Gregory Fast Date: Mon, 1 Jul 2024 12:08:19 -0400 Subject: [PATCH 2/5] Add chloggen entry --- .chloggen/opamp-bridge-v1beta1-support.yaml | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100755 .chloggen/opamp-bridge-v1beta1-support.yaml diff --git a/.chloggen/opamp-bridge-v1beta1-support.yaml b/.chloggen/opamp-bridge-v1beta1-support.yaml new file mode 100755 index 0000000000..90dfc73781 --- /dev/null +++ b/.chloggen/opamp-bridge-v1beta1-support.yaml @@ -0,0 +1,20 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action) +component: opamp + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adds support for v1beta1 OpenTelemetry Collector API in the OpAMP Bridge + +# One or more tracking issues related to the change +issues: [2985] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + This change adds support for the OpAMP Bridge to manage and apply OpenTelemetry Collectors using the v1beta1 API in + the OpAMP Bridge. This change removes support for applying OpenTelemetry Collectors using the v1alpha1 API version. + The v1beta1 API is the latest version of the OpenTelemetry Collector API and is the recommended version for new + deployments. \ No newline at end of file From 6f0a732a53056a632eafccb7e6fe16591779d1e4 Mon Sep 17 00:00:00 2001 From: Gregory Fast Date: Mon, 1 Jul 2024 15:12:18 -0400 Subject: [PATCH 3/5] Simplify list instances code --- cmd/operator-opamp-bridge/operator/client.go | 25 +++++++------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/cmd/operator-opamp-bridge/operator/client.go b/cmd/operator-opamp-bridge/operator/client.go index 72010b5ff3..1033e40cff 100644 --- a/cmd/operator-opamp-bridge/operator/client.go +++ b/cmd/operator-opamp-bridge/operator/client.go @@ -228,17 +228,19 @@ func (c Client) ListInstances() ([]v1beta1.OpenTelemetryCollector, error) { return nil, err } managedCollectorLabelSelector := client.MatchingLabelsSelector{Selector: labelSelector.Add(*requirement)} - reportingCollectorLabelMatcher := client.MatchingLabels{ReportingLabelKey: "true"} - managedCollectors, listErr := c.listCollectors(ctx, managedCollectorLabelSelector) - if listErr != nil { - return nil, listErr + managedCollectors := v1beta1.OpenTelemetryCollectorList{} + err = c.k8sClient.List(ctx, &managedCollectors, managedCollectorLabelSelector) + if err != nil { + return nil, err } instances = append(instances, managedCollectors.Items...) - reportingCollectors, listErr := c.listCollectors(ctx, reportingCollectorLabelMatcher) - if listErr != nil { - return nil, listErr + reportingCollectorLabelMatcher := client.MatchingLabels{ReportingLabelKey: "true"} + reportingCollectors := v1beta1.OpenTelemetryCollectorList{} + err = c.k8sClient.List(ctx, &reportingCollectors, reportingCollectorLabelMatcher) + if err != nil { + return nil, err } instances = append(instances, reportingCollectors.Items...) @@ -249,15 +251,6 @@ func (c Client) ListInstances() ([]v1beta1.OpenTelemetryCollector, error) { return instances, nil } -func (c Client) listCollectors(ctx context.Context, opts ...client.ListOption) (v1beta1.OpenTelemetryCollectorList, error) { - collectorsList := v1beta1.OpenTelemetryCollectorList{} - err := c.k8sClient.List(ctx, &collectorsList, opts...) - if err != nil { - return v1beta1.OpenTelemetryCollectorList{}, err - } - return collectorsList, nil -} - func (c Client) GetInstance(name string, namespace string) (*v1beta1.OpenTelemetryCollector, error) { ctx := context.Background() result := v1beta1.OpenTelemetryCollector{} From bf071257ac1db92907dfca8f1a28b94992c16efb Mon Sep 17 00:00:00 2001 From: Gregory Fast Date: Wed, 3 Jul 2024 09:49:57 -0400 Subject: [PATCH 4/5] Register v1beta1 types --- cmd/operator-opamp-bridge/config/config.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/operator-opamp-bridge/config/config.go b/cmd/operator-opamp-bridge/config/config.go index 7f4eaab4d5..cd83f043f1 100644 --- a/cmd/operator-opamp-bridge/config/config.go +++ b/cmd/operator-opamp-bridge/config/config.go @@ -41,6 +41,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log/zap" "github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1" + "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" "github.com/open-telemetry/opentelemetry-operator/cmd/operator-opamp-bridge/logger" ) @@ -56,6 +57,7 @@ var ( func registerKnownTypes(s *k8sruntime.Scheme) error { s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.OpenTelemetryCollector{}, &v1alpha1.OpenTelemetryCollectorList{}) + s.AddKnownTypes(v1beta1.GroupVersion, &v1beta1.OpenTelemetryCollector{}, &v1beta1.OpenTelemetryCollectorList{}) metav1.AddToGroupVersion(s, v1alpha1.GroupVersion) return nil } From 8003feaa486ba0b63a0f25d1ffd644b9dfdbf29b Mon Sep 17 00:00:00 2001 From: Gregory Fast Date: Wed, 3 Jul 2024 09:54:39 -0400 Subject: [PATCH 5/5] Register v1beta1 group version --- cmd/operator-opamp-bridge/config/config.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/operator-opamp-bridge/config/config.go b/cmd/operator-opamp-bridge/config/config.go index cd83f043f1..13a6661e0e 100644 --- a/cmd/operator-opamp-bridge/config/config.go +++ b/cmd/operator-opamp-bridge/config/config.go @@ -59,6 +59,7 @@ func registerKnownTypes(s *k8sruntime.Scheme) error { s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.OpenTelemetryCollector{}, &v1alpha1.OpenTelemetryCollectorList{}) s.AddKnownTypes(v1beta1.GroupVersion, &v1beta1.OpenTelemetryCollector{}, &v1beta1.OpenTelemetryCollectorList{}) metav1.AddToGroupVersion(s, v1alpha1.GroupVersion) + metav1.AddToGroupVersion(s, v1beta1.GroupVersion) return nil }