Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support elastic_data_stream #1190

Merged
merged 1 commit into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions apis/fluentd/v1alpha1/plugins/output/es.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package output
import "github.com/fluent/fluent-operator/v2/apis/fluentd/v1alpha1/plugins"

// Elasticsearch defines the parameters for out_es output plugin
type Elasticsearch struct {
type ElasticsearchCommon struct {
// The hostname of your Elasticsearch node (default: localhost).
Host *string `json:"host,omitempty"`
// The port number of your Elasticsearch node (default: 9200).
Expand All @@ -21,12 +21,6 @@ type Elasticsearch struct {
CloudId *plugins.Secret `json:"cloudId,omitempty"`
// Authenticate towards Elastic Cloud using cloudAuth.
CloudAuth *plugins.Secret `json:"cloudAuth,omitempty"`
// IndexName defines the placeholder syntax of Fluentd plugin API. See https://docs.fluentd.org/configuration/buffer-section.
IndexName *string `json:"indexName,omitempty"`
// If true, Fluentd uses the conventional index name format logstash-%Y.%m.%d (default: false). This option supersedes the index_name option.
LogstashFormat *bool `json:"logstashFormat,omitempty"`
// LogstashPrefix defines the logstash prefix index name to write events when logstash_format is true (default: logstash).
LogstashPrefix *string `json:"logstashPrefix,omitempty"`
// Optional, The login credentials to connect to Elasticsearch
User *plugins.Secret `json:"user,omitempty"`
// Optional, The login credentials to connect to Elasticsearch
Expand All @@ -42,3 +36,21 @@ type Elasticsearch struct {
// Optional, password for ClientKey file
ClientKeyPassword *plugins.Secret `json:"clientKeyPassword,omitempty"`
}

type Elasticsearch struct {
ElasticsearchCommon `json:",inline"`

// IndexName defines the placeholder syntax of Fluentd plugin API. See https://docs.fluentd.org/configuration/buffer-section.
IndexName *string `json:"indexName,omitempty"`
// If true, Fluentd uses the conventional index name format logstash-%Y.%m.%d (default: false). This option supersedes the index_name option.
LogstashFormat *bool `json:"logstashFormat,omitempty"`
// LogstashPrefix defines the logstash prefix index name to write events when logstash_format is true (default: logstash).
LogstashPrefix *string `json:"logstashPrefix,omitempty"`
}

type ElasticsearchDataStream struct {
ElasticsearchCommon `json:",inline"`

// You can specify Elasticsearch data stream name by this parameter. This parameter is mandatory for elasticsearch_data_stream
DataStreamName *string `json:"dataStreamName"`
}
81 changes: 56 additions & 25 deletions apis/fluentd/v1alpha1/plugins/output/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type Output struct {
Http *Http `json:"http,omitempty"`
// out_es plugin
Elasticsearch *Elasticsearch `json:"elasticsearch,omitempty"`
// out_es datastreams plugin
ElasticsearchDataStream *ElasticsearchDataStream `json:"elasticsearchDataStream,omitempty"`
// out_opensearch plugin
Opensearch *Opensearch `json:"opensearch,omitempty"`
// out_kafka plugin
Expand Down Expand Up @@ -135,6 +137,11 @@ func (o *Output) Params(loader plugins.SecretLoader) (*params.PluginStore, error
return o.elasticsearchPlugin(ps, loader)
}

if o.ElasticsearchDataStream != nil {
ps.InsertType(string(params.ElasticsearchDataStreamOutputType))
return o.elasticsearchDataStreamPlugin(ps, loader)
}

if o.Opensearch != nil {
ps.InsertType(string(params.OpensearchOutputType))
return o.opensearchPlugin(ps, loader)
Expand Down Expand Up @@ -383,65 +390,75 @@ func (o *Output) httpPlugin(parent *params.PluginStore, loader plugins.SecretLoa
return parent
}

func (o *Output) elasticsearchPlugin(parent *params.PluginStore, loader plugins.SecretLoader) (*params.PluginStore, error) {
if o.Elasticsearch.Host != nil {
parent.InsertPairs("host", fmt.Sprint(*o.Elasticsearch.Host))
func (o *Output) elasticsearchPluginCommon(common *ElasticsearchCommon, parent *params.PluginStore, loader plugins.SecretLoader) (*params.PluginStore, error) {
if common.Host != nil {
parent.InsertPairs("host", fmt.Sprint(*common.Host))
}

if o.Elasticsearch.Port != nil {
parent.InsertPairs("port", fmt.Sprint(*o.Elasticsearch.Port))
if common.Port != nil {
parent.InsertPairs("port", fmt.Sprint(*common.Port))
}

if o.Elasticsearch.Hosts != nil {
parent.InsertPairs("hosts", fmt.Sprint(*o.Elasticsearch.Hosts))
if common.Hosts != nil {
parent.InsertPairs("hosts", fmt.Sprint(*common.Hosts))
}

if o.Elasticsearch.User != nil {
user, err := loader.LoadSecret(*o.Elasticsearch.User)
if common.User != nil {
user, err := loader.LoadSecret(*common.User)
if err != nil {
return nil, err
}
parent.InsertPairs("user", user)
}

if o.Elasticsearch.Password != nil {
pwd, err := loader.LoadSecret(*o.Elasticsearch.Password)
if common.Password != nil {
pwd, err := loader.LoadSecret(*common.Password)
if err != nil {
return nil, err
}
parent.InsertPairs("password", pwd)
}

if o.Elasticsearch.SslVerify != nil {
parent.InsertPairs("ssl_verify", fmt.Sprint(*o.Elasticsearch.SslVerify))
if common.SslVerify != nil {
parent.InsertPairs("ssl_verify", fmt.Sprint(*common.SslVerify))
}

if o.Elasticsearch.CAFile != nil {
parent.InsertPairs("ca_file", fmt.Sprint(*o.Elasticsearch.CAFile))
if common.CAFile != nil {
parent.InsertPairs("ca_file", fmt.Sprint(*common.CAFile))
}

if o.Elasticsearch.ClientCert != nil {
parent.InsertPairs("client_cert", fmt.Sprint(*o.Elasticsearch.ClientCert))
if common.ClientCert != nil {
parent.InsertPairs("client_cert", fmt.Sprint(*common.ClientCert))
}

if o.Elasticsearch.ClientKey != nil {
parent.InsertPairs("client_key", fmt.Sprint(*o.Elasticsearch.ClientKey))
if common.ClientKey != nil {
parent.InsertPairs("client_key", fmt.Sprint(*common.ClientKey))
}

if o.Elasticsearch.ClientKeyPassword != nil {
pwd, err := loader.LoadSecret(*o.Elasticsearch.ClientKeyPassword)
if common.ClientKeyPassword != nil {
pwd, err := loader.LoadSecret(*common.ClientKeyPassword)
if err != nil {
return nil, err
}
parent.InsertPairs("client_key_pass", pwd)
}

if o.Elasticsearch.Scheme != nil {
parent.InsertPairs("scheme", fmt.Sprint(*o.Elasticsearch.Scheme))
if common.Scheme != nil {
parent.InsertPairs("scheme", fmt.Sprint(*common.Scheme))
}

if common.Path != nil {
parent.InsertPairs("path", fmt.Sprint(*common.Path))
}

if o.Elasticsearch.Path != nil {
parent.InsertPairs("path", fmt.Sprint(*o.Elasticsearch.Path))
return parent, nil
}

func (o *Output) elasticsearchPlugin(parent *params.PluginStore, loader plugins.SecretLoader) (*params.PluginStore, error) {

parent, err := o.elasticsearchPluginCommon(&o.Elasticsearch.ElasticsearchCommon, parent, loader)
if err != nil {
return nil, err
}

if o.Elasticsearch.IndexName != nil {
Expand All @@ -459,6 +476,20 @@ func (o *Output) elasticsearchPlugin(parent *params.PluginStore, loader plugins.
return parent, nil
}

func (o *Output) elasticsearchDataStreamPlugin(parent *params.PluginStore, loader plugins.SecretLoader) (*params.PluginStore, error) {

parent, err := o.elasticsearchPluginCommon(&o.ElasticsearchDataStream.ElasticsearchCommon, parent, loader)
if err != nil {
return nil, err
}

if o.ElasticsearchDataStream.DataStreamName != nil {
parent.InsertPairs("data_stream_name", fmt.Sprint(*o.ElasticsearchDataStream.DataStreamName))
}

return parent, nil
}

func (o *Output) opensearchPlugin(parent *params.PluginStore, loader plugins.SecretLoader) (*params.PluginStore, error) {
if o.Opensearch.Host != nil {
parent.InsertPairs("host", fmt.Sprint(*o.Opensearch.Host))
Expand Down
46 changes: 24 additions & 22 deletions apis/fluentd/v1alpha1/plugins/params/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,18 @@ const (
ParserPlugin PluginName = "parser"
StdoutPlugin PluginName = "stdout"

ReLabelPlugin PluginName = "relabel"
LabelPlugin PluginName = "label"
LabelRouterPlugin PluginName = "label_router"
S3Plugin PluginName = "s3"
KafkaPlugin PluginName = "kafka2"
ElasticsearchPlugin PluginName = "elasticsearch"
OpensearchPlugin PluginName = "opensearch"
MatchPlugin PluginName = "match"
BufferPlugin PluginName = "buffer"
CloudWatchPlugin PluginName = "cloudwatch_logs"
DatadogPlugin PluginName = "datadog"
ReLabelPlugin PluginName = "relabel"
LabelPlugin PluginName = "label"
LabelRouterPlugin PluginName = "label_router"
S3Plugin PluginName = "s3"
KafkaPlugin PluginName = "kafka2"
ElasticsearchPlugin PluginName = "elasticsearch"
ElasticsearchDataStreamPlugin PluginName = "elasticsearch_data_stream"
OpensearchPlugin PluginName = "opensearch"
MatchPlugin PluginName = "match"
BufferPlugin PluginName = "buffer"
CloudWatchPlugin PluginName = "cloudwatch_logs"
DatadogPlugin PluginName = "datadog"

BufferTag string = "tag"
LabelTag string = "tag"
Expand All @@ -60,17 +61,18 @@ const (
StdoutFilterType FilterType = "stdout"

// Enums the supported output types
ForwardOutputType OutputType = "forward"
HttpOutputType OutputType = "http"
StdOutputType OutputType = "stdout"
KafkaOutputType OutputType = "kafka2"
ElasticsearchOutputType OutputType = "elasticsearch"
OpensearchOutputType OutputType = "opensearch"
S3OutputType OutputType = "s3"
LokiOutputType OutputType = "loki"
CloudWatchOutputType OutputType = "cloudwatch_logs"
DatadogOutputType OutputType = "datadog"
CopyOutputType OutputType = "copy"
ForwardOutputType OutputType = "forward"
HttpOutputType OutputType = "http"
StdOutputType OutputType = "stdout"
KafkaOutputType OutputType = "kafka2"
ElasticsearchOutputType OutputType = "elasticsearch"
ElasticsearchDataStreamOutputType OutputType = "elasticsearch_data_stream"
OpensearchOutputType OutputType = "opensearch"
S3OutputType OutputType = "s3"
LokiOutputType OutputType = "loki"
CloudWatchOutputType OutputType = "cloudwatch_logs"
DatadogOutputType OutputType = "datadog"
CopyOutputType OutputType = "copy"
)

var (
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<source>
@type forward
bind 0.0.0.0
port 24224
</source>
<match **>
@id main
@type label_router
<route>
@label @a2170d34e9940ec56d328100e375c43e
<match>
namespaces default,kube-system
</match>
</route>
</match>
<label @a2170d34e9940ec56d328100e375c43e>
<match **>
@id ClusterFluentdConfig-cluster-fluentd-config::cluster::clusteroutput::fluentd-output-es-copy-0
@type copy
copy_mode no_copy
<store>
@id ClusterFluentdConfig-cluster-fluentd-config::cluster::clusteroutput::fluentd-output-es-copy-1
@type elasticsearch_data_stream
data_stream_name es1-notag-1
host elasticsearch-logging-data.kubesphere-logging-system.svc
password s3cr3tP@ssword
port 9243
scheme https
ssl_verify false
user s3cr3tUsern4me
<buffer>
@type memory
flush_mode immediate
</buffer>
</store>
<store>
@id ClusterFluentdConfig-cluster-fluentd-config::cluster::clusteroutput::fluentd-output-es-copy-2
@type elasticsearch_data_stream
data_stream_name es1-notag-2
host elasticsearch-logging-data.kubesphere-logging-system.svc
password s3cr3tP@ssword
port 9243
scheme https
ssl_verify false
user s3cr3tUsern4me
<buffer>
@type memory
flush_mode immediate
</buffer>
</store>
</match>
</label>
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<source>
@type forward
bind 0.0.0.0
port 24224
</source>
<match **>
@id main
@type label_router
<route>
@label @a2170d34e9940ec56d328100e375c43e
<match>
namespaces default,kube-system
</match>
</route>
</match>
<label @a2170d34e9940ec56d328100e375c43e>
<match **>
@id ClusterFluentdConfig-cluster-fluentd-config::cluster::clusteroutput::fluentd-output-es-0
@type elasticsearch_data_stream
data_stream_name test-ds
host elasticsearch-logging-data.kubesphere-logging-system.svc
port 9200
</match>
</label>
54 changes: 53 additions & 1 deletion apis/fluentd/v1alpha1/tests/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,66 @@ func Test_ClusterCfgOutput2ES(t *testing.T) {
i := 0
for i < maxRuntimes {
config, errs := psr.RenderMainConfig(false)
fmt.Println(config)
g.Expect(errs).NotTo(HaveOccurred())
g.Expect(string(getExpectedCfg("./expected/fluentd-cluster-cfg-output-es.cfg"))).To(Equal(config))

i++
}
}

func Test_ClusterCfgOutput2ESDataStream(t *testing.T) {
g := NewGomegaWithT(t)
sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{})

psr := fluentdv1alpha1.NewGlobalPluginResources("main")
psr.CombineGlobalInputsPlugins(sl, Fluentd.Spec.GlobalInputs)

clustercfgRouter, err := psr.BuildCfgRouter(&FluentdClusterFluentdConfig1)
g.Expect(err).NotTo(HaveOccurred())
clusterFilters := []fluentdv1alpha1.ClusterFilter{}
clusterOutputs := []fluentdv1alpha1.ClusterOutput{FluentdclusterOutput2ESDataStream}
clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdClusterFluentdConfig1.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, clusterFilters, clusterOutputs)
err = psr.WithCfgResources(*clustercfgRouter.Label, clustercfgResources)
g.Expect(err).NotTo(HaveOccurred())

// we should not see any permutations in serialized config
i := 0
for i < maxRuntimes {
config, errs := psr.RenderMainConfig(false)
g.Expect(errs).NotTo(HaveOccurred())
g.Expect(string(getExpectedCfg("./expected/fluentd-cluster-cfg-output-es-data-stream.cfg"))).To(Equal(config))

i++
}
}

func Test_ClusterCfgOutput2CopyESDataStream(t *testing.T) {
g := NewGomegaWithT(t)
sl := NewSecretLoader(logr.Logger{}, esCredentials)

psr := fluentdv1alpha1.NewGlobalPluginResources("main")
psr.CombineGlobalInputsPlugins(sl, Fluentd.Spec.GlobalInputs)

clustercfgRouter, err := psr.BuildCfgRouter(&FluentdClusterFluentdConfig1)
g.Expect(err).NotTo(HaveOccurred())
clusterFilters := []fluentdv1alpha1.ClusterFilter{}
clusterOutputs := []fluentdv1alpha1.ClusterOutput{FluentdclusterOutput2CopyESDataStream}
clustercfgResources, _ := psr.PatchAndFilterClusterLevelResources(sl, FluentdClusterFluentdConfig1.GetCfgId(), []fluentdv1alpha1.ClusterInput{}, clusterFilters, clusterOutputs)
err = psr.IdentifyCopyAndPatchOutput(clustercfgResources)
err = psr.WithCfgResources(*clustercfgRouter.Label, clustercfgResources)
g.Expect(err).NotTo(HaveOccurred())

// we should not see any permutations in serialized config
i := 0
for i < maxRuntimes {
config, errs := psr.RenderMainConfig(false)
g.Expect(errs).NotTo(HaveOccurred())
g.Expect(string(getExpectedCfg("./expected/fluentd-cluster-cfg-output-copy-es-data-stream.cfg"))).To(Equal(config))

i++
}
}

func Test_Cfg2OpenSearch(t *testing.T) {
g := NewGomegaWithT(t)
sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{})
Expand Down
Loading
Loading