Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Getting Checkpoint Offset Information #221

Open
mplachter opened this issue Jul 8, 2021 · 6 comments
Open

Getting Checkpoint Offset Information #221

mplachter opened this issue Jul 8, 2021 · 6 comments

Comments

@mplachter
Copy link

mplachter commented Jul 8, 2021

Expected Behavior

Hi Team.

I'm wondering if there is a API like there is in Kafka API to pull out the current Offset values of all consumers in a consumer group for a given topic vs the Current offset on the given partition of the topic for that consumer.

Trying to see if there is a way to extract data that can be used to see if there is back pressure on a certain consumer in a consumer group for a given topic or topics.

Actual Behavior

In Kafka for example you would be able to FetchOffsets using a OffsetFetchRequest to return back information regarding a given CounsumerGroup for given partitions.

https://github.com/Shopify/sarama/blob/master/broker.go#L372-L383
https://github.com/Shopify/sarama/blob/master/offset_fetch_request.go#L3-L8

Environment

  • OS: Linux
  • Go version: 1.16
  • Version of Library: 3
@devigned
Copy link
Member

devigned commented Jul 8, 2021

I don't think this API exists for EventHubs, but I know it's been asked for in the past. Currently, this repo offers something called the Event Processor Host, which will balance consumers within a group and persist the checkpoints to Azure Storage. I think you could read those checkpoints to determine the current position of a consumer in the consumer group.

To determine the current offset for a given partition, I believe you should be able to use:

azure-event-hubs-go/hub.go

Lines 523 to 546 in 0eb7b61

// GetPartitionInformation fetches runtime information about a specific partition from the Event Hub management node
func (h *Hub) GetPartitionInformation(ctx context.Context, partitionID string) (*HubPartitionRuntimeInformation, error) {
span, ctx := h.startSpanFromContext(ctx, "eh.Hub.GetPartitionInformation")
defer span.End()
client := newClient(h.namespace, h.name)
c, err := h.namespace.newConnection()
if err != nil {
tab.For(ctx).Error(err)
return nil, err
}
defer func() {
if err := c.Close(); err != nil {
tab.For(ctx).Error(err)
}
}()
info, err := client.GetHubPartitionRuntimeInformation(ctx, c, partitionID)
if err != nil {
return nil, err
}
return info, nil
}

It will have the last enqueued offset and other related partition information.

// HubPartitionRuntimeInformation provides management node information about a given Event Hub partition
HubPartitionRuntimeInformation struct {
HubPath string `mapstructure:"name"`
PartitionID string `mapstructure:"partition"`
BeginningSequenceNumber int64 `mapstructure:"begin_sequence_number"`
LastSequenceNumber int64 `mapstructure:"last_enqueued_sequence_number"`
LastEnqueuedOffset string `mapstructure:"last_enqueued_offset"`
LastEnqueuedTimeUtc time.Time `mapstructure:"last_enqueued_time_utc"`
}
)

@mplachter
Copy link
Author

mplachter commented Jul 8, 2021

Thanks David.

So it seems that EventHubs don't actually keep track of the Offsets per ConsumerGroup per Partition? It seems to be offloaded to some type of CheckPointer with the popular one being the Blob Storage CheckPointer?

Interestingly enough when you use the Sarama Client over the Kafka API for EventHubs the management of the Offsets using CheckPointers goes away and this seems to be offloaded somewhere magically in the background... Which leads me to believe there should be a way to read the information out of it somehow.

@devigned
Copy link
Member

devigned commented Jul 8, 2021

Interestingly enough when you use the Sarama Client over the Kafka API for EventHubs the management of the Offsets using CheckPointers goes away and this seems to be offloaded somewhere magically in the background... Which leads me to believe there should be a way to read the information out of it somehow.

I'm 90% sure something exists for the Kafka API on EH since most clients wouldn't work without it. It's such a key feature for partitioned consumers. I'm not sure it's exposed for regular EventHubs, but I really wish it was. It would make building partitioned consumers so much easier.

Perhaps, someone from that team could comment. @jhendrixMSFT, do you know who the right person to pull in from EH would be?

@jhendrixMSFT
Copy link
Member

Adding @richardpark-msft as he'll be working on EH in the future.

@mplachter
Copy link
Author

So the biggest thing is when i try to get a list of Groups back from the Broker (IE Azure Event Hub URI) it returns a nil map of Groups...

Which makes me thing that the Kafka API mapping back ConsumerGroups to EventHub is not mapped correctly.

@mplachter
Copy link
Author

mplachter commented Jul 8, 2021

Here is an example using Sarma

	c, err := sarama.NewClient([]string{"localhost:9092"}, getConfig())
	if err != nil {
		panic(err)
	}

	// Lets fresh metaData
	err = c.RefreshMetadata()
	if err != nil {
		panic(err)
	}

	brokers := c.Brokers()

	for _, broker := range brokers {

		// Open a connection to the broker
		err = broker.Open(getConfig())
		if err != nil {
			panic(err)
		}

		// Flow control to close connection
		defer broker.Close()

               // Listing the Groups returns a nil map of ConsumerGroups this doesn't return anything when connecting to Eevent Hub
		groups, err := broker.ListGroups(&sarama.ListGroupsRequest{})
		if err != nil {
			panic(err)
		}
		
       }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants