Skip to content

Commit

Permalink
Merge pull request #1 from totem3/data-is-not-appended-with-committed…
Browse files Browse the repository at this point in the history
…-streams

Added test for Committed Stream
  • Loading branch information
tomrussello authored Feb 6, 2024
2 parents 2c3bc06 + 24420b3 commit 1fa309d
Showing 1 changed file with 210 additions and 158 deletions.
368 changes: 210 additions & 158 deletions server/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,179 +392,233 @@ func validateDatum(t *testing.T, d interface{}) {
}

func TestStorageWrite(t *testing.T) {
const (
projectID = "test"
datasetID = "test"
tableID = "sample"
)

ctx := context.Background()
bqServer, err := server.New(server.TempStorage)
if err != nil {
t.Fatal(err)
}
if err := bqServer.Load(
server.StructSource(
types.NewProject(
projectID,
types.NewDataset(
datasetID,
types.NewTable(
tableID,
[]*types.Column{
types.NewColumn("bool_col", types.BOOL),
types.NewColumn("bytes_col", types.BYTES),
types.NewColumn("float64_col", types.FLOAT64),
types.NewColumn("int64_col", types.INT64),
types.NewColumn("string_col", types.STRING),
types.NewColumn("date_col", types.DATE),
types.NewColumn("datetime_col", types.DATETIME),
types.NewColumn("geography_col", types.GEOGRAPHY),
types.NewColumn("numeric_col", types.NUMERIC),
types.NewColumn("bignumeric_col", types.BIGNUMERIC),
types.NewColumn("time_col", types.TIME),
types.NewColumn("timestamp_col", types.TIMESTAMP),
types.NewColumn("int64_list", types.INT64, types.ColumnMode(types.RepeatedMode)),
types.NewColumn(
"struct_col",
types.STRUCT,
types.ColumnFields(
types.NewColumn("sub_int_col", types.INT64),
for _, test := range []struct {
name string
streamType storagepb.WriteStream_Type
expectedRowsAfterFirstWrite int
expectedRowsAfterSecondWrite int
expectedRowsAfterThirdWrite int
expectedRowsAfterExplicitCommit int
}{
{
name: "pending",
streamType: storagepb.WriteStream_PENDING,
expectedRowsAfterFirstWrite: 0,
expectedRowsAfterSecondWrite: 0,
expectedRowsAfterThirdWrite: 0,
expectedRowsAfterExplicitCommit: 6,
},
{
name: "committed",
streamType: storagepb.WriteStream_COMMITTED,
expectedRowsAfterFirstWrite: 1,
expectedRowsAfterSecondWrite: 4,
expectedRowsAfterThirdWrite: 6,
expectedRowsAfterExplicitCommit: 6,
},
} {
const (
projectID = "test"
datasetID = "test"
tableID = "sample"
)

ctx := context.Background()
bqServer, err := server.New(server.TempStorage)
if err != nil {
t.Fatal(err)
}
if err := bqServer.Load(
server.StructSource(
types.NewProject(
projectID,
types.NewDataset(
datasetID,
types.NewTable(
tableID,
[]*types.Column{
types.NewColumn("bool_col", types.BOOL),
types.NewColumn("bytes_col", types.BYTES),
types.NewColumn("float64_col", types.FLOAT64),
types.NewColumn("int64_col", types.INT64),
types.NewColumn("string_col", types.STRING),
types.NewColumn("date_col", types.DATE),
types.NewColumn("datetime_col", types.DATETIME),
types.NewColumn("geography_col", types.GEOGRAPHY),
types.NewColumn("numeric_col", types.NUMERIC),
types.NewColumn("bignumeric_col", types.BIGNUMERIC),
types.NewColumn("time_col", types.TIME),
types.NewColumn("timestamp_col", types.TIMESTAMP),
types.NewColumn("int64_list", types.INT64, types.ColumnMode(types.RepeatedMode)),
types.NewColumn(
"struct_col",
types.STRUCT,
types.ColumnFields(
types.NewColumn("sub_int_col", types.INT64),
),
),
),
types.NewColumn(
"struct_list",
types.STRUCT,
types.ColumnFields(
types.NewColumn("sub_int_col", types.INT64),
types.NewColumn(
"struct_list",
types.STRUCT,
types.ColumnFields(
types.NewColumn("sub_int_col", types.INT64),
),
types.ColumnMode(types.RepeatedMode),
),
types.ColumnMode(types.RepeatedMode),
),
},
nil,
},
nil,
),
),
),
),
),
); err != nil {
t.Fatal(err)
}
testServer := bqServer.TestServer()
defer func() {
testServer.Close()
bqServer.Close()
}()
opts, err := testServer.GRPCClientOptions(ctx)
if err != nil {
t.Fatal(err)
}
); err != nil {
t.Fatal(err)
}
testServer := bqServer.TestServer()
defer func() {
testServer.Close()
bqServer.Close()
}()
opts, err := testServer.GRPCClientOptions(ctx)
if err != nil {
t.Fatal(err)
}

client, err := managedwriter.NewClient(ctx, projectID, opts...)
if err != nil {
t.Fatal(err)
}
defer client.Close()
client, err := managedwriter.NewClient(ctx, projectID, opts...)
if err != nil {
t.Fatal(err)
}
defer client.Close()
t.Run(test.name, func(t *testing.T) {
writeStream, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", projectID, datasetID, tableID),
WriteStream: &storagepb.WriteStream{
Type: test.streamType,
},
})
if err != nil {
t.Fatalf("CreateWriteStream: %v", err)
}
m := &exampleproto.SampleData{}
descriptorProto, err := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor())
if err != nil {
t.Fatalf("NormalizeDescriptor: %v", err)
}
managedStream, err := client.NewManagedStream(
ctx,
managedwriter.WithStreamName(writeStream.GetName()),
managedwriter.WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}

pendingStream, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", projectID, datasetID, tableID),
WriteStream: &storagepb.WriteStream{
Type: storagepb.WriteStream_PENDING,
},
})
if err != nil {
t.Fatalf("CreateWriteStream: %v", err)
}
m := &exampleproto.SampleData{}
descriptorProto, err := adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor())
if err != nil {
t.Fatalf("NormalizeDescriptor: %v", err)
}
managedStream, err := client.NewManagedStream(
ctx,
managedwriter.WithStreamName(pendingStream.GetName()),
managedwriter.WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
rows, err := generateExampleMessages(1)
if err != nil {
t.Fatalf("generateExampleMessages: %v", err)
}
bqClient, err := bigquery.NewClient(
ctx,
projectID,
option.WithEndpoint(testServer.URL),
option.WithoutAuthentication(),
)
if err != nil {
t.Fatal(err)
}
defer bqClient.Close()

var (
curOffset int64
results []*managedwriter.AppendResult
)
result, err := managedStream.AppendRows(ctx, rows, managedwriter.WithOffset(0))
if err != nil {
t.Fatalf("AppendRows first call error: %v", err)
}
rows, err := generateExampleMessages(1)
if err != nil {
t.Fatalf("generateExampleMessages: %v", err)
}

results = append(results, result)
curOffset = curOffset + 1
rows, err = generateExampleMessages(3)
if err != nil {
t.Fatalf("generateExampleMessages: %v", err)
}
result, err = managedStream.AppendRows(ctx, rows, managedwriter.WithOffset(curOffset))
if err != nil {
t.Fatalf("AppendRows second call error: %v", err)
}
results = append(results, result)
curOffset = curOffset + 3
rows, err = generateExampleMessages(2)
if err != nil {
t.Fatalf("generateExampleMessages: %v", err)
}
result, err = managedStream.AppendRows(ctx, rows, managedwriter.WithOffset(curOffset))
if err != nil {
t.Fatalf("AppendRows third call error: %v", err)
}
results = append(results, result)
var (
curOffset int64
results []*managedwriter.AppendResult
)
result, err := managedStream.AppendRows(ctx, rows, managedwriter.WithOffset(0))
if err != nil {
t.Fatalf("AppendRows first call error: %v", err)
}

for k, v := range results {
recvOffset, err := v.GetResult(ctx)
if err != nil {
t.Fatalf("append %d returned error: %v", k, err)
}
t.Logf("Successfully appended data at offset %d", recvOffset)
}
iter := bqClient.Dataset(datasetID).Table(tableID).Read(ctx)
resultRowCount := countRows(t, iter)
if resultRowCount != test.expectedRowsAfterFirstWrite {
t.Fatalf("expected the number of rows after first AppendRows %d but got %d", test.expectedRowsAfterFirstWrite, resultRowCount)
}

rowCount, err := managedStream.Finalize(ctx)
if err != nil {
t.Fatalf("error during Finalize: %v", err)
}
results = append(results, result)
curOffset = curOffset + 1
rows, err = generateExampleMessages(3)
if err != nil {
t.Fatalf("generateExampleMessages: %v", err)
}
result, err = managedStream.AppendRows(ctx, rows, managedwriter.WithOffset(curOffset))
if err != nil {
t.Fatalf("AppendRows second call error: %v", err)
}

t.Logf("Stream %s finalized with %d rows", managedStream.StreamName(), rowCount)
iter = bqClient.Dataset(datasetID).Table(tableID).Read(ctx)
resultRowCount = countRows(t, iter)
if resultRowCount != test.expectedRowsAfterSecondWrite {
t.Fatalf("expected the number of rows after second AppendRows %d but got %d", test.expectedRowsAfterSecondWrite, resultRowCount)
}

req := &storagepb.BatchCommitWriteStreamsRequest{
Parent: managedwriter.TableParentFromStreamName(managedStream.StreamName()),
WriteStreams: []string{managedStream.StreamName()},
}
results = append(results, result)
curOffset = curOffset + 3
rows, err = generateExampleMessages(2)
if err != nil {
t.Fatalf("generateExampleMessages: %v", err)
}
result, err = managedStream.AppendRows(ctx, rows, managedwriter.WithOffset(curOffset))
if err != nil {
t.Fatalf("AppendRows third call error: %v", err)
}
results = append(results, result)

resp, err := client.BatchCommitWriteStreams(ctx, req)
if err != nil {
t.Fatalf("client.BatchCommit: %v", err)
}
if len(resp.GetStreamErrors()) > 0 {
t.Fatalf("stream errors present: %v", resp.GetStreamErrors())
}
for k, v := range results {
recvOffset, err := v.GetResult(ctx)
if err != nil {
t.Fatalf("append %d returned error: %v", k, err)
}
t.Logf("Successfully appended data at offset %d", recvOffset)
}

t.Logf("Table data committed at %s", resp.GetCommitTime().AsTime().Format(time.RFC3339Nano))
iter = bqClient.Dataset(datasetID).Table(tableID).Read(ctx)
resultRowCount = countRows(t, iter)
if resultRowCount != test.expectedRowsAfterThirdWrite {
t.Fatalf("expected the number of rows after third AppendRows %d but got %d", test.expectedRowsAfterThirdWrite, resultRowCount)
}

bqClient, err := bigquery.NewClient(
ctx,
projectID,
option.WithEndpoint(testServer.URL),
option.WithoutAuthentication(),
)
if err != nil {
t.Fatal(err)
rowCount, err := managedStream.Finalize(ctx)
if err != nil {
t.Fatalf("error during Finalize: %v", err)
}

t.Logf("Stream %s finalized with %d rows", managedStream.StreamName(), rowCount)

req := &storagepb.BatchCommitWriteStreamsRequest{
Parent: managedwriter.TableParentFromStreamName(managedStream.StreamName()),
WriteStreams: []string{managedStream.StreamName()},
}

resp, err := client.BatchCommitWriteStreams(ctx, req)
if err != nil {
t.Fatalf("client.BatchCommit: %v", err)
}
if len(resp.GetStreamErrors()) > 0 {
t.Fatalf("stream errors present: %v", resp.GetStreamErrors())
}

iter = bqClient.Dataset(datasetID).Table(tableID).Read(ctx)
resultRowCount = countRows(t, iter)
if resultRowCount != test.expectedRowsAfterExplicitCommit {
t.Fatalf("expected the number of rows after Finalize %d but got %d", test.expectedRowsAfterExplicitCommit, resultRowCount)
}

t.Logf("Table data committed at %s", resp.GetCommitTime().AsTime().Format(time.RFC3339Nano))
})
}
defer bqClient.Close()
}

iter := bqClient.Dataset(datasetID).Table(tableID).Read(ctx)
func countRows(t *testing.T, iter *bigquery.RowIterator) int {
var resultRowCount int
for {
v := map[string]bigquery.Value{}
Expand All @@ -576,9 +630,7 @@ func TestStorageWrite(t *testing.T) {
}
resultRowCount++
}
if resultRowCount != 6 {
t.Fatalf("failed to get table rows. expected 6 but got %d", resultRowCount)
}
return resultRowCount
}

func generateExampleMessages(numMessages int) ([][]byte, error) {
Expand Down

0 comments on commit 1fa309d

Please sign in to comment.