Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support file system as storage layer in service section of fluenbit #825

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions apis/fluentbit/v1alpha2/clusterfluentbitconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,27 @@ type FluentBitConfigSpec struct {
Namespace *string `json:"namespace,omitempty"`
}

type Storage struct {
// Select an optional location in the file system to store streams and chunks of data/
Path string `json:"path,omitempty"`
// Configure the synchronization mode used to store the data into the file system
// +kubebuilder:validation:Enum:=normal;full
Sync string `json:"sync,omitempty"`
// Enable the data integrity check when writing and reading data from the filesystem
// +kubebuilder:validation:Enum:=on;off
Checksum string `json:"checksum,omitempty"`
// This option configure a hint of maximum value of memory to use when processing these records
BacklogMemLimit string `json:"backlogMemLimit,omitempty"`
// If the input plugin has enabled filesystem storage type, this property sets the maximum number of Chunks that can be up in memory
MaxChunksUp *int64 `json:"maxChunksUp,omitempty"`
// If http_server option has been enabled in the Service section, this option registers a new endpoint where internal metrics of the storage layer can be consumed
// +kubebuilder:validation:Enum:=on;off
Metrics string `json:"metrics,omitempty"`
// When enabled, irrecoverable chunks will be deleted during runtime, and any other irrecoverable chunk located in the configured storage path directory will be deleted when Fluent-Bit starts.
// +kubebuilder:validation:Enum:=on;off
DeleteIrrecoverableChunks string `json:"deleteIrrecoverableChunks,omitempty"`
}

