-
Notifications
You must be signed in to change notification settings - Fork 40
/
Copy path_array.pyx
956 lines (768 loc) · 32.5 KB
/
_array.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# cython: language_level = 3
from libc.stdint cimport uintptr_t, uint8_t, int64_t
from cpython.pycapsule cimport PyCapsule_GetPointer
from cpython.unicode cimport PyUnicode_AsUTF8AndSize
from cpython cimport (
Py_buffer,
PyBuffer_Release,
PyBUF_ANY_CONTIGUOUS,
PyBUF_FORMAT,
PyBytes_FromStringAndSize,
PyObject_GetBuffer,
PyUnicode_FromStringAndSize,
)
from nanoarrow_c cimport (
ArrowArray,
ArrowArrayAppendBytes,
ArrowArrayAppendNull,
ArrowArrayAppendString,
ArrowArrayBuffer,
ArrowArrayFinishBuilding,
ArrowArrayInitFromSchema,
ArrowArrayInitFromType,
ArrowArrayMove,
ArrowArrayRelease,
ArrowArrayStartAppending,
ArrowArrayView,
ArrowArrayViewComputeNullCount,
ArrowArrayViewInitFromSchema,
ArrowArrayViewIsNull,
ArrowArrayViewGetBytesUnsafe,
ArrowArrayViewGetBufferDataType,
ArrowArrayViewGetBufferElementSizeBits,
ArrowArrayViewGetBufferType,
ArrowArrayViewGetBufferView,
ArrowArrayViewGetNumBuffers,
ArrowArrayViewGetStringUnsafe,
ArrowArrayViewSetArray,
ArrowArrayViewSetArrayMinimal,
ArrowBitCountSet,
ArrowBuffer,
ArrowBufferMove,
ArrowBufferType,
ArrowBufferView,
ArrowSchemaInitFromType,
ArrowStringView,
ArrowType,
ArrowTypeString,
ArrowValidationLevel,
NANOARROW_BUFFER_TYPE_DATA,
NANOARROW_BUFFER_TYPE_DATA_OFFSET,
NANOARROW_BUFFER_TYPE_VARIADIC_DATA,
NANOARROW_BUFFER_TYPE_VARIADIC_SIZE,
NANOARROW_BUFFER_TYPE_TYPE_ID,
NANOARROW_BUFFER_TYPE_UNION_OFFSET,
NANOARROW_BUFFER_TYPE_VALIDITY,
NANOARROW_VALIDATION_LEVEL_DEFAULT,
NANOARROW_VALIDATION_LEVEL_FULL,
NANOARROW_VALIDATION_LEVEL_MINIMAL,
NANOARROW_VALIDATION_LEVEL_NONE,
)
from nanoarrow_macros cimport (
NANOARROW_OK,
ARROW_DEVICE_CPU,
)
from nanoarrow_device_c cimport (
ArrowDeviceType,
ArrowDeviceArray,
ArrowDeviceArrayInit,
)
from nanoarrow._device cimport Device, CSharedSyncEvent
from nanoarrow._buffer cimport CBuffer, CBufferView
from nanoarrow._schema cimport CSchema, CLayout
from nanoarrow._utils cimport (
alloc_c_array,
alloc_c_device_array,
alloc_c_array_view,
c_array_shallow_copy,
c_device_array_shallow_copy,
Error
)
from typing import Iterable, Tuple, Union
from nanoarrow import _repr_utils
from nanoarrow._device import DEVICE_CPU, DeviceType
cdef class CArrayView:
"""Low-level ArrowArrayView wrapper
This object is a literal wrapper around an ArrowArrayView. It provides field accessors
that return Python objects and handles the structure lifecycle (i.e., initialized
ArrowArrayView structures are always released).
See `nanoarrow.c_array_view()` for construction and usage examples.
"""
def __cinit__(self, object base, uintptr_t addr):
self._base = base
self._ptr = <ArrowArrayView*>addr
self._event = CSharedSyncEvent(DEVICE_CPU)
def _set_array(self, CArray array, Device device=DEVICE_CPU):
cdef Error error = Error()
cdef int code
if device is DEVICE_CPU:
code = ArrowArrayViewSetArray(self._ptr, array._ptr, &error.c_error)
else:
code = ArrowArrayViewSetArrayMinimal(self._ptr, array._ptr, &error.c_error)
error.raise_message_not_ok("ArrowArrayViewSetArray()", code)
self._array_base = array._base
self._event = CSharedSyncEvent(device, <uintptr_t>array._sync_event)
return self
@property
def storage_type_id(self):
return self._ptr.storage_type
@property
def storage_type(self):
cdef const char* type_str = ArrowTypeString(self._ptr.storage_type)
if type_str != NULL:
return type_str.decode('UTF-8')
@property
def layout(self):
return CLayout(self, <uintptr_t>&self._ptr.layout)
def __len__(self):
return self._ptr.length
@property
def length(self):
return len(self)
@property
def offset(self):
return self._ptr.offset
@property
def null_count(self):
if self._ptr.null_count != -1:
return self._ptr.null_count
cdef ArrowBufferType buffer_type = self._ptr.layout.buffer_type[0]
cdef const uint8_t* validity_bits = self._ptr.buffer_views[0].data.as_uint8
if buffer_type != NANOARROW_BUFFER_TYPE_VALIDITY:
self._ptr.null_count = 0
elif validity_bits == NULL:
self._ptr.null_count = 0
elif self._event.device is DEVICE_CPU:
self._ptr.null_count = ArrowArrayViewComputeNullCount(self._ptr)
return self._ptr.null_count
@property
def n_children(self):
return self._ptr.n_children
def child(self, int64_t i):
if i < 0 or i >= self._ptr.n_children:
raise IndexError(f"{i} out of range [0, {self._ptr.n_children})")
cdef CArrayView child = CArrayView(
self._base,
<uintptr_t>self._ptr.children[i]
)
child._event = self._event
return child
@property
def children(self):
for i in range(self.n_children):
yield self.child(i)
@property
def n_buffers(self):
return ArrowArrayViewGetNumBuffers(self._ptr)
def _buffer_info(self, int64_t i):
if i < 0 or i >= self.n_buffers:
raise IndexError(f"{i} out of range [0, {self.n_buffers}]")
cdef ArrowBufferView view = ArrowArrayViewGetBufferView(self._ptr, i)
return (
ArrowArrayViewGetBufferType(self._ptr, i),
ArrowArrayViewGetBufferDataType(self._ptr, i),
ArrowArrayViewGetBufferElementSizeBits(self._ptr, i),
<uintptr_t>view.data.data,
view.size_bytes
)
def buffer_type(self, int64_t i):
buffer_type = self._buffer_info(i)[0]
if buffer_type == NANOARROW_BUFFER_TYPE_VALIDITY:
return "validity"
elif buffer_type == NANOARROW_BUFFER_TYPE_TYPE_ID:
return "type_id"
elif buffer_type == NANOARROW_BUFFER_TYPE_UNION_OFFSET:
return "union_offset"
elif buffer_type == NANOARROW_BUFFER_TYPE_DATA_OFFSET:
return "data_offset"
elif buffer_type == NANOARROW_BUFFER_TYPE_DATA:
return "data"
elif buffer_type == NANOARROW_BUFFER_TYPE_VARIADIC_DATA:
return "variadic_data"
elif buffer_type == NANOARROW_BUFFER_TYPE_VARIADIC_SIZE:
return "variadic_size"
else:
return "none"
def buffer(self, int64_t i):
_, data_type, element_size_bits, addr, size = self._buffer_info(i)
cdef ArrowBufferView buffer_view
buffer_view.data.data = <void*>addr
buffer_view.size_bytes = size
# Check the buffer size here because the error later is cryptic.
# Buffer sizes are set to -1 when they are "unknown", so because of errors
# in nanoarrow/C or because the array is on a non-CPU device, that -1 value
# could leak its way here.
if buffer_view.size_bytes < 0:
raise RuntimeError(f"ArrowArrayView buffer {i} has size_bytes < 0")
return CBufferView(
self._array_base,
addr,
size,
data_type,
element_size_bits,
self._event
)
@property
def buffers(self):
for i in range(self.n_buffers):
yield self.buffer(i)
@property
def dictionary(self):
if self._ptr.dictionary == NULL:
return None
cdef CArrayView dictionary = CArrayView(
self,
<uintptr_t>self._ptr.dictionary
)
dictionary._event = self._event
return dictionary
def _iter_bytes(self, int64_t offset, int64_t length) -> bytes | None:
cdef ArrowBufferView item_view
for i in range(offset, length):
if ArrowArrayViewIsNull(self._ptr, i):
yield None
else:
item_view = ArrowArrayViewGetBytesUnsafe(self._ptr, i)
yield PyBytes_FromStringAndSize(item_view.data.as_char, item_view.size_bytes)
def _iter_str(self, int64_t offset, int64_t length) -> str | None:
cdef ArrowStringView item_view
for i in range(offset, length):
if ArrowArrayViewIsNull(self._ptr, i):
yield None
else:
item_view = ArrowArrayViewGetStringUnsafe(self._ptr, i)
yield PyUnicode_FromStringAndSize(item_view.data, item_view.size_bytes)
def __repr__(self):
return _repr_utils.array_view_repr(self)
@staticmethod
def from_schema(CSchema schema):
cdef ArrowArrayView* c_array_view
base = alloc_c_array_view(&c_array_view)
cdef Error error = Error()
cdef int code = ArrowArrayViewInitFromSchema(c_array_view,
schema._ptr, &error.c_error)
error.raise_message_not_ok("ArrowArrayViewInitFromSchema()", code)
return CArrayView(base, <uintptr_t>c_array_view)
@staticmethod
def from_array(CArray array, Device device=DEVICE_CPU):
out = CArrayView.from_schema(array._schema)
return out._set_array(array, device)
cdef class CArray:
"""Low-level ArrowArray wrapper
This object is a literal wrapper around a read-only ArrowArray. It provides field accessors
that return Python objects and handles the C Data interface lifecycle (i.e., initialized
ArrowArray structures are always released).
See `nanoarrow.c_array()` for construction and usage examples.
"""
@staticmethod
def allocate(CSchema schema) -> CArray:
"""Allocate a released ArrowArray"""
cdef ArrowArray* c_array_out
base = alloc_c_array(&c_array_out)
return CArray(base, <uintptr_t>c_array_out, schema)
def __cinit__(self, object base, uintptr_t addr, CSchema schema):
self._base = base
self._ptr = <ArrowArray*>addr
self._schema = schema
self._device_type = ARROW_DEVICE_CPU
self._device_id = -1
self._sync_event = NULL
cdef _set_device(self, ArrowDeviceType device_type, int64_t device_id, void* sync_event):
self._device_type = device_type
self._device_id = device_id
self._sync_event = sync_event
@staticmethod
def _import_from_c_capsule(schema_capsule, array_capsule) -> CArray:
"""Import from a ArrowSchema and ArrowArray PyCapsule tuple.
Parameters
----------
schema_capsule : PyCapsule
A valid PyCapsule with name 'arrow_schema' containing an
ArrowSchema pointer.
array_capsule : PyCapsule
A valid PyCapsule with name 'arrow_array' containing an
ArrowArray pointer.
"""
cdef:
CSchema out_schema
CArray out
out_schema = CSchema._import_from_c_capsule(schema_capsule)
out = CArray(
array_capsule,
<uintptr_t>PyCapsule_GetPointer(array_capsule, 'arrow_array'),
out_schema
)
return out
def __getitem__(self, k) -> CArray:
self._assert_valid()
if not isinstance(k, slice):
raise TypeError(
f"Can't subset CArray with object of type {type(k).__name__}")
if k.step is not None:
raise ValueError("Can't slice CArray with step")
cdef int64_t start = 0 if k.start is None else k.start
cdef int64_t stop = self._ptr.length if k.stop is None else k.stop
if start < 0:
start = self._ptr.length + start
if stop < 0:
stop = self._ptr.length + stop
if start > self._ptr.length or stop > self._ptr.length or stop < start:
raise IndexError(
f"{k} does not describe a valid slice of CArray "
f"with length {self._ptr.length}"
)
cdef ArrowArray* c_array_out
base = alloc_c_array(&c_array_out)
c_array_shallow_copy(self._base, self._ptr, c_array_out)
c_array_out.offset = c_array_out.offset + start
c_array_out.length = stop - start
cdef CArray out = CArray(base, <uintptr_t>c_array_out, self._schema)
out._set_device(self._device_type, self._device_id, self._sync_event)
return out
def __arrow_c_array__(self, requested_schema=None):
"""
Get a pair of PyCapsules containing a C ArrowArray representation of the object.
Parameters
----------
requested_schema : PyCapsule | None
A PyCapsule containing a C ArrowSchema representation of a requested
schema. Not supported.
Returns
-------
Tuple[PyCapsule, PyCapsule]
A pair of PyCapsules containing a C ArrowSchema and ArrowArray,
respectively.
"""
self._assert_valid()
if self._device_type != ARROW_DEVICE_CPU:
raise ValueError(
"Can't invoke __arrow_c_array__ on non-CPU array "
f"with device_type {self._device_type}")
if requested_schema is not None:
raise NotImplementedError("requested_schema")
# Export a shallow copy pointing to the same data in a way
# that ensures this object stays valid.
# TODO optimize this to export a version where children are reference
# counted and can be released separately
cdef ArrowArray* c_array_out
array_capsule = alloc_c_array(&c_array_out)
c_array_shallow_copy(self._base, self._ptr, c_array_out)
return self._schema.__arrow_c_schema__(), array_capsule
def _addr(self) -> int:
return <uintptr_t>self._ptr
def is_valid(self) -> bool:
"""Check for a non-null and non-released underlying ArrowArray"""
return self._ptr != NULL and self._ptr.release != NULL
def _assert_valid(self):
if self._ptr == NULL:
raise RuntimeError("CArray is NULL")
if self._ptr.release == NULL:
raise RuntimeError("CArray is released")
def view(self) -> CArrayView:
"""Allocate a :class:`CArrayView` to access the buffers of this array"""
device = Device.resolve(self._device_type, self._device_id)
return CArrayView.from_array(self, device)
@property
def schema(self) -> CSchema:
return self._schema
@property
def device_type(self) -> DeviceType:
return DeviceType(self._device_type)
@property
def device_type_id(self) -> int:
return self._device_type
@property
def device_id(self) -> int:
return self._device_id
def __len__(self) -> int:
self._assert_valid()
return self._ptr.length
@property
def length(self) -> int:
return len(self)
@property
def offset(self) -> int:
self._assert_valid()
return self._ptr.offset
@property
def null_count(self) -> int:
self._assert_valid()
return self._ptr.null_count
@property
def n_buffers(self) -> int:
self._assert_valid()
return self._ptr.n_buffers
@property
def buffers(self) -> Tuple[int, ...]:
self._assert_valid()
return tuple(<uintptr_t>self._ptr.buffers[i] for i in range(self._ptr.n_buffers))
@property
def n_children(self) -> int:
self._assert_valid()
return self._ptr.n_children
def child(self, int64_t i):
self._assert_valid()
if i < 0 or i >= self._ptr.n_children:
raise IndexError(f"{i} out of range [0, {self._ptr.n_children})")
cdef CArray out = CArray(
self._base,
<uintptr_t>self._ptr.children[i],
self._schema.child(i)
)
out._set_device(self._device_type, self._device_id, self._sync_event)
return out
@property
def children(self) -> Iterable[CArray]:
for i in range(self.n_children):
yield self.child(i)
@property
def dictionary(self) -> Union[CArray, None]:
self._assert_valid()
cdef CArray out
if self._ptr.dictionary != NULL:
out = CArray(self, <uintptr_t>self._ptr.dictionary, self._schema.dictionary)
out._set_device(self._device_type, self._device_id, self._sync_event)
return out
else:
return None
def __repr__(self) -> str:
return _repr_utils.array_repr(self)
cdef class CArrayBuilder:
"""Helper for constructing an ArrowArray
The primary function of this class is to wrap the nanoarrow C library calls
that build up the components of an ArrowArray.
"""
cdef CArray c_array
cdef ArrowArray* _ptr
cdef Device _device
cdef bint _can_validate
def __cinit__(self, CArray array, Device device=DEVICE_CPU):
self.c_array = array
self._ptr = array._ptr
self._device = device
self._can_validate = device is DEVICE_CPU
@staticmethod
def allocate(Device device=DEVICE_CPU):
"""Create a CArrayBuilder
Allocates memory for an ArrowArray and populates it with nanoarrow's
ArrowArray private_data/release callback implementation. This should
usually be followed by :meth:`init_from_type` or :meth:`init_from_schema`.
"""
return CArrayBuilder(CArray.allocate(CSchema.allocate()), device)
def is_empty(self) -> bool:
"""Check if any items have been appended to this builder"""
if self._ptr.release == NULL:
raise RuntimeError("CArrayBuilder is not initialized")
return self._ptr.length == 0
def init_from_type(self, int type_id) -> CArrayBuilder:
if self._ptr.release != NULL:
raise RuntimeError("CArrayBuilder is already initialized")
cdef int code = ArrowArrayInitFromType(self._ptr, <ArrowType>type_id)
Error.raise_error_not_ok("ArrowArrayInitFromType()", code)
code = ArrowSchemaInitFromType(self.c_array._schema._ptr, <ArrowType>type_id)
Error.raise_error_not_ok("ArrowSchemaInitFromType()", code)
return self
def init_from_schema(self, CSchema schema) -> CArrayBuilder:
if self._ptr.release != NULL:
raise RuntimeError("CArrayBuilder is already initialized")
cdef Error error = Error()
cdef int code = ArrowArrayInitFromSchema(self._ptr, schema._ptr, &error.c_error)
error.raise_message_not_ok("ArrowArrayInitFromType()", code)
self.c_array._schema = schema
return self
def start_appending(self) -> CArrayBuilder:
"""Use append mode for building this ArrowArray
Calling this method is required to produce a valid array prior to calling
:meth:`append_strings` or `append_bytes`.
"""
if self._device != DEVICE_CPU:
raise ValueError("Can't append to non-CPU array")
cdef int code = ArrowArrayStartAppending(self._ptr)
Error.raise_error_not_ok("ArrowArrayStartAppending()", code)
return self
def append_strings(self, obj: Iterable[Union[str, None]]) -> CArrayBuilder:
cdef int code
cdef Py_ssize_t item_utf8_size
cdef ArrowStringView item
for py_item in obj:
if py_item is None:
code = ArrowArrayAppendNull(self._ptr, 1)
else:
# Cython raises the error from PyUnicode_AsUTF8AndSize()
# in the event that py_item is not a str(); however, we
# set item_utf8_size = 0 to be safe.
item_utf8_size = 0
item.data = PyUnicode_AsUTF8AndSize(py_item, &item_utf8_size)
item.size_bytes = item_utf8_size
code = ArrowArrayAppendString(self._ptr, item)
if code != NANOARROW_OK:
Error.raise_error(f"append string item {py_item}", code)
return self
def append_bytes(self, obj: Iterable[Union[str, None]]) -> CArrayBuilder:
cdef Py_buffer buffer
cdef ArrowBufferView item
for py_item in obj:
if py_item is None:
code = ArrowArrayAppendNull(self._ptr, 1)
else:
PyObject_GetBuffer(py_item, &buffer, PyBUF_ANY_CONTIGUOUS | PyBUF_FORMAT)
if buffer.ndim != 1:
raise ValueError("Can't append buffer with dimensions != 1 to binary array")
if buffer.itemsize != 1:
PyBuffer_Release(&buffer)
raise ValueError("Can't append buffer with itemsize != 1 to binary array")
item.data.data = buffer.buf
item.size_bytes = buffer.len
code = ArrowArrayAppendBytes(self._ptr, item)
PyBuffer_Release(&buffer)
if code != NANOARROW_OK:
Error.raise_error(f"append bytes item {py_item}", code)
def set_offset(self, int64_t offset) -> CArrayBuilder:
self.c_array._assert_valid()
self._ptr.offset = offset
return self
def set_length(self, int64_t length) -> CArrayBuilder:
self.c_array._assert_valid()
self._ptr.length = length
return self
def set_null_count(self, int64_t null_count) -> CArrayBuilder:
self.c_array._assert_valid()
self._ptr.null_count = null_count
return self
def resolve_null_count(self) -> CArrayBuilder:
"""Ensure the output null count is synchronized with existing buffers
Note that this will not attempt to access non-CPU buffers such that
:attr:`null_count` might still be -1 after calling this method.
"""
self.c_array._assert_valid()
# This doesn't apply to unions. We currently don't have a schema view
# or array view we can use to query the type ID, so just use the format
# string for now.
format = self.c_array.schema.format
if format.startswith("+us:") or format.startswith("+ud:"):
return self
# Don't overwrite an explicit null count
if self._ptr.null_count != -1:
return self
cdef ArrowBuffer* validity_buffer = ArrowArrayBuffer(self._ptr, 0)
if validity_buffer.size_bytes == 0:
self._ptr.null_count = 0
return self
# Don't attempt to access a non-cpu buffer
if self._device != DEVICE_CPU:
return self
# From _ArrowBytesForBits(), which is not included in nanoarrow_c.pxd
# because it's an internal inline function.
cdef int64_t bits = self._ptr.offset + self._ptr.length
cdef int64_t bytes_required = (bits >> 3) + ((bits & 7) != 0)
if validity_buffer.size_bytes < bytes_required:
raise ValueError(
f"Expected validity bitmap >= {bytes_required} bytes "
f"but got validity bitmap with {validity_buffer.size_bytes} bytes"
)
cdef int64_t count = ArrowBitCountSet(
validity_buffer.data,
self._ptr.offset,
self._ptr.length
)
self._ptr.null_count = self._ptr.length - count
return self
def set_buffer(self, int64_t i, CBuffer buffer, move=False) -> CArrayBuilder:
"""Set an ArrowArray buffer
Sets a buffer of this ArrowArray such the pointer at array->buffers[i] is
equal to buffer->data and such that the buffer's lifcycle is managed by
the array. If move is True, the input Python object that previously wrapped
the ArrowBuffer will be invalidated, which is usually the desired behaviour
if you built or imported a buffer specifically to build this array. If move
is False (the default), this function will a make a shallow copy via another
layer of Python object wrapping.
"""
if i < 0 or i > 3:
raise IndexError("i must be >= 0 and <= 3")
if buffer._device != self._device:
raise ValueError(
f"Builder device ({self._device.device_type}/{self._device.device_id})"
" and buffer device "
f"({buffer._device.device_type}/{buffer._device.device_id})"
" are not identical"
)
self.c_array._assert_valid()
if not move:
buffer = CBuffer.from_pybuffer(buffer)
ArrowBufferMove(buffer._ptr, ArrowArrayBuffer(self._ptr, i))
# The buffer's lifecycle is now owned by the array; however, we need
# array->buffers[i] to be updated such that it equals
# ArrowArrayBuffer(array, i)->data.
self._ptr.buffers[i] = ArrowArrayBuffer(self._ptr, i).data
return self
def set_child(self, int64_t i, CArray c_array, move=False) -> CArrayBuilder:
"""Set an ArrowArray child
Set a child of this array by performing a show copy or optionally
transferring ownership to this object. The initialized child array
must have been initialized before this call by initializing this
builder with a schema containing the correct number of children.
"""
cdef CArray child = self.c_array.child(i)
if child._ptr.release != NULL:
ArrowArrayRelease(child._ptr)
if (
self._device.device_type_id != c_array.device_type_id
or self._device.device_id != c_array.device_id
):
raise ValueError(
f"Builder device ({self._device.device_type}/{self._device.device_id})"
" and child device "
f"({c_array.device_type}/{c_array.device_id}) are not identical"
)
# There is probably a way to avoid a full synchronize for each child
# (e.g., perhaps the ArrayBuilder could allocate a stream to use such
# that an event can be allocated on finish_device() and synchronization
# could be avoided entirely). Including this for now for safety.
cdef CSharedSyncEvent sync = CSharedSyncEvent(
self._device,
<uintptr_t>c_array._sync_event
)
sync.synchronize()
if not move:
c_array_shallow_copy(c_array._base, c_array._ptr, child._ptr)
else:
ArrowArrayMove(c_array._ptr, child._ptr)
# After setting children, we can't use the built-in validation done by
# ArrowArrayFinishBuilding() because it assumes that the private_data of
# each array (recursively) is one that was initialized by ArrowArrayInit()
self._can_validate = False
return self
def finish(self, validation_level=None) -> CArray:
"""Finish building this array
Performs any steps required to return a valid ArrowArray and optionally
validates the output to ensure that the result is valid (given the information
the array has available to it).
Parameters
----------
validation_level : None, "full", "default", "minimal", or "none", optional
Explicitly define a validation level or use None to perform default
validation if possible. Validation may not be possible if children
were set that were not created by nanoarrow.
"""
self.c_array._assert_valid()
cdef ArrowValidationLevel c_validation_level
cdef Error error = Error()
cdef int code
if self._can_validate:
c_validation_level = NANOARROW_VALIDATION_LEVEL_DEFAULT
if validation_level == "full":
c_validation_level = NANOARROW_VALIDATION_LEVEL_FULL
elif validation_level == "minimal":
c_validation_level = NANOARROW_VALIDATION_LEVEL_MINIMAL
elif validation_level == "none":
c_validation_level = NANOARROW_VALIDATION_LEVEL_NONE
code = ArrowArrayFinishBuilding(self._ptr, c_validation_level, &error.c_error)
error.raise_message_not_ok("ArrowArrayFinishBuildingDefault()", code)
elif validation_level not in (None, "none"):
raise NotImplementedError("Validation for array with children is not implemented")
out = self.c_array
self.c_array = CArray.allocate(CSchema.allocate())
self._ptr = self.c_array._ptr
self._can_validate = True
return out
def finish_device(self):
"""Finish building this array and export to an ArrowDeviceArray
Calls :meth:`finish`, propagating device information into an ArrowDeviceArray.
"""
cdef CArray array = self.finish()
cdef ArrowDeviceArray* device_array_ptr
holder = alloc_c_device_array(&device_array_ptr)
cdef int code = ArrowDeviceArrayInit(self._device._ptr, device_array_ptr, array._ptr, NULL)
Error.raise_error_not_ok("ArrowDeviceArrayInit", code)
return CDeviceArray(holder, <uintptr_t>device_array_ptr, array._schema)
cdef class CDeviceArray:
"""Low-level ArrowDeviceArray wrapper
This object is a literal wrapper around an ArrowDeviceArray. It provides field accessors
that return Python objects and handles the structure lifecycle (i.e., initialized
ArrowDeviceArray structures are always released).
See `nanoarrow.device.c_device_array()` for construction and usage examples.
"""
def __cinit__(self, object base, uintptr_t addr, CSchema schema):
self._base = base
self._ptr = <ArrowDeviceArray*>addr
self._schema = schema
@staticmethod
def _init_from_array(Device device, uintptr_t array_addr, CSchema schema):
cdef ArrowArray* array_ptr = <ArrowArray*>array_addr
cdef ArrowDeviceArray* device_array_ptr
cdef void* sync_event = NULL
holder = alloc_c_device_array(&device_array_ptr)
cdef int code = ArrowDeviceArrayInit(device._ptr, device_array_ptr, array_ptr, sync_event)
Error.raise_error_not_ok("ArrowDeviceArrayInit", code)
return CDeviceArray(holder, <uintptr_t>device_array_ptr, schema)
@property
def schema(self) -> CSchema:
return self._schema
@property
def device_type(self) -> DeviceType:
return DeviceType(self._ptr.device_type)
@property
def device_type_id(self) -> int:
return self._ptr.device_type
@property
def device_id(self) -> int:
return self._ptr.device_id
@property
def array(self) -> CArray:
cdef CArray array = CArray(self, <uintptr_t>&self._ptr.array, self._schema)
array._set_device(self._ptr.device_type, self._ptr.device_id, self._ptr.sync_event)
return array
def view(self) -> CArrayView:
return self.array.view()
def __arrow_c_array__(self, requested_schema=None):
return self.array.__arrow_c_array__(requested_schema=requested_schema)
def __arrow_c_device_array__(self, requested_schema=None):
if requested_schema is not None:
raise NotImplementedError("requested_schema")
# TODO: evaluate whether we need to synchronize here or whether we should
# move device arrays instead of shallow-copying them
cdef ArrowDeviceArray* c_array_out
device_array_capsule = alloc_c_device_array(&c_array_out)
c_device_array_shallow_copy(self._base, self._ptr, c_array_out)
return self._schema.__arrow_c_schema__(), device_array_capsule
@staticmethod
def _import_from_c_capsule(schema_capsule, device_array_capsule) -> CDeviceArray:
"""
Import from an ArrowSchema and ArrowArray PyCapsule tuple.
Parameters
----------
schema_capsule : PyCapsule
A valid PyCapsule with name 'arrow_schema' containing an
ArrowSchema pointer.
device_array_capsule : PyCapsule
A valid PyCapsule with name 'arrow_device_array' containing an
ArrowDeviceArray pointer.
"""
cdef:
CSchema out_schema
CDeviceArray out
out_schema = CSchema._import_from_c_capsule(schema_capsule)
out = CDeviceArray(
device_array_capsule,
<uintptr_t>PyCapsule_GetPointer(device_array_capsule, 'arrow_device_array'),
out_schema
)
return out
def __repr__(self) -> str:
return _repr_utils.device_array_repr(self)