Skip to content

Commit

Permalink
Merge pull request linkedin#102 from markrileybot/master
Browse files Browse the repository at this point in the history
Whitelist support
  • Loading branch information
toddpalino authored Aug 30, 2016
2 parents 11bb2b6 + e766ed6 commit 9097d70
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 7 deletions.
1 change: 1 addition & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type BurrowConfig struct {
PIDFile string `gcfg:"pidfile"`
ClientID string `gcfg:"client-id"`
GroupBlacklist string `gcfg:"group-blacklist"`
GroupWhitelist string `gcfg:"group-whitelist"`
}
Zookeeper struct {
Hosts []string `gcfg:"hostname"`
Expand Down
1 change: 1 addition & 0 deletions config/burrow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ logconfig=config/logging.cfg
pidfile=burrow.pid
client-id=burrow-lagchecker
group-blacklist=^(console-consumer-|python-kafka-consumer-).*$
#group-whitelist=^(my-important-consumer).*$

[zookeeper]
hostname=zkhost01.example.com
Expand Down
2 changes: 1 addition & 1 deletion notify_center.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (nc *NotifyCenter) refreshConsumerGroups() {
// Check for new groups, mark existing groups true
for _, consumerGroup := range consumerGroups {
// Don't bother adding groups in the blacklist
if (nc.app.Storage.groupBlacklist != nil) && nc.app.Storage.groupBlacklist.MatchString(consumerGroup) {
if !nc.app.Storage.AcceptConsumerGroup(consumerGroup) {
continue
}

Expand Down
30 changes: 27 additions & 3 deletions offsets_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type OffsetStorage struct {
requestChannel chan interface{}
offsets map[string]*ClusterOffsets
groupBlacklist *regexp.Regexp
groupWhitelist *regexp.Regexp
}

type BrokerOffset struct {
Expand Down Expand Up @@ -97,6 +98,14 @@ func NewOffsetStorage(app *ApplicationContext) (*OffsetStorage, error) {
storage.groupBlacklist = re
}

if app.Config.General.GroupWhitelist != "" {
re, err := regexp.Compile(app.Config.General.GroupWhitelist)
if err != nil {
return nil, err
}
storage.groupWhitelist = re
}

for cluster, _ := range app.Config.Kafka {
storage.offsets[cluster] = &ClusterOffsets{
broker: make(map[string][]*BrokerOffset),
Expand Down Expand Up @@ -186,9 +195,9 @@ func (storage *OffsetStorage) addConsumerOffset(offset *protocol.PartitionOffset
return
}

// Ignore groups that match our blacklist
if (storage.groupBlacklist != nil) && storage.groupBlacklist.MatchString(offset.Group) {
log.Debugf("Dropped offset (blacklist): cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v",
// Ignore groups that are out of filter bounds
if !storage.AcceptConsumerGroup(offset.Group) {
log.Debugf("Dropped offset (black/white list): cluster=%s topic=%s partition=%v group=%s timestamp=%v offset=%v",
offset.Cluster, offset.Topic, offset.Partition, offset.Group, offset.Timestamp, offset.Offset)
return
}
Expand Down Expand Up @@ -677,3 +686,18 @@ func (storage *OffsetStorage) debugPrintGroup(cluster string, group string) {
}
clusterMap.consumerLock.RUnlock()
}

func (storage *OffsetStorage) AcceptConsumerGroup(group string) bool {
// First check to make sure group is in the whitelist
if (storage.groupWhitelist != nil) && !storage.groupWhitelist.MatchString(group) {
return false;
}

// The group is in the whitelist (or there is not whitelist). Now check the blacklist
if (storage.groupBlacklist != nil) && storage.groupBlacklist.MatchString(group) {
return false;
}

// good to go
return true;
}
4 changes: 2 additions & 2 deletions storm.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ func (stormClient *StormClient) refreshConsumerGroups() {

// Check for new groups, mark existing groups true
for _, consumerGroup := range consumerGroups {
// Don't bother adding groups in the blacklist
if (stormClient.app.Storage.groupBlacklist != nil) && stormClient.app.Storage.groupBlacklist.MatchString(consumerGroup) {
// Ignore groups that are out of filter bounds
if !stormClient.app.Storage.AcceptConsumerGroup(consumerGroup) {
continue
}

Expand Down
2 changes: 1 addition & 1 deletion zookeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (zkClient *ZookeeperClient) refreshConsumerGroups() {
// Check for new groups, mark existing groups true
for _, consumerGroup := range consumerGroups {
// Don't bother adding groups in the blacklist
if (zkClient.app.Storage.groupBlacklist != nil) && zkClient.app.Storage.groupBlacklist.MatchString(consumerGroup) {
if !zkClient.app.Storage.AcceptConsumerGroup(consumerGroup) {
continue
}

Expand Down

0 comments on commit 9097d70

Please sign in to comment.