type Service struct {
// If true go to background on start
Daemon *bool `json:"daemon,omitempty"`
Expand Down Expand Up @@ -80,6 +101,8 @@ type Service struct {
LogLevel string `json:"logLevel,omitempty"`
// Optional 'parsers' config file (can be multiple)
ParsersFile string `json:"parsersFile,omitempty"`
// Configure a global environment for the storage layer in Service. It is recommended to configure the volume and volumeMount separately for this storage. The hostPath type should be used for that Volume in Fluentbit daemon set.
Storage *Storage `json:"storage,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down Expand Up @@ -149,6 +172,29 @@ func (s *Service) Params() *params.KVs {
if s.ParsersFile != "" {
m.Insert("Parsers_File", s.ParsersFile)
}
if s.Storage != nil {
if s.Storage.Path != "" {
m.Insert("storage.path", s.Storage.Path)
}
if s.Storage.Sync != "" {
m.Insert("storage.sync", s.Storage.Sync)
}
if s.Storage.Checksum != "" {
m.Insert("storage.checksum", s.Storage.Checksum)
}
if s.Storage.BacklogMemLimit != "" {
m.Insert("storage.backlog.mem_limit", s.Storage.BacklogMemLimit)
}
if s.Storage.Metrics != "" {
m.Insert("storage.metrics", s.Storage.Metrics)
}
if s.Storage.MaxChunksUp != nil {
m.Insert("storage.max_chunks_up", fmt.Sprint(*s.Storage.MaxChunksUp))
}
if s.Storage.DeleteIrrecoverableChunks != "" {
m.Insert("storage.delete_irrecoverable_chunks", s.Storage.DeleteIrrecoverableChunks)
}
}
return m
}

Expand Down
12 changes: 12 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/input/systemd_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ type Systemd struct {
// Remove the leading underscore of the Journald field (key). For example the Journald field _PID becomes the key PID.
// +kubebuilder:validation:Enum:=on;off
StripUnderscores string `json:"stripUnderscores,omitempty"`
// Specify the buffering mechanism to use. It can be memory or filesystem
// +kubebuilder:validation:Enum:=filesystem;memory
StorageType string `json:"storageType,omitempty"`
// Specifies if the input plugin should be paused (stop ingesting new data) when the storage.max_chunks_up value is reached.
// +kubebuilder:validation:Enum:=on;off
PauseOnChunksOverlimit string `json:"pauseOnChunksOverlimit,omitempty"`
}

func (_ *Systemd) Name() string {
Expand Down Expand Up @@ -85,6 +91,12 @@ func (s *Systemd) Params(_ plugins.SecretLoader) (*params.KVs, error) {
if s.StripUnderscores != "" {
kvs.Insert("Strip_Underscores", s.StripUnderscores)
}
if s.StorageType != "" {
kvs.Insert("storage.type", s.StorageType)
}
if s.PauseOnChunksOverlimit != "" {
kvs.Insert("storage.pause_on_chunks_overlimit", s.PauseOnChunksOverlimit)
}

return kvs, nil
}
12 changes: 12 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/input/tail_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ type Tail struct {
// This will help to reassembly multiline messages originally split by Docker or CRI
//Specify one or Multiline Parser definition to apply to the content.
MultilineParser string `json:"multilineParser,omitempty"`
// Specify the buffering mechanism to use. It can be memory or filesystem
// +kubebuilder:validation:Enum:=filesystem;memory
StorageType string `json:"storageType,omitempty"`
// Specifies if the input plugin should be paused (stop ingesting new data) when the storage.max_chunks_up value is reached.
// +kubebuilder:validation:Enum:=on;off
PauseOnChunksOverlimit string `json:"pauseOnChunksOverlimit,omitempty"`
}

func (_ *Tail) Name() string {
Expand Down Expand Up @@ -179,5 +185,11 @@ func (t *Tail) Params(_ plugins.SecretLoader) (*params.KVs, error) {
if t.MultilineParser != "" {
kvs.Insert("multiline.parser", t.MultilineParser)
}
if t.StorageType != "" {
kvs.Insert("storage.type", t.StorageType)
}
if t.PauseOnChunksOverlimit != "" {
kvs.Insert("storage.pause_on_chunks_overlimit", t.PauseOnChunksOverlimit)
}
return kvs, nil
}
5 changes: 5 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/output/open_search_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ type OpenSearch struct {
// Enables dedicated thread(s) for this output. Default value is set since version 1.8.13. For previous versions is 0.
Workers *int32 `json:"Workers,omitempty"`
*plugins.TLS `json:"tls,omitempty"`
// Limit the maximum number of Chunks in the filesystem for the current output logical destination.
TotalLimitSize string `json:"totalLimitSize,omitempty"`
}

// Name implement Section() method
Expand Down Expand Up @@ -215,5 +217,8 @@ func (o *OpenSearch) Params(sl plugins.SecretLoader) (*params.KVs, error) {
}
kvs.Merge(tls)
}
if o.TotalLimitSize != "" {
kvs.Insert("storage.total_limit_size", o.TotalLimitSize)
}
return kvs, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,58 @@ spec:
parsersFile:
description: Optional 'parsers' config file (can be multiple)
type: string
storage:
description: Configure a global environment for the storage layer
in Service. It is recommended to configure the volume and volumeMount
separately for this storage. The hostPath type should be used
for that Volume in Fluentbit daemon set.
properties:
backlogMemLimit:
description: This option configure a hint of maximum value
of memory to use when processing these records
type: string
checksum:
description: Enable the data integrity check when writing
and reading data from the filesystem
enum:
- "on"
- "off"
type: string
deleteIrrecoverableChunks:
description: When enabled, irrecoverable chunks will be deleted
during runtime, and any other irrecoverable chunk located
in the configured storage path directory will be deleted
when Fluent-Bit starts.
enum:
- "on"
- "off"
type: string
maxChunksUp:
description: If the input plugin has enabled filesystem storage
type, this property sets the maximum number of Chunks that
can be up in memory
format: int64
type: integer
metrics:
description: If http_server option has been enabled in the
Service section, this option registers a new endpoint where
internal metrics of the storage layer can be consumed
enum:
- "on"
- "off"
type: string
path:
description: Select an optional location in the file system
to store streams and chunks of data/
type: string
sync:
description: Configure the synchronization mode used to store
the data into the file system
enum:
- normal
- full
type: string
type: object
type: object
type: object
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,28 @@ spec:
not set, the plugin will use default paths to read local-only
logs.
type: string
pauseOnChunksOverlimit:
description: Specifies if the input plugin should be paused (stop
ingesting new data) when the storage.max_chunks_up value is
reached.
enum:
- "on"
- "off"
type: string
readFromTail:
description: Start reading new entries. Skip entries already stored
in Journald.
enum:
- "on"
- "off"
type: string
storageType:
description: Specify the buffering mechanism to use. It can be
memory or filesystem
enum:
- filesystem
- memory
type: string
stripUnderscores:
description: Remove the leading underscore of the Journald field
(key). For example the Journald field _PID becomes the key PID.
Expand Down Expand Up @@ -328,6 +343,14 @@ spec:
file as part of the record. The value assigned becomes the key
in the map.
type: string
pauseOnChunksOverlimit:
description: Specifies if the input plugin should be paused (stop
ingesting new data) when the storage.max_chunks_up value is
reached.
enum:
- "on"
- "off"
type: string
readFromHead:
description: For new discovered files on start (without a database
offset/position), read the content from the head of the file,
Expand All @@ -350,6 +373,13 @@ spec:
behavior and instruct Fluent Bit to skip long lines and continue
processing other lines that fits into the buffer size.
type: boolean
storageType:
description: Specify the buffering mechanism to use. It can be
memory or filesystem
enum:
- filesystem
- memory
type: string
tag:
description: Set a tag (with regex-extract fields) that will be
placed on lines read. E.g. kube.<namespace_name>.<pod_name>.<container_name>
Expand Down
Loading
Loading