Skip to content

Commit

Permalink
Add support for multiple storm consumers reading from the same kafka …
Browse files Browse the repository at this point in the history
…cluster
  • Loading branch information
Mark Penny committed Nov 21, 2016
1 parent 3726a11 commit 8a7351a
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 44 deletions.
14 changes: 9 additions & 5 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type BurrowConfig struct {
Storm map[string]*struct {
Zookeepers []string `gcfg:"zookeeper"`
ZookeeperPort int `gcfg:"zookeeper-port"`
ZookeeperPath string `gcfg:"zookeeper-path"`
ZookeeperPath []string `gcfg:"zookeeper-path"`
}
Tickers struct {
BrokerOffsets int `gcfg:"broker-offsets"`
Expand Down Expand Up @@ -284,11 +284,15 @@ func ValidateConfig(app *ApplicationContext) error {
errs = append(errs, hostlistError)
}
}
if cfg.ZookeeperPath == "" {
errs = append(errs, fmt.Sprintf("Zookeeper path is not specified for cluster %s", cluster))
if len(cfg.ZookeeperPath) == 0 {
errs = append(errs, fmt.Sprintf("No Zookeeper paths specified for cluster %s", cluster))
} else {
if !validateZookeeperPath(cfg.ZookeeperPath) {
errs = append(errs, fmt.Sprintf("Zookeeper path is not valid for cluster %s", cluster))
for _, zkpath := range cfg.ZookeeperPath {
if zkpath == "" {
errs = append(errs, fmt.Sprintf("Zookeeper path is not specified for cluster %s", cluster))
} else if !validateZookeeperPath(zkpath) {
errs = append(errs, fmt.Sprintf("Zookeeper path is not valid for cluster %s", cluster))
}
}
}
}
Expand Down
86 changes: 47 additions & 39 deletions storm.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,34 +108,39 @@ func parseStormSpoutStateJson(stateStr string) (int, string, error) {
}
}

func (stormClient *StormClient) getConsumerGroupPath(consumerGroup string) string {
if "/" == stormClient.app.Config.Storm[stormClient.cluster].ZookeeperPath {
return "/" + consumerGroup
} else {
return stormClient.app.Config.Storm[stormClient.cluster].ZookeeperPath + "/" + consumerGroup
func (stormClient *StormClient) getConsumerGroupPath(consumerGroup string) []string {
consumerGroupPaths := make([]string, len(stormClient.app.Config.Storm[stormClient.cluster].ZookeeperPath))
for index, zkpath := range stormClient.app.Config.Storm[stormClient.cluster].ZookeeperPath {
if "/" == zkpath {
consumerGroupPaths[index] = "/" + consumerGroup
} else {
consumerGroupPaths[index] = zkpath + "/" + consumerGroup
}
}
return consumerGroupPaths
}

func (stormClient *StormClient) getOffsetsForConsumerGroup(consumerGroup string) {
consumerGroupPath := stormClient.getConsumerGroupPath(consumerGroup)
partition_ids, _, err := stormClient.conn.Children(consumerGroupPath)
switch {
case err == nil:
for _, partition_id := range partition_ids {
partition, errConversion := parsePartitionId(partition_id)
switch {
case errConversion == nil:
stormClient.getOffsetsForPartition(consumerGroup, partition, consumerGroupPath+"/"+partition_id)
default:
log.Errorf("Something is very wrong! The partition id %s of consumer group %s in ZK path %s should be a number",
partition_id, consumerGroup, consumerGroupPath)
for _, consumerGroupPath := range stormClient.getConsumerGroupPath(consumerGroup) {
partition_ids, _, err := stormClient.conn.Children(consumerGroupPath)
switch {
case err == nil:
for _, partition_id := range partition_ids {
partition, errConversion := parsePartitionId(partition_id)
switch {
case errConversion == nil:
stormClient.getOffsetsForPartition(consumerGroup, partition, consumerGroupPath+"/"+partition_id)
default:
log.Errorf("Something is very wrong! The partition id %s of consumer group %s in ZK path %s should be a number",
partition_id, consumerGroup, consumerGroupPath)
}
}
case err == zk.ErrNoNode:
// it is OK as the offsets may not be managed by ZK
log.Debugf("This consumer group's offset is not managed by ZK: " + consumerGroup)
default:
log.Warnf("Failed to read data for consumer group %s in ZK path %s. Error: %v", consumerGroup, consumerGroupPath, err)
}
case err == zk.ErrNoNode:
// it is OK as the offsets may not be managed by ZK
log.Debugf("This consumer group's offset is not managed by ZK: " + consumerGroup)
default:
log.Warnf("Failed to read data for consumer group %s in ZK path %s. Error: %v", consumerGroup, consumerGroupPath, err)
}
}

Expand Down Expand Up @@ -170,31 +175,34 @@ func (stormClient *StormClient) refreshConsumerGroups() {
stormClient.stormGroupLock.Lock()
defer stormClient.stormGroupLock.Unlock()

consumerGroups, _, err := stormClient.conn.Children(stormClient.app.Config.Storm[stormClient.cluster].ZookeeperPath)
if err != nil {
// Can't read the consumers path. Bail for now
log.Errorf("Cannot get Storm Kafka consumer group list for cluster %s: %s", stormClient.cluster, err)
return
}

// Mark all existing groups false
for consumerGroup := range stormClient.stormGroupList {
stormClient.stormGroupList[consumerGroup] = false
}

// Check for new groups, mark existing groups true
for _, consumerGroup := range consumerGroups {
// Ignore groups that are out of filter bounds
if !stormClient.app.Storage.AcceptConsumerGroup(consumerGroup) {
continue

for _, stormZookeeperPath := range stormClient.app.Config.Storm[stormClient.cluster].ZookeeperPath {
consumerGroups, _, err := stormClient.conn.Children(stormZookeeperPath)
if err != nil {
// Can't read the consumers path. Bail for now
log.Errorf("Cannot get Storm Kafka consumer group list for cluster %s: %s", stormClient.cluster, err)
return
}

if _, ok := stormClient.stormGroupList[consumerGroup]; !ok {
// Add new consumer group and start it
log.Debugf("Add Storm Kafka consumer group %s to cluster %s", consumerGroup, stormClient.cluster)
go stormClient.startConsumerGroupChecker(consumerGroup)
// Check for new groups, mark existing groups true
for _, consumerGroup := range consumerGroups {
// Ignore groups that are out of filter bounds
if !stormClient.app.Storage.AcceptConsumerGroup(consumerGroup) {
continue
}

if _, ok := stormClient.stormGroupList[consumerGroup]; !ok {
// Add new consumer group and start it
log.Debugf("Add Storm Kafka consumer group %s to cluster %s", consumerGroup, stormClient.cluster)
go stormClient.startConsumerGroupChecker(consumerGroup)
}
stormClient.stormGroupList[consumerGroup] = true
}
stormClient.stormGroupList[consumerGroup] = true
}

// Delete groups that are still false
Expand Down

0 comments on commit 8a7351a

Please sign in to comment.