From 2c3bc0689480b12d2f6bb06590d022838578f0a8 Mon Sep 17 00:00:00 2001 From: "tom.russello@altirnao.com" Date: Mon, 5 Feb 2024 10:51:29 +0100 Subject: [PATCH 1/2] fix: data is not appended with committed streams --- server/storage_handler.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/storage_handler.go b/server/storage_handler.go index 1503b10ac..88ed3e87d 100644 --- a/server/storage_handler.go +++ b/server/storage_handler.go @@ -517,6 +517,10 @@ func (s *storageWriteServer) appendRows(req *storagepb.AppendRowsRequest, msgDes s.sendErrorMessage(stream, streamName, err) return err } + if err := tx.Commit(); err != nil { + s.sendErrorMessage(stream, streamName, err) + return err + } } else { status.rows = append(status.rows, data...) } From 24420b3035fac8c2b90547033f0fd83dd0233b5b Mon Sep 17 00:00:00 2001 From: Takafumi Hirata Date: Mon, 5 Feb 2024 22:25:31 +0900 Subject: [PATCH 2/2] Added test for Committed Stream --- server/storage_test.go | 368 +++++++++++++++++++++++------------------ 1 file changed, 210 insertions(+), 158 deletions(-) diff --git a/server/storage_test.go b/server/storage_test.go index 45d6dc01b..0187d3db9 100644 --- a/server/storage_test.go +++ b/server/storage_test.go @@ -392,179 +392,233 @@ func validateDatum(t *testing.T, d interface{}) { } func TestStorageWrite(t *testing.T) { - const ( - projectID = "test" - datasetID = "test" - tableID = "sample" - ) - - ctx := context.Background() - bqServer, err := server.New(server.TempStorage) - if err != nil { - t.Fatal(err) - } - if err := bqServer.Load( - server.StructSource( - types.NewProject( - projectID, - types.NewDataset( - datasetID, - types.NewTable( - tableID, - []*types.Column{ - types.NewColumn("bool_col", types.BOOL), - types.NewColumn("bytes_col", types.BYTES), - types.NewColumn("float64_col", types.FLOAT64), - types.NewColumn("int64_col", types.INT64), - types.NewColumn("string_col", types.STRING), - types.NewColumn("date_col", types.DATE), - types.NewColumn("datetime_col", types.DATETIME), - types.NewColumn("geography_col", types.GEOGRAPHY), - types.NewColumn("numeric_col", types.NUMERIC), - types.NewColumn("bignumeric_col", types.BIGNUMERIC), - types.NewColumn("time_col", types.TIME), - types.NewColumn("timestamp_col", types.TIMESTAMP), - types.NewColumn("int64_list", types.INT64, types.ColumnMode(types.RepeatedMode)), - types.NewColumn( - "struct_col", - types.STRUCT, - types.ColumnFields( - types.NewColumn("sub_int_col", types.INT64), + for _, test := range []struct { + name string + streamType storagepb.WriteStream_Type + expectedRowsAfterFirstWrite int + expectedRowsAfterSecondWrite int + expectedRowsAfterThirdWrite int + expectedRowsAfterExplicitCommit int + }{ + { + name: "pending", + streamType: storagepb.WriteStream_PENDING, + expectedRowsAfterFirstWrite: 0, + expectedRowsAfterSecondWrite: 0, + expectedRowsAfterThirdWrite: 0, + expectedRowsAfterExplicitCommit: 6, + }, + { + name: "committed", + streamType: storagepb.WriteStream_COMMITTED, + expectedRowsAfterFirstWrite: 1, + expectedRowsAfterSecondWrite: 4, + expectedRowsAfterThirdWrite: 6, + expectedRowsAfterExplicitCommit: 6, + }, + } { + const ( + projectID = "test" + datasetID = "test" + tableID = "sample" + ) + + ctx := context.Background() + bqServer, err := server.New(server.TempStorage) + if err != nil { + t.Fatal(err) + } + if err := bqServer.Load( + server.StructSource( + types.NewProject( + projectID, + types.NewDataset( + datasetID, + types.NewTable( + tableID, + []*types.Column{ + types.NewColumn("bool_col", types.BOOL), + types.NewColumn("bytes_col", types.BYTES), + types.NewColumn("float64_col", types.FLOAT64), + types.NewColumn("int64_col", types.INT64), + types.NewColumn("string_col", types.STRING), + types.NewColumn("date_col", types.DATE), + types.NewColumn("datetime_col", types.DATETIME), + types.NewColumn("geography_col", types.GEOGRAPHY), + types.NewColumn("numeric_col", types.NUMERIC), + types.NewColumn("bignumeric_col", types.BIGNUMERIC), + types.NewColumn("time_col", types.TIME), + types.NewColumn("timestamp_col", types.TIMESTAMP), + types.NewColumn("int64_list", types.INT64, types.ColumnMode(types.RepeatedMode)), + types.NewColumn( + "struct_col", + types.STRUCT, + types.ColumnFields( + types.NewColumn("sub_int_col", types.INT64), + ), ), - ), - types.NewColumn( - "struct_list", - types.STRUCT, - types.ColumnFields( - types.NewColumn("sub_int_col", types.INT64), + types.NewColumn( + "struct_list", + types.STRUCT, + types.ColumnFields( + types.NewColumn("sub_int_col", types.INT64), + ), + types.ColumnMode(types.RepeatedMode), ), - types.ColumnMode(types.RepeatedMode), - ), - }, - nil, + }, + nil, + ), ), ), ), - ), - ); err != nil { - t.Fatal(err) - } - testServer := bqServer.TestServer() - defer func() { - testServer.Close() - bqServer.Close() - }() - opts, err := testServer.GRPCClientOptions(ctx) - if err != nil { - t.Fatal(err) - } + ); err != nil { + t.Fatal(err) + } + testServer := bqServer.TestServer() + defer func() { + testServer.Close() + bqServer.Close() + }() + opts, err := testServer.GRPCClientOptions(ctx) + if err != nil { + t.Fatal(err) + } - client, err := managedwriter.NewClient(ctx, projectID, opts...) - if err != nil { - t.Fatal(err) - } - defer client.Close() + client, err := managedwriter.NewClient(ctx, projectID, opts...) + if err != nil { + t.Fatal(err) + } + defer client.Close() + t.Run(test.name, func(t *testing.T) { + writeStream, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{ + Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", projectID, datasetID, tableID), + WriteStream: &storagepb.WriteStream{ + Type: test.streamType, + }, + }) + if err != nil { + t.Fatalf("CreateWriteStream: %v", err) + } + m := &exampleproto.SampleData{} + descriptorProto, err := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor()) + if err != nil { + t.Fatalf("NormalizeDescriptor: %v", err) + } + managedStream, err := client.NewManagedStream( + ctx, + managedwriter.WithStreamName(writeStream.GetName()), + managedwriter.WithSchemaDescriptor(descriptorProto), + ) + if err != nil { + t.Fatalf("NewManagedStream: %v", err) + } - pendingStream, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{ - Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", projectID, datasetID, tableID), - WriteStream: &storagepb.WriteStream{ - Type: storagepb.WriteStream_PENDING, - }, - }) - if err != nil { - t.Fatalf("CreateWriteStream: %v", err) - } - m := &exampleproto.SampleData{} - descriptorProto, err := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor()) - if err != nil { - t.Fatalf("NormalizeDescriptor: %v", err) - } - managedStream, err := client.NewManagedStream( - ctx, - managedwriter.WithStreamName(pendingStream.GetName()), - managedwriter.WithSchemaDescriptor(descriptorProto), - ) - if err != nil { - t.Fatalf("NewManagedStream: %v", err) - } - rows, err := generateExampleMessages(1) - if err != nil { - t.Fatalf("generateExampleMessages: %v", err) - } + bqClient, err := bigquery.NewClient( + ctx, + projectID, + option.WithEndpoint(testServer.URL), + option.WithoutAuthentication(), + ) + if err != nil { + t.Fatal(err) + } + defer bqClient.Close() - var ( - curOffset int64 - results []*managedwriter.AppendResult - ) - result, err := managedStream.AppendRows(ctx, rows, managedwriter.WithOffset(0)) - if err != nil { - t.Fatalf("AppendRows first call error: %v", err) - } + rows, err := generateExampleMessages(1) + if err != nil { + t.Fatalf("generateExampleMessages: %v", err) + } - results = append(results, result) - curOffset = curOffset + 1 - rows, err = generateExampleMessages(3) - if err != nil { - t.Fatalf("generateExampleMessages: %v", err) - } - result, err = managedStream.AppendRows(ctx, rows, managedwriter.WithOffset(curOffset)) - if err != nil { - t.Fatalf("AppendRows second call error: %v", err) - } - results = append(results, result) - curOffset = curOffset + 3 - rows, err = generateExampleMessages(2) - if err != nil { - t.Fatalf("generateExampleMessages: %v", err) - } - result, err = managedStream.AppendRows(ctx, rows, managedwriter.WithOffset(curOffset)) - if err != nil { - t.Fatalf("AppendRows third call error: %v", err) - } - results = append(results, result) + var ( + curOffset int64 + results []*managedwriter.AppendResult + ) + result, err := managedStream.AppendRows(ctx, rows, managedwriter.WithOffset(0)) + if err != nil { + t.Fatalf("AppendRows first call error: %v", err) + } - for k, v := range results { - recvOffset, err := v.GetResult(ctx) - if err != nil { - t.Fatalf("append %d returned error: %v", k, err) - } - t.Logf("Successfully appended data at offset %d", recvOffset) - } + iter := bqClient.Dataset(datasetID).Table(tableID).Read(ctx) + resultRowCount := countRows(t, iter) + if resultRowCount != test.expectedRowsAfterFirstWrite { + t.Fatalf("expected the number of rows after first AppendRows %d but got %d", test.expectedRowsAfterFirstWrite, resultRowCount) + } - rowCount, err := managedStream.Finalize(ctx) - if err != nil { - t.Fatalf("error during Finalize: %v", err) - } + results = append(results, result) + curOffset = curOffset + 1 + rows, err = generateExampleMessages(3) + if err != nil { + t.Fatalf("generateExampleMessages: %v", err) + } + result, err = managedStream.AppendRows(ctx, rows, managedwriter.WithOffset(curOffset)) + if err != nil { + t.Fatalf("AppendRows second call error: %v", err) + } - t.Logf("Stream %s finalized with %d rows", managedStream.StreamName(), rowCount) + iter = bqClient.Dataset(datasetID).Table(tableID).Read(ctx) + resultRowCount = countRows(t, iter) + if resultRowCount != test.expectedRowsAfterSecondWrite { + t.Fatalf("expected the number of rows after second AppendRows %d but got %d", test.expectedRowsAfterSecondWrite, resultRowCount) + } - req := &storagepb.BatchCommitWriteStreamsRequest{ - Parent: managedwriter.TableParentFromStreamName(managedStream.StreamName()), - WriteStreams: []string{managedStream.StreamName()}, - } + results = append(results, result) + curOffset = curOffset + 3 + rows, err = generateExampleMessages(2) + if err != nil { + t.Fatalf("generateExampleMessages: %v", err) + } + result, err = managedStream.AppendRows(ctx, rows, managedwriter.WithOffset(curOffset)) + if err != nil { + t.Fatalf("AppendRows third call error: %v", err) + } + results = append(results, result) - resp, err := client.BatchCommitWriteStreams(ctx, req) - if err != nil { - t.Fatalf("client.BatchCommit: %v", err) - } - if len(resp.GetStreamErrors()) > 0 { - t.Fatalf("stream errors present: %v", resp.GetStreamErrors()) - } + for k, v := range results { + recvOffset, err := v.GetResult(ctx) + if err != nil { + t.Fatalf("append %d returned error: %v", k, err) + } + t.Logf("Successfully appended data at offset %d", recvOffset) + } - t.Logf("Table data committed at %s", resp.GetCommitTime().AsTime().Format(time.RFC3339Nano)) + iter = bqClient.Dataset(datasetID).Table(tableID).Read(ctx) + resultRowCount = countRows(t, iter) + if resultRowCount != test.expectedRowsAfterThirdWrite { + t.Fatalf("expected the number of rows after third AppendRows %d but got %d", test.expectedRowsAfterThirdWrite, resultRowCount) + } - bqClient, err := bigquery.NewClient( - ctx, - projectID, - option.WithEndpoint(testServer.URL), - option.WithoutAuthentication(), - ) - if err != nil { - t.Fatal(err) + rowCount, err := managedStream.Finalize(ctx) + if err != nil { + t.Fatalf("error during Finalize: %v", err) + } + + t.Logf("Stream %s finalized with %d rows", managedStream.StreamName(), rowCount) + + req := &storagepb.BatchCommitWriteStreamsRequest{ + Parent: managedwriter.TableParentFromStreamName(managedStream.StreamName()), + WriteStreams: []string{managedStream.StreamName()}, + } + + resp, err := client.BatchCommitWriteStreams(ctx, req) + if err != nil { + t.Fatalf("client.BatchCommit: %v", err) + } + if len(resp.GetStreamErrors()) > 0 { + t.Fatalf("stream errors present: %v", resp.GetStreamErrors()) + } + + iter = bqClient.Dataset(datasetID).Table(tableID).Read(ctx) + resultRowCount = countRows(t, iter) + if resultRowCount != test.expectedRowsAfterExplicitCommit { + t.Fatalf("expected the number of rows after Finalize %d but got %d", test.expectedRowsAfterExplicitCommit, resultRowCount) + } + + t.Logf("Table data committed at %s", resp.GetCommitTime().AsTime().Format(time.RFC3339Nano)) + }) } - defer bqClient.Close() +} - iter := bqClient.Dataset(datasetID).Table(tableID).Read(ctx) +func countRows(t *testing.T, iter *bigquery.RowIterator) int { var resultRowCount int for { v := map[string]bigquery.Value{} @@ -576,9 +630,7 @@ func TestStorageWrite(t *testing.T) { } resultRowCount++ } - if resultRowCount != 6 { - t.Fatalf("failed to get table rows. expected 6 but got %d", resultRowCount) - } + return resultRowCount } func generateExampleMessages(numMessages int) ([][]byte, error) {