Skip to content

Commit

Permalink
Support system layer in service section of fluenbit
Browse files Browse the repository at this point in the history
Signed-off-by: karan k <karan.k@oracle.com>
  • Loading branch information
karan56625 committed Jul 7, 2023
1 parent 5b16383 commit 0cb2240
Show file tree
Hide file tree
Showing 18 changed files with 445 additions and 0 deletions.
46 changes: 46 additions & 0 deletions apis/fluentbit/v1alpha2/clusterfluentbitconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,27 @@ type FluentBitConfigSpec struct {
Namespace *string `json:"namespace,omitempty"`
}

type Storage struct {
// Select an optional location in the file system to store streams and chunks of data/
Path string `json:"path,omitempty"`
// Configure the synchronization mode used to store the data into the file system
// +kubebuilder:validation:Enum:=normal;full
Sync string `json:"sync,omitempty"`
// Enable the data integrity check when writing and reading data from the filesystem
// +kubebuilder:validation:Enum:=on;off
Checksum string `json:"checksum,omitempty"`
// This option configure a hint of maximum value of memory to use when processing these records
BacklogMemLimit string `json:"backlogMemLimit,omitempty"`
// If the input plugin has enabled filesystem storage type, this property sets the maximum number of Chunks that can be up in memory
MaxChunksUp *int64 `json:"maxChunksUp,omitempty"`
// If http_server option has been enabled in the Service section, this option registers a new endpoint where internal metrics of the storage layer can be consumed
// +kubebuilder:validation:Enum:=on;off
Metrics string `json:"metrics,omitempty"`
// When enabled, irrecoverable chunks will be deleted during runtime, and any other irrecoverable chunk located in the configured storage path directory will be deleted when Fluent-Bit starts.
// +kubebuilder:validation:Enum:=on;off
DeleteIrrecoverableChunks string `json:"deleteIrrecoverableChunks,omitempty"`
}

