From 62f3daeaa7263db73570d741e2ca633e683dceb6 Mon Sep 17 00:00:00 2001 From: Dag Sverre Seljebotn Date: Wed, 1 Mar 2023 19:30:11 +0100 Subject: [PATCH 1/2] Experimental flags: wait and stream --- go/api.go | 108 ++++++++++++++++++++++++++++++++++++++++--------- go/api_test.go | 45 ++++++++++----------- 2 files changed, 110 insertions(+), 43 deletions(-) diff --git a/go/api.go b/go/api.go index 039a244..a5aa71c 100644 --- a/go/api.go +++ b/go/api.go @@ -6,15 +6,15 @@ import ( "context" "encoding/json" "fmt" + "github.com/gorilla/mux" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" "io" "net/http" "net/url" "strconv" "strings" - - "github.com/gorilla/mux" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" + "time" ) const ( @@ -56,13 +56,25 @@ type EventReceiver interface { Checkpoint(partitionID int, cursor string) error } +type Options struct { + PageSizeHint int + Wait time.Duration + Stream time.Duration + Headers []string +} + +func (o Options) AllHeaders() Options { + o.Headers = []string{All} + return o +} + // EventFetcher is a generic-based interface providing a contract for fetching events: both for the server side and // client side implementations. type EventFetcher interface { // FetchEvents method accepts array of Cursor's along with an optional page size hint and an EventReceiver. // Pass pageSizeHint = 0 for having the server choose a default / no hint. // Optional `headers` argument specifies headers to be returned, or none, if it's absent. - FetchEvents(ctx context.Context, cursors []Cursor, pageSizeHint int, receiver EventReceiver, headers ...string) error + FetchEvents(ctx context.Context, cursors []Cursor, receiver EventReceiver, options Options) error } // API is a generic-based interface that has to be implemented on a server side. @@ -79,17 +91,35 @@ type API interface { type NDJSONEventSerializer struct { encoder *json.Encoder writer io.Writer + flusher http.Flusher } -func NewNDJSONEventSerializer(writer io.Writer) *NDJSONEventSerializer { +type NoopFlusher struct{} + +func (n NoopFlusher) Flush() { +} + +func NewNDJSONEventSerializer(writer io.Writer, flush bool) *NDJSONEventSerializer { + var flusher http.Flusher + if flush { + flusher, _ = writer.(http.Flusher) + } + if flusher == nil { + flusher = NoopFlusher{} + } return &NDJSONEventSerializer{ encoder: json.NewEncoder(writer), writer: writer, + flusher: flusher, } } func (s NDJSONEventSerializer) writeNdJsonLine(item interface{}) error { - return s.encoder.Encode(item) + if err := s.encoder.Encode(item); err != nil { + return err + } + s.flusher.Flush() + return nil } func (s NDJSONEventSerializer) Checkpoint(partitionID int, cursor string) error { @@ -171,6 +201,7 @@ func HandlerWithoutRoute(api API, getLogger func(request *http.Request) logrus.F http.Error(writer, ErrHandshakePartitionCountMissing.Error(), ErrHandshakePartitionCountMissing.Status()) return } + if n, err := strconv.Atoi(query.Get("n")); err != nil { http.Error(writer, err.Error(), http.StatusBadRequest) return @@ -180,18 +211,45 @@ func HandlerWithoutRoute(api API, getLogger func(request *http.Request) logrus.F return } } - var pageSizeHint int + + var options Options if query.Has("pagesizehint") { if x, err := strconv.Atoi(query.Get("pagesizehint")); err != nil { http.Error(writer, err.Error(), http.StatusBadRequest) return } else { - pageSizeHint = x + options.PageSizeHint = x } } - var headers []string + + parseMilliseconds := func(key string) (time.Duration, error) { + if query.Has(key) { + intResult, err := strconv.Atoi(query.Get(key)) + if err != nil { + return 0, err + } + return time.Duration(intResult) * time.Millisecond, nil + } else { + return 0, nil + } + } + + var err error + + options.Wait, err = parseMilliseconds("wait") + if err != nil { + http.Error(writer, err.Error(), http.StatusBadRequest) + return + } + + options.Stream, err = parseMilliseconds("stream") + if err != nil { + http.Error(writer, err.Error(), http.StatusBadRequest) + return + } + if query.Has("headers") { - headers = strings.Split(strings.TrimSuffix(query.Get("headers"), ","), ",") + options.Headers = strings.Split(strings.TrimSuffix(query.Get("headers"), ","), ",") } cursors, err := parseCursors(api.GetPartitionCount(), query) if err != nil { @@ -202,11 +260,13 @@ func HandlerWithoutRoute(api API, getLogger func(request *http.Request) logrus.F WithField("event", api.GetName()). WithField("PartitionCount", api.GetPartitionCount()). WithField("Cursors", cursors). - WithField("PageSizeHint", pageSizeHint). - WithField("Headers", headers) + WithField("PageSizeHint", options.PageSizeHint). + WithField("Headers", options.Headers). + WithField("Wait", options.Wait.Milliseconds()). + WithField("Stream", options.Stream) fields.Info() - serializer := NewNDJSONEventSerializer(writer) - err = api.FetchEvents(request.Context(), cursors, pageSizeHint, serializer, headers...) + serializer := NewNDJSONEventSerializer(writer, options.Stream != 0) + err = api.FetchEvents(request.Context(), cursors, serializer, options) if err != nil { logger.WithField("event", api.GetName()+".fetch_events_error").WithError(err).Info() http.Error(writer, "Internal server error", http.StatusInternalServerError) @@ -307,7 +367,7 @@ type checkpointOrEvent struct { } // FetchEvents is a client-side implementation that queries the server and properly deserializes received data. -func (c Client) FetchEvents(ctx context.Context, cursors []Cursor, pageSizeHint int, r EventReceiver, headers ...string) error { +func (c Client) FetchEvents(ctx context.Context, cursors []Cursor, r EventReceiver, options Options) error { if len(cursors) == 0 { return ErrCursorsMissing } @@ -321,17 +381,25 @@ func (c Client) FetchEvents(ctx context.Context, cursors []Cursor, pageSizeHint q := req.URL.Query() q.Add("n", fmt.Sprintf("%d", c.partitionCount)) - if pageSizeHint != DefaultPageSize { - q.Add("pagesizehint", fmt.Sprintf("%d", pageSizeHint)) + if options.PageSizeHint != DefaultPageSize { + q.Add("pagesizehint", fmt.Sprintf("%d", options.PageSizeHint)) } for _, cursor := range cursors { q.Add(fmt.Sprintf("cursor%d", cursor.PartitionID), fmt.Sprintf("%s", cursor.Cursor)) } - if len(headers) != 0 { - q.Add("headers", strings.Join(headers, ",")) + if len(options.Headers) != 0 { + q.Add("headers", strings.Join(options.Headers, ",")) } req.URL.RawQuery = q.Encode() + if options.Stream != 0 { + q.Add("stream", fmt.Sprintf("%d", options.Stream.Milliseconds())) + } + + if options.Wait != 0 { + q.Add("wait", fmt.Sprintf("%d", options.Wait.Milliseconds())) + } + if err := c.requestProcessor(req); err != nil { return err } diff --git a/go/api_test.go b/go/api_test.go index 963035a..1d1c116 100644 --- a/go/api_test.go +++ b/go/api_test.go @@ -5,14 +5,13 @@ import ( "encoding/json" "errors" "fmt" + "github.com/stretchr/testify/assert" "io" "net/http" "net/http/httptest" "strconv" "testing" - "github.com/stretchr/testify/assert" - "github.com/gorilla/mux" "github.com/sirupsen/logrus" hookstest "github.com/sirupsen/logrus/hooks/test" @@ -74,9 +73,9 @@ func (t TestZeroEventHubAPI) GetPartitionCount() int { return 2 } -func (t TestZeroEventHubAPI) FetchEvents(ctx context.Context, cursors []Cursor, pageSizeHint int, r EventReceiver, headers ...string) error { - if pageSizeHint == DefaultPageSize { - pageSizeHint = 100 +func (t TestZeroEventHubAPI) FetchEvents(ctx context.Context, cursors []Cursor, r EventReceiver, options Options) error { + if options.PageSizeHint == DefaultPageSize { + options.PageSizeHint = 100 } for _, cursor := range cursors { partition, ok := t.partitions[cursor.PartitionID] @@ -103,7 +102,7 @@ func (t TestZeroEventHubAPI) FetchEvents(ctx context.Context, cursors []Cursor, } eventsProcessed := 0 h := make(map[string]string, 1) - for _, header := range headers { + for _, header := range options.Headers { if header == "content-type" { h["content-type"] = "application/json" break @@ -125,7 +124,7 @@ func (t TestZeroEventHubAPI) FetchEvents(ctx context.Context, cursors []Cursor, lastProcessedCursor = event.Cursor eventsProcessed++ } - if eventsProcessed == pageSizeHint { + if eventsProcessed == options.PageSizeHint { break } } @@ -139,7 +138,7 @@ func TestAPI(t *testing.T) { name string partitionCount int - pageSizeHint int + options Options cursors []Cursor expectedEvents int @@ -223,7 +222,7 @@ func TestAPI(t *testing.T) { { name: "pagesizehint 10000, full page", partitionCount: 2, - pageSizeHint: 10000, + options: Options{PageSizeHint: 10000}, cursors: []Cursor{{ PartitionID: 0, Cursor: FirstCursor, @@ -233,7 +232,7 @@ func TestAPI(t *testing.T) { { name: "pagesizehint 10000, half page", partitionCount: 2, - pageSizeHint: 10000, + options: Options{PageSizeHint: 10000}, cursors: []Cursor{{ PartitionID: 0, Cursor: "4999", @@ -245,7 +244,7 @@ func TestAPI(t *testing.T) { t.Run(test.name, func(t *testing.T) { var client EventFetcher = createZehClientWithPartitionCount(server, test.partitionCount) var page EventPageSingleType[TestEvent] - err := client.FetchEvents(context.Background(), test.cursors, test.pageSizeHint, &page) + err := client.FetchEvents(context.Background(), test.cursors, &page, test.options) if err == nil { require.Equal(t, test.expectedEvents, len(page.Events)) } else { @@ -270,7 +269,7 @@ func BenchmarkFeed(b *testing.B) { PartitionID: 1, Cursor: FirstCursor, }, - }, 1, &page) + }, &page, Options{}) require.NoError(b, err) } @@ -307,7 +306,7 @@ func TestJSON(t *testing.T) { PartitionID: 1, Cursor: "9998", }, - }, DefaultPageSize, &page) + }, &page, Options{}) require.NoError(t, err) require.Equal(t, `{"partition":0,"data":{"ID":"00000000-0000-0000-0000-00000000270f","Version":0,"Cursor":9999}} {"partition":0,"cursor":"9999"} @@ -351,7 +350,7 @@ func TestNewLines(t *testing.T) { PartitionID: 1, Cursor: "9999", }, - }, DefaultPageSize, &page1) + }, &page1, Options{}) require.NoError(t, err) require.Equal(t, []TypedEnvelope[TestEvent]{ @@ -383,7 +382,7 @@ func TestNewLines(t *testing.T) { PartitionID: 1, Cursor: "9999", }, - }, DefaultPageSize, &page2) + }, &page2, Options{}) require.NoError(t, err) require.Equal(t, page1, page2) } @@ -398,7 +397,7 @@ func TestRequestProcessor(t *testing.T) { return nil }) var page EventPageSingleType[TestEvent] - err := client.FetchEvents(context.Background(), []Cursor{{Cursor: LastCursor}}, DefaultPageSize, &page) + err := client.FetchEvents(context.Background(), []Cursor{{Cursor: LastCursor}}, &page, Options{}) require.NoError(t, err) require.NotNil(t, loggingRoundTripper.requestHeaders) require.Equal(t, "application/json", loggingRoundTripper.requestHeaders.Get("content-type")) @@ -408,20 +407,20 @@ func TestEnvelopeHeaders(t *testing.T) { server := httptest.NewServer(Handler("/feed/v1", nil, NewTestZeroEventHubAPI())) client := createZehClient(server) var page EventPageSingleType[TestEvent] - err := client.FetchEvents(context.Background(), []Cursor{{Cursor: LastCursor}}, DefaultPageSize, &page, "123") + err := client.FetchEvents(context.Background(), []Cursor{{Cursor: LastCursor}}, &page, Options{Headers: []string{"123"}}) require.NoError(t, err) require.Len(t, page.Events, 1) require.Len(t, page.Cursors, 1) require.Empty(t, page.Events[0].Headers) page = EventPageSingleType[TestEvent]{} - err = client.FetchEvents(context.Background(), []Cursor{{Cursor: LastCursor}}, DefaultPageSize, &page, "content-type") + err = client.FetchEvents(context.Background(), []Cursor{{Cursor: LastCursor}}, &page, Options{Headers: []string{"content-type"}}) require.NoError(t, err) require.Len(t, page.Events, 1) require.Len(t, page.Cursors, 1) require.Len(t, page.Events[0].Headers, 1) require.Equal(t, "application/json", page.Events[0].Headers["content-type"]) page = EventPageSingleType[TestEvent]{} - err = client.FetchEvents(context.Background(), []Cursor{{Cursor: LastCursor}}, DefaultPageSize, &page, All) + err = client.FetchEvents(context.Background(), []Cursor{{Cursor: LastCursor}}, &page, Options{}.AllHeaders()) require.NoError(t, err) require.Len(t, page.Events, 1) require.Len(t, page.Cursors, 1) @@ -454,8 +453,8 @@ func MockHandler(logger logrus.FieldLogger, api API) http.Handler { return } - serializer := NewNDJSONEventSerializer(writer) - err = api.FetchEvents(request.Context(), cursors, 10, serializer, All) + serializer := NewNDJSONEventSerializer(writer, false) + err = api.FetchEvents(request.Context(), cursors, serializer, Options{Headers: []string{All}, PageSizeHint: 10}) switch err { case err500: http.Error(writer, err.Error(), http.StatusInternalServerError) @@ -481,9 +480,9 @@ func TestMockResponses(t *testing.T) { client := createZehClient(server) var page EventPageSingleType[TestEvent] - err := client.FetchEvents(context.Background(), []Cursor{{Cursor: cursorReturn500}}, DefaultPageSize, &page, All) + err := client.FetchEvents(context.Background(), []Cursor{{Cursor: cursorReturn500}}, &page, Options{}.AllHeaders()) require.EqualError(t, err, "unexpected response body: error when fetching events\n") - err = client.FetchEvents(context.Background(), []Cursor{{Cursor: cursorReturn504}}, DefaultPageSize, &page, All) + err = client.FetchEvents(context.Background(), []Cursor{{Cursor: cursorReturn504}}, &page, Options{}.AllHeaders()) require.EqualError(t, err, "empty response body") // Checking logged entries From 3e115016497d0cccc51b763b47e3846625458757 Mon Sep 17 00:00:00 2001 From: Dag Sverre Seljebotn Date: Fri, 10 Mar 2023 08:23:11 +0100 Subject: [PATCH 2/2] SPEC.md: version 0.2 --- SPEC.md | 117 +++++++++++++++++++++++++++++++------------------------- 1 file changed, 65 insertions(+), 52 deletions(-) diff --git a/SPEC.md b/SPEC.md index ad795a4..73f7533 100644 --- a/SPEC.md +++ b/SPEC.md @@ -1,5 +1,15 @@ # ZeroEventHub spec +## Version + +Specification version: 0.2. + +ZeroEventHub has a limited number of servers and clients at the moment. + +If you are considering to use ZeroEventHub outside of Vipps MobilePay, +please let us know in a GitHub issue, and we will instantly freeze the standard +as 1.0 and handle future upgrade cases in a backwards-compatible manner, +or include a versioning mechanism in the protocol. ## Abstract @@ -124,23 +134,12 @@ Partitions are provided so that event processing can more easily be parallelized both at the Producer and the Consumer. In this example the publisher has documented that 4 partitions is available -- the client cannot change this, but has to document its assumption in the -`n` parameter. Also the client chooses to process 2 of these -parititions in a single request -- presumably there is another thread -processing the remaining 2 partitions in parallel. This method of -maintaining independent cursor allows the consumer flexibility in -advancing all cursors in parallel in one chain of calls, or to orchestrate -multiple chains of calls on different subset of partitions, and to split & -merge partitions at will (up to the limit supported by the producer). - -**WARNING**: If comparing with the same events published on Event Hub, -while the partition keys should be the same, the mapping of partition -keys to partitions will be different. Events will be "re-partitioned" -when published to Event Hub vs. the Event API. - +`n` parameter. ### Authorization & use of consumer identity -Standard service-to-service MSI, out of scope for this spec. +The authorization method is standard service-to-service HTTP; +out of scope for this spec. As part of the authorization the publisher will usually figure out a name/identity of the consumer. The publisher may use this to log access, @@ -176,19 +175,19 @@ or use a more complex setup to "re-partition" on the fly. ### Example request: ``` -GET https://myservice/my-kind-of-entity/feed/v1?cursor0=1000240213123&cursor1=1231231231242&pagesizehint=1000&n=4&headers=ce_tracestate,ce_id +GET https://myservice/my-kind-of-entity/feed?n=4&partition=0&cursor=1000240213123&pagesizehint=1000&headers=tracestate,id ``` - -### Parameters +### Parameters supported by all servers * **n**: Number of partitions the client assumes the server to have, in total. If there is mismatch, the server is free to either emulate the behaviour or return 400 Bad Request. -* **cursorN**: Pass in one cursor for each partition you wish to - consume; where `N` is a number in the range `0...n-1`. - Each `cursor` is an opaque string that should be passed +* **partition**: Which partition to read; an integer in the range `0..n-1`. + +* **cursor**: The cursor for the given partition; `n` is a number + in the range `0...n-1`. Each `cursor` is an opaque string that should be passed back as-is, but is limited to ASCII printable characters. Two special cursors are used; `_first` means to start from the beginning of time, and `_last` starts at an arbitrary point "around now". @@ -203,21 +202,23 @@ GET https://myservice/my-kind-of-entity/feed/v1?cursor0=1000240213123&cursor1=12 sensible default for the dataset. * **headers**: In event transports (such as Event Hub) the headers are - primarily of use to middlewares. With zeroeventhub the consumption is more + primarily of use to middlewares. With ZeroEventHub the consumption is more direct, and therefore headers of events are not returned by default, and the header parameter is used to request which headers one wants. The special value `_all` can be used to request all headers. The parameter is optional and its absence means that no headers will be returned. -See the example above for more detailed description of the interaction of -`n` and `cursorN`. +### Parameters optionally supported + +* **wait**: The number of milliseconds (as an integer) to wait for at least + one event in the response before returning (i.e., longpolling). + This allows clients to open an HTTP request and get back a response + the moment an event happens to minimize + latency for seldom-occuring events. -Consumers are encouraged to pass contiguous ranges of cursors and the -same number of cursors from every thread. I.e., pass -`cursor2=...&cursor3=...`, DO NOT pass `cursor2=...&cursor4=...` (not -contiguous) and DO NOT pass `cursor1=...&cursor2=...` (does not spread -evenly across threads). This recommentation in this spec makes the -pattern predictable in the case that producers can optimize for it. +* **stream**: The HTTP response will be held open for the given number of + milliseconds (as an integer). Events will be written and flushed continuously + as they happen. ### Response @@ -230,13 +231,12 @@ to the streaming format itself. #### Events -An event has the form `{"partition": ..., "headers": {...}, "data": { ... }}`, +An event has the form `{"headers": {...}, "data": { ... }}`, here is an example displayed with whitespace for clarity (newlines must not be present within each event on the wire): ``` { - "partition": 0, "headers": { "header1": "value1", "header2": "value2", @@ -249,22 +249,36 @@ here is an example displayed with whitespace for clarity ``` If `header` is empty -- as is the case when not requesting headers in the request -- -it can be non-present in the struct +it can be non-present in the struct. -#### Checkpoint +*Please note:* Currently all backends will also emit the key `partition` on every +line. This field will however be gone when all clients have upgraded to +the latest version of the spec. -A checkpoint has the form `{"partition": ..., "cursor": ...}`. The client can save this +#### Checkpoints and event ordering + +A checkpoint has the form `{"cursor": ...}`. The client can save this cursor value in order to start the stream at the same point. -Between checkpoints, events are unsorted and may arrive in a different order if the -same/similar request is done again. +In general, events transported over ZeroEventHub should follow the +*partitioned log* event communication model of Kafka, Event Hub, etc; +i.e., events should arrive in-order and should be processed in the +order they are received in the response. + +*However*, there is no guarantee that two requests +from a given `cursor` will produce an identical response each time. +This is because while from the perspective of the API we are consuming +a single partition, the server *could* be emulating this behaviour +from a larger set of real physical partitions, and it may be that +returning data from the underlying partitions happen in a non-deterministic +manner to minimize latency. What is important +is: + +* The client should consider events ordered in the order they arrive in +* Any ordering between events *that matters* should be reproducible + (typically, event ordering will be the same between calls for some + underlying physical partition of events) -Checkpoints are also allowed to come back differently if the same/similar request is done -again. This is because the cursor may really be a composite cursor on several internal -partitions in the service, and a checkpoint be emitted for an advance of any individual -internal partition. The requirement for checkpoints is simply that if a client -persisted all events *before* a checkpoint, and then passes the checkpoint cursor -in on the next call, then it will be able to properly follow the stream of the events. ### Recommendations @@ -314,16 +328,6 @@ processing you need to handle re-processing anyway! But, if there is a real need to simply store a state without reading more events you can always pass `store_cursor=1&pagesizehint=0`... -## Possible future extension: Long-polling - -An extra argument: - -* **wait=NNN** If there are no new events, wait for this many seconds - before returning. - -This allows clients to open an HTTP request and get back a response -the moment an event happens. - ## Future possibilities * The newline-delimited JSON works for other formats more typical for streaming @@ -338,3 +342,12 @@ the moment an event happens. * One could allow subscribing to subsets of the event feed. Such extensions may be standardized or just bolted on by each backend in a manner that fits the event data... + +## Changelog + +* 0.2: Each call can only consume a single partition; + moved from `cursor0=c` to `partition=0&cursor=c`, in order to + simplify the specification and implementations. Described optional + flags `wait` and `stream`. + +* 0.1: Initial release.