diff --git a/manifests/4.5/logforwardings.crd.yaml b/manifests/4.5/logforwardings.crd.yaml index b4876cd4a1..da3b6a8b43 100644 --- a/manifests/4.5/logforwardings.crd.yaml +++ b/manifests/4.5/logforwardings.crd.yaml @@ -29,6 +29,7 @@ spec: enum: - elasticsearch - forward + - syslog name: description: The name of the output type: string diff --git a/pkg/apis/logging/v1alpha1/forwarding_types.go b/pkg/apis/logging/v1alpha1/forwarding_types.go index 21f8d46197..3cbe9e73c9 100644 --- a/pkg/apis/logging/v1alpha1/forwarding_types.go +++ b/pkg/apis/logging/v1alpha1/forwarding_types.go @@ -74,6 +74,9 @@ const ( //OutputTypeForward configures the pipeline to send messages via Fluent's secure forward OutputTypeForward OutputType = "forward" + + //OutputTypeSyslog configures pipeline to send messages to an external syslog server through docebo/fluent-plugin-remote-syslog + OutputTypeSyslog OutputType = "syslog" ) //LogForwardingReason The reason for the current state diff --git a/pkg/generators/factory.go b/pkg/generators/factory.go index c5efd21f6c..7cb93edf58 100644 --- a/pkg/generators/factory.go +++ b/pkg/generators/factory.go @@ -13,13 +13,7 @@ type Generator struct { //New creates an instance of a template engine for a set of templates func New(name string, addFunctions *template.FuncMap, templates ...string) (*Generator, error) { - allFunctions := funcMap - if addFunctions != nil { - for name, f := range *addFunctions { - allFunctions[name] = f - } - } - tmpl := template.New(name).Funcs(funcMap) + tmpl := template.New(name).Funcs(*addFunctions).Funcs(funcMap) var err error for i, s := range templates { tmpl, err = tmpl.Parse(s) diff --git a/pkg/generators/forwarding/fluentd/fluent_conf.go b/pkg/generators/forwarding/fluentd/fluent_conf.go index 1404383233..c4ccb6e0c1 100644 --- a/pkg/generators/forwarding/fluentd/fluent_conf.go +++ b/pkg/generators/forwarding/fluentd/fluent_conf.go @@ -11,6 +11,7 @@ import ( ) var replacer = strings.NewReplacer(" ", "_", "-", "_", ".", "_") +var protocolSeparator = "://" type outputLabelConf struct { Name string @@ -44,17 +45,34 @@ func (conf *outputLabelConf) Template() *template.Template { return conf.TemplateContext } func (conf *outputLabelConf) Host() string { - return strings.Split(conf.Target.Endpoint, ":")[0] + endpoint := stripProtocol(conf.Target.Endpoint) + return strings.Split(endpoint, ":")[0] } func (conf *outputLabelConf) Port() string { - parts := strings.Split(conf.Target.Endpoint, ":") + endpoint := stripProtocol(conf.Target.Endpoint) + parts := strings.Split(endpoint, ":") if len(parts) == 1 { return "9200" } return parts[1] } +func (conf *outputLabelConf) Protocol() string { + endpoint := conf.Target.Endpoint + if index := strings.Index(endpoint, protocolSeparator); index != -1 { + return endpoint[:index] + } + return "" +} + +func stripProtocol(endpoint string) string { + if index := strings.Index(endpoint, protocolSeparator); index != -1 { + endpoint = endpoint[index+len(protocolSeparator):] + } + return endpoint +} + func (conf *outputLabelConf) BufferPath() string { return fmt.Sprintf("/var/lib/fluentd/%s", conf.StoreID()) } diff --git a/pkg/generators/forwarding/fluentd/generators.go b/pkg/generators/forwarding/fluentd/generators.go index e79770b16b..fc16603757 100644 --- a/pkg/generators/forwarding/fluentd/generators.go +++ b/pkg/generators/forwarding/fluentd/generators.go @@ -202,6 +202,9 @@ func (engine *ConfigGenerator) generateOutputLabelBlocks(outputs []logforward.Ou case logforward.OutputTypeForward: storeTemplateName = "forward" outputTemplateName = "outputLabelConfNoCopy" + case logforward.OutputTypeSyslog: + storeTemplateName = "storeSyslog" + outputTemplateName = "outputLabelConfNoRetry" default: logger.Warnf("Pipeline targets include an unrecognized type: %q", output.Type) continue diff --git a/pkg/generators/forwarding/fluentd/output_conf_syslog_test.go b/pkg/generators/forwarding/fluentd/output_conf_syslog_test.go new file mode 100644 index 0000000000..16754f31a6 --- /dev/null +++ b/pkg/generators/forwarding/fluentd/output_conf_syslog_test.go @@ -0,0 +1,108 @@ +package fluentd + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + logging "github.com/openshift/cluster-logging-operator/pkg/apis/logging/v1alpha1" + test "github.com/openshift/cluster-logging-operator/test" +) + +var _ = Describe("Generating external syslog server output store config blocks", func() { + + var ( + err error + outputs []logging.OutputSpec + generator *ConfigGenerator + ) + BeforeEach(func() { + generator, err = NewConfigGenerator(false, false) + Expect(err).To(BeNil()) + }) + + Context("based on syslog plugin", func() { + tcpConf := `` + + udpConf := `` + + Context("for protocol-less endpoint", func() { + BeforeEach(func() { + outputs = []logging.OutputSpec{ + { + Type: logging.OutputTypeSyslog, + Name: "syslog-receiver", + Endpoint: "sl.svc.messaging.cluster.local:9654", + }, + } + }) + It("should produce well formed output label config", func() { + results, err := generator.generateOutputLabelBlocks(outputs) + Expect(err).To(BeNil()) + Expect(len(results)).To(Equal(1)) + test.Expect(results[0]).ToEqual(tcpConf) + }) + }) + + Context("for tcp endpoint", func() { + BeforeEach(func() { + outputs = []logging.OutputSpec{ + { + Type: logging.OutputTypeSyslog, + Name: "syslog-receiver", + Endpoint: "tcp://sl.svc.messaging.cluster.local:9654", + }, + } + }) + It("should produce well formed output label config", func() { + results, err := generator.generateOutputLabelBlocks(outputs) + Expect(err).To(BeNil()) + Expect(len(results)).To(Equal(1)) + test.Expect(results[0]).ToEqual(tcpConf) + }) + }) + + Context("for udp endpoint", func() { + BeforeEach(func() { + outputs = []logging.OutputSpec{ + { + Type: logging.OutputTypeSyslog, + Name: "syslog-receiver", + Endpoint: "udp://sl.svc.messaging.cluster.local:9654", + }, + } + }) + It("should produce well formed output label config", func() { + results, err := generator.generateOutputLabelBlocks(outputs) + Expect(err).To(BeNil()) + Expect(len(results)).To(Equal(1)) + test.Expect(results[0]).ToEqual(udpConf) + }) + }) + }) +}) diff --git a/pkg/generators/forwarding/fluentd/syslog_conf.go b/pkg/generators/forwarding/fluentd/syslog_conf.go new file mode 100644 index 0000000000..e5c41c797c --- /dev/null +++ b/pkg/generators/forwarding/fluentd/syslog_conf.go @@ -0,0 +1,8 @@ +package fluentd + +func (conf *outputLabelConf) SyslogPlugin() string { + if protocol := conf.Protocol(); protocol == "udp" { + return "syslog" + } + return "syslog_buffered" +} diff --git a/pkg/generators/forwarding/fluentd/templates.go b/pkg/generators/forwarding/fluentd/templates.go index 6eaa6842dc..a5a22f8bc1 100644 --- a/pkg/generators/forwarding/fluentd/templates.go +++ b/pkg/generators/forwarding/fluentd/templates.go @@ -12,8 +12,10 @@ var templateRegistry = []string{ sourceToPipelineCopyTemplate, outputLabelConfTemplate, outputLabelConfNocopyTemplate, + outputLabelConfNoretryTemplate, storeElasticsearchTemplate, forwardTemplate, + storeSyslogTemplate, } const fluentConfTemplate = `{{- define "fluentConf" }} @@ -484,6 +486,15 @@ const outputLabelConfNocopyTemplate = `{{- define "outputLabelConfNoCopy" }} {{- end}}` +const outputLabelConfNoretryTemplate = `{{- define "outputLabelConfNoRetry" }} + +{{- end}}` + const forwardTemplate = `{{- define "forward" }} # https://docs.fluentd.org/v1.0/articles/in_forward @type forward @@ -573,3 +584,15 @@ const storeElasticsearchTemplate = `{{- define "storeElasticsearch" }} {{- end}}` + +const storeSyslogTemplate = `{{- define "storeSyslog" }} + + @type {{.SyslogPlugin}} + @id {{.StoreID}} + remote_syslog {{.Host}} + port {{.Port}} + hostname ${hostname} + facility user + severity debug + +{{- end}}` diff --git a/pkg/k8shandler/forwarding.go b/pkg/k8shandler/forwarding.go index 9164ebe437..c4db656d51 100644 --- a/pkg/k8shandler/forwarding.go +++ b/pkg/k8shandler/forwarding.go @@ -26,7 +26,7 @@ const ( ) var ( - outputTypes = sets.NewString(string(logforward.OutputTypeElasticsearch), string(logforward.OutputTypeForward)) + outputTypes = sets.NewString(string(logforward.OutputTypeElasticsearch), string(logforward.OutputTypeForward), string(logforward.OutputTypeSyslog)) sourceTypes = sets.NewString(string(logforward.LogSourceTypeApp), string(logforward.LogSourceTypeInfra), string(logforward.LogSourceTypeAudit)) ) diff --git a/test/e2e/logforwarding/syslog/deleteme.go b/test/e2e/logforwarding/syslog/deleteme.go deleted file mode 100644 index 337a6fc2fa..0000000000 --- a/test/e2e/logforwarding/syslog/deleteme.go +++ /dev/null @@ -1,3 +0,0 @@ -package syslog - -// placeholder to make compiler happy diff --git a/test/e2e/logforwarding/syslog/forward_to_syslog_test.go b/test/e2e/logforwarding/syslog/forward_to_syslog_test.go new file mode 100644 index 0000000000..38603f6016 --- /dev/null +++ b/test/e2e/logforwarding/syslog/forward_to_syslog_test.go @@ -0,0 +1,144 @@ +package fluent + +import ( + "fmt" + "runtime" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + apps "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + + logforward "github.com/openshift/cluster-logging-operator/pkg/apis/logging/v1alpha1" + "github.com/openshift/cluster-logging-operator/pkg/logger" + "github.com/openshift/cluster-logging-operator/test/helpers" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var _ = Describe("LogForwarding", func() { + _, filename, _, _ := runtime.Caller(0) + logger.Infof("Running %s", filename) + var ( + err error + syslogDeployment *apps.Deployment + e2e = helpers.NewE2ETestFramework() + ) + BeforeEach(func() { + if err := e2e.DeployLogGenerator(); err != nil { + logger.Errorf("unable to deploy log generator. E: %s", err.Error()) + } + }) + Describe("when ClusterLogging is configured with 'forwarding' to an external syslog server", func() { + + Context("with the syslog plugin", func() { + + Context("and tcp receiver", func() { + + BeforeEach(func() { + if syslogDeployment, err = e2e.DeploySyslogReceiver(corev1.ProtocolTCP); err != nil { + Fail(fmt.Sprintf("Unable to deploy syslog receiver: %v", err)) + } + cr := helpers.NewClusterLogging(helpers.ComponentTypeCollector) + if err := e2e.CreateClusterLogging(cr); err != nil { + Fail(fmt.Sprintf("Unable to create an instance of cluster logging: %v", err)) + } + forwarding := &logforward.LogForwarding{ + TypeMeta: metav1.TypeMeta{ + Kind: logforward.LogForwardingKind, + APIVersion: logforward.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "instance", + }, + Spec: logforward.ForwardingSpec{ + Outputs: []logforward.OutputSpec{ + logforward.OutputSpec{ + Name: syslogDeployment.ObjectMeta.Name, + Type: logforward.OutputTypeSyslog, + Endpoint: fmt.Sprintf("%s.%s.svc:24224", syslogDeployment.ObjectMeta.Name, syslogDeployment.Namespace), + }, + }, + Pipelines: []logforward.PipelineSpec{ + logforward.PipelineSpec{ + Name: "test-infra", + OutputRefs: []string{syslogDeployment.ObjectMeta.Name}, + SourceType: logforward.LogSourceTypeInfra, + }, + }, + }, + } + if err := e2e.CreateLogForwarding(forwarding); err != nil { + Fail(fmt.Sprintf("Unable to create an instance of logforwarding: %v", err)) + } + components := []helpers.LogComponentType{helpers.ComponentTypeCollector} + for _, component := range components { + if err := e2e.WaitFor(component); err != nil { + Fail(fmt.Sprintf("Failed waiting for component %s to be ready: %v", component, err)) + } + } + }) + + It("should send logs to the forward.Output logstore", func() { + Expect(e2e.LogStore.HasInfraStructureLogs(helpers.DefaultWaitForLogsTimeout)).To(BeTrue(), "Expected to find stored infrastructure logs") + }) + }) + + Context("and udp receiver", func() { + + BeforeEach(func() { + if syslogDeployment, err = e2e.DeploySyslogReceiver(corev1.ProtocolUDP); err != nil { + Fail(fmt.Sprintf("Unable to deploy syslog receiver: %v", err)) + } + cr := helpers.NewClusterLogging(helpers.ComponentTypeCollector) + if err := e2e.CreateClusterLogging(cr); err != nil { + Fail(fmt.Sprintf("Unable to create an instance of cluster logging: %v", err)) + } + forwarding := &logforward.LogForwarding{ + TypeMeta: metav1.TypeMeta{ + Kind: logforward.LogForwardingKind, + APIVersion: logforward.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "instance", + }, + Spec: logforward.ForwardingSpec{ + Outputs: []logforward.OutputSpec{ + logforward.OutputSpec{ + Name: syslogDeployment.ObjectMeta.Name, + Type: logforward.OutputTypeSyslog, + Endpoint: fmt.Sprintf("udp://%s.%s.svc:24224", syslogDeployment.ObjectMeta.Name, syslogDeployment.Namespace), + }, + }, + Pipelines: []logforward.PipelineSpec{ + logforward.PipelineSpec{ + Name: "test-infra", + OutputRefs: []string{syslogDeployment.ObjectMeta.Name}, + SourceType: logforward.LogSourceTypeInfra, + }, + }, + }, + } + if err := e2e.CreateLogForwarding(forwarding); err != nil { + Fail(fmt.Sprintf("Unable to create an instance of logforwarding: %v", err)) + } + components := []helpers.LogComponentType{helpers.ComponentTypeCollector} + for _, component := range components { + if err := e2e.WaitFor(component); err != nil { + Fail(fmt.Sprintf("Failed waiting for component %s to be ready: %v", component, err)) + } + } + }) + + It("should send logs to the forward.Output logstore", func() { + Expect(e2e.LogStore.HasInfraStructureLogs(helpers.DefaultWaitForLogsTimeout)).To(BeTrue(), "Expected to find stored infrastructure logs") + }) + }) + }) + + AfterEach(func() { + e2e.Cleanup() + }) + + }) + +}) diff --git a/test/e2e/logforwarding/syslog/logforwarding_suite_test.go b/test/e2e/logforwarding/syslog/logforwarding_suite_test.go new file mode 100644 index 0000000000..7d044173c3 --- /dev/null +++ b/test/e2e/logforwarding/syslog/logforwarding_suite_test.go @@ -0,0 +1,13 @@ +package fluent + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestLogForwarding(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "LogForwarding Integration E2E Suite - Forward to syslog") +}