Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to use new parsers #3069

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
53 changes: 53 additions & 0 deletions apis/v1beta1/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@ import (
"strconv"
"strings"

"github.com/go-logr/logr"
"gopkg.in/yaml.v3"
corev1 "k8s.io/api/core/v1"

"github.com/open-telemetry/opentelemetry-operator/internal/components"
"github.com/open-telemetry/opentelemetry-operator/internal/components/exporters"
"github.com/open-telemetry/opentelemetry-operator/internal/components/receivers"
)

type ComponentType int
Expand Down Expand Up @@ -133,6 +139,53 @@ type Config struct {
Service Service `json:"service" yaml:"service"`
}

// getPortsForComponentTypes gets the ports for the given ComponentType(s).
// NOTE FOR REVIEWERS: we could also do this by introducing something in components that is aware of which retriever
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seeking opinions here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this depends on where we want to handle finding the right component config. At this level, we know it exists, as we're iterating over the map. The retriever would have to check and potentially return an error. Is there any other place we need this logic in? If it's just here, I'd keep it as is.

// is being used and pass the right config in where necessary...
func (c *Config) getPortsForComponentTypes(logger logr.Logger, componentType ...ComponentType) ([]corev1.ServicePort, error) {
swiatekm marked this conversation as resolved.
Show resolved Hide resolved
var ports []corev1.ServicePort
enabledComponents := c.GetEnabledComponents()
for _, cType := range componentType {
var retriever components.ParserRetriever
var cfg AnyConfig
switch cType {
case ComponentTypeReceiver:
retriever = receivers.ReceiverFor
cfg = c.Receivers
case ComponentTypeExporter:
retriever = exporters.ParserFor
cfg = c.Exporters
case ComponentTypeProcessor:
break
}
for componentName := range enabledComponents[cType] {
// NOTE FOR REVIEWERS:
swiatekm marked this conversation as resolved.
Show resolved Hide resolved
// I don't love that we pass the component name in three different times here (to get the parser, to set
// the service name, to access in the object)
// Maybe in a followup this can be cleaned up?
parser := retriever(componentName)
if parsedPorts, err := parser.Ports(logger, componentName, cfg.Object[componentName]); err != nil {
return nil, err
} else {
ports = append(ports, parsedPorts...)
}
}
}
return ports, nil
}

func (c *Config) GetReceiverPorts(logger logr.Logger) ([]corev1.ServicePort, error) {
return c.getPortsForComponentTypes(logger, ComponentTypeReceiver)
}

func (c *Config) GetExporterPorts(logger logr.Logger) ([]corev1.ServicePort, error) {
return c.getPortsForComponentTypes(logger, ComponentTypeExporter)
}

func (c *Config) GetAllPorts(logger logr.Logger) ([]corev1.ServicePort, error) {
return c.getPortsForComponentTypes(logger, ComponentTypeReceiver, ComponentTypeExporter)
}

// Yaml encodes the current object and returns it as a string.
func (c *Config) Yaml() (string, error) {
var buf bytes.Buffer
Expand Down
161 changes: 161 additions & 0 deletions apis/v1beta1/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ import (
"strings"
"testing"

"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
go_yaml "gopkg.in/yaml.v3"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/ptr"
"sigs.k8s.io/yaml"
)

Expand Down Expand Up @@ -385,3 +389,160 @@ func TestConfig_GetEnabledComponents(t *testing.T) {
})
}
}

func TestConfig_GetReceiverPorts(t *testing.T) {
tests := []struct {
name string
file string
want []v1.ServicePort
wantErr bool
}{

{
name: "connectors",
file: "testdata/otelcol-connectors.yaml",
want: nil,
wantErr: true,
},
{
name: "couchbase",
file: "testdata/otelcol-couchbase.yaml",
want: nil, // Couchbase uses a prometheus scraper, no ports should be opened
},
{
name: "demo",
file: "testdata/otelcol-demo.yaml",
want: []v1.ServicePort{
{
Name: "otlp-grpc",
Protocol: "",
AppProtocol: ptr.To("grpc"),
Port: 4317,
TargetPort: intstr.FromInt32(4317),
},
},
},
{
name: "extensions",
file: "testdata/otelcol-extensions.yaml",
want: []v1.ServicePort{
{
Name: "otlp-grpc",
Protocol: "",
AppProtocol: ptr.To("grpc"),
Port: 4317,
TargetPort: intstr.FromInt32(4317),
},
},
},
{
name: "filelog",
file: "testdata/otelcol-filelog.yaml",
want: nil,
},
{
name: "null",
file: "testdata/otelcol-null-values.yaml",
want: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
collectorYaml, err := os.ReadFile(tt.file)
require.NoError(t, err)

c := &Config{}
err = go_yaml.Unmarshal(collectorYaml, c)
require.NoError(t, err)
ports, err := c.GetReceiverPorts(logr.Discard())
if tt.wantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
assert.Equalf(t, tt.want, ports, "GetReceiverPorts()")
})
}
}

