Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nicksanford committed Nov 8, 2024
1 parent ac3b33d commit da5c8a8
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 30 deletions.
53 changes: 27 additions & 26 deletions services/datamanager/builtin/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,16 @@ type Capture struct {
captureDir string
// maxCaptureFileSize is only stored on Capture so that we can detect when it changs
maxCaptureFileSize int64
mongoMU sync.Mutex
mongo captureMongo
}

// the mongo* struct members are protected by
// mongoMU and are either all nil or all non nil
mongoMU sync.Mutex
mongoClient *mongo.Client
mongoCollection *mongo.Collection
mongoConfig *MongoConfig
type captureMongo struct {
// the struct members are protected by
// mu and are either all nil or all non nil
client *mongo.Client
collection *mongo.Collection
config *MongoConfig
}

type (
Expand Down Expand Up @@ -141,10 +144,10 @@ func (c *Capture) newCollectors(
}

func (c *Capture) mongoSetup(ctx context.Context, newConfig MongoConfig) *mongo.Collection {
oldConfig := c.mongoConfig
if oldConfig != nil && oldConfig.Equal(newConfig) && c.mongoClient != nil {
oldConfig := c.mongo.config
if oldConfig != nil && oldConfig.Equal(newConfig) && c.mongo.client != nil {
// if we have a client & the configs are equal, reuse the existing collection
return c.mongoCollection
return c.mongo.collection
}

// We now know we want a mongo connection and that we either don't have one, or we have one
Expand All @@ -155,18 +158,20 @@ func (c *Capture) mongoSetup(ctx context.Context, newConfig MongoConfig) *mongo.
// Use the SetServerAPIOptions() method to set the Stable API version to 1
serverAPI := options.ServerAPI(options.ServerAPIVersion1)
// Create a new client and connect to the server
client, err := mongo.Connect(ctx, options.Client().ApplyURI(newConfig.URI).SetServerAPIOptions(serverAPI))
client, err := mongo.Connect(ctx, options.Client().ApplyURI(newConfig.ConnectionString).SetServerAPIOptions(serverAPI))
if err != nil {
c.logger.Warn("failed to create mongo connection with mongo_capture_config.uri")
return nil
}
database := defaultIfZeroVal(newConfig.Database, defaultMongoDatabaseName)
collection := defaultIfZeroVal(newConfig.Collection, defaultMongoCollectionName)
c.mongoClient = client
c.mongoCollection = c.mongoClient.Database(database).Collection(collection)
c.mongoConfig = &newConfig
c.mongo = captureMongo{
client: client,
collection: c.mongo.client.Database(database).Collection(collection),
config: &newConfig,
}
c.logger.Info("mongo client created")
return c.mongoCollection
return c.mongo.collection
}

// mongoReconfigure shuts down the collectors when the mongo client is no longer being
Expand All @@ -176,8 +181,8 @@ func (c *Capture) mongoSetup(ctx context.Context, newConfig MongoConfig) *mongo.
func (c *Capture) mongoReconfigure(ctx context.Context, newConfig *MongoConfig) *mongo.Collection {
c.mongoMU.Lock()
defer c.mongoMU.Unlock()
noClient := c.mongoClient == nil
disabled := newConfig == nil || newConfig.URI == ""
noClient := c.mongo.client == nil
disabled := newConfig == nil || newConfig.ConnectionString == ""

if noClient && disabled {
// if we don't have a client and the new config
Expand Down Expand Up @@ -243,24 +248,20 @@ func (c *Capture) Close(ctx context.Context) {
c.closeCollectors()
c.mongoMU.Lock()
defer c.mongoMU.Unlock()
if c.mongoClient != nil {
if c.mongo.client != nil {
c.logger.Info("closing mongo connection")
goutils.UncheckedError(c.mongoClient.Disconnect(ctx))
c.mongoClient = nil
c.mongoCollection = nil
c.mongoConfig = nil
goutils.UncheckedError(c.mongo.client.Disconnect(ctx))
c.mongo = captureMongo{}
}
}

func (c *Capture) closeNoMongoMutex(ctx context.Context) {
c.FlushCollectors()
c.closeCollectors()
if c.mongoClient != nil {
if c.mongo.client != nil {
c.logger.Info("closing mongo connection")
goutils.UncheckedError(c.mongoClient.Disconnect(ctx))
c.mongoClient = nil
c.mongoCollection = nil
c.mongoConfig = nil
goutils.UncheckedError(c.mongo.client.Disconnect(ctx))
c.mongo = captureMongo{}
}
}

Expand Down
8 changes: 4 additions & 4 deletions services/datamanager/builtin/capture/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package capture

// MongoConfig is the optional data capture mongo config.
type MongoConfig struct {
URI string `json:"uri"`
Database string `json:"database"`
Collection string `json:"collection"`
ConnectionString string `json:"connection_string"`
Database string `json:"database"`
Collection string `json:"collection"`
}

// Equal returns true when both MongoConfigs are equal.
func (mc MongoConfig) Equal(o MongoConfig) bool {
return mc.URI == o.URI && mc.Database == o.Database && mc.Collection == o.Collection
return mc.ConnectionString == o.ConnectionString && mc.Database == o.Database && mc.Collection == o.Collection
}

// Config is the capture config.
Expand Down

0 comments on commit da5c8a8

Please sign in to comment.