From f68e980ff4fc9c3b466d174918c39c2b10dca34f Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Tue, 14 May 2019 10:27:31 +0200 Subject: [PATCH] (#102) fix setting target specific TLS --- README.md | 2 ++ config/config.go | 29 ++++------------------------- config/topicconf.go | 33 +++++++++++++++++++++++++++++++++ puppet/types/topic.pp | 4 +++- replicator/replicator.go | 2 +- replicator/worker.go | 16 ++++++++++++++-- 6 files changed, 57 insertions(+), 29 deletions(-) create mode 100644 config/topicconf.go diff --git a/README.md b/README.md index f6fb7d1..005f26b 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,8 @@ SSL is supported on the network connections, 2 modes of configuration exist - Pu The examples below will show a top level `tls` key, you can also put it at a individual topic level if needed. +Here we show a root level `tls` key, but you can also add per topic configuration in case you have different CA's on each end. Additionally topics support keys `disable_target_tls` and `disable_source_tls` to selectively allow plain text connections on only one side of the topic bridge. + ### Puppet Compatible If you are a Puppet user you might want to re-use the Puppet CA, a sample SSL configuration can be seen here: diff --git a/config/config.go b/config/config.go index a012fdf..a2a7d56 100644 --- a/config/config.go +++ b/config/config.go @@ -21,27 +21,6 @@ type replications struct { SecurityProvider security.Provider } -// TopicConf is the configuration for a specific topic -type TopicConf struct { - Topic string `json:"topic"` - SourceURL string `json:"source_url"` - SourceID string `json:"source_cluster_id"` - TargetURL string `json:"target_url"` - TargetID string `json:"target_cluster_id"` - Workers int `json:"workers"` - Queued bool `json:"queued"` - QueueGroup string `json:"queue_group"` - Inspect string `json:"inspect"` - UpdateFlag string `json:"update_flag"` - MinAge string `json:"age"` - Name string `json:"name"` - MonitorPort int `json:"monitor"` - Advisory *AdvisoryConf `json:"advisory"` - TLS *TLSConf `json:"tls"` - - SecurityProvider security.Provider `json:"-"` -} - // AdvisoryConf configures an advisory target type AdvisoryConf struct { Target string `json:"target"` @@ -85,12 +64,12 @@ func Load(file string) error { for _, t := range config.Topics { t.SecurityProvider = config.SecurityProvider - if t.TLS == nil { - t.TLS = config.TLS + if t.TLSc == nil { + t.TLSc = config.TLS } - if t.TLS != nil { - t.SecurityProvider, err = t.TLS.SecurityProvider() + if t.TLSc != nil { + t.SecurityProvider, err = t.TLSc.SecurityProvider() if err != nil { return fmt.Errorf("could not configure topic %s SSL: %s", t.Name, err) } diff --git a/config/topicconf.go b/config/topicconf.go new file mode 100644 index 0000000..468498f --- /dev/null +++ b/config/topicconf.go @@ -0,0 +1,33 @@ +package config + +import ( + security "github.com/choria-io/go-security" +) + +// TopicConf is the configuration for a specific topic +type TopicConf struct { + Topic string `json:"topic"` + SourceURL string `json:"source_url"` + SourceID string `json:"source_cluster_id"` + TargetURL string `json:"target_url"` + TargetID string `json:"target_cluster_id"` + Workers int `json:"workers"` + Queued bool `json:"queued"` + QueueGroup string `json:"queue_group"` + Inspect string `json:"inspect"` + UpdateFlag string `json:"update_flag"` + MinAge string `json:"age"` + Name string `json:"name"` + MonitorPort int `json:"monitor"` + Advisory *AdvisoryConf `json:"advisory"` + TLSc *TLSConf `json:"tls"` + DisableTargetTLS bool `json:"disable_target_tls"` + DisableSourceTLS bool `json:"disable_source_tls"` + + SecurityProvider security.Provider `json:"-"` +} + +// TLS determines if the topic has a TLS configuration set +func (t *TopicConf) TLS() bool { + return t.TLSc == nil +} diff --git a/puppet/types/topic.pp b/puppet/types/topic.pp index ecb4f7d..6bcb5c2 100644 --- a/puppet/types/topic.pp +++ b/puppet/types/topic.pp @@ -12,5 +12,7 @@ monitor => Optional[Integer], name => Optional[String], advisory => Optional[Stream_replicator::Advisory], - tls => Optional[Variant[Stream_replicator::FileSSL, Stream_replicator::PuppetSSL]] + tls => Optional[Variant[Stream_replicator::FileSSL, Stream_replicator::PuppetSSL]], + disable_target_tls => Optional[Boolean], + disable_source_tls => Optional[Boolean], }] diff --git a/replicator/replicator.go b/replicator/replicator.go index 317d53d..228c46f 100644 --- a/replicator/replicator.go +++ b/replicator/replicator.go @@ -27,7 +27,7 @@ type Copier struct { // Setup validates the configuration of the copier and sets defaults where possible func (c *Copier) Setup(name string, topic *config.TopicConf) error { c.config = topic - c.tls = config.TLS() + c.tls = config.TLS() || topic.TLS() if c.config.Topic == "" { return fmt.Errorf("a topic is required") diff --git a/replicator/worker.go b/replicator/worker.go index de2d737..793c938 100644 --- a/replicator/worker.go +++ b/replicator/worker.go @@ -121,14 +121,26 @@ func (w *worker) connect(ctx context.Context) error { wg.Add(1) go func(wg *sync.WaitGroup) { defer wg.Done() - w.from = connector.New(w.name, w.tls, connector.Source, w.config, w.log) + + tls := w.tls + if w.config.DisableSourceTLS { + tls = false + } + + w.from = connector.New(w.name, tls, connector.Source, w.config, w.log) w.from.Connect(ctx) }(wg) wg.Add(1) go func(wg *sync.WaitGroup) { defer wg.Done() - w.to = connector.New(w.name, w.tls, connector.Target, w.config, w.log) + + tls := w.tls + if w.config.DisableTargetTLS { + tls = false + } + + w.to = connector.New(w.name, tls, connector.Target, w.config, w.log) w.to.Connect(ctx) }(wg)