func TestConfig_GetExporterPorts(t *testing.T) {
tests := []struct {
name string
file string
want []v1.ServicePort
wantErr bool
}{

{
name: "connectors",
file: "testdata/otelcol-connectors.yaml",
want: nil,
wantErr: false,
},
{
name: "couchbase",
file: "testdata/otelcol-couchbase.yaml",
want: []v1.ServicePort{
{
Name: "prometheus",
Port: 9123,
},
},
},
{
name: "demo",
file: "testdata/otelcol-demo.yaml",
want: []v1.ServicePort{
{
Name: "prometheus",
Port: 8889,
},
{
Name: "otlp",
Port: 4317,
},
{
Name: "zipkin",
Port: 9411,
},
},
},
{
name: "extensions",
file: "testdata/otelcol-extensions.yaml",
want: []v1.ServicePort{
{
Name: "otlp",
Port: 4317,
},
},
},
{
name: "filelog",
file: "testdata/otelcol-filelog.yaml",
want: nil,
},
{
name: "null",
file: "testdata/otelcol-null-values.yaml",
want: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
collectorYaml, err := os.ReadFile(tt.file)
require.NoError(t, err)

c := &Config{}
err = go_yaml.Unmarshal(collectorYaml, c)
require.NoError(t, err)
ports, err := c.GetExporterPorts(logr.Discard())
if tt.wantErr {
require.Error(t, err)
return
}
require.NoError(t, err)
assert.ElementsMatchf(t, tt.want, ports, "GetReceiverPorts()")
})
}
}
6 changes: 3 additions & 3 deletions apis/v1beta1/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const (
)

// TODO: Refactor this logic, centralize it. See: https://github.com/open-telemetry/opentelemetry-operator/issues/2603
type components struct {
type componentDefinitions struct {
receivers []string
processors []string
exporters []string
Expand Down Expand Up @@ -193,9 +193,9 @@ func extractElements(elements map[string]interface{}) []string {
return items
}

func getComponentsFromConfig(yamlContent Config) *components {
func getComponentsFromConfig(yamlContent Config) *componentDefinitions {

info := &components{
info := &componentDefinitions{
receivers: extractElements(yamlContent.Receivers.Object),
exporters: extractElements(yamlContent.Exporters.Object),
}
Expand Down
26 changes: 21 additions & 5 deletions internal/components/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,22 +81,25 @@ func PortFromEndpoint(endpoint string) (int32, error) {
port, err = strconv.ParseInt(cleanedPortStr, 10, 32)

if err != nil {
return 0, err
return UnsetPort, err
}
}

if port == 0 {
return 0, PortNotFoundErr
return UnsetPort, PortNotFoundErr
}

return int32(port), err
}

type ParserRetriever func(string) ComponentPortParser

type ComponentPortParser interface {
// Ports returns the service ports parsed based on the exporter's configuration
Ports(logger logr.Logger, config interface{}) ([]corev1.ServicePort, error)
// Ports returns the service ports parsed based on the component's configuration where name is the component's name
// of the form "name" or "type/name"
Ports(logger logr.Logger, name string, config interface{}) ([]corev1.ServicePort, error)

// ParserType returns the name of this parser
// ParserType returns the type of this parser
ParserType() string

// ParserName is an internal name for the parser
Expand All @@ -113,3 +116,16 @@ func ConstructServicePort(current *corev1.ServicePort, port int32) corev1.Servic
Protocol: current.Protocol,
}
}

func GetPortsForConfig(logger logr.Logger, config map[string]interface{}, retriever ParserRetriever) ([]corev1.ServicePort, error) {
var ports []corev1.ServicePort
for componentName, componentDef := range config {
swiatekm marked this conversation as resolved.
Show resolved Hide resolved
parser := retriever(componentName)
if parsedPorts, err := parser.Ports(logger, componentName, componentDef); err != nil {
return nil, err
} else {
ports = append(ports, parsedPorts...)
}
}
return ports, nil
}
70 changes: 70 additions & 0 deletions internal/components/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
package components_test

import (
"fmt"
"testing"

"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"

"github.com/open-telemetry/opentelemetry-operator/internal/components"
"github.com/open-telemetry/opentelemetry-operator/internal/components/receivers"
)

func TestComponentType(t *testing.T) {
Expand Down Expand Up @@ -65,3 +70,68 @@ func TestReceiverParsePortFromEndpoint(t *testing.T) {
})
}
}

func TestGetPortsForConfig(t *testing.T) {
type args struct {
config map[string]interface{}
retriever components.ParserRetriever
}
tests := []struct {
name string
args args
want []corev1.ServicePort
wantErr assert.ErrorAssertionFunc
}{
{
name: "nothing",
args: args{
config: nil,
retriever: receivers.ReceiverFor,
},
want: nil,
wantErr: assert.NoError,
},
{
name: "bad config",
args: args{
config: map[string]interface{}{
"test": "garbage",
},
retriever: receivers.ReceiverFor,
},
want: nil,
wantErr: assert.Error,
},
{
name: "receivers",
args: args{
config: map[string]interface{}{
"otlp": map[string]interface{}{
"protocols": map[string]interface{}{
"grpc": map[string]interface{}{},
},
},
},
retriever: receivers.ReceiverFor,
},
want: []corev1.ServicePort{
{
Name: "otlp-grpc",
Port: 4317,
TargetPort: intstr.FromInt32(4317),
AppProtocol: &components.GrpcProtocol,
},
},
wantErr: assert.NoError,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := components.GetPortsForConfig(logr.Discard(), tt.args.config, tt.args.retriever)
if !tt.wantErr(t, err, fmt.Sprintf("GetPortsForConfig(%v)", tt.args.config)) {
return
}
assert.Equalf(t, tt.want, got, "GetPortsForConfig(%v)", tt.args.config)
})
}
}
Loading
Loading