diff --git a/config.go b/config.go index c3116235..87fc1496 100644 --- a/config.go +++ b/config.go @@ -36,6 +36,7 @@ type BurrowConfig struct { PIDFile string `gcfg:"pidfile"` ClientID string `gcfg:"client-id"` GroupBlacklist string `gcfg:"group-blacklist"` + GroupWhitelist string `gcfg:"group-whitelist"` } Zookeeper struct { Hosts []string `gcfg:"hostname"` diff --git a/config/burrow.cfg b/config/burrow.cfg index 831ecd08..1518826a 100644 --- a/config/burrow.cfg +++ b/config/burrow.cfg @@ -4,6 +4,7 @@ logconfig=config/logging.cfg pidfile=burrow.pid client-id=burrow-lagchecker group-blacklist=^(console-consumer-|python-kafka-consumer-).*$ +#group-whitelist=^(my-important-consumer).*$ [zookeeper] hostname=zkhost01.example.com diff --git a/notify_center.go b/notify_center.go index 56eecae0..4c9c5d70 100644 --- a/notify_center.go +++ b/notify_center.go @@ -139,7 +139,7 @@ func (nc *NotifyCenter) refreshConsumerGroups() { // Check for new groups, mark existing groups true for _, consumerGroup := range consumerGroups { // Don't bother adding groups in the blacklist - if (nc.app.Storage.groupBlacklist != nil) && nc.app.Storage.groupBlacklist.MatchString(consumerGroup) { + if !nc.app.Storage.AcceptConsumerGroup(consumerGroup) { continue } diff --git a/offsets_store.go b/offsets_store.go index 99aca25f..380e2b1d 100644 --- a/offsets_store.go +++ b/offsets_store.go @@ -27,6 +27,7 @@ type OffsetStorage struct { requestChannel chan interface{} offsets map[string]*ClusterOffsets groupBlacklist *regexp.Regexp + groupWhitelist *regexp.Regexp } type BrokerOffset struct { @@ -97,6 +98,14 @@ func NewOffsetStorage(app *ApplicationContext) (*OffsetStorage, error) { storage.groupBlacklist = re } + if app.Config.General.GroupWhitelist != "" { + re, err := regexp.Compile(app.Config.General.GroupWhitelist) + if err != nil { + return nil, err + } + storage.groupWhitelist = re + } + for cluster, _ := range app.Config.Kafka { storage.offsets[cluster] = &ClusterOffsets{ broker: make(map[string][]*BrokerOffset), @@ -186,9 +195,9 @@ func (storage *OffsetStorage) addConsumerOffset(offset *protocol.PartitionOffset return } - // Ignore groups that match our blacklist - if (storage.groupBlacklist != nil) && storage.groupBlacklist.MatchString(offset.Group) { - log.Debugf("Dropped offset (blacklist): cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v", + // Ignore groups that are out of filter bounds + if !storage.AcceptConsumerGroup(offset.Group) { + log.Debugf("Dropped offset (black/white list): cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v", offset.Cluster, offset.Topic, offset.Partition, offset.Group, offset.Timestamp, offset.Offset) return } @@ -677,3 +686,18 @@ func (storage *OffsetStorage) debugPrintGroup(cluster string, group string) { } clusterMap.consumerLock.RUnlock() } + +func (storage *OffsetStorage) AcceptConsumerGroup(group string) bool { + // First check to make sure group is in the whitelist + if (storage.groupWhitelist != nil) && !storage.groupWhitelist.MatchString(group) { + return false; + } + + // The group is in the whitelist (or there is not whitelist). Now check the blacklist + if (storage.groupBlacklist != nil) && storage.groupBlacklist.MatchString(group) { + return false; + } + + // good to go + return true; +} diff --git a/storm.go b/storm.go index 94c88278..1e179dec 100644 --- a/storm.go +++ b/storm.go @@ -184,8 +184,8 @@ func (stormClient *StormClient) refreshConsumerGroups() { // Check for new groups, mark existing groups true for _, consumerGroup := range consumerGroups { - // Don't bother adding groups in the blacklist - if (stormClient.app.Storage.groupBlacklist != nil) && stormClient.app.Storage.groupBlacklist.MatchString(consumerGroup) { + // Ignore groups that are out of filter bounds + if !stormClient.app.Storage.AcceptConsumerGroup(consumerGroup) { continue } diff --git a/zookeeper.go b/zookeeper.go index d220dde3..ec5a0861 100644 --- a/zookeeper.go +++ b/zookeeper.go @@ -90,7 +90,7 @@ func (zkClient *ZookeeperClient) refreshConsumerGroups() { // Check for new groups, mark existing groups true for _, consumerGroup := range consumerGroups { // Don't bother adding groups in the blacklist - if (zkClient.app.Storage.groupBlacklist != nil) && zkClient.app.Storage.groupBlacklist.MatchString(consumerGroup) { + if !zkClient.app.Storage.AcceptConsumerGroup(consumerGroup) { continue }