From f0b7413d28f6aeb4b5d4478e202965ff604d0566 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 26 Sep 2023 18:13:39 +0200 Subject: [PATCH] WIP --- ci/scripts/integration_arrow.sh | 9 +- csharp/src/Apache.Arrow/ArrowBuffer.cs | 3 +- csharp/src/Apache.Arrow/C/CArrowArray.cs | 15 +- .../src/Apache.Arrow/C/CArrowArrayExporter.cs | 52 +- .../Apache.Arrow/C/CArrowSchemaImporter.cs | 9 +- .../Memory/ExportedAllocationOwner.cs | 20 + .../Apache.Arrow.IntegrationTest.csproj | 3 +- .../CDataInterface.cs | 79 +++ .../IntegrationCommand.cs | 606 +---------------- .../Apache.Arrow.IntegrationTest/JsonFile.cs | 638 +++++++++++++++++- .../CDataInterfacePythonTests.cs | 4 +- dev/archery/archery/integration/runner.py | 11 +- dev/archery/archery/integration/tester.py | 6 +- dev/archery/archery/integration/tester_cpp.py | 6 +- .../archery/integration/tester_csharp.py | 133 +++- dev/archery/archery/integration/tester_go.py | 6 +- 16 files changed, 951 insertions(+), 649 deletions(-) create mode 100644 csharp/test/Apache.Arrow.IntegrationTest/CDataInterface.cs diff --git a/ci/scripts/integration_arrow.sh b/ci/scripts/integration_arrow.sh index a165f8027bf8f..bd89a4b6e5545 100755 --- a/ci/scripts/integration_arrow.sh +++ b/ci/scripts/integration_arrow.sh @@ -23,6 +23,11 @@ arrow_dir=${1} gold_dir=$arrow_dir/testing/data/arrow-ipc-stream/integration pip install -e $arrow_dir/dev/archery[integration] +# For C# C Data Interface testing +pip install pythonnet + +# Get more detailed context on crashes +export PYTHONFAULTHANDLER=1 # Rust can be enabled by exporting ARCHERY_INTEGRATION_WITH_RUST=1 time archery integration \ @@ -31,8 +36,8 @@ time archery integration \ --run-flight \ --with-cpp=1 \ --with-csharp=1 \ - --with-java=1 \ - --with-js=1 \ + --with-java=0 \ + --with-js=0 \ --with-go=1 \ --gold-dirs=$gold_dir/0.14.1 \ --gold-dirs=$gold_dir/0.17.1 \ diff --git a/csharp/src/Apache.Arrow/ArrowBuffer.cs b/csharp/src/Apache.Arrow/ArrowBuffer.cs index dbd97fc3aec9e..ef98bdc853b88 100644 --- a/csharp/src/Apache.Arrow/ArrowBuffer.cs +++ b/csharp/src/Apache.Arrow/ArrowBuffer.cs @@ -75,8 +75,9 @@ public void Dispose() internal bool TryExport(ExportedAllocationOwner newOwner, out IntPtr ptr) { - if (_memoryOwner == null && IsEmpty) + if (IsEmpty) { + // _memoryOwner could be anything (for example null or a NullMemoryOwner), but it doesn't matter here ptr = IntPtr.Zero; return true; } diff --git a/csharp/src/Apache.Arrow/C/CArrowArray.cs b/csharp/src/Apache.Arrow/C/CArrowArray.cs index fc609f10fdfa5..e5d63090fe158 100644 --- a/csharp/src/Apache.Arrow/C/CArrowArray.cs +++ b/csharp/src/Apache.Arrow/C/CArrowArray.cs @@ -16,6 +16,7 @@ // under the License. using System; +using System.Diagnostics; using System.Runtime.InteropServices; namespace Apache.Arrow.C @@ -68,16 +69,24 @@ public unsafe struct CArrowArray /// public static void Free(CArrowArray* array) { - if (array->release != default) - { + CallReleaseFunc(array); + Marshal.FreeHGlobal((IntPtr)array); + } + + /// + /// Call the array's release func, if set. + /// + public static void CallReleaseFunc(CArrowArray* array) { + if (array->release != default) { // Call release if not already called. #if NET5_0_OR_GREATER array->release(array); #else Marshal.GetDelegateForFunctionPointer(array->release)(array); #endif + Debug.Assert(array->release == default, + "Calling the CArrowArray release func should have set it to NULL"); } - Marshal.FreeHGlobal((IntPtr)array); } } } diff --git a/csharp/src/Apache.Arrow/C/CArrowArrayExporter.cs b/csharp/src/Apache.Arrow/C/CArrowArrayExporter.cs index 16aaa3874b370..592fbc019abce 100644 --- a/csharp/src/Apache.Arrow/C/CArrowArrayExporter.cs +++ b/csharp/src/Apache.Arrow/C/CArrowArrayExporter.cs @@ -15,6 +15,7 @@ using System; +using System.Diagnostics; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using Apache.Arrow.Memory; @@ -59,8 +60,6 @@ public static unsafe void ExportArray(IArrowArray array, CArrowArray* cArray) try { ConvertArray(allocationOwner, array.Data, cArray); - cArray->release = ReleaseArrayPtr; - cArray->private_data = FromDisposable(allocationOwner); allocationOwner = null; } finally @@ -102,8 +101,6 @@ public static unsafe void ExportRecordBatch(RecordBatch batch, CArrowArray* cArr try { ConvertRecordBatch(allocationOwner, batch, cArray); - cArray->release = ReleaseArrayPtr; - cArray->private_data = FromDisposable(allocationOwner); allocationOwner = null; } finally @@ -118,7 +115,7 @@ private unsafe static void ConvertArray(ExportedAllocationOwner sharedOwner, Arr cArray->offset = array.Offset; cArray->null_count = array.NullCount; cArray->release = ReleaseArrayPtr; - cArray->private_data = null; + cArray->private_data = MakePrivateData(sharedOwner); cArray->n_buffers = array.Buffers?.Length ?? 0; cArray->buffers = null; @@ -131,7 +128,7 @@ private unsafe static void ConvertArray(ExportedAllocationOwner sharedOwner, Arr IntPtr ptr; if (!buffer.TryExport(sharedOwner, out ptr)) { - throw new NotSupportedException($"An ArrowArray of type {array.DataType.TypeId} could not be exported"); + throw new NotSupportedException($"An ArrowArray of type {array.DataType.TypeId} could not be exported: failed on buffer #{i}"); } cArray->buffers[i] = (byte*)ptr; } @@ -144,7 +141,7 @@ private unsafe static void ConvertArray(ExportedAllocationOwner sharedOwner, Arr cArray->children = (CArrowArray**)sharedOwner.Allocate(IntPtr.Size * array.Children.Length); for (int i = 0; i < array.Children.Length; i++) { - cArray->children[i] = CArrowArray.Create(); + cArray->children[i] = MakeArray(sharedOwner); ConvertArray(sharedOwner, array.Children[i], cArray->children[i]); } } @@ -152,7 +149,7 @@ private unsafe static void ConvertArray(ExportedAllocationOwner sharedOwner, Arr cArray->dictionary = null; if (array.Dictionary != null) { - cArray->dictionary = CArrowArray.Create(); + cArray->dictionary = MakeArray(sharedOwner); ConvertArray(sharedOwner, array.Dictionary, cArray->dictionary); } } @@ -163,20 +160,24 @@ private unsafe static void ConvertRecordBatch(ExportedAllocationOwner sharedOwne cArray->offset = 0; cArray->null_count = 0; cArray->release = ReleaseArrayPtr; - cArray->private_data = null; + cArray->private_data = MakePrivateData(sharedOwner); cArray->n_buffers = 1; cArray->buffers = (byte**)sharedOwner.Allocate(IntPtr.Size); cArray->n_children = batch.ColumnCount; cArray->children = null; + // XXX sharing the same ExportedAllocationOwner for all columns + // and child arrays makes memory tracking inflexible. + // If the consumer keeps only a single record batch column, + // the entire record batch memory is nevertheless kept alive. if (cArray->n_children > 0) { cArray->children = (CArrowArray**)sharedOwner.Allocate(IntPtr.Size * batch.ColumnCount); int i = 0; foreach (IArrowArray child in batch.Arrays) { - cArray->children[i] = CArrowArray.Create(); + cArray->children[i] = MakeArray(sharedOwner); ConvertArray(sharedOwner, child.Data, cArray->children[i]); i++; } @@ -190,26 +191,43 @@ private unsafe static void ConvertRecordBatch(ExportedAllocationOwner sharedOwne #endif private unsafe static void ReleaseArray(CArrowArray* cArray) { - Dispose(&cArray->private_data); + for (long i = 0; i < cArray->n_children; i++) + { + CArrowArray.CallReleaseFunc(cArray->children[i]); + } + if (cArray->dictionary != null) { + CArrowArray.CallReleaseFunc(cArray->dictionary); + } + DisposePrivateData(&cArray->private_data); cArray->release = default; } - private unsafe static void* FromDisposable(IDisposable disposable) + private unsafe static CArrowArray* MakeArray(ExportedAllocationOwner sharedOwner) + { + var array = (CArrowArray*)sharedOwner.Allocate(sizeof(CArrowArray)); + *array = default; + return array; + } + + private unsafe static void* MakePrivateData(ExportedAllocationOwner sharedOwner) { - GCHandle gch = GCHandle.Alloc(disposable); + GCHandle gch = GCHandle.Alloc(sharedOwner); + sharedOwner.IncRef(); return (void*)GCHandle.ToIntPtr(gch); } - private unsafe static void Dispose(void** ptr) + private unsafe static void DisposePrivateData(void** ptr) { - GCHandle gch = GCHandle.FromIntPtr((IntPtr)(*ptr)); + GCHandle gch = GCHandle.FromIntPtr((IntPtr) (*ptr)); if (!gch.IsAllocated) { return; } - ((IDisposable)gch.Target).Dispose(); + // We can't call IDisposable.Dispose() here as we create multiple + // GCHandles to the same object. Instead, refcounting ensures + // timely memory deallocation when all GCHandles are freed. + ((ExportedAllocationOwner) gch.Target).DecRef(); gch.Free(); - *ptr = null; } } } diff --git a/csharp/src/Apache.Arrow/C/CArrowSchemaImporter.cs b/csharp/src/Apache.Arrow/C/CArrowSchemaImporter.cs index 42c8cdd5ef548..cf48c18646dc9 100644 --- a/csharp/src/Apache.Arrow/C/CArrowSchemaImporter.cs +++ b/csharp/src/Apache.Arrow/C/CArrowSchemaImporter.cs @@ -37,7 +37,7 @@ public static class CArrowSchemaImporter /// Typically, you will allocate an uninitialized CArrowSchema pointer, /// pass that to external function, and then use this method to import /// the result. - /// + /// /// /// CArrowSchema* importedPtr = CArrowSchema.Create(); /// foreign_export_function(importedPtr); @@ -62,7 +62,7 @@ public static unsafe ArrowType ImportType(CArrowSchema* ptr) /// Typically, you will allocate an uninitialized CArrowSchema pointer, /// pass that to external function, and then use this method to import /// the result. - /// + /// /// /// CArrowSchema* importedPtr = CArrowSchema.Create(); /// foreign_export_function(importedPtr); @@ -87,7 +87,7 @@ public static unsafe Field ImportField(CArrowSchema* ptr) /// Typically, you will allocate an uninitialized CArrowSchema pointer, /// pass that to external function, and then use this method to import /// the result. - /// + /// /// /// CArrowSchema* importedPtr = CArrowSchema.Create(); /// foreign_export_function(importedPtr); @@ -241,6 +241,9 @@ public ArrowType GetAsType() }; string timezone = format.Substring(format.IndexOf(':') + 1); + if (timezone.Length == 0) { + timezone = null; + } return new TimestampType(timeUnit, timezone); } diff --git a/csharp/src/Apache.Arrow/Memory/ExportedAllocationOwner.cs b/csharp/src/Apache.Arrow/Memory/ExportedAllocationOwner.cs index e872dc5425e06..cad634ef2477d 100644 --- a/csharp/src/Apache.Arrow/Memory/ExportedAllocationOwner.cs +++ b/csharp/src/Apache.Arrow/Memory/ExportedAllocationOwner.cs @@ -14,8 +14,10 @@ // limitations under the License. using System; +using System.Diagnostics; using System.Collections.Generic; using System.Runtime.InteropServices; +using System.Threading; namespace Apache.Arrow.Memory { @@ -23,6 +25,8 @@ internal sealed class ExportedAllocationOwner : INativeAllocationOwner, IDisposa { private readonly List _pointers = new List(); private int _allocationSize; + private long _referenceCount; + private bool _disposed; ~ExportedAllocationOwner() { @@ -47,8 +51,23 @@ public void Release(IntPtr ptr, int offset, int length) throw new InvalidOperationException(); } + public void IncRef() + { + Interlocked.Increment(ref _referenceCount); + } + + public void DecRef() + { + if (Interlocked.Decrement(ref _referenceCount) == 0) { + Dispose(); + } + } + public void Dispose() { + if (_disposed) { + return; + } for (int i = 0; i < _pointers.Count; i++) { if (_pointers[i] != IntPtr.Zero) @@ -59,6 +78,7 @@ public void Dispose() } GC.RemoveMemoryPressure(_allocationSize); GC.SuppressFinalize(this); + _disposed = true; } } } diff --git a/csharp/test/Apache.Arrow.IntegrationTest/Apache.Arrow.IntegrationTest.csproj b/csharp/test/Apache.Arrow.IntegrationTest/Apache.Arrow.IntegrationTest.csproj index a6c635a79a45f..cb7f7ae896ee2 100644 --- a/csharp/test/Apache.Arrow.IntegrationTest/Apache.Arrow.IntegrationTest.csproj +++ b/csharp/test/Apache.Arrow.IntegrationTest/Apache.Arrow.IntegrationTest.csproj @@ -3,6 +3,7 @@ Exe + true net7.0 @@ -13,4 +14,4 @@ - \ No newline at end of file + diff --git a/csharp/test/Apache.Arrow.IntegrationTest/CDataInterface.cs b/csharp/test/Apache.Arrow.IntegrationTest/CDataInterface.cs new file mode 100644 index 0000000000000..d6168d2f479e6 --- /dev/null +++ b/csharp/test/Apache.Arrow.IntegrationTest/CDataInterface.cs @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.Data; +using System.Diagnostics; +using System.IO; +using Apache.Arrow.C; +using Apache.Arrow.Arrays; +using Apache.Arrow.Types; + +namespace Apache.Arrow.IntegrationTest +{ + /// + /// Bridge for C Data Interface integration testing. + /// These methods are called from the Python integration testing + /// harness provided by Archery. + /// + public static class CDataInterface + { + // Archery uses the `pythonnet` library (*) to invoke .Net DLLs. + // `pythonnet` is only able to marshal simple types such as int and + // str, which is why we provide trivial wrappers around other APIs. + // + // (*) https://pythonnet.github.io/ + + public static void Initialize() + { + // Allow debugging using Debug.WriteLine() + Trace.Listeners.Add(new ConsoleTraceListener()); + } + + public static unsafe Schema ImportSchema(long ptr) + { + return CArrowSchemaImporter.ImportSchema((CArrowSchema*) ptr); + } + + public static unsafe void ExportSchema(Schema schema, long ptr) + { + CArrowSchemaExporter.ExportSchema(schema, (CArrowSchema*) ptr); + } + + public static unsafe RecordBatch ImportRecordBatch(long ptr, Schema schema) + { + return CArrowArrayImporter.ImportRecordBatch((CArrowArray*) ptr, schema); + } + + public static unsafe void ExportRecordBatch(RecordBatch batch, long ptr) + { + CArrowArrayExporter.ExportRecordBatch(batch, (CArrowArray*) ptr); + } + + public static JsonFile ParseJsonFile(String jsonPath) + { + return JsonFile.Parse(new FileInfo(jsonPath)); + } + + public static long GetAllocatedBytes() + { + GC.Collect(); + // XXX this doesn't seem to give stable and reliable measurements + var gcInfo = GC.GetGCMemoryInfo(); + return gcInfo.PromotedBytes; + } + } +} diff --git a/csharp/test/Apache.Arrow.IntegrationTest/IntegrationCommand.cs b/csharp/test/Apache.Arrow.IntegrationTest/IntegrationCommand.cs index 1e76ee505a516..d19d19f1ce7c1 100644 --- a/csharp/test/Apache.Arrow.IntegrationTest/IntegrationCommand.cs +++ b/csharp/test/Apache.Arrow.IntegrationTest/IntegrationCommand.cs @@ -72,7 +72,7 @@ private async Task Validate() return -1; } - Schema jsonFileSchema = CreateSchema(jsonFile.Schema); + Schema jsonFileSchema = jsonFile.Schema.ToArrow(); Schema arrowFileSchema = reader.Schema; SchemaComparer.Compare(jsonFileSchema, arrowFileSchema); @@ -80,7 +80,7 @@ private async Task Validate() for (int i = 0; i < batchCount; i++) { RecordBatch arrowFileRecordBatch = reader.ReadNextRecordBatch(); - RecordBatch jsonFileRecordBatch = CreateRecordBatch(jsonFileSchema, jsonFile.Batches[i]); + RecordBatch jsonFileRecordBatch = jsonFile.Batches[i].ToArrow(jsonFileSchema); ArrowReaderVerifier.CompareBatches(jsonFileRecordBatch, arrowFileRecordBatch, strictCompare: false); } @@ -98,7 +98,7 @@ private async Task Validate() private async Task JsonToArrow() { JsonFile jsonFile = await ParseJsonFile(); - Schema schema = CreateSchema(jsonFile.Schema); + Schema schema = jsonFile.Schema.ToArrow(); using (FileStream fs = ArrowFileInfo.Create()) { @@ -107,7 +107,7 @@ private async Task JsonToArrow() foreach (var jsonRecordBatch in jsonFile.Batches) { - RecordBatch batch = CreateRecordBatch(schema, jsonRecordBatch); + RecordBatch batch = jsonRecordBatch.ToArrow(schema); await writer.WriteRecordBatchAsync(batch); } await writer.WriteEndAsync(); @@ -117,595 +117,6 @@ private async Task JsonToArrow() return 0; } - private RecordBatch CreateRecordBatch(Schema schema, JsonRecordBatch jsonRecordBatch) - { - if (schema.FieldsList.Count != jsonRecordBatch.Columns.Count) - { - throw new NotSupportedException($"jsonRecordBatch.Columns.Count '{jsonRecordBatch.Columns.Count}' doesn't match schema field count '{schema.FieldsList.Count}'"); - } - - List arrays = new List(jsonRecordBatch.Columns.Count); - for (int i = 0; i < jsonRecordBatch.Columns.Count; i++) - { - JsonFieldData data = jsonRecordBatch.Columns[i]; - Field field = schema.FieldsList[i]; - ArrayCreator creator = new ArrayCreator(data); - field.DataType.Accept(creator); - arrays.Add(creator.Array); - } - - return new RecordBatch(schema, arrays, jsonRecordBatch.Count); - } - - private static Schema CreateSchema(JsonSchema jsonSchema) - { - Schema.Builder builder = new Schema.Builder(); - for (int i = 0; i < jsonSchema.Fields.Count; i++) - { - builder.Field(f => CreateField(f, jsonSchema.Fields[i])); - } - return builder.Build(); - } - - private static void CreateField(Field.Builder builder, JsonField jsonField) - { - Field[] children = null; - if (jsonField.Children?.Count > 0) - { - children = new Field[jsonField.Children.Count]; - for (int i = 0; i < jsonField.Children.Count; i++) - { - Field.Builder field = new Field.Builder(); - CreateField(field, jsonField.Children[i]); - children[i] = field.Build(); - } - } - - builder.Name(jsonField.Name) - .DataType(ToArrowType(jsonField.Type, children)) - .Nullable(jsonField.Nullable); - - if (jsonField.Metadata != null) - { - builder.Metadata(jsonField.Metadata); - } - } - - private static IArrowType ToArrowType(JsonArrowType type, Field[] children) - { - return type.Name switch - { - "bool" => BooleanType.Default, - "int" => ToIntArrowType(type), - "floatingpoint" => ToFloatingPointArrowType(type), - "decimal" => ToDecimalArrowType(type), - "binary" => BinaryType.Default, - "utf8" => StringType.Default, - "fixedsizebinary" => new FixedSizeBinaryType(type.ByteWidth), - "date" => ToDateArrowType(type), - "time" => ToTimeArrowType(type), - "timestamp" => ToTimestampArrowType(type), - "list" => ToListArrowType(type, children), - "fixedsizelist" => ToFixedSizeListArrowType(type, children), - "struct" => ToStructArrowType(type, children), - "union" => ToUnionArrowType(type, children), - "null" => NullType.Default, - _ => throw new NotSupportedException($"JsonArrowType not supported: {type.Name}") - }; - } - - private static IArrowType ToIntArrowType(JsonArrowType type) - { - return (type.BitWidth, type.IsSigned) switch - { - (8, true) => Int8Type.Default, - (8, false) => UInt8Type.Default, - (16, true) => Int16Type.Default, - (16, false) => UInt16Type.Default, - (32, true) => Int32Type.Default, - (32, false) => UInt32Type.Default, - (64, true) => Int64Type.Default, - (64, false) => UInt64Type.Default, - _ => throw new NotSupportedException($"Int type not supported: {type.BitWidth}, {type.IsSigned}") - }; - } - - private static IArrowType ToFloatingPointArrowType(JsonArrowType type) - { - return type.FloatingPointPrecision switch - { - "SINGLE" => FloatType.Default, - "DOUBLE" => DoubleType.Default, - _ => throw new NotSupportedException($"FloatingPoint type not supported: {type.FloatingPointPrecision}") - }; - } - - private static IArrowType ToDecimalArrowType(JsonArrowType type) - { - return type.BitWidth switch - { - 256 => new Decimal256Type(type.DecimalPrecision, type.Scale), - _ => new Decimal128Type(type.DecimalPrecision, type.Scale), - }; - } - - private static IArrowType ToDateArrowType(JsonArrowType type) - { - return type.Unit switch - { - "DAY" => Date32Type.Default, - "MILLISECOND" => Date64Type.Default, - _ => throw new NotSupportedException($"Date type not supported: {type.Unit}") - }; - } - - private static IArrowType ToTimeArrowType(JsonArrowType type) - { - return (type.Unit, type.BitWidth) switch - { - ("SECOND", 32) => new Time32Type(TimeUnit.Second), - ("SECOND", 64) => new Time64Type(TimeUnit.Second), - ("MILLISECOND", 32) => new Time32Type(TimeUnit.Millisecond), - ("MILLISECOND", 64) => new Time64Type(TimeUnit.Millisecond), - ("MICROSECOND", 32) => new Time32Type(TimeUnit.Microsecond), - ("MICROSECOND", 64) => new Time64Type(TimeUnit.Microsecond), - ("NANOSECOND", 32) => new Time32Type(TimeUnit.Nanosecond), - ("NANOSECOND", 64) => new Time64Type(TimeUnit.Nanosecond), - _ => throw new NotSupportedException($"Time type not supported: {type.Unit}, {type.BitWidth}") - }; - } - - private static IArrowType ToTimestampArrowType(JsonArrowType type) - { - return type.Unit switch - { - "SECOND" => new TimestampType(TimeUnit.Second, type.Timezone), - "MILLISECOND" => new TimestampType(TimeUnit.Millisecond, type.Timezone), - "MICROSECOND" => new TimestampType(TimeUnit.Microsecond, type.Timezone), - "NANOSECOND" => new TimestampType(TimeUnit.Nanosecond, type.Timezone), - _ => throw new NotSupportedException($"Time type not supported: {type.Unit}, {type.BitWidth}") - }; - } - - private static IArrowType ToListArrowType(JsonArrowType type, Field[] children) - { - return new ListType(children[0]); - } - - private static IArrowType ToFixedSizeListArrowType(JsonArrowType type, Field[] children) - { - return new FixedSizeListType(children[0], type.ListSize); - } - - private static IArrowType ToStructArrowType(JsonArrowType type, Field[] children) - { - return new StructType(children); - } - - private static IArrowType ToUnionArrowType(JsonArrowType type, Field[] children) - { - UnionMode mode = type.Mode switch - { - "SPARSE" => UnionMode.Sparse, - "DENSE" => UnionMode.Dense, - _ => throw new NotSupportedException($"Union mode not supported: {type.Mode}"), - }; - return new UnionType(children, type.TypeIds, mode); - } - - private class ArrayCreator : - IArrowTypeVisitor, - IArrowTypeVisitor, - IArrowTypeVisitor, - IArrowTypeVisitor, - IArrowTypeVisitor, - IArrowTypeVisitor, - IArrowTypeVisitor, - IArrowTypeVisitor, - IArrowTypeVisitor, - IArrowTypeVisitor, - IArrowTypeVisitor, - IArrowTypeVisitor, - IArrowTypeVisitor, - IArrowTypeVisitor, - IArrowTypeVisitor, - IArrowTypeVisitor, - IArrowTypeVisitor, - IArrowTypeVisitor, - IArrowTypeVisitor, - IArrowTypeVisitor, - IArrowTypeVisitor, - IArrowTypeVisitor, - IArrowTypeVisitor, - IArrowTypeVisitor, - IArrowTypeVisitor, - IArrowTypeVisitor - { - private JsonFieldData JsonFieldData { get; set; } - public IArrowArray Array { get; private set; } - - public ArrayCreator(JsonFieldData jsonFieldData) - { - JsonFieldData = jsonFieldData; - } - - public void Visit(BooleanType type) - { - ArrowBuffer validityBuffer = GetValidityBuffer(out int nullCount); - ArrowBuffer.BitmapBuilder valueBuilder = new ArrowBuffer.BitmapBuilder(validityBuffer.Length); - - var json = JsonFieldData.Data.GetRawText(); - bool[] values = JsonSerializer.Deserialize(json); - - foreach (bool value in values) - { - valueBuilder.Append(value); - } - ArrowBuffer valueBuffer = valueBuilder.Build(); - - Array = new BooleanArray( - valueBuffer, validityBuffer, - JsonFieldData.Count, nullCount, 0); - } - - public void Visit(Int8Type type) => GenerateArray((v, n, c, nc, o) => new Int8Array(v, n, c, nc, o)); - public void Visit(Int16Type type) => GenerateArray((v, n, c, nc, o) => new Int16Array(v, n, c, nc, o)); - public void Visit(Int32Type type) => GenerateArray((v, n, c, nc, o) => new Int32Array(v, n, c, nc, o)); - public void Visit(Int64Type type) => GenerateLongArray((v, n, c, nc, o) => new Int64Array(v, n, c, nc, o), s => long.Parse(s)); - public void Visit(UInt8Type type) => GenerateArray((v, n, c, nc, o) => new UInt8Array(v, n, c, nc, o)); - public void Visit(UInt16Type type) => GenerateArray((v, n, c, nc, o) => new UInt16Array(v, n, c, nc, o)); - public void Visit(UInt32Type type) => GenerateArray((v, n, c, nc, o) => new UInt32Array(v, n, c, nc, o)); - public void Visit(UInt64Type type) => GenerateLongArray((v, n, c, nc, o) => new UInt64Array(v, n, c, nc, o), s => ulong.Parse(s)); - public void Visit(FloatType type) => GenerateArray((v, n, c, nc, o) => new FloatArray(v, n, c, nc, o)); - public void Visit(DoubleType type) => GenerateArray((v, n, c, nc, o) => new DoubleArray(v, n, c, nc, o)); - public void Visit(Time32Type type) => GenerateArray((v, n, c, nc, o) => new Time32Array(type, v, n, c, nc, o)); - public void Visit(Time64Type type) => GenerateLongArray((v, n, c, nc, o) => new Time64Array(type, v, n, c, nc, o), s => long.Parse(s)); - - public void Visit(Decimal128Type type) - { - Array = new Decimal128Array(GetDecimalArrayData(type)); - } - - public void Visit(Decimal256Type type) - { - Array = new Decimal256Array(GetDecimalArrayData(type)); - } - - public void Visit(NullType type) - { - Array = new NullArray(JsonFieldData.Count); - } - - private ArrayData GetDecimalArrayData(FixedSizeBinaryType type) - { - ArrowBuffer validityBuffer = GetValidityBuffer(out int nullCount); - - var json = JsonFieldData.Data.GetRawText(); - string[] values = JsonSerializer.Deserialize(json, s_options); - - Span buffer = stackalloc byte[type.ByteWidth]; - - ArrowBuffer.Builder valueBuilder = new ArrowBuffer.Builder(); - foreach (string value in values) - { - buffer.Fill(0); - - BigInteger bigInteger = BigInteger.Parse(value); - if (!bigInteger.TryWriteBytes(buffer, out int bytesWritten, false, !BitConverter.IsLittleEndian)) - { - throw new InvalidDataException($"Decimal data was too big to fit into {type.BitWidth} bits."); - } - - if (bigInteger.Sign == -1) - { - buffer.Slice(bytesWritten).Fill(255); - } - - valueBuilder.Append(buffer); - } - ArrowBuffer valueBuffer = valueBuilder.Build(default); - - return new ArrayData(type, JsonFieldData.Count, nullCount, 0, new[] { validityBuffer, valueBuffer }); - } - - public void Visit(Date32Type type) - { - ArrowBuffer validityBuffer = GetValidityBuffer(out int nullCount); - - ArrowBuffer.Builder valueBuilder = new ArrowBuffer.Builder(JsonFieldData.Count); - var json = JsonFieldData.Data.GetRawText(); - int[] values = JsonSerializer.Deserialize(json, s_options); - - foreach (int value in values) - { - valueBuilder.Append(value); - } - ArrowBuffer valueBuffer = valueBuilder.Build(); - - Array = new Date32Array( - valueBuffer, validityBuffer, - JsonFieldData.Count, nullCount, 0); - } - - public void Visit(Date64Type type) - { - ArrowBuffer validityBuffer = GetValidityBuffer(out int nullCount); - - ArrowBuffer.Builder valueBuilder = new ArrowBuffer.Builder(JsonFieldData.Count); - var json = JsonFieldData.Data.GetRawText(); - string[] values = JsonSerializer.Deserialize(json, s_options); - - foreach (string value in values) - { - valueBuilder.Append(long.Parse(value)); - } - ArrowBuffer valueBuffer = valueBuilder.Build(); - - Array = new Date64Array( - valueBuffer, validityBuffer, - JsonFieldData.Count, nullCount, 0); - } - - public void Visit(TimestampType type) - { - ArrowBuffer validityBuffer = GetValidityBuffer(out int nullCount); - - ArrowBuffer.Builder valueBuilder = new ArrowBuffer.Builder(JsonFieldData.Count); - var json = JsonFieldData.Data.GetRawText(); - string[] values = JsonSerializer.Deserialize(json, s_options); - - foreach (string value in values) - { - valueBuilder.Append(long.Parse(value)); - } - ArrowBuffer valueBuffer = valueBuilder.Build(); - - Array = new TimestampArray( - type, valueBuffer, validityBuffer, - JsonFieldData.Count, nullCount, 0); - } - - public void Visit(StringType type) - { - ArrowBuffer validityBuffer = GetValidityBuffer(out int nullCount); - ArrowBuffer offsetBuffer = GetOffsetBuffer(); - - var json = JsonFieldData.Data.GetRawText(); - string[] values = JsonSerializer.Deserialize(json, s_options); - - ArrowBuffer.Builder valueBuilder = new ArrowBuffer.Builder(); - foreach (string value in values) - { - valueBuilder.Append(Encoding.UTF8.GetBytes(value)); - } - ArrowBuffer valueBuffer = valueBuilder.Build(default); - - Array = new StringArray(JsonFieldData.Count, offsetBuffer, valueBuffer, validityBuffer, nullCount); - } - - public void Visit(BinaryType type) - { - ArrowBuffer validityBuffer = GetValidityBuffer(out int nullCount); - ArrowBuffer offsetBuffer = GetOffsetBuffer(); - - var json = JsonFieldData.Data.GetRawText(); - string[] values = JsonSerializer.Deserialize(json, s_options); - - ArrowBuffer.Builder valueBuilder = new ArrowBuffer.Builder(); - foreach (string value in values) - { - valueBuilder.Append(ConvertHexStringToByteArray(value)); - } - ArrowBuffer valueBuffer = valueBuilder.Build(default); - - ArrayData arrayData = new ArrayData(type, JsonFieldData.Count, nullCount, 0, new[] { validityBuffer, offsetBuffer, valueBuffer }); - Array = new BinaryArray(arrayData); - } - - public void Visit(FixedSizeBinaryType type) - { - ArrowBuffer validityBuffer = GetValidityBuffer(out int nullCount); - - var json = JsonFieldData.Data.GetRawText(); - string[] values = JsonSerializer.Deserialize(json, s_options); - - ArrowBuffer.Builder valueBuilder = new ArrowBuffer.Builder(); - foreach (string value in values) - { - valueBuilder.Append(ConvertHexStringToByteArray(value)); - } - ArrowBuffer valueBuffer = valueBuilder.Build(default); - - ArrayData arrayData = new ArrayData(type, JsonFieldData.Count, nullCount, 0, new[] { validityBuffer, valueBuffer }); - Array = new FixedSizeBinaryArray(arrayData); - } - - public void Visit(ListType type) - { - ArrowBuffer validityBuffer = GetValidityBuffer(out int nullCount); - ArrowBuffer offsetBuffer = GetOffsetBuffer(); - - var data = JsonFieldData; - JsonFieldData = data.Children[0]; - type.ValueDataType.Accept(this); - JsonFieldData = data; - - ArrayData arrayData = new ArrayData(type, JsonFieldData.Count, nullCount, 0, - new[] { validityBuffer, offsetBuffer }, new[] { Array.Data }); - Array = new ListArray(arrayData); - } - - public void Visit(FixedSizeListType type) - { - ArrowBuffer validityBuffer = GetValidityBuffer(out int nullCount); - - var data = JsonFieldData; - JsonFieldData = data.Children[0]; - type.ValueDataType.Accept(this); - JsonFieldData = data; - - ArrayData arrayData = new ArrayData(type, JsonFieldData.Count, nullCount, 0, - new[] { validityBuffer }, new[] { Array.Data }); - Array = new FixedSizeListArray(arrayData); - } - - public void Visit(StructType type) - { - ArrowBuffer validityBuffer = GetValidityBuffer(out int nullCount); - - ArrayData[] children = new ArrayData[type.Fields.Count]; - - var data = JsonFieldData; - for (int i = 0; i < children.Length; i++) - { - JsonFieldData = data.Children[i]; - type.Fields[i].DataType.Accept(this); - children[i] = Array.Data; - } - JsonFieldData = data; - - ArrayData arrayData = new ArrayData(type, JsonFieldData.Count, nullCount, 0, - new[] { validityBuffer }, children); - Array = new StructArray(arrayData); - } - - public void Visit(UnionType type) - { - ArrowBuffer[] buffers; - if (type.Mode == UnionMode.Dense) - { - buffers = new ArrowBuffer[2]; - buffers[1] = GetOffsetBuffer(); - } - else - { - buffers = new ArrowBuffer[1]; - } - buffers[0] = GetTypeIdBuffer(); - - ArrayData[] children = GetChildren(type); - - int nullCount = 0; - ArrayData arrayData = new ArrayData(type, JsonFieldData.Count, nullCount, 0, buffers, children); - Array = UnionArray.Create(arrayData); - } - - private ArrayData[] GetChildren(NestedType type) - { - ArrayData[] children = new ArrayData[type.Fields.Count]; - - var data = JsonFieldData; - for (int i = 0; i < children.Length; i++) - { - JsonFieldData = data.Children[i]; - type.Fields[i].DataType.Accept(this); - children[i] = Array.Data; - } - JsonFieldData = data; - - return children; - } - - private static byte[] ConvertHexStringToByteArray(string hexString) - { - byte[] data = new byte[hexString.Length / 2]; - for (int index = 0; index < data.Length; index++) - { - data[index] = byte.Parse(hexString.AsSpan(index * 2, 2), NumberStyles.HexNumber, CultureInfo.InvariantCulture); - } - - return data; - } - - private static readonly JsonSerializerOptions s_options = new JsonSerializerOptions() - { - Converters = - { - new ByteArrayConverter() - } - }; - - private void GenerateArray(Func createArray) - where TArray : PrimitiveArray - where T : struct - { - ArrowBuffer validityBuffer = GetValidityBuffer(out int nullCount); - - ArrowBuffer.Builder valueBuilder = new ArrowBuffer.Builder(JsonFieldData.Count); - var json = JsonFieldData.Data.GetRawText(); - T[] values = JsonSerializer.Deserialize(json, s_options); - - foreach (T value in values) - { - valueBuilder.Append(value); - } - ArrowBuffer valueBuffer = valueBuilder.Build(); - - Array = createArray( - valueBuffer, validityBuffer, - JsonFieldData.Count, nullCount, 0); - } - - private void GenerateLongArray(Func createArray, Func parse) - where TArray : PrimitiveArray - where T : struct - { - ArrowBuffer validityBuffer = GetValidityBuffer(out int nullCount); - - ArrowBuffer.Builder valueBuilder = new ArrowBuffer.Builder(JsonFieldData.Count); - var json = JsonFieldData.Data.GetRawText(); - string[] values = JsonSerializer.Deserialize(json); - - foreach (string value in values) - { - valueBuilder.Append(parse(value)); - } - ArrowBuffer valueBuffer = valueBuilder.Build(); - - Array = createArray( - valueBuffer, validityBuffer, - JsonFieldData.Count, nullCount, 0); - } - - private ArrowBuffer GetOffsetBuffer() - { - if (JsonFieldData.Count == 0) { return ArrowBuffer.Empty; } - ArrowBuffer.Builder valueOffsets = new ArrowBuffer.Builder(JsonFieldData.Offset.Length); - valueOffsets.AppendRange(JsonFieldData.Offset); - return valueOffsets.Build(default); - } - - private ArrowBuffer GetTypeIdBuffer() - { - ArrowBuffer.Builder typeIds = new ArrowBuffer.Builder(JsonFieldData.TypeId.Length); - for (int i = 0; i < JsonFieldData.TypeId.Length; i++) - { - typeIds.Append(checked((byte)JsonFieldData.TypeId[i])); - } - return typeIds.Build(default); - } - - private ArrowBuffer GetValidityBuffer(out int nullCount) - { - if (JsonFieldData.Validity == null) - { - nullCount = 0; - return ArrowBuffer.Empty; - } - - ArrowBuffer.BitmapBuilder validityBuilder = new ArrowBuffer.BitmapBuilder(JsonFieldData.Validity.Length); - validityBuilder.AppendRange(JsonFieldData.Validity); - - nullCount = validityBuilder.UnsetBitCount; - return validityBuilder.Build(); - } - - public void Visit(IArrowType type) - { - throw new NotImplementedException($"{type.Name} not implemented"); - } - } - private async Task StreamToFile() { using ArrowStreamReader reader = new ArrowStreamReader(Console.OpenStandardInput()); @@ -752,14 +163,7 @@ private async Task FileToStream() private async ValueTask ParseJsonFile() { - using var fileStream = JsonFileInfo.OpenRead(); - JsonSerializerOptions options = new JsonSerializerOptions() - { - PropertyNamingPolicy = JsonFileNamingPolicy.Instance, - }; - options.Converters.Add(new ValidityConverter()); - - return await JsonSerializer.DeserializeAsync(fileStream, options); + return await JsonFile.ParseAsync(JsonFileInfo); } } } diff --git a/csharp/test/Apache.Arrow.IntegrationTest/JsonFile.cs b/csharp/test/Apache.Arrow.IntegrationTest/JsonFile.cs index 112eeabcb9931..20a3203d95d01 100644 --- a/csharp/test/Apache.Arrow.IntegrationTest/JsonFile.cs +++ b/csharp/test/Apache.Arrow.IntegrationTest/JsonFile.cs @@ -15,8 +15,16 @@ using System; using System.Collections.Generic; +using System.Diagnostics; +using System.Globalization; +using System.IO; +using System.Numerics; +using System.Text; using System.Text.Json; using System.Text.Json.Serialization; +using System.Threading.Tasks; +using Apache.Arrow.Arrays; +using Apache.Arrow.Types; namespace Apache.Arrow.IntegrationTest { @@ -25,12 +33,199 @@ public class JsonFile public JsonSchema Schema { get; set; } public List Batches { get; set; } //public List Dictionaries {get;set;} + + public static async ValueTask ParseAsync(FileInfo fileInfo) + { + using var fileStream = fileInfo.OpenRead(); + var options = GetJsonOptions(); + return await JsonSerializer.DeserializeAsync(fileStream, options); + } + + public static JsonFile Parse(FileInfo fileInfo) + { + using var fileStream = fileInfo.OpenRead(); + var options = GetJsonOptions(); + return JsonSerializer.Deserialize(fileStream, options); + } + + private static JsonSerializerOptions GetJsonOptions() + { + JsonSerializerOptions options = new JsonSerializerOptions() + { + PropertyNamingPolicy = JsonFileNamingPolicy.Instance, + }; + options.Converters.Add(new ValidityConverter()); + return options; + } } public class JsonSchema { public List Fields { get; set; } public JsonMetadata Metadata { get; set; } + + /// + /// Decode this JSON schema as a Schema instance. + /// + public Schema ToArrow() { + return CreateSchema(this); + } + + private static Schema CreateSchema(JsonSchema jsonSchema) + { + Schema.Builder builder = new Schema.Builder(); + for (int i = 0; i < jsonSchema.Fields.Count; i++) + { + builder.Field(f => CreateField(f, jsonSchema.Fields[i])); + } + return builder.Build(); + } + + private static void CreateField(Field.Builder builder, JsonField jsonField) + { + Field[] children = null; + if (jsonField.Children?.Count > 0) + { + children = new Field[jsonField.Children.Count]; + for (int i = 0; i < jsonField.Children.Count; i++) + { + Field.Builder field = new Field.Builder(); + CreateField(field, jsonField.Children[i]); + children[i] = field.Build(); + } + } + + builder.Name(jsonField.Name) + .DataType(ToArrowType(jsonField.Type, children)) + .Nullable(jsonField.Nullable); + + if (jsonField.Metadata != null) + { + builder.Metadata(jsonField.Metadata); + } + } + + private static IArrowType ToArrowType(JsonArrowType type, Field[] children) + { + return type.Name switch + { + "bool" => BooleanType.Default, + "int" => ToIntArrowType(type), + "floatingpoint" => ToFloatingPointArrowType(type), + "decimal" => ToDecimalArrowType(type), + "binary" => BinaryType.Default, + "utf8" => StringType.Default, + "fixedsizebinary" => new FixedSizeBinaryType(type.ByteWidth), + "date" => ToDateArrowType(type), + "time" => ToTimeArrowType(type), + "timestamp" => ToTimestampArrowType(type), + "list" => ToListArrowType(type, children), + "fixedsizelist" => ToFixedSizeListArrowType(type, children), + "struct" => ToStructArrowType(type, children), + "union" => ToUnionArrowType(type, children), + "null" => NullType.Default, + _ => throw new NotSupportedException($"JsonArrowType not supported: {type.Name}") + }; + } + + private static IArrowType ToIntArrowType(JsonArrowType type) + { + return (type.BitWidth, type.IsSigned) switch + { + (8, true) => Int8Type.Default, + (8, false) => UInt8Type.Default, + (16, true) => Int16Type.Default, + (16, false) => UInt16Type.Default, + (32, true) => Int32Type.Default, + (32, false) => UInt32Type.Default, + (64, true) => Int64Type.Default, + (64, false) => UInt64Type.Default, + _ => throw new NotSupportedException($"Int type not supported: {type.BitWidth}, {type.IsSigned}") + }; + } + + private static IArrowType ToFloatingPointArrowType(JsonArrowType type) + { + return type.FloatingPointPrecision switch + { + "SINGLE" => FloatType.Default, + "DOUBLE" => DoubleType.Default, + _ => throw new NotSupportedException($"FloatingPoint type not supported: {type.FloatingPointPrecision}") + }; + } + + private static IArrowType ToDecimalArrowType(JsonArrowType type) + { + return type.BitWidth switch + { + 256 => new Decimal256Type(type.DecimalPrecision, type.Scale), + _ => new Decimal128Type(type.DecimalPrecision, type.Scale), + }; + } + + private static IArrowType ToDateArrowType(JsonArrowType type) + { + return type.Unit switch + { + "DAY" => Date32Type.Default, + "MILLISECOND" => Date64Type.Default, + _ => throw new NotSupportedException($"Date type not supported: {type.Unit}") + }; + } + + private static IArrowType ToTimeArrowType(JsonArrowType type) + { + return (type.Unit, type.BitWidth) switch + { + ("SECOND", 32) => new Time32Type(TimeUnit.Second), + ("SECOND", 64) => new Time64Type(TimeUnit.Second), + ("MILLISECOND", 32) => new Time32Type(TimeUnit.Millisecond), + ("MILLISECOND", 64) => new Time64Type(TimeUnit.Millisecond), + ("MICROSECOND", 32) => new Time32Type(TimeUnit.Microsecond), + ("MICROSECOND", 64) => new Time64Type(TimeUnit.Microsecond), + ("NANOSECOND", 32) => new Time32Type(TimeUnit.Nanosecond), + ("NANOSECOND", 64) => new Time64Type(TimeUnit.Nanosecond), + _ => throw new NotSupportedException($"Time type not supported: {type.Unit}, {type.BitWidth}") + }; + } + + private static IArrowType ToTimestampArrowType(JsonArrowType type) + { + return type.Unit switch + { + "SECOND" => new TimestampType(TimeUnit.Second, type.Timezone), + "MILLISECOND" => new TimestampType(TimeUnit.Millisecond, type.Timezone), + "MICROSECOND" => new TimestampType(TimeUnit.Microsecond, type.Timezone), + "NANOSECOND" => new TimestampType(TimeUnit.Nanosecond, type.Timezone), + _ => throw new NotSupportedException($"Time type not supported: {type.Unit}, {type.BitWidth}") + }; + } + + private static IArrowType ToListArrowType(JsonArrowType type, Field[] children) + { + return new ListType(children[0]); + } + + private static IArrowType ToFixedSizeListArrowType(JsonArrowType type, Field[] children) + { + return new FixedSizeListType(children[0], type.ListSize); + } + + private static IArrowType ToStructArrowType(JsonArrowType type, Field[] children) + { + return new StructType(children); + } + + private static IArrowType ToUnionArrowType(JsonArrowType type, Field[] children) + { + UnionMode mode = type.Mode switch + { + "SPARSE" => UnionMode.Sparse, + "DENSE" => UnionMode.Dense, + _ => throw new NotSupportedException($"Union mode not supported: {type.Mode}"), + }; + return new UnionType(children, type.TypeIds, mode); + } } public class JsonField @@ -60,7 +255,7 @@ public class JsonArrowType public int DecimalPrecision => ExtensionData["precision"].GetInt32(); public int Scale { get; set; } - // date and time fields + // date and time fields public string Unit { get; set; } // timestamp fields public string Timezone { get; set; } @@ -74,7 +269,7 @@ public class JsonArrowType // union fields public string Mode { get; set; } public int[] TypeIds { get; set; } - + [JsonExtensionData] public Dictionary ExtensionData { get; set; } } @@ -94,6 +289,445 @@ public class JsonRecordBatch { public int Count { get; set; } public List Columns { get; set; } + + /// + /// Decode this JSON record batch as a RecordBatch instance. + /// + public RecordBatch ToArrow(Schema schema) { + return CreateRecordBatch(schema, this); + } + + private RecordBatch CreateRecordBatch(Schema schema, JsonRecordBatch jsonRecordBatch) + { + if (schema.FieldsList.Count != jsonRecordBatch.Columns.Count) + { + throw new NotSupportedException($"jsonRecordBatch.Columns.Count '{jsonRecordBatch.Columns.Count}' doesn't match schema field count '{schema.FieldsList.Count}'"); + } + + List arrays = new List(jsonRecordBatch.Columns.Count); + for (int i = 0; i < jsonRecordBatch.Columns.Count; i++) + { + JsonFieldData data = jsonRecordBatch.Columns[i]; + Field field = schema.FieldsList[i]; + ArrayCreator creator = new ArrayCreator(data); + field.DataType.Accept(creator); + arrays.Add(creator.Array); + } + + return new RecordBatch(schema, arrays, jsonRecordBatch.Count); + } + + private class ArrayCreator : + IArrowTypeVisitor, + IArrowTypeVisitor, + IArrowTypeVisitor, + IArrowTypeVisitor, + IArrowTypeVisitor, + IArrowTypeVisitor, + IArrowTypeVisitor, + IArrowTypeVisitor, + IArrowTypeVisitor, + IArrowTypeVisitor, + IArrowTypeVisitor, + IArrowTypeVisitor, + IArrowTypeVisitor, + IArrowTypeVisitor, + IArrowTypeVisitor, + IArrowTypeVisitor, + IArrowTypeVisitor, + IArrowTypeVisitor, + IArrowTypeVisitor, + IArrowTypeVisitor, + IArrowTypeVisitor, + IArrowTypeVisitor, + IArrowTypeVisitor, + IArrowTypeVisitor, + IArrowTypeVisitor, + IArrowTypeVisitor + { + private JsonFieldData JsonFieldData { get; set; } + public IArrowArray Array { get; private set; } + + public ArrayCreator(JsonFieldData jsonFieldData) + { + JsonFieldData = jsonFieldData; + } + + public void Visit(BooleanType type) + { + ArrowBuffer validityBuffer = GetValidityBuffer(out int nullCount); + ArrowBuffer.BitmapBuilder valueBuilder = new ArrowBuffer.BitmapBuilder(validityBuffer.Length); + + var json = JsonFieldData.Data.GetRawText(); + bool[] values = JsonSerializer.Deserialize(json); + + foreach (bool value in values) + { + valueBuilder.Append(value); + } + ArrowBuffer valueBuffer = valueBuilder.Build(); + + Array = new BooleanArray( + valueBuffer, validityBuffer, + JsonFieldData.Count, nullCount, 0); + } + + public void Visit(Int8Type type) => GenerateArray((v, n, c, nc, o) => new Int8Array(v, n, c, nc, o)); + public void Visit(Int16Type type) => GenerateArray((v, n, c, nc, o) => new Int16Array(v, n, c, nc, o)); + public void Visit(Int32Type type) => GenerateArray((v, n, c, nc, o) => new Int32Array(v, n, c, nc, o)); + public void Visit(Int64Type type) => GenerateLongArray((v, n, c, nc, o) => new Int64Array(v, n, c, nc, o), s => long.Parse(s)); + public void Visit(UInt8Type type) => GenerateArray((v, n, c, nc, o) => new UInt8Array(v, n, c, nc, o)); + public void Visit(UInt16Type type) => GenerateArray((v, n, c, nc, o) => new UInt16Array(v, n, c, nc, o)); + public void Visit(UInt32Type type) => GenerateArray((v, n, c, nc, o) => new UInt32Array(v, n, c, nc, o)); + public void Visit(UInt64Type type) => GenerateLongArray((v, n, c, nc, o) => new UInt64Array(v, n, c, nc, o), s => ulong.Parse(s)); + public void Visit(FloatType type) => GenerateArray((v, n, c, nc, o) => new FloatArray(v, n, c, nc, o)); + public void Visit(DoubleType type) => GenerateArray((v, n, c, nc, o) => new DoubleArray(v, n, c, nc, o)); + public void Visit(Time32Type type) => GenerateArray((v, n, c, nc, o) => new Time32Array(type, v, n, c, nc, o)); + public void Visit(Time64Type type) => GenerateLongArray((v, n, c, nc, o) => new Time64Array(type, v, n, c, nc, o), s => long.Parse(s)); + + public void Visit(Decimal128Type type) + { + Array = new Decimal128Array(GetDecimalArrayData(type)); + } + + public void Visit(Decimal256Type type) + { + Array = new Decimal256Array(GetDecimalArrayData(type)); + } + + public void Visit(NullType type) + { + Array = new NullArray(JsonFieldData.Count); + } + + private ArrayData GetDecimalArrayData(FixedSizeBinaryType type) + { + ArrowBuffer validityBuffer = GetValidityBuffer(out int nullCount); + + var json = JsonFieldData.Data.GetRawText(); + string[] values = JsonSerializer.Deserialize(json, s_options); + + Span buffer = stackalloc byte[type.ByteWidth]; + + ArrowBuffer.Builder valueBuilder = new ArrowBuffer.Builder(); + foreach (string value in values) + { + buffer.Fill(0); + + BigInteger bigInteger = BigInteger.Parse(value); + if (!bigInteger.TryWriteBytes(buffer, out int bytesWritten, false, !BitConverter.IsLittleEndian)) + { + throw new InvalidDataException($"Decimal data was too big to fit into {type.BitWidth} bits."); + } + + if (bigInteger.Sign == -1) + { + buffer.Slice(bytesWritten).Fill(255); + } + + valueBuilder.Append(buffer); + } + ArrowBuffer valueBuffer = valueBuilder.Build(default); + + return new ArrayData(type, JsonFieldData.Count, nullCount, 0, new[] { validityBuffer, valueBuffer }); + } + + public void Visit(Date32Type type) + { + ArrowBuffer validityBuffer = GetValidityBuffer(out int nullCount); + + ArrowBuffer.Builder valueBuilder = new ArrowBuffer.Builder(JsonFieldData.Count); + var json = JsonFieldData.Data.GetRawText(); + int[] values = JsonSerializer.Deserialize(json, s_options); + + foreach (int value in values) + { + valueBuilder.Append(value); + } + ArrowBuffer valueBuffer = valueBuilder.Build(); + + Array = new Date32Array( + valueBuffer, validityBuffer, + JsonFieldData.Count, nullCount, 0); + } + + public void Visit(Date64Type type) + { + ArrowBuffer validityBuffer = GetValidityBuffer(out int nullCount); + + ArrowBuffer.Builder valueBuilder = new ArrowBuffer.Builder(JsonFieldData.Count); + var json = JsonFieldData.Data.GetRawText(); + string[] values = JsonSerializer.Deserialize(json, s_options); + + foreach (string value in values) + { + valueBuilder.Append(long.Parse(value)); + } + ArrowBuffer valueBuffer = valueBuilder.Build(); + + Array = new Date64Array( + valueBuffer, validityBuffer, + JsonFieldData.Count, nullCount, 0); + } + + public void Visit(TimestampType type) + { + ArrowBuffer validityBuffer = GetValidityBuffer(out int nullCount); + + ArrowBuffer.Builder valueBuilder = new ArrowBuffer.Builder(JsonFieldData.Count); + var json = JsonFieldData.Data.GetRawText(); + string[] values = JsonSerializer.Deserialize(json, s_options); + + foreach (string value in values) + { + valueBuilder.Append(long.Parse(value)); + } + ArrowBuffer valueBuffer = valueBuilder.Build(); + + Array = new TimestampArray( + type, valueBuffer, validityBuffer, + JsonFieldData.Count, nullCount, 0); + } + + public void Visit(StringType type) + { + ArrowBuffer validityBuffer = GetValidityBuffer(out int nullCount); + ArrowBuffer offsetBuffer = GetOffsetBuffer(); + + var json = JsonFieldData.Data.GetRawText(); + string[] values = JsonSerializer.Deserialize(json, s_options); + + ArrowBuffer.Builder valueBuilder = new ArrowBuffer.Builder(); + foreach (string value in values) + { + valueBuilder.Append(Encoding.UTF8.GetBytes(value)); + } + ArrowBuffer valueBuffer = valueBuilder.Build(default); + + Array = new StringArray(JsonFieldData.Count, offsetBuffer, valueBuffer, validityBuffer, nullCount); + } + + public void Visit(BinaryType type) + { + ArrowBuffer validityBuffer = GetValidityBuffer(out int nullCount); + ArrowBuffer offsetBuffer = GetOffsetBuffer(); + + var json = JsonFieldData.Data.GetRawText(); + string[] values = JsonSerializer.Deserialize(json, s_options); + + ArrowBuffer.Builder valueBuilder = new ArrowBuffer.Builder(); + foreach (string value in values) + { + valueBuilder.Append(ConvertHexStringToByteArray(value)); + } + ArrowBuffer valueBuffer = valueBuilder.Build(default); + + ArrayData arrayData = new ArrayData(type, JsonFieldData.Count, nullCount, 0, new[] { validityBuffer, offsetBuffer, valueBuffer }); + Array = new BinaryArray(arrayData); + } + + public void Visit(FixedSizeBinaryType type) + { + ArrowBuffer validityBuffer = GetValidityBuffer(out int nullCount); + + var json = JsonFieldData.Data.GetRawText(); + string[] values = JsonSerializer.Deserialize(json, s_options); + + ArrowBuffer.Builder valueBuilder = new ArrowBuffer.Builder(); + foreach (string value in values) + { + valueBuilder.Append(ConvertHexStringToByteArray(value)); + } + ArrowBuffer valueBuffer = valueBuilder.Build(default); + + ArrayData arrayData = new ArrayData(type, JsonFieldData.Count, nullCount, 0, new[] { validityBuffer, valueBuffer }); + Array = new FixedSizeBinaryArray(arrayData); + } + + public void Visit(ListType type) + { + ArrowBuffer validityBuffer = GetValidityBuffer(out int nullCount); + ArrowBuffer offsetBuffer = GetOffsetBuffer(); + + var data = JsonFieldData; + JsonFieldData = data.Children[0]; + type.ValueDataType.Accept(this); + JsonFieldData = data; + + ArrayData arrayData = new ArrayData(type, JsonFieldData.Count, nullCount, 0, + new[] { validityBuffer, offsetBuffer }, new[] { Array.Data }); + Array = new ListArray(arrayData); + } + + public void Visit(FixedSizeListType type) + { + ArrowBuffer validityBuffer = GetValidityBuffer(out int nullCount); + + var data = JsonFieldData; + JsonFieldData = data.Children[0]; + type.ValueDataType.Accept(this); + JsonFieldData = data; + + ArrayData arrayData = new ArrayData(type, JsonFieldData.Count, nullCount, 0, + new[] { validityBuffer }, new[] { Array.Data }); + Array = new FixedSizeListArray(arrayData); + } + + public void Visit(StructType type) + { + ArrowBuffer validityBuffer = GetValidityBuffer(out int nullCount); + + ArrayData[] children = new ArrayData[type.Fields.Count]; + + var data = JsonFieldData; + for (int i = 0; i < children.Length; i++) + { + JsonFieldData = data.Children[i]; + type.Fields[i].DataType.Accept(this); + children[i] = Array.Data; + } + JsonFieldData = data; + + ArrayData arrayData = new ArrayData(type, JsonFieldData.Count, nullCount, 0, + new[] { validityBuffer }, children); + Array = new StructArray(arrayData); + } + + public void Visit(UnionType type) + { + ArrowBuffer[] buffers; + if (type.Mode == UnionMode.Dense) + { + buffers = new ArrowBuffer[2]; + buffers[1] = GetOffsetBuffer(); + } + else + { + buffers = new ArrowBuffer[1]; + } + buffers[0] = GetTypeIdBuffer(); + + ArrayData[] children = GetChildren(type); + + int nullCount = 0; + ArrayData arrayData = new ArrayData(type, JsonFieldData.Count, nullCount, 0, buffers, children); + Array = UnionArray.Create(arrayData); + } + + private ArrayData[] GetChildren(NestedType type) + { + ArrayData[] children = new ArrayData[type.Fields.Count]; + + var data = JsonFieldData; + for (int i = 0; i < children.Length; i++) + { + JsonFieldData = data.Children[i]; + type.Fields[i].DataType.Accept(this); + children[i] = Array.Data; + } + JsonFieldData = data; + + return children; + } + + private static byte[] ConvertHexStringToByteArray(string hexString) + { + byte[] data = new byte[hexString.Length / 2]; + for (int index = 0; index < data.Length; index++) + { + data[index] = byte.Parse(hexString.AsSpan(index * 2, 2), NumberStyles.HexNumber, CultureInfo.InvariantCulture); + } + + return data; + } + + private static readonly JsonSerializerOptions s_options = new JsonSerializerOptions() + { + Converters = + { + new ByteArrayConverter() + } + }; + + private void GenerateArray(Func createArray) + where TArray : PrimitiveArray + where T : struct + { + ArrowBuffer validityBuffer = GetValidityBuffer(out int nullCount); + + ArrowBuffer.Builder valueBuilder = new ArrowBuffer.Builder(JsonFieldData.Count); + var json = JsonFieldData.Data.GetRawText(); + T[] values = JsonSerializer.Deserialize(json, s_options); + + foreach (T value in values) + { + valueBuilder.Append(value); + } + ArrowBuffer valueBuffer = valueBuilder.Build(); + + Array = createArray( + valueBuffer, validityBuffer, + JsonFieldData.Count, nullCount, 0); + } + + private void GenerateLongArray(Func createArray, Func parse) + where TArray : PrimitiveArray + where T : struct + { + ArrowBuffer validityBuffer = GetValidityBuffer(out int nullCount); + + ArrowBuffer.Builder valueBuilder = new ArrowBuffer.Builder(JsonFieldData.Count); + var json = JsonFieldData.Data.GetRawText(); + string[] values = JsonSerializer.Deserialize(json); + + foreach (string value in values) + { + valueBuilder.Append(parse(value)); + } + ArrowBuffer valueBuffer = valueBuilder.Build(); + + Array = createArray( + valueBuffer, validityBuffer, + JsonFieldData.Count, nullCount, 0); + } + + private ArrowBuffer GetOffsetBuffer() + { + ArrowBuffer.Builder valueOffsets = new ArrowBuffer.Builder(JsonFieldData.Offset.Length); + valueOffsets.AppendRange(JsonFieldData.Offset); + return valueOffsets.Build(default); + } + + private ArrowBuffer GetTypeIdBuffer() + { + ArrowBuffer.Builder typeIds = new ArrowBuffer.Builder(JsonFieldData.TypeId.Length); + for (int i = 0; i < JsonFieldData.TypeId.Length; i++) + { + typeIds.Append(checked((byte)JsonFieldData.TypeId[i])); + } + return typeIds.Build(default); + } + + private ArrowBuffer GetValidityBuffer(out int nullCount) + { + if (JsonFieldData.Validity == null) + { + nullCount = 0; + return ArrowBuffer.Empty; + } + + ArrowBuffer.BitmapBuilder validityBuilder = new ArrowBuffer.BitmapBuilder(JsonFieldData.Validity.Length); + validityBuilder.AppendRange(JsonFieldData.Validity); + + nullCount = validityBuilder.UnsetBitCount; + return validityBuilder.Build(); + } + + public void Visit(IArrowType type) + { + throw new NotImplementedException($"{type.Name} not implemented"); + } + } } public class JsonFieldData diff --git a/csharp/test/Apache.Arrow.Tests/CDataInterfacePythonTests.cs b/csharp/test/Apache.Arrow.Tests/CDataInterfacePythonTests.cs index f28b89a9cd17e..b6b65a582d953 100644 --- a/csharp/test/Apache.Arrow.Tests/CDataInterfacePythonTests.cs +++ b/csharp/test/Apache.Arrow.Tests/CDataInterfacePythonTests.cs @@ -99,8 +99,8 @@ private static Schema GetTestSchema() .Field(f => f.Name("time64_us").DataType(new Time64Type(TimeUnit.Microsecond)).Nullable(false)) .Field(f => f.Name("time64_ns").DataType(new Time64Type(TimeUnit.Nanosecond)).Nullable(false)) - .Field(f => f.Name("timestamp_ns").DataType(new TimestampType(TimeUnit.Nanosecond, "")).Nullable(false)) - .Field(f => f.Name("timestamp_us").DataType(new TimestampType(TimeUnit.Microsecond, "")).Nullable(false)) + .Field(f => f.Name("timestamp_ns").DataType(new TimestampType(TimeUnit.Nanosecond, (string) null)).Nullable(false)) + .Field(f => f.Name("timestamp_us").DataType(new TimestampType(TimeUnit.Microsecond, (string) null)).Nullable(false)) .Field(f => f.Name("timestamp_us_paris").DataType(new TimestampType(TimeUnit.Microsecond, "Europe/Paris")).Nullable(true)) .Field(f => f.Name("list_string").DataType(new ListType(StringType.Default)).Nullable(false)) diff --git a/dev/archery/archery/integration/runner.py b/dev/archery/archery/integration/runner.py index a780d33cbf323..67f0693df14aa 100644 --- a/dev/archery/archery/integration/runner.py +++ b/dev/archery/archery/integration/runner.py @@ -124,8 +124,8 @@ def run_c_data(self): enabled implementations. """ for producer, consumer in itertools.product( - filter(lambda t: t.C_DATA_EXPORTER, self.testers), - filter(lambda t: t.C_DATA_IMPORTER, self.testers)): + filter(lambda t: t.C_DATA_SCHEMA_EXPORTER, self.testers), + filter(lambda t: t.C_DATA_SCHEMA_IMPORTER, self.testers)): self._compare_c_data_implementations(producer, consumer) log('\n') @@ -428,9 +428,10 @@ def _compare_c_data_implementations( exporter, importer) self._run_test_cases(case_runner, self.json_files, serial=serial) - case_runner = partial(self._run_c_array_test_cases, producer, consumer, - exporter, importer) - self._run_test_cases(case_runner, self.json_files, serial=serial) + if producer.C_DATA_ARRAY_EXPORTER and consumer.C_DATA_ARRAY_IMPORTER: + case_runner = partial(self._run_c_array_test_cases, producer, consumer, + exporter, importer) + self._run_test_cases(case_runner, self.json_files, serial=serial) def _run_c_schema_test_case(self, producer: Tester, consumer: Tester, diff --git a/dev/archery/archery/integration/tester.py b/dev/archery/archery/integration/tester.py index 6a3061992d006..6cde20e61b321 100644 --- a/dev/archery/archery/integration/tester.py +++ b/dev/archery/archery/integration/tester.py @@ -204,9 +204,11 @@ class Tester: # whether the language supports receiving Flight FLIGHT_CLIENT = False # whether the language supports the C Data Interface as an exporter - C_DATA_EXPORTER = False + C_DATA_SCHEMA_EXPORTER = False + C_DATA_ARRAY_EXPORTER = False # whether the language supports the C Data Interface as an importer - C_DATA_IMPORTER = False + C_DATA_SCHEMA_IMPORTER = False + C_DATA_ARRAY_IMPORTER = False # the name used for skipping and shown in the logs name = "unknown" diff --git a/dev/archery/archery/integration/tester_cpp.py b/dev/archery/archery/integration/tester_cpp.py index 9ddc3c480002a..ab642c31aacc6 100644 --- a/dev/archery/archery/integration/tester_cpp.py +++ b/dev/archery/archery/integration/tester_cpp.py @@ -52,8 +52,10 @@ class CppTester(Tester): CONSUMER = True FLIGHT_SERVER = True FLIGHT_CLIENT = True - C_DATA_EXPORTER = True - C_DATA_IMPORTER = True + C_DATA_SCHEMA_EXPORTER = True + C_DATA_ARRAY_EXPORTER = True + C_DATA_SCHEMA_IMPORTER = True + C_DATA_ARRAY_IMPORTER = True name = 'C++' diff --git a/dev/archery/archery/integration/tester_csharp.py b/dev/archery/archery/integration/tester_csharp.py index 018731d573cac..bfa1dc1e45514 100644 --- a/dev/archery/archery/integration/tester_csharp.py +++ b/dev/archery/archery/integration/tester_csharp.py @@ -15,23 +15,138 @@ # specific language governing permissions and limitations # under the License. +from contextlib import contextmanager import os -from .tester import Tester +from . import cdata +from .tester import Tester, CDataExporter, CDataImporter from .util import run_cmd, log from ..utils.source import ARROW_ROOT_DEFAULT -_EXE_PATH = os.path.join( - ARROW_ROOT_DEFAULT, - "csharp/artifacts/Apache.Arrow.IntegrationTest", - "Debug/net7.0/Apache.Arrow.IntegrationTest", -) +_ARTIFACTS_PATH = os.path.join(ARROW_ROOT_DEFAULT, "csharp/artifacts") + +_EXE_PATH = os.path.join(_ARTIFACTS_PATH, + "Apache.Arrow.IntegrationTest", + "Debug/net7.0/Apache.Arrow.IntegrationTest", + ) + +_clr_loaded = False + + +def _load_clr(): + global _clr_loaded + if not _clr_loaded: + _clr_loaded = True + import pythonnet + pythonnet.load("coreclr") + import clr + clr.AddReference( + f"{_ARTIFACTS_PATH}/Apache.Arrow.IntegrationTest/" + f"Debug/net7.0/Apache.Arrow.IntegrationTest.dll") + clr.AddReference( + f"{_ARTIFACTS_PATH}/Apache.Arrow.Tests/" + f"Debug/net7.0/Apache.Arrow.Tests.dll") + + from Apache.Arrow.IntegrationTest import CDataInterface + CDataInterface.Initialize() + + +@contextmanager +def _disposing(disposable): + """ + Ensure the IDisposable object is disposed of when the enclosed block exits. + """ + try: + yield disposable + finally: + disposable.Dispose() + + +class _CDataBase: + + def __init__(self, debug, args): + self.debug = debug + self.args = args + self.ffi = cdata.ffi() + _load_clr() + + def _pointer_to_int(self, c_ptr): + return int(self.ffi.cast('uintptr_t', c_ptr)) + + def _read_batch_from_json(self, json_path, num_batch): + from Apache.Arrow.IntegrationTest import CDataInterface + + jf = CDataInterface.ParseJsonFile(json_path) + schema = jf.Schema.ToArrow() + return schema, jf.Batches[num_batch].ToArrow(schema) + + +class CSharpCDataExporter(CDataExporter, _CDataBase): + + def export_schema_from_json(self, json_path, c_schema_ptr): + from Apache.Arrow.IntegrationTest import CDataInterface + + jf = CDataInterface.ParseJsonFile(json_path) + CDataInterface.ExportSchema(jf.Schema.ToArrow(), + self._pointer_to_int(c_schema_ptr)) + + def export_batch_from_json(self, json_path, num_batch, c_array_ptr): + from Apache.Arrow.IntegrationTest import CDataInterface + + _, batch = self._read_batch_from_json(json_path, num_batch) + with _disposing(batch): + CDataInterface.ExportRecordBatch(batch, + self._pointer_to_int(c_array_ptr)) + + @property + def supports_releasing_memory(self): + # XXX the C# GC doesn't give reliable allocation measurements + return False + + +class CSharpCDataImporter(CDataImporter, _CDataBase): + + def import_schema_and_compare_to_json(self, json_path, c_schema_ptr): + from Apache.Arrow.IntegrationTest import CDataInterface + from Apache.Arrow.Tests import SchemaComparer + + jf = CDataInterface.ParseJsonFile(json_path) + imported_schema = CDataInterface.ImportSchema( + self._pointer_to_int(c_schema_ptr)) + SchemaComparer.Compare(jf.Schema.ToArrow(), imported_schema) + + def import_batch_and_compare_to_json(self, json_path, num_batch, + c_array_ptr): + from Apache.Arrow.IntegrationTest import CDataInterface + from Apache.Arrow.Tests import ArrowReaderVerifier + + schema, batch = self._read_batch_from_json(json_path, num_batch) + with _disposing(batch): + imported_batch = CDataInterface.ImportRecordBatch( + self._pointer_to_int(c_array_ptr), schema) + with _disposing(imported_batch): + # FIXME strictCompare=True adds more failures + ArrowReaderVerifier.CompareBatches(batch, imported_batch, + strictCompare=False) + + @property + def supports_releasing_memory(self): + return True + + def gc_until(self, predicate): + from Apache.Arrow.IntegrationTest import CDataInterface + CDataInterface.GetAllocatedBytes() # implicit GC + return predicate() class CSharpTester(Tester): PRODUCER = True CONSUMER = True + C_DATA_SCHEMA_EXPORTER = True + C_DATA_SCHEMA_IMPORTER = True + C_DATA_ARRAY_EXPORTER = True + C_DATA_ARRAY_IMPORTER = True name = 'C#' @@ -68,3 +183,9 @@ def file_to_stream(self, file_path, stream_path): cmd.extend(['--mode', 'file-to-stream']) cmd.extend(['-a', file_path, '>', stream_path]) self.run_shell_command(cmd) + + def make_c_data_exporter(self): + return CSharpCDataExporter(self.debug, self.args) + + def make_c_data_importer(self): + return CSharpCDataImporter(self.debug, self.args) diff --git a/dev/archery/archery/integration/tester_go.py b/dev/archery/archery/integration/tester_go.py index 6fa26ea02b8e7..75333db8d66d5 100644 --- a/dev/archery/archery/integration/tester_go.py +++ b/dev/archery/archery/integration/tester_go.py @@ -55,8 +55,10 @@ class GoTester(Tester): CONSUMER = True FLIGHT_SERVER = True FLIGHT_CLIENT = True - C_DATA_EXPORTER = True - C_DATA_IMPORTER = True + C_DATA_SCHEMA_EXPORTER = True + C_DATA_ARRAY_EXPORTER = True + C_DATA_SCHEMA_IMPORTER = True + C_DATA_ARRAY_IMPORTER = True name = 'Go'