Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
Merge pull request #94 from ripienaar/85.2
Browse files Browse the repository at this point in the history
(#85) handle reconnects
  • Loading branch information
ripienaar authored Jul 2, 2018
2 parents a7c3208 + 90c6973 commit c8354e6
Show file tree
Hide file tree
Showing 12 changed files with 198 additions and 132 deletions.
6 changes: 1 addition & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ When there are many workers or it's specified to belong to a queue group a Durab

First time it connects it attempts to replicate all messages, as the subscription is Durable it will from then on continue where it left off.

## Requirements

For reliable operation where connection issues get correctly detected NATS Streaming Server version 0.10.0 or newer is required

## Status

This is a pretty new project that is being used in production, however as it is new and developing use with caution. I'd love any feedback you might have - especially design ideas about multi worker order preserving replication!
Expand Down Expand Up @@ -374,4 +370,4 @@ A Puppet module to install and manage the Stream Replicator can be found on the

## Thanks

<a href="https://packagecloud.io/"><img src="https://packagecloud.io/images/packagecloud-badge.png" width="158"></a>
<img src="https://packagecloud.io/images/packagecloud-badge.png" width="158">
27 changes: 15 additions & 12 deletions advisor/advisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,17 @@ import (
"github.com/choria-io/stream-replicator/backoff"
"github.com/choria-io/stream-replicator/config"
"github.com/choria-io/stream-replicator/connector"
"github.com/nats-io/go-nats-streaming"
nats "github.com/nats-io/go-nats"
"github.com/sirupsen/logrus"
)

type stream interface {
Connect(ctx context.Context)
Publish(subject string, data []byte) error
Close() error
NatsConn() *nats.Conn
}

// AgeAdvisoryV1 defines a message published when a node has not been seen within configured deadlines and when it recovers
type AgeAdvisoryV1 struct {
Version string `json:"$schema"`
Expand Down Expand Up @@ -55,7 +62,7 @@ var interval time.Duration
var age time.Duration
var err error
var log *logrus.Entry
var conn stan.Conn
var conn stream
var natstls bool
var name string

Expand Down Expand Up @@ -97,7 +104,7 @@ func Configure(tls bool, c *config.TopicConf) error {
}

// Connect initiates the connection to NATS Streaming
func Connect(ctx context.Context, wg *sync.WaitGroup, reconn chan string) {
func Connect(ctx context.Context, wg *sync.WaitGroup) {
mu.Lock()
defer mu.Unlock()

Expand All @@ -106,7 +113,7 @@ func Connect(ctx context.Context, wg *sync.WaitGroup, reconn chan string) {
}

log.Debug("Starting advisor connection")
connect(ctx, reconn)
connect(ctx)

log.Debug("Starting advisor publisher")
wg.Add(1)
Expand Down Expand Up @@ -234,20 +241,16 @@ func newAdvisory(id string, event EventType) AgeAdvisoryV1 {
}
}

func connect(ctx context.Context, reconn chan string) {
var c *connector.Connection

func connect(ctx context.Context) {
if conf.Advisory.Cluster == "source" {
log.Infof("Connection to source to publish advisories")
c = connector.New(name, natstls, connector.Source, conf, log)
conn = connector.New(name, natstls, connector.Source, conf, log)
} else {
log.Infof("Connection to target to publish advisories")
c = connector.New(name, natstls, connector.Target, conf, log)
conn = connector.New(name, natstls, connector.Target, conf, log)
}

conn = c.Connect(ctx, func(_ stan.Conn, reason error) {
reconn <- fmt.Sprintf("advisory stream disconnected: %s", reason)
})
conn.Connect(ctx)
}

func publisher(ctx context.Context, wg *sync.WaitGroup) {
Expand Down
24 changes: 7 additions & 17 deletions advisor/advisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var _ = Describe("Advisor", func() {
defer left.Shutdown()

conf.Advisory.Cluster = "source"
connect(ctx, make(chan string, 1))
connect(ctx)

Expect(conn.NatsConn().ConnectedUrl()).To(Equal("nats://localhost:34222"))
})
Expand All @@ -87,7 +87,7 @@ var _ = Describe("Advisor", func() {
defer right.Shutdown()

conf.Advisory.Cluster = "target"
connect(ctx, make(chan string, 1))
connect(ctx)

Expect(conn.NatsConn().ConnectedUrl()).To(Equal("nats://localhost:44222"))
})
Expand Down Expand Up @@ -161,33 +161,23 @@ var _ = Describe("Advisor", func() {

Expect(seen).To(BeEmpty())

mu.Lock()
seen["old"] = time.Now().Add(-1 * time.Hour)
seen["expired"] = time.Now().Add(-3 * time.Hour)
seen["new"] = time.Now()
mu.Unlock()

Expect(out).To(HaveLen(0))

advise()

Expect(out).To(HaveLen(2))

// ordering isnt guaranteed using a map here
// allows us to check both at least got fired
// uniquely, previoulsy this test would not use
// a map and was failing intermitantly
msgs := make(map[string]AgeAdvisoryV1)
msg := <-out
msgs[msg.Value] = msg
msg = <-out
msgs[msg.Value] = msg

Expect(msgs["old"].Value).To(Equal("old"))
Expect(msgs["old"].Event).To(Equal(Timeout))
Expect(msg.Value).To(Equal("old"))
Expect(msg.Event).To(Equal(Timeout))

Expect(msgs["expired"].Value).To(Equal("expired"))
Expect(msgs["expired"].Event).To(Equal(Expired))
msg = <-out
Expect(msg.Value).To(Equal("expired"))
Expect(msg.Event).To(Equal(Expired))

_, found := advised["old"]
Expect(found).To(BeTrue())
Expand Down
26 changes: 3 additions & 23 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ var (
enrollIdentity string
enrollCA string
enrollDir string

reconn chan string
)

func Run() {
Expand All @@ -57,8 +55,6 @@ func Run() {

command := kingpin.MustParse(app.Parse(os.Args[1:]))

reconn = make(chan string, 5)

ctx, cancel = context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -122,7 +118,7 @@ func runReplicate() {
os.Exit(1)
}

go interruptHandler(wg)
go interruptHandler()

writePID(pidfile)

Expand All @@ -145,39 +141,23 @@ func writePID(pidfile string) {
}
}

func interruptHandler(wg *sync.WaitGroup) {
defer wg.Done()

func interruptHandler() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

for {
select {
case reason := <-reconn:
logrus.Errorf("Restarting replicator after: %s", reason)

// stops everything and sleep a bit to give state saves a bit of time etc
cancel()
time.Sleep(1 * time.Second)

err := syscall.Exec(os.Args[0], os.Args, os.Environ())
if err != nil {
logrus.Errorf("Could not restart Stream Replicator: %s", err)
}

case sig := <-sigs:
logrus.Infof("Shutting down on %s", sig)
cancel()
return

case <-ctx.Done():
return
}
}
}

func startReplicator(ctx context.Context, wg *sync.WaitGroup, done chan int, topic *config.TopicConf, topicname string) {
err := rep.Setup(topicname, topic, reconn)
err := rep.Setup(topicname, topic)
if err != nil {
logrus.Errorf("Could not configure Replicator: %s", err)
return
Expand Down
Loading

0 comments on commit c8354e6

Please sign in to comment.