Skip to content

Commit

Permalink
apacheGH-39456: [Go][Parquet] Arrow DATE64 Type Coerced to Parquet DA…
Browse files Browse the repository at this point in the history
…TE Logical Type (apache#39460)

### Rationale for this change

Closes: apache#39456 

### What changes are included in this PR?

Update physical and logical type mapping from Arrow to Parquet for DATE64 type

### Are these changes tested?

Yes,
- Update expected schema mapping in existing test
- Tests asserting new behavior
  - Arrow DATE64 will roundtrip -> Parquet -> Arrow as DATE32
  - Arrow DATE64 _not aligned_ to exact date boundary will truncate to milliseconds at boundary of greatest full day on Parquet roundtrip

### Are there any user-facing changes?

Yes, users of `pqarrow.FileWriter` will produce Parquet files containing `DATE` logical type instead of `TIMESTAMP[ms]` when writing Arrow data containing DATE64 field(s). The proposed implementation truncates `int64` values to be divisible by 86400000 rather than validating that this is already the case, as some implementations do. I am happy to add this validation if it would be preferred, but the truncating behavior will likely break fewer existing users.

I'm not sure whether this is technically considered a breaking change to a public API and if/how it should be communicated. Any direction regarding this would be appreciated.

* Closes: apache#39456

Authored-by: Joel Lubinitsky <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
joellubi authored and zanmato1984 committed Feb 28, 2024
1 parent 6455157 commit c33a3e3
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 3 deletions.
84 changes: 84 additions & 0 deletions go/parquet/pqarrow/encode_arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,52 @@ func makeDateTimeTypesTable(mem memory.Allocator, expected bool, addFieldMeta bo
return array.NewTable(arrsc, cols, int64(len(isValid)))
}

func makeDateTypeTable(mem memory.Allocator, expected bool, partialDays bool) arrow.Table {
const (
millisPerHour int64 = 1000 * 60 * 60
millisPerDay int64 = millisPerHour * 24
)
isValid := []bool{true, true, true, false, true, true}

var field arrow.Field
if expected {
field = arrow.Field{Name: "date", Type: arrow.FixedWidthTypes.Date32, Nullable: true}
} else {
field = arrow.Field{Name: "date", Type: arrow.FixedWidthTypes.Date64, Nullable: true}
}

field.Metadata = arrow.NewMetadata([]string{"PARQUET:field_id"}, []string{"1"})

arrsc := arrow.NewSchema([]arrow.Field{field}, nil)

d32Values := []arrow.Date32{1489269000, 1489270000, 1489271000, 1489272000, 1489272000, 1489273000}

d64Values := make([]arrow.Date64, len(d32Values))
for i := range d64Values {
// Calculate number of milliseconds at date boundary
d64Values[i] = arrow.Date64(int64(d32Values[i]) * millisPerDay)
if partialDays {
// Offset 1 or more hours past the date boundary
hoursIntoDay := int64(i) * millisPerHour
d64Values[i] += arrow.Date64(hoursIntoDay)
}
}

bldr := array.NewRecordBuilder(mem, arrsc)
defer bldr.Release()

if expected {
bldr.Field(0).(*array.Date32Builder).AppendValues(d32Values, isValid)
} else {
bldr.Field(0).(*array.Date64Builder).AppendValues(d64Values, isValid)
}

rec := bldr.NewRecord()
defer rec.Release()

return array.NewTableFromRecords(arrsc, []arrow.Record{rec})
}

func TestWriteArrowCols(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)
Expand Down Expand Up @@ -831,6 +877,44 @@ func (ps *ParquetIOTestSuite) TestDateTimeTypesWithInt96ReadWriteTable() {
}
}

func (ps *ParquetIOTestSuite) TestDate64ReadWriteTable() {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(ps.T(), 0)

date64InputTable := makeDateTypeTable(mem, false, false)
defer date64InputTable.Release()
buf := writeTableToBuffer(ps.T(), mem, date64InputTable, date64InputTable.NumRows(), pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
defer buf.Release()

reader := ps.createReader(mem, buf.Bytes())
roundTripOutputTable := ps.readTable(reader)
defer roundTripOutputTable.Release()

date32ExpectedOutputTable := makeDateTypeTable(mem, true, false)
defer date32ExpectedOutputTable.Release()

ps.Truef(array.TableEqual(date32ExpectedOutputTable, roundTripOutputTable), "expected table: %s\ngot table: %s", date32ExpectedOutputTable, roundTripOutputTable)
}

func (ps *ParquetIOTestSuite) TestDate64ReadWriteTableWithPartialDays() {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(ps.T(), 0)

date64InputTableNotAlignedToDateBoundary := makeDateTypeTable(mem, false, true)
defer date64InputTableNotAlignedToDateBoundary.Release()
buf := writeTableToBuffer(ps.T(), mem, date64InputTableNotAlignedToDateBoundary, date64InputTableNotAlignedToDateBoundary.NumRows(), pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
defer buf.Release()

reader := ps.createReader(mem, buf.Bytes())
roundTripOutputTable := ps.readTable(reader)
defer roundTripOutputTable.Release()

date32ExpectedOutputTable := makeDateTypeTable(mem, true, true)
defer date32ExpectedOutputTable.Release()

ps.Truef(array.TableEqual(date32ExpectedOutputTable, roundTripOutputTable), "expected table: %s\ngot table: %s", date32ExpectedOutputTable, roundTripOutputTable)
}

func (ps *ParquetIOTestSuite) TestLargeBinaryReadWriteTable() {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(ps.T(), 0)
Expand Down
4 changes: 2 additions & 2 deletions go/parquet/pqarrow/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,8 @@ func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties
typ = parquet.Types.Int32
logicalType = schema.DateLogicalType{}
case arrow.DATE64:
typ = parquet.Types.Int64
logicalType = schema.NewTimestampLogicalType(true, schema.TimeUnitMillis)
typ = parquet.Types.Int32
logicalType = schema.DateLogicalType{}
case arrow.TIMESTAMP:
typ, logicalType, err = getTimestampMeta(field.Type.(*arrow.TimestampType), props, arrprops)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/parquet/pqarrow/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func TestConvertArrowFlatPrimitives(t *testing.T) {
arrowFields = append(arrowFields, arrow.Field{Name: "date", Type: arrow.FixedWidthTypes.Date32, Nullable: false})

parquetFields = append(parquetFields, schema.Must(schema.NewPrimitiveNodeLogical("date64", parquet.Repetitions.Required,
schema.NewTimestampLogicalType(true, schema.TimeUnitMillis), parquet.Types.Int64, 0, -1)))
schema.DateLogicalType{}, parquet.Types.Int32, 0, -1)))
arrowFields = append(arrowFields, arrow.Field{Name: "date64", Type: arrow.FixedWidthTypes.Date64, Nullable: false})

parquetFields = append(parquetFields, schema.Must(schema.NewPrimitiveNodeLogical("time32", parquet.Repetitions.Required,
Expand Down

0 comments on commit c33a3e3

Please sign in to comment.