Skip to content

Commit

Permalink
feat: Make Kafka group.id configurable. Fixes #89 (#464)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <alex_collins@intuit.com>
  • Loading branch information
alexec committed Oct 25, 2021
1 parent 782c39a commit 5ec67f6
Show file tree
Hide file tree
Showing 15 changed files with 264 additions and 151 deletions.
329 changes: 185 additions & 144 deletions api/v1alpha1/generated.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions api/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions api/v1alpha1/kafka_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type KafkaSource struct {
FetchMin *resource.Quantity `json:"fetchMin,omitempty" protobuf:"bytes,3,opt,name=fetchMin"`
// +kubebuilder:default="500ms"
FetchWaitMax *metav1.Duration `json:"fetchWaitMax,omitempty" protobuf:"bytes,4,opt,name=fetchWaitMax"`
// GroupID is the consumer group ID. If not specified, a unique deterministic group ID is generated.
GroupID string `json:"groupId,omitempty" protobuf:"bytes,5,opt,name=groupId"`
}

func (m *KafkaSource) GetAutoOffsetReset() string {
Expand All @@ -26,3 +28,10 @@ func (m *KafkaSource) GetFetchMinBytes() int {
func (m *KafkaSource) GetFetchWaitMaxMs() int {
return int(m.FetchWaitMax.Milliseconds())
}

func (m *KafkaSource) GetGroupID(defaultGroupID string) string {
if m.GroupID != "" {
return m.GroupID
}
return defaultGroupID
}
11 changes: 11 additions & 0 deletions api/v1alpha1/kafka_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,14 @@ func TestKafkaSource_GetFetchWaitMaxMs(t *testing.T) {
s := KafkaSource{FetchWaitMax: &metav1.Duration{Duration: time.Second}}
assert.Equal(t, 1000, s.GetFetchWaitMaxMs())
}

func TestKafkaSource_GetGroupID(t *testing.T) {
t.Run("Default", func(t *testing.T) {
s := &KafkaSource{}
assert.Equal(t, "foo", s.GetGroupID("foo"))
})
t.Run("Specified", func(t *testing.T) {
s := &KafkaSource{GroupID: "bar"}
assert.Equal(t, "bar", s.GetGroupID("foo"))
})
}
9 changes: 9 additions & 0 deletions config/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3646,6 +3646,11 @@ spec:
fetchWaitMax:
default: 500ms
type: string
groupId:
description: GroupID is the consumer group ID. If
not specified, a unique deterministic group ID is
generated.
type: string
maxMessageBytes:
format: int32
type: integer
Expand Down Expand Up @@ -10689,6 +10694,10 @@ spec:
fetchWaitMax:
default: 500ms
type: string
groupId:
description: GroupID is the consumer group ID. If not specified,
a unique deterministic group ID is generated.
type: string
maxMessageBytes:
format: int32
type: integer
Expand Down
5 changes: 5 additions & 0 deletions config/crd/bases/dataflow.argoproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3641,6 +3641,11 @@ spec:
fetchWaitMax:
default: 500ms
type: string
groupId:
description: GroupID is the consumer group ID. If
not specified, a unique deterministic group ID is
generated.
type: string
maxMessageBytes:
format: int32
type: integer
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/dataflow.argoproj.io_steps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3468,6 +3468,10 @@ spec:
fetchWaitMax:
default: 500ms
type: string
groupId:
description: GroupID is the consumer group ID. If not specified,
a unique deterministic group ID is generated.
type: string
maxMessageBytes:
format: int32
type: integer
Expand Down
9 changes: 9 additions & 0 deletions config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3646,6 +3646,11 @@ spec:
fetchWaitMax:
default: 500ms
type: string
groupId:
description: GroupID is the consumer group ID. If
not specified, a unique deterministic group ID is
generated.
type: string
maxMessageBytes:
format: int32
type: integer
Expand Down Expand Up @@ -10689,6 +10694,10 @@ spec:
fetchWaitMax:
default: 500ms
type: string
groupId:
description: GroupID is the consumer group ID. If not specified,
a unique deterministic group ID is generated.
type: string
maxMessageBytes:
format: int32
type: integer
Expand Down
9 changes: 9 additions & 0 deletions config/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3646,6 +3646,11 @@ spec:
fetchWaitMax:
default: 500ms
type: string
groupId:
description: GroupID is the consumer group ID. If
not specified, a unique deterministic group ID is
generated.
type: string
maxMessageBytes:
format: int32
type: integer
Expand Down Expand Up @@ -10689,6 +10694,10 @@ spec:
fetchWaitMax:
default: 500ms
type: string
groupId:
description: GroupID is the consumer group ID. If not specified,
a unique deterministic group ID is generated.
type: string
maxMessageBytes:
format: int32
type: integer
Expand Down
9 changes: 9 additions & 0 deletions config/quick-start.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3646,6 +3646,11 @@ spec:
fetchWaitMax:
default: 500ms
type: string
groupId:
description: GroupID is the consumer group ID. If
not specified, a unique deterministic group ID is
generated.
type: string
maxMessageBytes:
format: int32
type: integer
Expand Down Expand Up @@ -10689,6 +10694,10 @@ spec:
fetchWaitMax:
default: 500ms
type: string
groupId:
description: GroupID is the consumer group ID. If not specified,
a unique deterministic group ID is generated.
type: string
maxMessageBytes:
format: int32
type: integer
Expand Down
9 changes: 6 additions & 3 deletions dsls/python/argo_dataflow/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,13 +600,14 @@ def dump(self):


class KafkaSource(Source):
def __init__(self, topic, name=None, retry=None, startOffset=None, fetchMin=None, fetchWaitMax=None):
def __init__(self, topic, name=None, retry=None, startOffset=None, fetchMin=None, fetchWaitMax=None, groupId=None):
super().__init__(name=name, retry=retry)
assert topic
self._topic = topic
self._startOffset = startOffset
self._fetchMin = fetchMin
self._fetchWaitMax = fetchWaitMax
self._groupId = groupId

def dump(self):
x = super().dump()
Expand All @@ -617,6 +618,8 @@ def dump(self):
y["fetchMin"] = self._fetchMin
if self._fetchWaitMax:
y["fetchWaitMax"] = self._fetchWaitMax
if self._groupId:
y["groupId"] = self._groupId
x['kafka'] = y
return x

Expand Down Expand Up @@ -655,9 +658,9 @@ def http(name=None, retry=None, serviceName=None):
return HTTPSource(name=name, serviceName=serviceName, retry=retry)


def kafka(topic=None, name=None, retry=None, startOffset=None, fetchMin=None, fetchWaitMax=None):
def kafka(topic=None, name=None, retry=None, startOffset=None, fetchMin=None, fetchWaitMax=None, groupId=None):
return KafkaSource(topic, name=name, retry=retry, startOffset=startOffset, fetchMin=fetchMin,
fetchWaitMax=fetchWaitMax)
fetchWaitMax=fetchWaitMax, groupId=groupId)


def stan(subject=None, name=None, retry=None):
Expand Down
2 changes: 1 addition & 1 deletion examples/301-kafka-pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
""")
.annotate("dataflow.argoproj.io/test", "true")
.step(
(kafka('input-topic')
(kafka('input-topic', groupId='my-group')
.cat()
.kafka('output-topic', a_sync=True)
))
Expand Down
1 change: 1 addition & 0 deletions examples/301-kafka-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ spec:
topic: output-topic
sources:
- kafka:
groupId: my-group
topic: input-topic
3 changes: 2 additions & 1 deletion runner/sidecar/source/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ const (
pendingUnavailable = math.MinInt32
)

func New(ctx context.Context, secretInterface corev1.SecretInterface, consumerGroupID, sourceName, sourceURN string, replica int, x dfv1.KafkaSource, process source.Process) (source.Interface, error) {
func New(ctx context.Context, secretInterface corev1.SecretInterface, cluster, namespace, pipelineName, stepName, sourceName, sourceURN string, replica int, x dfv1.KafkaSource, process source.Process) (source.Interface, error) {
logger := sharedutil.NewLogger().WithValues("source", sourceName)
config, err := sharedkafka.GetConfig(ctx, secretInterface, x.KafkaConfig)
if err != nil {
return nil, err
}
config["go.logs.channel.enable"] = true
consumerGroupID := x.GetGroupID(sharedutil.GetSourceUID(cluster, namespace, pipelineName, stepName, sourceName))
config["group.id"] = consumerGroupID
config["group.instance.id"] = fmt.Sprintf("%s/%d", consumerGroupID, replica)
config["heartbeat.interval.ms"] = 3 * seconds
Expand Down
3 changes: 1 addition & 2 deletions runner/sidecar/sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ func connectSources(ctx context.Context, process func(context.Context, []byte) e
sources[sourceName] = y
}
} else if x := s.Kafka; x != nil {
groupID := sharedutil.GetSourceUID(cluster, namespace, pipelineName, stepName, sourceName)
if y, err := kafkasource.New(ctx, secretInterface, groupID, sourceName, sourceURN, replica, *x, processWithRetry); err != nil {
if y, err := kafkasource.New(ctx, secretInterface, cluster, namespace, pipelineName, stepName, sourceName, sourceURN, replica, *x, processWithRetry); err != nil {
return err
} else {
sources[sourceName] = y
Expand Down

0 comments on commit 5ec67f6

Please sign in to comment.