type Service struct {
// If true go to background on start
Daemon *bool `json:"daemon,omitempty"`
Expand Down Expand Up @@ -80,6 +101,8 @@ type Service struct {
LogLevel string `json:"logLevel,omitempty"`
// Optional 'parsers' config file (can be multiple)
ParsersFile string `json:"parsersFile,omitempty"`
// Configure a global environment for the storage layer in Service
Storage *Storage `json:"storage,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down Expand Up @@ -149,6 +172,29 @@ func (s *Service) Params() *params.KVs {
if s.ParsersFile != "" {
m.Insert("Parsers_File", s.ParsersFile)
}
if s.Storage != nil {
if s.Storage.Path != "" {
m.Insert("storage.path", s.Storage.Path)
}
if s.Storage.Sync != "" {
m.Insert("storage.sync", s.Storage.Sync)
}
if s.Storage.Checksum != "" {
m.Insert("storage.checksum", s.Storage.Checksum)
}
if s.Storage.BacklogMemLimit != "" {
m.Insert("storage.backlog.mem_limit", s.Storage.BacklogMemLimit)
}
if s.Storage.Metrics != "" {
m.Insert("storage.metrics", s.Storage.Metrics)
}
if s.Storage.MaxChunksUp != nil {
m.Insert("storage.max_chunks_up", fmt.Sprint(*s.Storage.MaxChunksUp))
}
if s.Storage.DeleteIrrecoverableChunks != "" {
m.Insert("storage.delete_irrecoverable_chunks", s.Storage.DeleteIrrecoverableChunks)
}
}
return m
}

Expand Down
12 changes: 12 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/input/systemd_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ type Systemd struct {
// Remove the leading underscore of the Journald field (key). For example the Journald field _PID becomes the key PID.
// +kubebuilder:validation:Enum:=on;off
StripUnderscores string `json:"stripUnderscores,omitempty"`
// Specify the buffering mechanism to use. It can be memory or filesystem
// +kubebuilder:validation:Enum:=filesystem;memory
StorageType string `json:"storageType,omitempty"`
// Specifies if the input plugin should be paused (stop ingesting new data) when the storage.max_chunks_up value is reached.
// +kubebuilder:validation:Enum:=on;off
PauseOnChunksOverlimit string `json:"pauseOnChunksOverlimit,omitempty"`
}

func (_ *Systemd) Name() string {
Expand Down Expand Up @@ -85,6 +91,12 @@ func (s *Systemd) Params(_ plugins.SecretLoader) (*params.KVs, error) {
if s.StripUnderscores != "" {
kvs.Insert("Strip_Underscores", s.StripUnderscores)
}
if s.StorageType != "" {
kvs.Insert("storage.type", s.StorageType)
}
if s.PauseOnChunksOverlimit != "" {
kvs.Insert("storage.pause_on_chunks_overlimit", s.PauseOnChunksOverlimit)
}

return kvs, nil
}
12 changes: 12 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/input/tail_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ type Tail struct {
// This will help to reassembly multiline messages originally split by Docker or CRI
//Specify one or Multiline Parser definition to apply to the content.
MultilineParser string `json:"multilineParser,omitempty"`
// Specify the buffering mechanism to use. It can be memory or filesystem
// +kubebuilder:validation:Enum:=filesystem;memory
StorageType string `json:"storageType,omitempty"`
// Specifies if the input plugin should be paused (stop ingesting new data) when the storage.max_chunks_up value is reached.
// +kubebuilder:validation:Enum:=on;off
PauseOnChunksOverlimit string `json:"pauseOnChunksOverlimit,omitempty"`
}

func (_ *Tail) Name() string {
Expand Down Expand Up @@ -179,5 +185,11 @@ func (t *Tail) Params(_ plugins.SecretLoader) (*params.KVs, error) {
if t.MultilineParser != "" {
kvs.Insert("multiline.parser", t.MultilineParser)
}
if t.StorageType != "" {
kvs.Insert("storage.type", t.StorageType)
}
if t.PauseOnChunksOverlimit != "" {
kvs.Insert("storage.pause_on_chunks_overlimit", t.PauseOnChunksOverlimit)
}
return kvs, nil
}
5 changes: 5 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/output/open_search_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ type OpenSearch struct {
// Enables dedicated thread(s) for this output. Default value is set since version 1.8.13. For previous versions is 0.
Workers *int32 `json:"Workers,omitempty"`
*plugins.TLS `json:"tls,omitempty"`
// Limit the maximum number of Chunks in the filesystem for the current output logical destination.
TotalLimitSize string `json:"totalLimitSize,omitempty"`
}

// Name implement Section() method
Expand Down Expand Up @@ -215,5 +217,8 @@ func (o *OpenSearch) Params(sl plugins.SecretLoader) (*params.KVs, error) {
}
kvs.Merge(tls)
}
if o.TotalLimitSize != "" {
kvs.Insert("storage.total_limit_size", o.TotalLimitSize)
}
return kvs, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,56 @@ spec:
parsersFile:
description: Optional 'parsers' config file (can be multiple)
type: string
storage:
description: Configure a global environment for the storage layer
in Service
properties:
backlogMemLimit:
description: This option configure a hint of maximum value
of memory to use when processing these records
type: string
checksum:
description: Enable the data integrity check when writing
and reading data from the filesystem
enum:
- "on"
- "off"
type: string
deleteIrrecoverableChunks:
description: When enabled, irrecoverable chunks will be deleted
during runtime, and any other irrecoverable chunk located
in the configured storage path directory will be deleted
when Fluent-Bit starts.
enum:
- "on"
- "off"
type: string
maxChunksUp:
description: If the input plugin has enabled filesystem storage
type, this property sets the maximum number of Chunks that
can be up in memory
format: int64
type: integer
metrics:
description: If http_server option has been enabled in the
Service section, this option registers a new endpoint where
internal metrics of the storage layer can be consumed
enum:
- "on"
- "off"
type: string
path:
description: Select an optional location in the file system
to store streams and chunks of data/
type: string
sync:
description: Configure the synchronization mode used to store
the data into the file system
enum:
- normal
- full
type: string
type: object
type: object
type: object
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,28 @@ spec:
not set, the plugin will use default paths to read local-only
logs.
type: string
pauseOnChunksOverlimit:
description: Specifies if the input plugin should be paused (stop
ingesting new data) when the storage.max_chunks_up value is
reached.
enum:
- "on"
- "off"
type: string
readFromTail:
description: Start reading new entries. Skip entries already stored
in Journald.
enum:
- "on"
- "off"
type: string
storageType:
description: Specify the buffering mechanism to use. It can be
memory or filesystem
enum:
- filesystem
- memory
type: string
stripUnderscores:
description: Remove the leading underscore of the Journald field
(key). For example the Journald field _PID becomes the key PID.
Expand Down Expand Up @@ -328,6 +343,14 @@ spec:
file as part of the record. The value assigned becomes the key
in the map.
type: string
pauseOnChunksOverlimit:
description: Specifies if the input plugin should be paused (stop
ingesting new data) when the storage.max_chunks_up value is
reached.
enum:
- "on"
- "off"
type: string
readFromHead:
description: For new discovered files on start (without a database
offset/position), read the content from the head of the file,
Expand All @@ -350,6 +373,13 @@ spec:
behavior and instruct Fluent Bit to skip long lines and continue
processing other lines that fits into the buffer size.
type: boolean
storageType:
description: Specify the buffering mechanism to use. It can be
memory or filesystem
enum:
- filesystem
- memory
type: string
tag:
description: Set a tag (with regex-extract fields) that will be
placed on lines read. E.g. kube.<namespace_name>.<pod_name>.<container_name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1846,6 +1846,10 @@ spec:
description: Hostname to be used for TLS SNI extension
type: string
type: object
totalLimitSize:
description: Limit the maximum number of Chunks in the filesystem
for the current output logical destination.
type: string
traceError:
description: When enabled print the elasticsearch API calls to
stdout when elasticsearch returns an error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1846,6 +1846,10 @@ spec:
description: Hostname to be used for TLS SNI extension
type: string
type: object
totalLimitSize:
description: Limit the maximum number of Chunks in the filesystem
for the current output logical destination.
type: string
traceError:
description: When enabled print the elasticsearch API calls to
stdout when elasticsearch returns an error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ spec:
{{- toYaml .Values.fluentbit.input.systemd.systemdFilter.filters | nindent 6 }}
{{- end }}
{{- end }}
storageType: {{ .Values.fluentbit.input.systemd.storageType }}
{{- if eq .Values.fluentbit.input.systemd.storageType "filesystem" }}
pauseOnChunksOverlimit: {{ .Values.fluentbit.input.systemd.pauseOnChunksOverlimit | quote }}
{{- end }}
{{- end }}
{{- end }}
{{- end }}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ spec:
skipLongLines: {{ .Values.fluentbit.input.tail.skipLongLines }}
db: /fluent-bit/tail/pos.db
dbSync: Normal
storageType: {{ .Values.fluentbit.input.tail.storageType }}
{{- if eq .Values.fluentbit.input.tail.storageType "filesystem" }}
pauseOnChunksOverlimit: {{ .Values.fluentbit.input.tail.pauseOnChunksOverlimit | quote }}
{{- end }}
{{- end }}
{{- end }}
{{- end }}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ metadata:
spec:
service:
parsersFile: parsers.conf
{{- if .Values.fluentbit.service.storage }}
storage:
{{ toYaml .Values.fluentbit.service.storage | indent 6 }}
{{- end }}
inputSelector:
matchLabels:
fluentbit.fluent.io/enabled: "true"
Expand Down
6 changes: 6 additions & 0 deletions charts/fluent-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ fluentbit:
path: "/var/log/containers/*.log"
skipLongLines: true
readFromHead: false
storageType: memory
pauseOnChunksOverlimit: "off"
systemd:
enable: true
systemdFilter:
Expand All @@ -168,6 +170,8 @@ fluentbit:
path: "/var/log/journal"
includeKubelet: true
stripUnderscores: "off"
storageType: memory
pauseOnChunksOverlimit: "off"
nodeExporterMetrics: {}
# uncomment below nodeExporterMetrics section if you want to collect node exporter metrics
# nodeExporterMetrics:
Expand Down Expand Up @@ -213,6 +217,8 @@ fluentbit:
# You can configure the opensearch-related configuration here
stdout:
enable: false
service:
storage: {}

# Configure the default filters in FluentBit.
# The `filter` will filter and parse the collected log information and output the logs into a uniform format. You can choose whether to turn this on or not.
Expand Down
50 changes: 50 additions & 0 deletions config/crd/bases/fluentbit.fluent.io_clusterfluentbitconfigs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,56 @@ spec:
parsersFile:
description: Optional 'parsers' config file (can be multiple)
type: string
storage:
description: Configure a global environment for the storage layer
in Service
properties:
backlogMemLimit:
description: This option configure a hint of maximum value
of memory to use when processing these records
type: string
checksum:
description: Enable the data integrity check when writing
and reading data from the filesystem
enum:
- "on"
- "off"
type: string
deleteIrrecoverableChunks:
description: When enabled, irrecoverable chunks will be deleted
during runtime, and any other irrecoverable chunk located
in the configured storage path directory will be deleted
when Fluent-Bit starts.
enum:
- "on"
- "off"
type: string
maxChunksUp:
description: If the input plugin has enabled filesystem storage
type, this property sets the maximum number of Chunks that
can be up in memory
format: int64
type: integer
metrics:
description: If http_server option has been enabled in the
Service section, this option registers a new endpoint where
internal metrics of the storage layer can be consumed
enum:
- "on"
- "off"
type: string
path:
description: Select an optional location in the file system
to store streams and chunks of data/
type: string
sync:
description: Configure the synchronization mode used to store
the data into the file system
enum:
- normal
- full
type: string
type: object
type: object
type: object
type: object
Expand Down
Loading

0 comments on commit 0cb2240

Please sign in to comment.