From f856cc4a07fec591dda83c9e4960d6312f8d6748 Mon Sep 17 00:00:00 2001 From: Joel Lubinitsky Date: Thu, 4 Jan 2024 19:35:08 -0500 Subject: [PATCH 1/2] [Go][Parquet] Arrow DATE64 type is coerced into Parquet TIMESTAMP[ms] logical type instead of DATE (32-bit) --- go/parquet/pqarrow/encode_arrow_test.go | 84 +++++++++++++++++++++++++ go/parquet/pqarrow/schema.go | 4 +- go/parquet/pqarrow/schema_test.go | 2 +- 3 files changed, 87 insertions(+), 3 deletions(-) diff --git a/go/parquet/pqarrow/encode_arrow_test.go b/go/parquet/pqarrow/encode_arrow_test.go index 95ea644dd8013..1027ec2d760d3 100644 --- a/go/parquet/pqarrow/encode_arrow_test.go +++ b/go/parquet/pqarrow/encode_arrow_test.go @@ -125,6 +125,52 @@ func makeDateTimeTypesTable(mem memory.Allocator, expected bool, addFieldMeta bo return array.NewTable(arrsc, cols, int64(len(isValid))) } +func makeDateTypeTable(mem memory.Allocator, expected bool, partialDays bool) arrow.Table { + const ( + millisPerHour int64 = 1000 * 60 * 60 + millisPerDay int64 = millisPerHour * 24 + ) + isValid := []bool{true, true, true, false, true, true} + + var field arrow.Field + if expected { + field = arrow.Field{Name: "date", Type: arrow.FixedWidthTypes.Date32, Nullable: true} + } else { + field = arrow.Field{Name: "date", Type: arrow.FixedWidthTypes.Date64, Nullable: true} + } + + field.Metadata = arrow.NewMetadata([]string{"PARQUET:field_id"}, []string{"1"}) + + arrsc := arrow.NewSchema([]arrow.Field{field}, nil) + + d32Values := []arrow.Date32{1489269000, 1489270000, 1489271000, 1489272000, 1489272000, 1489273000} + + d64Values := make([]arrow.Date64, len(d32Values)) + for i := range d64Values { + // Calculate number of milliseconds at date boundary + d64Values[i] = arrow.Date64(int64(d32Values[i]) * millisPerDay) + if partialDays { + // Offset 1 or more hours past the date boundary + hoursIntoDay := int64(i) * millisPerHour + d64Values[i] += arrow.Date64(hoursIntoDay) + } + } + + bldr := array.NewRecordBuilder(mem, arrsc) + defer bldr.Release() + + if expected { + bldr.Field(0).(*array.Date32Builder).AppendValues(d32Values, isValid) + } else { + bldr.Field(0).(*array.Date64Builder).AppendValues(d64Values, isValid) + } + + rec := bldr.NewRecord() + defer rec.Release() + + return array.NewTableFromRecords(arrsc, []arrow.Record{rec}) +} + func TestWriteArrowCols(t *testing.T) { mem := memory.NewCheckedAllocator(memory.DefaultAllocator) defer mem.AssertSize(t, 0) @@ -831,6 +877,44 @@ func (ps *ParquetIOTestSuite) TestDateTimeTypesWithInt96ReadWriteTable() { } } +func (ps *ParquetIOTestSuite) TestDate64ReadWriteTable() { + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(ps.T(), 0) + + toWrite := makeDateTypeTable(mem, false, false) + defer toWrite.Release() + buf := writeTableToBuffer(ps.T(), mem, toWrite, toWrite.NumRows(), pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))) + defer buf.Release() + + reader := ps.createReader(mem, buf.Bytes()) + tbl := ps.readTable(reader) + defer tbl.Release() + + expected := makeDateTypeTable(mem, true, false) + defer expected.Release() + + ps.Truef(array.TableEqual(expected, tbl), "expected table: %s\ngot table: %s", expected, tbl) +} + +func (ps *ParquetIOTestSuite) TestDate64ReadWriteTableWithPartialDays() { + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(ps.T(), 0) + + toWrite := makeDateTypeTable(mem, false, true) + defer toWrite.Release() + buf := writeTableToBuffer(ps.T(), mem, toWrite, toWrite.NumRows(), pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))) + defer buf.Release() + + reader := ps.createReader(mem, buf.Bytes()) + tbl := ps.readTable(reader) + defer tbl.Release() + + expected := makeDateTypeTable(mem, true, true) + defer expected.Release() + + ps.Truef(array.TableEqual(expected, tbl), "expected table: %s\ngot table: %s", expected, tbl) +} + func (ps *ParquetIOTestSuite) TestLargeBinaryReadWriteTable() { mem := memory.NewCheckedAllocator(memory.DefaultAllocator) defer mem.AssertSize(ps.T(), 0) diff --git a/go/parquet/pqarrow/schema.go b/go/parquet/pqarrow/schema.go index ccb3dc0350ae1..383d47fbaabed 100644 --- a/go/parquet/pqarrow/schema.go +++ b/go/parquet/pqarrow/schema.go @@ -326,8 +326,8 @@ func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties typ = parquet.Types.Int32 logicalType = schema.DateLogicalType{} case arrow.DATE64: - typ = parquet.Types.Int64 - logicalType = schema.NewTimestampLogicalType(true, schema.TimeUnitMillis) + typ = parquet.Types.Int32 + logicalType = schema.DateLogicalType{} case arrow.TIMESTAMP: typ, logicalType, err = getTimestampMeta(field.Type.(*arrow.TimestampType), props, arrprops) if err != nil { diff --git a/go/parquet/pqarrow/schema_test.go b/go/parquet/pqarrow/schema_test.go index bc33663414075..a3c2c7a4ff60c 100644 --- a/go/parquet/pqarrow/schema_test.go +++ b/go/parquet/pqarrow/schema_test.go @@ -187,7 +187,7 @@ func TestConvertArrowFlatPrimitives(t *testing.T) { arrowFields = append(arrowFields, arrow.Field{Name: "date", Type: arrow.FixedWidthTypes.Date32, Nullable: false}) parquetFields = append(parquetFields, schema.Must(schema.NewPrimitiveNodeLogical("date64", parquet.Repetitions.Required, - schema.NewTimestampLogicalType(true, schema.TimeUnitMillis), parquet.Types.Int64, 0, -1))) + schema.DateLogicalType{}, parquet.Types.Int32, 0, -1))) arrowFields = append(arrowFields, arrow.Field{Name: "date64", Type: arrow.FixedWidthTypes.Date64, Nullable: false}) parquetFields = append(parquetFields, schema.Must(schema.NewPrimitiveNodeLogical("time32", parquet.Repetitions.Required, From c1207dab610fa8efb7cc849c9436019fb9c793fd Mon Sep 17 00:00:00 2001 From: Joel Lubinitsky Date: Thu, 4 Jan 2024 20:02:00 -0500 Subject: [PATCH 2/2] Clearer variable names in test --- go/parquet/pqarrow/encode_arrow_test.go | 32 ++++++++++++------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/go/parquet/pqarrow/encode_arrow_test.go b/go/parquet/pqarrow/encode_arrow_test.go index 1027ec2d760d3..4c900b6e50a7d 100644 --- a/go/parquet/pqarrow/encode_arrow_test.go +++ b/go/parquet/pqarrow/encode_arrow_test.go @@ -881,38 +881,38 @@ func (ps *ParquetIOTestSuite) TestDate64ReadWriteTable() { mem := memory.NewCheckedAllocator(memory.DefaultAllocator) defer mem.AssertSize(ps.T(), 0) - toWrite := makeDateTypeTable(mem, false, false) - defer toWrite.Release() - buf := writeTableToBuffer(ps.T(), mem, toWrite, toWrite.NumRows(), pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))) + date64InputTable := makeDateTypeTable(mem, false, false) + defer date64InputTable.Release() + buf := writeTableToBuffer(ps.T(), mem, date64InputTable, date64InputTable.NumRows(), pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))) defer buf.Release() reader := ps.createReader(mem, buf.Bytes()) - tbl := ps.readTable(reader) - defer tbl.Release() + roundTripOutputTable := ps.readTable(reader) + defer roundTripOutputTable.Release() - expected := makeDateTypeTable(mem, true, false) - defer expected.Release() + date32ExpectedOutputTable := makeDateTypeTable(mem, true, false) + defer date32ExpectedOutputTable.Release() - ps.Truef(array.TableEqual(expected, tbl), "expected table: %s\ngot table: %s", expected, tbl) + ps.Truef(array.TableEqual(date32ExpectedOutputTable, roundTripOutputTable), "expected table: %s\ngot table: %s", date32ExpectedOutputTable, roundTripOutputTable) } func (ps *ParquetIOTestSuite) TestDate64ReadWriteTableWithPartialDays() { mem := memory.NewCheckedAllocator(memory.DefaultAllocator) defer mem.AssertSize(ps.T(), 0) - toWrite := makeDateTypeTable(mem, false, true) - defer toWrite.Release() - buf := writeTableToBuffer(ps.T(), mem, toWrite, toWrite.NumRows(), pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))) + date64InputTableNotAlignedToDateBoundary := makeDateTypeTable(mem, false, true) + defer date64InputTableNotAlignedToDateBoundary.Release() + buf := writeTableToBuffer(ps.T(), mem, date64InputTableNotAlignedToDateBoundary, date64InputTableNotAlignedToDateBoundary.NumRows(), pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))) defer buf.Release() reader := ps.createReader(mem, buf.Bytes()) - tbl := ps.readTable(reader) - defer tbl.Release() + roundTripOutputTable := ps.readTable(reader) + defer roundTripOutputTable.Release() - expected := makeDateTypeTable(mem, true, true) - defer expected.Release() + date32ExpectedOutputTable := makeDateTypeTable(mem, true, true) + defer date32ExpectedOutputTable.Release() - ps.Truef(array.TableEqual(expected, tbl), "expected table: %s\ngot table: %s", expected, tbl) + ps.Truef(array.TableEqual(date32ExpectedOutputTable, roundTripOutputTable), "expected table: %s\ngot table: %s", date32ExpectedOutputTable, roundTripOutputTable) } func (ps *ParquetIOTestSuite) TestLargeBinaryReadWriteTable() {