Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
Merge pull request #103 from ripienaar/102
Browse files Browse the repository at this point in the history
(#102) fix setting target specific TLS
  • Loading branch information
ripienaar authored May 14, 2019
2 parents 047dcb3 + f68e980 commit 8438822
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 29 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ SSL is supported on the network connections, 2 modes of configuration exist - Pu

The examples below will show a top level `tls` key, you can also put it at a individual topic level if needed.

Here we show a root level `tls` key, but you can also add per topic configuration in case you have different CA's on each end. Additionally topics support keys `disable_target_tls` and `disable_source_tls` to selectively allow plain text connections on only one side of the topic bridge.

### Puppet Compatible

If you are a Puppet user you might want to re-use the Puppet CA, a sample SSL configuration can be seen here:
Expand Down
29 changes: 4 additions & 25 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,6 @@ type replications struct {
SecurityProvider security.Provider
}

// TopicConf is the configuration for a specific topic
type TopicConf struct {
Topic string `json:"topic"`
SourceURL string `json:"source_url"`
SourceID string `json:"source_cluster_id"`
TargetURL string `json:"target_url"`
TargetID string `json:"target_cluster_id"`
Workers int `json:"workers"`
Queued bool `json:"queued"`
QueueGroup string `json:"queue_group"`
Inspect string `json:"inspect"`
UpdateFlag string `json:"update_flag"`
MinAge string `json:"age"`
Name string `json:"name"`
MonitorPort int `json:"monitor"`
Advisory *AdvisoryConf `json:"advisory"`
TLS *TLSConf `json:"tls"`

SecurityProvider security.Provider `json:"-"`
}

// AdvisoryConf configures an advisory target
type AdvisoryConf struct {
Target string `json:"target"`
Expand Down Expand Up @@ -85,12 +64,12 @@ func Load(file string) error {
for _, t := range config.Topics {
t.SecurityProvider = config.SecurityProvider

if t.TLS == nil {
t.TLS = config.TLS
if t.TLSc == nil {
t.TLSc = config.TLS
}

if t.TLS != nil {
t.SecurityProvider, err = t.TLS.SecurityProvider()
if t.TLSc != nil {
t.SecurityProvider, err = t.TLSc.SecurityProvider()
if err != nil {
return fmt.Errorf("could not configure topic %s SSL: %s", t.Name, err)
}
Expand Down
33 changes: 33 additions & 0 deletions config/topicconf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package config

import (
security "github.com/choria-io/go-security"
)

// TopicConf is the configuration for a specific topic
type TopicConf struct {
Topic string `json:"topic"`
SourceURL string `json:"source_url"`
SourceID string `json:"source_cluster_id"`
TargetURL string `json:"target_url"`
TargetID string `json:"target_cluster_id"`
Workers int `json:"workers"`
Queued bool `json:"queued"`
QueueGroup string `json:"queue_group"`
Inspect string `json:"inspect"`
UpdateFlag string `json:"update_flag"`
MinAge string `json:"age"`
Name string `json:"name"`
MonitorPort int `json:"monitor"`
Advisory *AdvisoryConf `json:"advisory"`
TLSc *TLSConf `json:"tls"`
DisableTargetTLS bool `json:"disable_target_tls"`
DisableSourceTLS bool `json:"disable_source_tls"`

SecurityProvider security.Provider `json:"-"`
}

// TLS determines if the topic has a TLS configuration set
func (t *TopicConf) TLS() bool {
return t.TLSc == nil
}
4 changes: 3 additions & 1 deletion puppet/types/topic.pp
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,7 @@
monitor => Optional[Integer],
name => Optional[String],
advisory => Optional[Stream_replicator::Advisory],
tls => Optional[Variant[Stream_replicator::FileSSL, Stream_replicator::PuppetSSL]]
tls => Optional[Variant[Stream_replicator::FileSSL, Stream_replicator::PuppetSSL]],
disable_target_tls => Optional[Boolean],
disable_source_tls => Optional[Boolean],
}]
2 changes: 1 addition & 1 deletion replicator/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Copier struct {
// Setup validates the configuration of the copier and sets defaults where possible
func (c *Copier) Setup(name string, topic *config.TopicConf) error {
c.config = topic
c.tls = config.TLS()
c.tls = config.TLS() || topic.TLS()

if c.config.Topic == "" {
return fmt.Errorf("a topic is required")
Expand Down
16 changes: 14 additions & 2 deletions replicator/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,26 @@ func (w *worker) connect(ctx context.Context) error {
wg.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
w.from = connector.New(w.name, w.tls, connector.Source, w.config, w.log)

tls := w.tls
if w.config.DisableSourceTLS {
tls = false
}

w.from = connector.New(w.name, tls, connector.Source, w.config, w.log)
w.from.Connect(ctx)
}(wg)

wg.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
w.to = connector.New(w.name, w.tls, connector.Target, w.config, w.log)

tls := w.tls
if w.config.DisableTargetTLS {
tls = false
}

w.to = connector.New(w.name, tls, connector.Target, w.config, w.log)
w.to.Connect(ctx)
}(wg)

Expand Down

0 comments on commit 8438822

Please sign in to comment.