From 86a8ab2bf50f26a557131076281f8753a727e2c2 Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Mon, 2 Jul 2018 10:07:17 +0300 Subject: [PATCH 1/2] Revert "(#85) handle STAN connection errors" This reverts commit c7404456643d5a6e7f9cceda50861f3e8774a37d. --- README.md | 6 +----- advisor/advisor.go | 10 ++++------ advisor/advisor_test.go | 24 +++++++----------------- cmd/cmd.go | 26 +++----------------------- connector/connector.go | 27 +++++++++++++-------------- connector/connector_test.go | 3 +-- glide.lock | 22 +++++++++++----------- glide.yaml | 3 ++- replicator/replicator.go | 8 +++----- replicator/worker.go | 18 +++++++----------- 10 files changed, 52 insertions(+), 95 deletions(-) diff --git a/README.md b/README.md index 4a9d4b5..f6fb7d1 100644 --- a/README.md +++ b/README.md @@ -15,10 +15,6 @@ When there are many workers or it's specified to belong to a queue group a Durab First time it connects it attempts to replicate all messages, as the subscription is Durable it will from then on continue where it left off. -## Requirements - -For reliable operation where connection issues get correctly detected NATS Streaming Server version 0.10.0 or newer is required - ## Status This is a pretty new project that is being used in production, however as it is new and developing use with caution. I'd love any feedback you might have - especially design ideas about multi worker order preserving replication! @@ -374,4 +370,4 @@ A Puppet module to install and manage the Stream Replicator can be found on the ## Thanks - \ No newline at end of file + \ No newline at end of file diff --git a/advisor/advisor.go b/advisor/advisor.go index b6e47ac..7564963 100644 --- a/advisor/advisor.go +++ b/advisor/advisor.go @@ -97,7 +97,7 @@ func Configure(tls bool, c *config.TopicConf) error { } // Connect initiates the connection to NATS Streaming -func Connect(ctx context.Context, wg *sync.WaitGroup, reconn chan string) { +func Connect(ctx context.Context, wg *sync.WaitGroup) { mu.Lock() defer mu.Unlock() @@ -106,7 +106,7 @@ func Connect(ctx context.Context, wg *sync.WaitGroup, reconn chan string) { } log.Debug("Starting advisor connection") - connect(ctx, reconn) + connect(ctx) log.Debug("Starting advisor publisher") wg.Add(1) @@ -234,7 +234,7 @@ func newAdvisory(id string, event EventType) AgeAdvisoryV1 { } } -func connect(ctx context.Context, reconn chan string) { +func connect(ctx context.Context) { var c *connector.Connection if conf.Advisory.Cluster == "source" { @@ -245,9 +245,7 @@ func connect(ctx context.Context, reconn chan string) { c = connector.New(name, natstls, connector.Target, conf, log) } - conn = c.Connect(ctx, func(_ stan.Conn, reason error) { - reconn <- fmt.Sprintf("advisory stream disconnected: %s", reason) - }) + conn = c.Connect(ctx) } func publisher(ctx context.Context, wg *sync.WaitGroup) { diff --git a/advisor/advisor_test.go b/advisor/advisor_test.go index a38979c..688a505 100644 --- a/advisor/advisor_test.go +++ b/advisor/advisor_test.go @@ -70,7 +70,7 @@ var _ = Describe("Advisor", func() { defer left.Shutdown() conf.Advisory.Cluster = "source" - connect(ctx, make(chan string, 1)) + connect(ctx) Expect(conn.NatsConn().ConnectedUrl()).To(Equal("nats://localhost:34222")) }) @@ -87,7 +87,7 @@ var _ = Describe("Advisor", func() { defer right.Shutdown() conf.Advisory.Cluster = "target" - connect(ctx, make(chan string, 1)) + connect(ctx) Expect(conn.NatsConn().ConnectedUrl()).To(Equal("nats://localhost:44222")) }) @@ -161,11 +161,9 @@ var _ = Describe("Advisor", func() { Expect(seen).To(BeEmpty()) - mu.Lock() seen["old"] = time.Now().Add(-1 * time.Hour) seen["expired"] = time.Now().Add(-3 * time.Hour) seen["new"] = time.Now() - mu.Unlock() Expect(out).To(HaveLen(0)) @@ -173,21 +171,13 @@ var _ = Describe("Advisor", func() { Expect(out).To(HaveLen(2)) - // ordering isnt guaranteed using a map here - // allows us to check both at least got fired - // uniquely, previoulsy this test would not use - // a map and was failing intermitantly - msgs := make(map[string]AgeAdvisoryV1) msg := <-out - msgs[msg.Value] = msg - msg = <-out - msgs[msg.Value] = msg - - Expect(msgs["old"].Value).To(Equal("old")) - Expect(msgs["old"].Event).To(Equal(Timeout)) + Expect(msg.Value).To(Equal("old")) + Expect(msg.Event).To(Equal(Timeout)) - Expect(msgs["expired"].Value).To(Equal("expired")) - Expect(msgs["expired"].Event).To(Equal(Expired)) + msg = <-out + Expect(msg.Value).To(Equal("expired")) + Expect(msg.Event).To(Equal(Expired)) _, found := advised["old"] Expect(found).To(BeTrue()) diff --git a/cmd/cmd.go b/cmd/cmd.go index 14ce32e..7a6c12a 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -34,8 +34,6 @@ var ( enrollIdentity string enrollCA string enrollDir string - - reconn chan string ) func Run() { @@ -57,8 +55,6 @@ func Run() { command := kingpin.MustParse(app.Parse(os.Args[1:])) - reconn = make(chan string, 5) - ctx, cancel = context.WithCancel(context.Background()) defer cancel() @@ -122,7 +118,7 @@ func runReplicate() { os.Exit(1) } - go interruptHandler(wg) + go interruptHandler() writePID(pidfile) @@ -145,31 +141,15 @@ func writePID(pidfile string) { } } -func interruptHandler(wg *sync.WaitGroup) { - defer wg.Done() - +func interruptHandler() { sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) for { select { - case reason := <-reconn: - logrus.Errorf("Restarting replicator after: %s", reason) - - // stops everything and sleep a bit to give state saves a bit of time etc - cancel() - time.Sleep(1 * time.Second) - - err := syscall.Exec(os.Args[0], os.Args, os.Environ()) - if err != nil { - logrus.Errorf("Could not restart Stream Replicator: %s", err) - } - case sig := <-sigs: logrus.Infof("Shutting down on %s", sig) cancel() - return - case <-ctx.Done(): return } @@ -177,7 +157,7 @@ func interruptHandler(wg *sync.WaitGroup) { } func startReplicator(ctx context.Context, wg *sync.WaitGroup, done chan int, topic *config.TopicConf, topicname string) { - err := rep.Setup(topicname, topic, reconn) + err := rep.Setup(topicname, topic) if err != nil { logrus.Errorf("Could not configure Replicator: %s", err) return diff --git a/connector/connector.go b/connector/connector.go index ca842ac..0fffbe7 100644 --- a/connector/connector.go +++ b/connector/connector.go @@ -13,14 +13,13 @@ import ( // Connection holds a connection to NATS Streaming type Connection struct { - url string - log *logrus.Entry - conn stan.Conn - natsConn *nats.Conn - name string - cfg *config.TopicConf - id string - tls bool + url string + log *logrus.Entry + conn stan.Conn + name string + cfg *config.TopicConf + id string + tls bool } // Direction indicates which of the connectors to connect to @@ -52,15 +51,15 @@ func New(name string, tls bool, dir Direction, cfg *config.TopicConf, logger *lo } // Connect connects to the configured stream -func (c *Connection) Connect(ctx context.Context, cb func(stan.Conn, error)) stan.Conn { - c.conn = c.connectSTAN(ctx, c.id, c.name, c.url, cb) +func (c *Connection) Connect(ctx context.Context) stan.Conn { + c.conn = c.connectSTAN(ctx, c.id, c.name, c.url) return c.conn } -func (c *Connection) connectSTAN(ctx context.Context, cid string, name string, urls string, cb func(stan.Conn, error)) stan.Conn { - c.natsConn = c.connectNATS(ctx, name, urls) - if c.natsConn == nil { +func (c *Connection) connectSTAN(ctx context.Context, cid string, name string, urls string) stan.Conn { + n := c.connectNATS(ctx, name, urls) + if n == nil { c.log.Errorf("%s NATS connection could not be established, cannot connect to the Stream", name) return nil } @@ -72,7 +71,7 @@ func (c *Connection) connectSTAN(ctx context.Context, cid string, name string, u for { try++ - conn, err = stan.Connect(cid, name, stan.NatsConn(c.natsConn), stan.SetConnectionLostHandler(cb)) + conn, err = stan.Connect(cid, name, stan.NatsConn(n)) if err != nil { c.log.Warnf("%s initial connection to the NATS Streaming broker cluster failed: %s", name, err) diff --git a/connector/connector_test.go b/connector/connector_test.go index b3e3cd6..1cfe268 100644 --- a/connector/connector_test.go +++ b/connector/connector_test.go @@ -8,7 +8,6 @@ import ( "github.com/choria-io/stream-replicator/config" conntest "github.com/choria-io/stream-replicator/connector/test" - "github.com/nats-io/go-nats-streaming" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/sirupsen/logrus" @@ -70,7 +69,7 @@ var _ = Describe("Connector", func() { defer left.Shutdown() c := New("testcon", false, Source, conf, log) - con := c.Connect(ctx, func(_ stan.Conn, reason error) {}) + con := c.Connect(ctx) defer con.Close() }) }) diff --git a/glide.lock b/glide.lock index ad59887..3694ce2 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 283e62924a46cfca27e07204e62171df02d36dc739c881eb421b2fc1c706b5d8 -updated: 2018-06-25T11:35:38.17691+03:00 +hash: f7c6a83b9b77ae4741b048388febf0defb7b1e811f52a257b82eb5982eb64392 +updated: 2018-05-30T10:41:02.980026+02:00 imports: - name: github.com/alecthomas/template version: a0175ee3bccc567396460bf5acd36800cb10c49c @@ -18,7 +18,7 @@ imports: version: 2f1ce7a837dcb8da3ec595b1dac9d0632f0f99e8 repo: https://github.com/boltdb/bolt - name: github.com/choria-io/go-choria - version: bb84cbd37539a2cdbf890c843ef7c6afc54c6964 + version: a3b74b8d7769db8ec227d4cf0a75aaebb28ad2fd subpackages: - build - config @@ -28,7 +28,7 @@ imports: - name: github.com/choria-io/go-confkey version: 634d2c41860498507b0c7eeb2f8a33b1aa6a809d - name: github.com/choria-io/go-security - version: a221c72ee79ab400346d38837d7e8b289857247b + version: 615ff9a3ca0cabc08289e491ef8f57b6a51031a7 subpackages: - filesec - puppetsec @@ -55,7 +55,7 @@ imports: - proto - protoc-gen-gogo/descriptor - name: github.com/golang/protobuf - version: 9eb2c01ac278a5d89ce4b2be68fe4500955d8179 + version: df1d3ca07d2d07bba352d5b73c4313b4e2a6203e subpackages: - proto - name: github.com/hashicorp/go-immutable-radix @@ -97,11 +97,11 @@ imports: - encoders/builtin - util - name: github.com/nats-io/go-nats-streaming - version: e15a53f85e4932540600a16b56f6c4f65f58176f + version: 6e620057a207bd61e992c1c5b6a2de7b6a4cb010 subpackages: - pb - name: github.com/nats-io/nats-streaming-server - version: 6027b20cd943a9fa598a664e4c8d7514fe684345 + version: 6026da1b7c444bf9f1d1b1d3ed14e435aabf02bc subpackages: - logger - server @@ -109,14 +109,14 @@ imports: - stores - util - name: github.com/nats-io/nuid - version: 3e58d42c9cfe5cd9429f1a21ad8f35cd859ba829 + version: 28b996b57a46dd0c2aa3a3dc7fa8780878331d00 - name: github.com/prometheus/client_golang version: 967789050ba94deca04a5e84cce8ad472ce313c1 subpackages: - prometheus - prometheus/promhttp - name: github.com/prometheus/client_model - version: 99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c + version: 6f3806018612930941127f2a7c6c453ba2c527d2 subpackages: - go - name: github.com/prometheus/common @@ -132,7 +132,7 @@ imports: - name: github.com/sirupsen/logrus version: c155da19408a8799da419ed3eeb0cb5db0ad5dbc - name: github.com/tidwall/gjson - version: f123b340873a0084cb27267eddd8ff615115fbff + version: 01f00f129617a6fe98941fb920d6c760241b54d2 - name: github.com/tidwall/match version: 1731857f09b1f38450e2c12409748407822dc6be - name: golang.org/x/crypto @@ -199,7 +199,7 @@ testImports: - html/atom - html/charset - name: golang.org/x/text - version: 5cec4b58c438bd98288aeb248bab2c1840713d21 + version: 5c1cf69b5978e5a34c5f9ba09a83e56acc4b7877 subpackages: - encoding - encoding/charmap diff --git a/glide.yaml b/glide.yaml index 78ee757..fa06aa7 100644 --- a/glide.yaml +++ b/glide.yaml @@ -5,8 +5,9 @@ import: - package: github.com/nats-io/go-nats version: ^1 - package: github.com/nats-io/go-nats-streaming - version: '>= 0.4.0' + version: '*' - package: github.com/prometheus/client_golang + version: '>= 0.9.0-pre1' subpackages: - prometheus - prometheus/promhttp diff --git a/replicator/replicator.go b/replicator/replicator.go index 22f9d19..317d53d 100644 --- a/replicator/replicator.go +++ b/replicator/replicator.go @@ -22,14 +22,12 @@ type Copier struct { Log *logrus.Entry ctx context.Context cancel func() - reconn chan string } // Setup validates the configuration of the copier and sets defaults where possible -func (c *Copier) Setup(name string, topic *config.TopicConf, reconn chan string) error { +func (c *Copier) Setup(name string, topic *config.TopicConf) error { c.config = topic c.tls = config.TLS() - c.reconn = reconn if c.config.Topic == "" { return fmt.Errorf("a topic is required") @@ -90,13 +88,13 @@ func (c *Copier) Run(ctx context.Context, wg *sync.WaitGroup) { return } - advisor.Connect(c.ctx, wg, c.reconn) + advisor.Connect(c.ctx, wg) } for i := 0; i < c.config.Workers; i++ { w := newWorker(i, c.config, c.tls, c.Log) wg.Add(1) - go w.Run(ctx, wg, c.reconn) + go w.Run(ctx, wg) } select { diff --git a/replicator/worker.go b/replicator/worker.go index 88c359c..b3bd6a9 100644 --- a/replicator/worker.go +++ b/replicator/worker.go @@ -35,18 +35,18 @@ func newWorker(i int, config *config.TopicConf, tls bool, log *logrus.Entry) *wo return &w } -func (w *worker) Run(ctx context.Context, wg *sync.WaitGroup, reconn chan string) { +func (w *worker) Run(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() - err := w.connect(ctx, reconn) + err := w.connect(ctx) if err != nil { - reconn <- fmt.Sprintf("could not start worker: %s", err) + w.log.Errorf("Could not start worker: %s", err) return } err = w.subscribe() if err != nil { - reconn <- fmt.Sprintf("could not subscribe to source %s: %s", w.config.Topic, err) + w.log.Errorf("Could not subscribe to source %s", w.config.Topic) return } @@ -115,25 +115,21 @@ func (w *worker) subscribe() error { return err } -func (w *worker) connect(ctx context.Context, reconn chan string) error { +func (w *worker) connect(ctx context.Context) error { wg := &sync.WaitGroup{} wg.Add(1) go func(wg *sync.WaitGroup) { defer wg.Done() c := connector.New(w.name, w.tls, connector.Source, w.config, w.log) - w.from = c.Connect(ctx, func(_ stan.Conn, reason error) { - reconn <- fmt.Sprintf("source stream disconnected: %s", reason) - }) + w.from = c.Connect(ctx) }(wg) wg.Add(1) go func(wg *sync.WaitGroup) { defer wg.Done() c := connector.New(w.name, w.tls, connector.Target, w.config, w.log) - w.to = c.Connect(ctx, func(_ stan.Conn, reason error) { - reconn <- fmt.Sprintf("target stream disconnected: %s", reason) - }) + w.to = c.Connect(ctx) }(wg) wg.Wait() From 90c69738d58656bd3ecf296dd0c7421004b3f19f Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Mon, 2 Jul 2018 11:43:22 +0300 Subject: [PATCH 2/2] (#85) handle reconnects In the past should the STAN connection be interrupted long enough the whole system would just stop working as on reconnect the server would not recognise us as valid. In the go library version 0.4.0 and the server version 0.10.0 a bidirectional ping was added whereby the client code can get notified on disconnection. The problem is one has to entirely recreate all subscriptions and everything from scratch when this happens. Quite a difficult ask for a program that was never developed with that in mind. To use this we refactor the connector class so it's fully responsible for interacting with STAN rather than the weird uncontained split we had before and now in that class we handle reconnects and resubscribes --- advisor/advisor.go | 19 +++--- connector/connector.go | 121 +++++++++++++++++++++++++++++------- connector/connector_test.go | 8 +-- connector/stats.go | 6 ++ connector/subscription.go | 25 ++++++++ glide.lock | 14 ++--- glide.yaml | 4 +- replicator/worker.go | 28 ++++----- 8 files changed, 167 insertions(+), 58 deletions(-) create mode 100644 connector/subscription.go diff --git a/advisor/advisor.go b/advisor/advisor.go index 7564963..cdcf5ad 100644 --- a/advisor/advisor.go +++ b/advisor/advisor.go @@ -10,10 +10,17 @@ import ( "github.com/choria-io/stream-replicator/backoff" "github.com/choria-io/stream-replicator/config" "github.com/choria-io/stream-replicator/connector" - "github.com/nats-io/go-nats-streaming" + nats "github.com/nats-io/go-nats" "github.com/sirupsen/logrus" ) +type stream interface { + Connect(ctx context.Context) + Publish(subject string, data []byte) error + Close() error + NatsConn() *nats.Conn +} + // AgeAdvisoryV1 defines a message published when a node has not been seen within configured deadlines and when it recovers type AgeAdvisoryV1 struct { Version string `json:"$schema"` @@ -55,7 +62,7 @@ var interval time.Duration var age time.Duration var err error var log *logrus.Entry -var conn stan.Conn +var conn stream var natstls bool var name string @@ -235,17 +242,15 @@ func newAdvisory(id string, event EventType) AgeAdvisoryV1 { } func connect(ctx context.Context) { - var c *connector.Connection - if conf.Advisory.Cluster == "source" { log.Infof("Connection to source to publish advisories") - c = connector.New(name, natstls, connector.Source, conf, log) + conn = connector.New(name, natstls, connector.Source, conf, log) } else { log.Infof("Connection to target to publish advisories") - c = connector.New(name, natstls, connector.Target, conf, log) + conn = connector.New(name, natstls, connector.Target, conf, log) } - conn = c.Connect(ctx) + conn.Connect(ctx) } func publisher(ctx context.Context, wg *sync.WaitGroup) { diff --git a/connector/connector.go b/connector/connector.go index 0fffbe7..81341e2 100644 --- a/connector/connector.go +++ b/connector/connector.go @@ -2,6 +2,7 @@ package connector import ( "context" + "sync" "time" "github.com/choria-io/stream-replicator/backoff" @@ -20,6 +21,8 @@ type Connection struct { cfg *config.TopicConf id string tls bool + subs []*subscription + mu *sync.Mutex } // Direction indicates which of the connectors to connect to @@ -40,6 +43,8 @@ func New(name string, tls bool, dir Direction, cfg *config.TopicConf, logger *lo id: cfg.TargetID, tls: tls, cfg: cfg, + subs: []*subscription{}, + mu: &sync.Mutex{}, } if dir == Source { @@ -50,40 +55,108 @@ func New(name string, tls bool, dir Direction, cfg *config.TopicConf, logger *lo return &c } +// NatsConn returns the active nats connection +func (c *Connection) NatsConn() *nats.Conn { + return c.conn.NatsConn() +} + // Connect connects to the configured stream -func (c *Connection) Connect(ctx context.Context) stan.Conn { - c.conn = c.connectSTAN(ctx, c.id, c.name, c.url) +func (c *Connection) Connect(ctx context.Context) { + c.mu.Lock() + defer c.mu.Unlock() + + c.connectSTAN(ctx) +} + +// Subscribe subscribes to a subject, if group is empty a normal subscription is done +func (c *Connection) Subscribe(subject string, qgroup string, cb stan.MsgHandler, opts ...stan.SubscriptionOption) (err error) { + c.mu.Lock() + defer c.mu.Unlock() + + sub := &subscription{ + subject: subject, + group: qgroup, + cb: cb, + opts: opts, + } + + err = sub.subscribe(c) + + if err == nil { + c.subs = append(c.subs, sub) + } + + return +} + +// Close closes the connection and forgets all subscriptions +func (c *Connection) Close() error { + c.mu.Lock() + defer c.mu.Unlock() + + c.subs = []*subscription{} + + return c.conn.Close() +} + +// Publish publishes data to a specific subject +func (c *Connection) Publish(subject string, data []byte) error { + c.mu.Lock() + defer c.mu.Unlock() + + return c.conn.Publish(subject, data) +} + +func (c *Connection) reconnect(ctx context.Context, reason error) { + c.mu.Lock() + defer c.mu.Unlock() + + streamReconnectCtr.WithLabelValues(c.name, c.cfg.Name).Inc() + + c.log.Errorf("Reconnecting to NATS Stream after disconnection: %s", reason) + + c.connectSTAN(ctx) - return c.conn + c.log.Infof("Resubscribing to %d subscriptions", len(c.subs)) + for _, sub := range c.subs { + err := sub.subscribe(c) + if err != nil { + c.log.Errorf("Could not re-subscribe to %s: %s", sub.subject, err) + } + } } -func (c *Connection) connectSTAN(ctx context.Context, cid string, name string, urls string) stan.Conn { - n := c.connectNATS(ctx, name, urls) +func (c *Connection) connectSTAN(ctx context.Context) { + n := c.connectNATS(ctx) if n == nil { - c.log.Errorf("%s NATS connection could not be established, cannot connect to the Stream", name) - return nil + c.log.Errorf("%s NATS connection could not be established, cannot connect to the Stream", c.name) + return } var err error - var conn stan.Conn try := 0 for { try++ - conn, err = stan.Connect(cid, name, stan.NatsConn(n)) + reconf := func(_ stan.Conn, reason error) { + errorCtr.WithLabelValues(c.name, c.cfg.Name).Inc() + c.reconnect(ctx, reason) + } + + c.conn, err = stan.Connect(c.id, c.name, stan.NatsConn(n), stan.SetConnectionLostHandler(reconf)) if err != nil { - c.log.Warnf("%s initial connection to the NATS Streaming broker cluster failed: %s", name, err) + c.log.Warnf("%s initial connection to the NATS Streaming broker cluster failed: %s", c.name, err) if ctx.Err() != nil { - c.log.Errorf("%s initial connection cancelled due to shut down", name) - return nil + c.log.Errorf("%s initial connection cancelled due to shut down", c.name) + return } - c.log.Infof("%s NATS Stream client failed connection attempt %d", name, try) + c.log.Infof("%s NATS Stream client failed connection attempt %d", c.name, try) if backoff.FiveSec.InterruptableSleep(ctx, try) != nil { - return nil + return } continue @@ -92,13 +165,13 @@ func (c *Connection) connectSTAN(ctx context.Context, cid string, name string, u break } - return conn + return } -func (c *Connection) connectNATS(ctx context.Context, name string, urls string) (natsc *nats.Conn) { +func (c *Connection) connectNATS(ctx context.Context) (natsc *nats.Conn) { options := []nats.Option{ nats.MaxReconnects(-1), - nats.Name(name), + nats.Name(c.name), nats.DisconnectHandler(c.disconCb), nats.ReconnectHandler(c.reconCb), nats.ClosedHandler(c.closedCb), @@ -106,7 +179,7 @@ func (c *Connection) connectNATS(ctx context.Context, name string, urls string) } if c.tls { - c.log.Debugf("Configuring TLS on NATS connection to %s", urls) + c.log.Debugf("Configuring TLS on NATS connection to %s", c.url) tlsc, err := c.cfg.SecurityProvider.TLSConfig() if err != nil { c.log.Errorf("Failed to configure TLS: %s", err) @@ -122,17 +195,17 @@ func (c *Connection) connectNATS(ctx context.Context, name string, urls string) for { try++ - natsc, err = nats.Connect(urls, options...) + natsc, err = nats.Connect(c.url, options...) if err != nil { - c.log.Warnf("%s initial connection to the NATS broker cluster (%s) failed: %s", name, urls, err) + c.log.Warnf("%s initial connection to the NATS broker cluster (%s) failed: %s", c.name, c.url, err) if ctx.Err() != nil { - c.log.Errorf("%s initial connection cancelled due to shut down", name) + c.log.Errorf("%s initial connection cancelled due to shut down", c.name) return nil } s := backoff.FiveSec.Duration(try) - c.log.Infof("%s NATS client sleeping %s after failed connection attempt %d", name, s, try) + c.log.Infof("%s NATS client sleeping %s after failed connection attempt %d", c.name, s, try) timer := time.NewTimer(s) @@ -140,12 +213,12 @@ func (c *Connection) connectNATS(ctx context.Context, name string, urls string) case <-timer.C: continue case <-ctx.Done(): - c.log.Errorf("%s initial connection cancelled due to shut down", name) + c.log.Errorf("%s initial connection cancelled due to shut down", c.name) return nil } } - c.log.Infof("%s NATS client connected to %s", name, natsc.ConnectedUrl()) + c.log.Infof("%s NATS client connected to %s", c.name, natsc.ConnectedUrl()) break } diff --git a/connector/connector_test.go b/connector/connector_test.go index 1cfe268..82886e2 100644 --- a/connector/connector_test.go +++ b/connector/connector_test.go @@ -42,7 +42,7 @@ var _ = Describe("Connector", func() { } }) - var _ = Describe("New", func() { + Describe("New", func() { It("Should configure the correct direction", func() { c := New("testcon", true, Source, conf, log) Expect(c.cfg).To(Equal(conf)) @@ -53,7 +53,7 @@ var _ = Describe("Connector", func() { }) }) - var _ = Describe("Connect", func() { + Describe("Connect", func() { It("Should connect to the stream", func() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -69,8 +69,8 @@ var _ = Describe("Connector", func() { defer left.Shutdown() c := New("testcon", false, Source, conf, log) - con := c.Connect(ctx) - defer con.Close() + c.Connect(ctx) + defer c.Close() }) }) }) diff --git a/connector/stats.go b/connector/stats.go index 7d6d049..1253c1e 100644 --- a/connector/stats.go +++ b/connector/stats.go @@ -5,6 +5,11 @@ import ( ) var ( + streamReconnectCtr = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "stream_replicator_stream_reconnections", + Help: "Number of times the NATS Stream reconnected", + }, []string{"name", "worker"}) + reconnectCtr = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "stream_replicator_connection_reconnections", Help: "Number of times the connector reconnected to the middleware", @@ -25,4 +30,5 @@ func init() { prometheus.MustRegister(reconnectCtr) prometheus.MustRegister(closedCtr) prometheus.MustRegister(errorCtr) + prometheus.MustRegister(streamReconnectCtr) } diff --git a/connector/subscription.go b/connector/subscription.go new file mode 100644 index 0000000..756242c --- /dev/null +++ b/connector/subscription.go @@ -0,0 +1,25 @@ +package connector + +import ( + "github.com/nats-io/go-nats-streaming" +) + +type subscription struct { + subject string + group string + cb stan.MsgHandler + opts []stan.SubscriptionOption + sub stan.Subscription +} + +func (s *subscription) subscribe(c *Connection) (err error) { + if s.group == "" { + c.log.Infof("Subscribing to subject %s", s.subject) + s.sub, err = c.conn.Subscribe(c.cfg.Topic, s.cb, s.opts...) + } else { + c.log.Infof("Subscribing to subject %s in group %s", s.subject, s.group) + s.sub, err = c.conn.QueueSubscribe(c.cfg.Topic, s.group, s.cb, s.opts...) + } + + return +} diff --git a/glide.lock b/glide.lock index 3694ce2..77b3316 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: f7c6a83b9b77ae4741b048388febf0defb7b1e811f52a257b82eb5982eb64392 -updated: 2018-05-30T10:41:02.980026+02:00 +hash: d6cb7b9989283da5b0adf73743f9b3556dfb6b4f4f2680579a3ae9d5107c9059 +updated: 2018-07-02T11:46:08.63013+03:00 imports: - name: github.com/alecthomas/template version: a0175ee3bccc567396460bf5acd36800cb10c49c @@ -18,7 +18,7 @@ imports: version: 2f1ce7a837dcb8da3ec595b1dac9d0632f0f99e8 repo: https://github.com/boltdb/bolt - name: github.com/choria-io/go-choria - version: a3b74b8d7769db8ec227d4cf0a75aaebb28ad2fd + version: 703e9d269cdd54a9839551aea5bfe9e5b481cda7 subpackages: - build - config @@ -28,7 +28,7 @@ imports: - name: github.com/choria-io/go-confkey version: 634d2c41860498507b0c7eeb2f8a33b1aa6a809d - name: github.com/choria-io/go-security - version: 615ff9a3ca0cabc08289e491ef8f57b6a51031a7 + version: a221c72ee79ab400346d38837d7e8b289857247b subpackages: - filesec - puppetsec @@ -97,11 +97,11 @@ imports: - encoders/builtin - util - name: github.com/nats-io/go-nats-streaming - version: 6e620057a207bd61e992c1c5b6a2de7b6a4cb010 + version: e15a53f85e4932540600a16b56f6c4f65f58176f subpackages: - pb - name: github.com/nats-io/nats-streaming-server - version: 6026da1b7c444bf9f1d1b1d3ed14e435aabf02bc + version: 6027b20cd943a9fa598a664e4c8d7514fe684345 subpackages: - logger - server @@ -132,7 +132,7 @@ imports: - name: github.com/sirupsen/logrus version: c155da19408a8799da419ed3eeb0cb5db0ad5dbc - name: github.com/tidwall/gjson - version: 01f00f129617a6fe98941fb920d6c760241b54d2 + version: f123b340873a0084cb27267eddd8ff615115fbff - name: github.com/tidwall/match version: 1731857f09b1f38450e2c12409748407822dc6be - name: golang.org/x/crypto diff --git a/glide.yaml b/glide.yaml index fa06aa7..35b652f 100644 --- a/glide.yaml +++ b/glide.yaml @@ -5,7 +5,7 @@ import: - package: github.com/nats-io/go-nats version: ^1 - package: github.com/nats-io/go-nats-streaming - version: '*' + version: '^0.4.0' - package: github.com/prometheus/client_golang version: '>= 0.9.0-pre1' subpackages: @@ -18,7 +18,7 @@ import: - package: gopkg.in/alecthomas/kingpin.v2 version: ^2 - package: github.com/nats-io/nats-streaming-server - version: ^0.7.0 + version: ^0.10.0 subpackages: - server - package: github.com/nats-io/gnatsd diff --git a/replicator/worker.go b/replicator/worker.go index b3bd6a9..de2d737 100644 --- a/replicator/worker.go +++ b/replicator/worker.go @@ -13,15 +13,21 @@ import ( "github.com/sirupsen/logrus" ) +type stream interface { + Connect(ctx context.Context) + Subscribe(subject string, qgroup string, cb stan.MsgHandler, opts ...stan.SubscriptionOption) error + Publish(subject string, data []byte) error + Close() error +} + type worker struct { name string - from stan.Conn - to stan.Conn + from stream + to stream config *config.TopicConf tls bool log *logrus.Entry - sub stan.Subscription } func newWorker(i int, config *config.TopicConf, tls bool, log *logrus.Entry) *worker { @@ -104,13 +110,7 @@ func (w *worker) subscribe() error { var err error - if w.config.Queued { - w.log.Infof("subscribing to %s in queue group %s", w.config.Topic, w.config.QueueGroup) - w.sub, err = w.from.QueueSubscribe(w.config.Topic, w.config.QueueGroup, w.copyf, opts...) - } else { - w.log.Infof("subscribing to %s", w.config.Topic) - w.sub, err = w.from.Subscribe(w.config.Topic, w.copyf, opts...) - } + err = w.from.Subscribe(w.config.Topic, w.config.QueueGroup, w.copyf, opts...) return err } @@ -121,15 +121,15 @@ func (w *worker) connect(ctx context.Context) error { wg.Add(1) go func(wg *sync.WaitGroup) { defer wg.Done() - c := connector.New(w.name, w.tls, connector.Source, w.config, w.log) - w.from = c.Connect(ctx) + w.from = connector.New(w.name, w.tls, connector.Source, w.config, w.log) + w.from.Connect(ctx) }(wg) wg.Add(1) go func(wg *sync.WaitGroup) { defer wg.Done() - c := connector.New(w.name, w.tls, connector.Target, w.config, w.log) - w.to = c.Connect(ctx) + w.to = connector.New(w.name, w.tls, connector.Target, w.config, w.log) + w.to.Connect(ctx) }(wg) wg.Wait()