diff --git a/.bumpversion.cfg b/.bumpversion.cfg deleted file mode 100644 index 6e3ad036..00000000 --- a/.bumpversion.cfg +++ /dev/null @@ -1,28 +0,0 @@ -[bumpversion] -current_version = 3.0.0rc1 -commit = True -tag = False - -[bumpversion:file:pyproject.toml] -search = version = "{current_version}" -replace = version = "{new_version}" - -[bumpversion:file:setup.py] -search = version='{current_version}' -replace = version='{new_version}' - -[bumpversion:file:README.rst] -search = library is {current_version} -replace = library is {new_version} - -[bumpversion:file:docs/conf.py] -search = version = release = '{current_version}' -replace = version = release = '{new_version}' - -[bumpversion:file:src/questdb/__init__.py] -search = __version__ = '{current_version}' -replace = __version__ = '{new_version}' - -[bumpversion:file:src/questdb/ingress.pyx] -search = VERSION = '{current_version}' -replace = VERSION = '{new_version}' diff --git a/.bumpversion.toml b/.bumpversion.toml new file mode 100644 index 00000000..1417a603 --- /dev/null +++ b/.bumpversion.toml @@ -0,0 +1,39 @@ +[tool.bumpversion] +current_version = "3.0.0rc1" +commit = false +tag = false + +[[tool.bumpversion.files]] +filename = "pyproject.toml" +search = "version = \"{current_version}\"" +replace = "version = \"{new_version}\"" + +[[tool.bumpversion.files]] +filename = "setup.py" +search = "version='{current_version}'" +replace = "version='{new_version}'" + +[[tool.bumpversion.files]] +filename = "README.rst" +search = "library is {current_version}" +replace = "library is {new_version}" + +[[tool.bumpversion.files]] +filename = "docs/conf.py" +search = "version = release = '{current_version}'" +replace = "version = release = '{new_version}'" + +[[tool.bumpversion.files]] +filename = "src/questdb/__init__.py" +search = "__version__ = '{current_version}'" +replace = "__version__ = '{new_version}'" + +[[tool.bumpversion.files]] +filename = "src/questdb/ingress.pyx" +search = "VERSION = '{current_version}'" +replace = "VERSION = '{new_version}'" + +[[tool.bumpversion.files]] +filename = ".bumpversion.toml" +search = "current_version = \"{current_version}\"" +replace = "current_version = \"{new_version}\"" diff --git a/CHANGELOG.rst b/CHANGELOG.rst index ca005f9e..3598c1f8 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -16,6 +16,10 @@ Features * Array Data Type Support. Adds native support for NumPy arrays (currently only for ``np.float64`` element type and up to 32 dimensions). +.. note:: + **Server Requirement**: This feature requires QuestDB server version 8.4.0 or higher. + Ensure your server is upgraded before ingesting array types, otherwise data ingestion will fail. + .. code-block:: python import numpy as np diff --git a/DEV_NOTES.rst b/DEV_NOTES.rst index fd899f57..b4b40855 100644 --- a/DEV_NOTES.rst +++ b/DEV_NOTES.rst @@ -55,7 +55,6 @@ The development requirements are these if you prefer to install them one by one: python3 -m pip install wheel python3 -m pip install twine python3 -m pip install cibuildwheel - python3 -m pip install bump2version Building and packaging diff --git a/README.rst b/README.rst index 04923f6f..3d4a0ac2 100644 --- a/README.rst +++ b/README.rst @@ -5,7 +5,7 @@ QuestDB Client Library for Python This is the official Python client library for `QuestDB `_. This client library implements QuestDB's variant of the -`Ingestion Line Protocol `_ +`InfluxDB Line Protocol `_ (ILP) over HTTP and TCP. ILP provides the fastest way to insert data into QuestDB. @@ -53,6 +53,7 @@ The most common way to insert data is from a Pandas dataframe. 'amount': [0.00044, 0.001], # NumPy float64 arrays are supported from v3.0.0rc1 onwards. + # Note that requires QuestDB server >= 8.4.0 for array support 'ord_book_bids': [ np.array([2615.54, 2618.63]), np.array([39269.98, 39270.00]) @@ -82,6 +83,7 @@ You can also send individual rows. This only requires a more minimal installatio 'amount': 0.00044, # NumPy float64 arrays are supported from v3.0.0rc1 onwards. + # Note that requires QuestDB server >= 8.4.0 for array support 'ord_book_bids': np.array([2615.54, 2618.63]), }, at=TimestampNanos.now()) diff --git a/RELEASING.rst b/RELEASING.rst index 68c77e69..c5444cdd 100644 --- a/RELEASING.rst +++ b/RELEASING.rst @@ -20,21 +20,17 @@ Create a new PR with the new changes in ``CHANGELOG.rst``. Make a commit and push the changes to a new branch. -You also want to bump the version. +You also want to bump the version. This process is semi-automated. -This process is automated by the following command: +* Ensure you have `uv` and `bump-my-version` installed: + * `curl -LsSf https://astral.sh/uv/install.sh | sh` : see https://docs.astral.sh/uv/getting-started/installation/ + * `uv tool install bump-my-version`: see https://github.com/callowayproject/bump-my-version. -.. code-block:: bash - - bump2version --config-file .bumpversion.cfg --no-tag patch - -Here use: - -* ``patch`` to bump the version to the next patch version, e.g. 1.0.0 -> 1.0.1 - -* ``minor`` to bump the version to the next minor version, e.g. 1.0.0 -> 1.1.0 +```console +bump-my-version replace --new-version NEW_VERSION +``` -* ``major`` to bump the version to the next major version, e.g. 1.0.0 -> 2.0.0 +If you're unsure, append `--dry-run` to preview changes. Now merge the PR with the title "Bump version: V.V.V → W.W.W". diff --git a/c-questdb-client b/c-questdb-client index fd24e025..5af7515a 160000 --- a/c-questdb-client +++ b/c-questdb-client @@ -1 +1 @@ -Subproject commit fd24e0258f6b86a457037013cc42459e5bb9475b +Subproject commit 5af7515a29bc5b612516474a83e1186c583a73b3 diff --git a/ci/cibuildwheel.yaml b/ci/cibuildwheel.yaml index fdbb5850..2f684b5d 100644 --- a/ci/cibuildwheel.yaml +++ b/ci/cibuildwheel.yaml @@ -122,6 +122,7 @@ stages: displayName: Build wheels env: CIBW_BUILD: pp* + CIBW_ENABLE: pypy pypy-eol - task: PublishBuildArtifacts@1 inputs: {pathtoPublish: 'wheelhouse'} diff --git a/ci/run_tests_pipeline.yaml b/ci/run_tests_pipeline.yaml index 3a3d848d..4d88f61c 100644 --- a/ci/run_tests_pipeline.yaml +++ b/ci/run_tests_pipeline.yaml @@ -35,6 +35,8 @@ stages: lfs: false submodules: true - task: UsePythonVersion@0 + inputs: + versionSpec: '3.12' - script: | python3 --version python3 -m pip install cython @@ -68,3 +70,51 @@ stages: JAVA_HOME: $(JAVA_HOME_11_X64) QDB_REPO_PATH: './questdb' condition: eq(variables.vsQuestDbMaster, true) + - job: TestsAgainstVariousNumpyVersion1x + pool: + name: "Azure Pipelines" + vmImage: "ubuntu-latest" + timeoutInMinutes: 45 + steps: + - checkout: self + fetchDepth: 1 + lfs: false + submodules: true + - task: UsePythonVersion@0 + inputs: + versionSpec: '3.9' + - script: | + python3 --version + python3 -m pip install uv + sudo apt-get install -y libopenblas-dev pkg-config + displayName: "Install uv" + - script: uv run --with 'numpy==1.21.0' test/test.py -v TestBufferProtocolVersionV2 + displayName: "Test vs numpy 1.21" + - script: uv run --with 'numpy==1.24.0' test/test.py -v TestBufferProtocolVersionV2 + displayName: "Test vs numpy 1.24" + - script: uv run --with 'numpy==1.26.0' test/test.py -v TestBufferProtocolVersionV2 + displayName: "Test vs numpy 1.26" + - job: TestsAgainstVariousNumpyVersion2x + pool: + name: "Azure Pipelines" + vmImage: "ubuntu-latest" + timeoutInMinutes: 45 + steps: + - checkout: self + fetchDepth: 1 + lfs: false + submodules: true + - task: UsePythonVersion@0 + inputs: + versionSpec: '3.11' + - script: | + python3 --version + python3 -m pip install uv + sudo apt-get install -y libopenblas-dev pkg-config + displayName: "Install uv" + - script: uv run --with 'numpy==2.0.0' test/test.py -v TestBufferProtocolVersionV2 + displayName: "Test vs numpy 2.0" + - script: uv run --with 'numpy==2.2.0' test/test.py -v TestBufferProtocolVersionV2 + displayName: "Test vs numpy 2.2" + - script: uv run --with 'numpy==2.3.0' test/test.py -v TestBufferProtocolVersionV2 + displayName: "Test vs numpy 2.3" diff --git a/dev_requirements.txt b/dev_requirements.txt index 9d2076dc..36930bfb 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -7,6 +7,6 @@ sphinx-rtd-theme>=1.0.0 twine>=4.0.1 bump2version>=1.0.1 pandas>=1.3.5 -numpy>=1.21.6 +numpy>=1.21.0 pyarrow>=10.0.1 fastparquet>=2023.10.1 diff --git a/docs/conf.py b/docs/conf.py index 1c68b50e..c50d9dc0 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -5,6 +5,11 @@ from questdb.ingress import * +autodoc_mock_imports = ["_cython"] +autodoc_type_aliases = { + 'datetime': 'datetime.datetime', +} + extensions = [ 'sphinx.ext.autodoc', 'sphinx.ext.autosummary', diff --git a/docs/conf.rst b/docs/conf.rst index 6b0853b2..71812a3d 100644 --- a/docs/conf.rst +++ b/docs/conf.rst @@ -240,6 +240,9 @@ Specifies the version of InfluxDB Line Protocol to use. Valid options are: TCP/TCPS: Defaults to version 1 for compatibility +.. note:: + Protocol version ``2`` requires QuestDB server version 8.4.0 or higher. + .. _sender_conf_buffer: Buffer diff --git a/docs/sender.rst b/docs/sender.rst index 602f1268..840c1b1a 100644 --- a/docs/sender.rst +++ b/docs/sender.rst @@ -302,6 +302,9 @@ Here is a configuration string with ``protocol_version=2`` for ``TCP``: See the :ref:`sender_conf_protocol_version` section for more details. +.. note:: + Protocol version ``2`` requires QuestDB server version 8.4.0 or higher. + Error Reporting =============== diff --git a/pyproject.toml b/pyproject.toml index b7b94fe3..1ed3bfd8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,7 @@ classifiers = [ "Topic :: Database :: Front-Ends", "Topic :: Scientific/Engineering" ] -dependencies = ["numpy>=1.26.0"] # Keep in sync with build-system.requires +dependencies = ["numpy>=1.21.0"] # Keep in sync with build-system.requires [project.license] text = "Apache License 2.0" @@ -46,7 +46,7 @@ requires = [ "setuptools>=45.2.0", "wheel>=0.34.2", "cython>=0.29.24", - "numpy>=1.26.0", # keep in sync with project.dependencies + "numpy>=1.21.0", # keep in sync with project.dependencies ] diff --git a/src/questdb/dataframe.pxi b/src/questdb/dataframe.pxi index 1741f097..dcd6624b 100644 --- a/src/questdb/dataframe.pxi +++ b/src/questdb/dataframe.pxi @@ -2064,18 +2064,30 @@ cdef void_int _dataframe_serialize_cell_column_arr_f64__arr_f64_numpyobj( f'Only float64 numpy arrays are supported, got dtype: {arr_descr}') cdef: size_t rank = PyArray_NDIM(arr) - const uint8_t* data_ptr = PyArray_DATA(arr) + const double* data_ptr = PyArray_DATA(arr) line_sender_error * err = NULL - if not line_sender_buffer_column_f64_arr_byte_strides( - ls_buf, - col.name, - rank, - PyArray_DIMS(arr), - PyArray_STRIDES(arr), # N.B.: Strides expressed as byte jumps - data_ptr, - PyArray_NBYTES(arr), - &err): - raise c_err_to_py(err) + + if PyArray_FLAGS(arr) & NPY_ARRAY_C_CONTIGUOUS != 0: + if not line_sender_buffer_column_f64_arr_c_major( + ls_buf, + col.name, + rank, + PyArray_DIMS(arr), + data_ptr, + PyArray_SIZE(arr), + &err): + raise c_err_to_py(err) + else: + if not line_sender_buffer_column_f64_arr_byte_strides( + ls_buf, + col.name, + rank, + PyArray_DIMS(arr), + PyArray_STRIDES(arr), # N.B.: Strides expressed as byte jumps + data_ptr, + PyArray_SIZE(arr), + &err): + raise c_err_to_py(err) cdef void_int _dataframe_serialize_cell_column_ts__dt64ns_tz_arrow( line_sender_buffer* ls_buf, diff --git a/src/questdb/extra_numpy.pxd b/src/questdb/extra_numpy.pxd index 3aa5f71f..ec7ce57d 100644 --- a/src/questdb/extra_numpy.pxd +++ b/src/questdb/extra_numpy.pxd @@ -8,6 +8,7 @@ from cpython.object cimport PyObject from numpy cimport ( # Constants NPY_DOUBLE, # N.B.: From `#include `: `#define NPY_FLOAT64 NPY_DOUBLE` + NPY_ARRAY_C_CONTIGUOUS, # Types PyArrayObject, @@ -20,7 +21,7 @@ cdef extern from "numpy/arrayobject.h": bint PyArray_CheckExact(PyObject * o) # PyArrayObject - npy_intp PyArray_NBYTES(PyArrayObject*) nogil + npy_intp PyArray_SIZE(PyArrayObject*) nogil npy_intp* PyArray_STRIDES(PyArrayObject*) nogil npy_intp* PyArray_DIMS(PyArrayObject*) nogil npy_int PyArray_TYPE(PyArrayObject* arr) nogil @@ -31,3 +32,4 @@ cdef extern from "numpy/arrayobject.h": npy_intp PyArray_DIM(PyArrayObject*, size_t) nogil npy_intp PyArray_STRIDE(PyArrayObject*, size_t) nogil int PyArray_NDIM(PyArrayObject*) nogil + int PyArray_FLAGS(PyArrayObject*) diff --git a/src/questdb/ingress.pyi b/src/questdb/ingress.pyi index cd6f2085..7d5c206f 100644 --- a/src/questdb/ingress.pyi +++ b/src/questdb/ingress.pyi @@ -55,9 +55,7 @@ class IngressErrorCode(Enum): HttpNotSupported = ... ServerFlushError = ... ConfigError = ... - ArrayLargeDimError = ... - ArrayInternalError = ... - ArrayWriteToBufferError = ... + ArrayError = ... ProtocolVersionError = ... BadDataFrame = ... @@ -189,7 +187,7 @@ class SenderTransaction: To create a transaction: - .. code_block:: python + .. code-block:: python with sender.transaction('table_name') as txn: txn.row(..) @@ -212,6 +210,8 @@ class SenderTransaction: Write a row for the table in the transaction. The table name is taken from the transaction. + + **Note**: Support for NumPy arrays (``np.array``) requires QuestDB server version 8.4.0 or higher. """ def dataframe( @@ -295,6 +295,8 @@ class Buffer: This should match the ``cairo.max.file.name.length`` setting of the QuestDB instance you're connecting to. + **Note**: Protocol version ``2`` requires QuestDB server version 8.4.0 or higher. + .. code-block:: python # These two buffer constructions are equivalent. @@ -450,6 +452,8 @@ class Buffer: * - ``None`` - *Column is skipped and not serialized.* + **Note**: Support for NumPy arrays (``np.array``) requires QuestDB server version 8.4.0 or higher. + If the destination table was already created, then the columns types will be cast to the types of the existing columns whenever possible (Refer to the QuestDB documentation pages linked above). @@ -727,6 +731,9 @@ class Buffer: interpreted as the current QuestDB server time set on receipt of message. + * **η**: Support for NumPy arrays (``np.array``) requires QuestDB + server version 8.4.0 or higher. + **Error Handling and Recovery** In case an exception is raised during dataframe serialization, the @@ -834,6 +841,7 @@ class Sender: auto_flush_rows: Optional[int] = None, auto_flush_bytes: bool = False, auto_flush_interval: int = 1000, + protocol_version=None, init_buf_size: int = 65536, max_name_len: int = 127, ): ... @@ -859,6 +867,7 @@ class Sender: auto_flush_rows: Optional[int] = None, auto_flush_bytes: bool = False, auto_flush_interval: int = 1000, + protocol_version=None, init_buf_size: int = 65536, max_name_len: int = 127, ) -> Sender: @@ -894,6 +903,7 @@ class Sender: auto_flush_rows: Optional[int] = None, auto_flush_bytes: bool = False, auto_flush_interval: int = 1000, + protocol_version=None, init_buf_size: int = 65536, max_name_len: int = 127, ) -> Sender: @@ -956,7 +966,13 @@ class Sender: @property def protocol_version(self) -> int: """ - Returns the QuestDB server's recommended default line protocol version. + The protocol version used by the sender. + + Protocol version 1 is retained for backwards compatibility with + older QuestDB versions. + + Protocol version 2 introduces binary floating point support and + the array datatype. """ @property @@ -1020,6 +1036,8 @@ class Sender: in the constructor. Refer to the :func:`Buffer.row` documentation for details on arguments. + + **Note**: Support for NumPy arrays (``np.array``) requires QuestDB server version 8.4.0 or higher. """ def dataframe( diff --git a/src/questdb/ingress.pyx b/src/questdb/ingress.pyx index 524ae2e3..881bd3b6 100644 --- a/src/questdb/ingress.pyx +++ b/src/questdb/ingress.pyx @@ -51,7 +51,8 @@ from libc.string cimport strncmp, memset from libc.math cimport isnan from libc.errno cimport errno # from libc.stdio cimport stderr, fprintf -from cpython.datetime cimport datetime, timedelta +from cpython.datetime cimport datetime as cp_datetime +from cpython.datetime cimport timedelta as cp_timedelta from cpython.bool cimport bool from cpython.weakref cimport PyWeakref_NewRef, PyWeakref_GetObject from cpython.object cimport PyObject @@ -74,7 +75,6 @@ ctypedef int void_int import cython include "dataframe.pxi" - from enum import Enum from typing import List, Tuple, Dict, Union, Any, Optional, Callable, \ Iterable @@ -82,9 +82,15 @@ import pathlib from cpython.bytes cimport PyBytes_FromStringAndSize import sys +import datetime import os +import threading +import collections +import time +import heapq +import warnings -import numpy as np +import numpy cimport numpy as cnp from numpy cimport NPY_DOUBLE, PyArrayObject @@ -100,6 +106,112 @@ cnp.import_array() VERSION = '3.0.0rc1' +_SENDER_RECONNECT_WARN_THRESHOLD = 25 # reconnections +_SENDER_RECONNECT_WARN_WINDOW_NS = 5_000_000_000 # 5 seconds in nanoseconds + + +class _ActiveSenders: + def __init__(self): + self._lock = threading.Lock() + + # The slots fields manage a pool of unsigned integer slot IDs. These slot IDs are: + # * Always non-negative integers (starting from 0). + # * Reused when returned. + # * Allocated in the lowest-available order to keep them compact. + self._next_slot = 0 # Next available slot ID in the linear range. + self._returned_slots = [] # I.e. "holes" in the range `0..self._next_slot`. + + # Tracked established/closed connection events. + # Keys are slot IDs, which are always non-negative integers. + # Values are `collections.deque(maxlen=100)` containing established `time.monotonic_ns()` timestamps. + self._series = {} + + # Timestamp of last warning (monotonic_ns) + self._last_warning_ns = None # Track last warning time (monotonic_ns) + + def _get_next_slot(self) -> int: + # Always called with a lock held. + if self._returned_slots: + return heapq.heappop(self._returned_slots) + else: + self._next_slot += 1 + return self._next_slot - 1 + + def _return_slot(self, slot_id): + # Always called with a lock held. + if slot_id == self._next_slot - 1: + # Not optimal since we're not dealing with "trailing" slots, + # but at least the code is simple :-) + self._next_slot -= 1 + else: + heapq.heappush(self._returned_slots, slot_id) + + def _count_recent_reconnections(self, window_ns) -> int: + """ + Return the number of sender connections established within the last `window_ns` window. + Each slot's most recent establishment is counted if it falls within the window. + """ + # Always called with a lock held. + now = time.monotonic_ns() + cutoff = now - window_ns + max_count = 0 + to_delete = [] + for slot_id, serie in self._series.items(): + while serie and serie[0] < cutoff: + serie.popleft() + count = len(serie) + if not serie: + to_delete.append(slot_id) + elif count > max_count: + max_count = count + for slot_id in to_delete: + del self._series[slot_id] + return max_count + + def track_established(self) -> int: + """ + Track a sender connection event (threadsafe). + """ + with self._lock: + slot_id = self._get_next_slot() + serie = self._series.setdefault(slot_id, collections.deque(maxlen=100)) + serie.append(time.monotonic_ns()) + + max_recent_reconnections = self._count_recent_reconnections( + _SENDER_RECONNECT_WARN_WINDOW_NS) + + if max_recent_reconnections >= _SENDER_RECONNECT_WARN_THRESHOLD: + now = time.monotonic_ns() + # 10 minutes in nanoseconds + min_rewarn_interval_ns = 10 * 60 * 1_000_000_000 + no_recent_warnings = self._last_warning_ns is None or \ + (now - self._last_warning_ns > min_rewarn_interval_ns) + if no_recent_warnings: + warnings.warn( + "questdb.ingress.Sender: " + f"Detected {max_recent_reconnections} reconnections " + f"within the last {_SENDER_RECONNECT_WARN_WINDOW_NS / 1_000_000_000} seconds. " + "This may indicate an inefficient coding pattern where the sender is " + "frequently created and destroyed. " + "Consider reusing sender instance whenever possible.", + UserWarning, + stacklevel=2 + ) + self._last_warning_ns = now + return slot_id + + def track_closed(self, slot_id: int): + """ + Track a sender connection closed event (threadsafe). + """ + with self._lock: + self._return_slot(slot_id) + + +_ACTIVE_SENDERS = _ActiveSenders() + + + cdef bint _has_gil(PyThreadState** gs): return gs[0] == NULL @@ -131,9 +243,7 @@ class IngressErrorCode(Enum): HttpNotSupported = line_sender_error_http_not_supported ServerFlushError = line_sender_error_server_flush_error ConfigError = line_sender_error_config_error - ArrayLargeDimError = line_sender_error_array_large_dim - ArrayInternalError = line_sender_error_array_view_internal_error - ArrayWriteToBufferError = line_sender_error_array_view_write_to_buffer_error + ArrayError = line_sender_error_array_error ProtocolVersionError = line_sender_error_protocol_version_error BadDataFrame = line_sender_error_protocol_version_error + 1 @@ -177,12 +287,8 @@ cdef inline object c_err_code_to_py(line_sender_error_code code): return IngressErrorCode.ServerFlushError elif code == line_sender_error_config_error: return IngressErrorCode.ConfigError - elif code == line_sender_error_array_large_dim: - return IngressErrorCode.ArrayLargeDimError - elif code == line_sender_error_array_view_internal_error: - return IngressErrorCode.ArrayInternalError - elif code == line_sender_error_array_view_write_to_buffer_error: - return IngressErrorCode.ArrayWriteToBufferError + elif code == line_sender_error_array_error: + return IngressErrorCode.ArrayError elif code == line_sender_error_protocol_version_error: return IngressErrorCode.ProtocolVersionError else: @@ -386,7 +492,7 @@ cdef void_int str_to_column_name_copy( raise c_err_to_py(err) -cdef int64_t datetime_to_micros(datetime dt): +cdef int64_t datetime_to_micros(cp_datetime dt): """ Convert a :class:`datetime.datetime` to microseconds since the epoch. """ @@ -396,7 +502,7 @@ cdef int64_t datetime_to_micros(datetime dt): (dt.microsecond)) -cdef int64_t datetime_to_nanos(datetime dt): +cdef int64_t datetime_to_nanos(cp_datetime dt): """ Convert a `datetime.datetime` to nanoseconds since the epoch. """ @@ -469,11 +575,11 @@ cdef class TimestampMicros: self._value = value @classmethod - def from_datetime(cls, dt: datetime): + def from_datetime(cls, dt: datetime.datetime): """ Construct a ``TimestampMicros`` from a :class:`datetime.datetime` object. """ - if not isinstance(dt, datetime): + if not isinstance(dt, cp_datetime): raise TypeError('dt must be a datetime object.') return cls(datetime_to_micros(dt)) @@ -534,11 +640,11 @@ cdef class TimestampNanos: self._value = value @classmethod - def from_datetime(cls, dt: datetime): + def from_datetime(cls, dt: datetime.datetime): """ Construct a ``TimestampNanos`` from a ``datetime.datetime`` object. """ - if not isinstance(dt, datetime): + if not isinstance(dt, cp_datetime): raise TypeError('dt must be a datetime object.') return cls(datetime_to_nanos(dt)) @@ -593,7 +699,7 @@ cdef class SenderTransaction: To create a transaction: - .. code_block:: python + .. code-block:: python with sender.transaction('table_name') as txn: txn.row(..) @@ -645,13 +751,15 @@ cdef class SenderTransaction: symbols: Optional[Dict[str, Optional[str]]]=None, columns: Optional[Dict[ str, - Union[None, bool, int, float, str, TimestampMicros, datetime, np.ndarray]] + Union[None, bool, int, float, str, TimestampMicros, datetime.datetime, numpy.ndarray]] ]=None, - at: Union[ServerTimestampType, TimestampNanos, datetime]): + at: Union[ServerTimestampType, TimestampNanos, datetime.datetime]): """ Write a row for the table in the transaction. The table name is taken from the transaction. + + **Note**: Support for NumPy arrays (``numpy.array``) requires QuestDB server version 8.4.0 or higher. """ if at is None: raise IngressError( @@ -678,7 +786,7 @@ cdef class SenderTransaction: df, # : pd.DataFrame *, symbols: Union[str, bool, List[int], List[str]] = 'auto', - at: Union[ServerTimestampType, int, str, TimestampNanos, datetime]): + at: Union[ServerTimestampType, int, str, TimestampNanos, datetime.datetime]): """ Write a dataframe for the table in the transaction. @@ -777,6 +885,7 @@ cdef class Buffer: Buffer Constructor Arguments: + * protocol_version (``int``): The protocol version to use. * ``init_buf_size`` (``int``): Initial capacity of the buffer in bytes. Defaults to ``65536`` (64KiB). * ``max_name_len`` (``int``): Maximum length of a column name. @@ -784,6 +893,8 @@ cdef class Buffer: This should match the ``cairo.max.file.name.length`` setting of the QuestDB instance you're connecting to. + **Note**: Protocol version ``2`` requires QuestDB server version 8.4.0 or higher. + .. code-block:: python # These two buffer constructions are equivalent. @@ -964,26 +1075,37 @@ cdef class Buffer: self, line_sender_column_name c_name, cnp.ndarray arr) except -1: if cnp.PyArray_TYPE(arr) != cnp.NPY_FLOAT64: raise IngressError( - IngressErrorCode.ArrayWriteToBufferError, + IngressErrorCode.ArrayError, f'Only float64 numpy arrays are supported, got dtype: {arr.dtype}') cdef: size_t rank = cnp.PyArray_NDIM(arr) - const uint8_t * data_ptr = cnp.PyArray_DATA(arr) + const double * data_ptr = cnp.PyArray_DATA(arr) line_sender_error * err = NULL - if not line_sender_buffer_column_f64_arr_byte_strides( - self._impl, - c_name, - rank, - cnp.PyArray_DIMS(arr), - cnp.PyArray_STRIDES(arr), # N.B.: Strides expressed as byte jumps - data_ptr, - cnp.PyArray_NBYTES(arr), - &err): - raise c_err_to_py(err) + if cnp.PyArray_FLAGS(arr) & cnp.NPY_ARRAY_C_CONTIGUOUS != 0: + if not line_sender_buffer_column_f64_arr_c_major( + self._impl, + c_name, + rank, + cnp.PyArray_DIMS(arr), + data_ptr, + cnp.PyArray_SIZE(arr), + &err): + raise c_err_to_py(err) + else: + if not line_sender_buffer_column_f64_arr_byte_strides( + self._impl, + c_name, + rank, + cnp.PyArray_DIMS(arr), + cnp.PyArray_STRIDES(arr), # N.B.: Strides expressed as byte jumps + data_ptr, + cnp.PyArray_SIZE(arr), + &err): + raise c_err_to_py(err) cdef inline void_int _column_dt( - self, line_sender_column_name c_name, datetime dt) except -1: + self, line_sender_column_name c_name, cp_datetime dt) except -1: cdef line_sender_error* err = NULL if not line_sender_buffer_column_ts_micros( self._impl, c_name, datetime_to_micros(dt), &err): @@ -1004,7 +1126,7 @@ cdef class Buffer: self._column_ts(c_name, value) elif PyArray_CheckExact( value): self._column_numpy(c_name, value) - elif isinstance(value, datetime): + elif isinstance(value, cp_datetime): self._column_dt(c_name, value) else: valid = ', '.join(( @@ -1014,7 +1136,7 @@ cdef class Buffer: 'str', 'TimestampMicros', 'datetime.datetime' - 'np.ndarray')) + 'numpy.ndarray')) raise TypeError( f'Unsupported type: {_fqn(type(value))}. Must be one of: {valid}') @@ -1031,7 +1153,7 @@ cdef class Buffer: if not line_sender_buffer_at_nanos(self._impl, ts._value, &err): raise c_err_to_py(err) - cdef inline void_int _at_dt(self, datetime dt) except -1: + cdef inline void_int _at_dt(self, cp_datetime dt) except -1: cdef int64_t value = datetime_to_nanos(dt) cdef line_sender_error* err = NULL if not line_sender_buffer_at_nanos(self._impl, value, &err): @@ -1047,7 +1169,7 @@ cdef class Buffer: self._at_now() elif isinstance(ts, TimestampNanos): self._at_ts(ts) - elif isinstance(ts, datetime): + elif isinstance(ts, cp_datetime): self._at_dt(ts) else: raise TypeError( @@ -1096,9 +1218,9 @@ cdef class Buffer: symbols: Optional[Dict[str, Optional[str]]]=None, columns: Optional[Dict[ str, - Union[None, bool, int, float, str, TimestampMicros, datetime, np.ndarray]] + Union[None, bool, int, float, str, TimestampMicros, datetime.datetime, numpy.ndarray]] ]=None, - at: Union[ServerTimestampType, TimestampNanos, datetime]): + at: Union[ServerTimestampType, TimestampNanos, datetime.datetime]): """ Add a single row (line) to the buffer. @@ -1115,7 +1237,7 @@ cdef class Buffer: 'col4': 'xyz', 'col5': TimestampMicros(123456789), 'col6': datetime(2019, 1, 1, 12, 0, 0), - 'col7': np.array([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]), + 'col7': numpy.array([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]), 'col8': None}, at=TimestampNanos(123456789)) @@ -1159,13 +1281,15 @@ cdef class Buffer: - `FLOAT `_ * - ``str`` - `STRING `_ - * - ``np.ndarray`` + * - ``numpy.ndarray`` - `ARRAY `_ * - ``datetime.datetime`` and ``TimestampMicros`` - `TIMESTAMP `_ * - ``None`` - *Column is skipped and not serialized.* + **Note**: Support for NumPy arrays (``numpy.array``) requires QuestDB server version 8.4.0 or higher. + If the destination table was already created, then the columns types will be cast to the types of the existing columns whenever possible (Refer to the QuestDB documentation pages linked above). @@ -1210,7 +1334,7 @@ cdef class Buffer: table_name: Optional[str] = None, table_name_col: Union[None, int, str] = None, symbols: Union[str, bool, List[int], List[str]] = 'auto', - at: Union[ServerTimestampType, int, str, TimestampNanos, datetime]): + at: Union[ServerTimestampType, int, str, TimestampNanos, datetime.datetime]): """ Add a pandas DataFrame to the buffer. @@ -1498,7 +1622,7 @@ _FLUSH_FMT = ('{} - See https://py-questdb-client.readthedocs.io/en/' '/troubleshooting.html#inspecting-and-debugging-errors#flush-failed') -cdef uint64_t _timedelta_to_millis(object timedelta): +cdef uint64_t _timedelta_to_millis(cp_timedelta timedelta): """ Convert a timedelta to milliseconds. """ @@ -1591,7 +1715,7 @@ cdef void_int _parse_auto_flush( auto_flush_interval = int(auto_flush_interval) elif auto_flush_interval is False or isinstance(auto_flush_interval, int): pass - elif isinstance(auto_flush_interval, timedelta): + elif isinstance(auto_flush_interval, cp_timedelta): auto_flush_interval = _timedelta_to_millis(auto_flush_interval) else: raise TypeError( @@ -1818,6 +1942,7 @@ cdef class Sender: cdef int64_t* _last_flush_ms cdef size_t _init_buf_size cdef bint _in_txn + cdef int64_t _slot_id cdef void_int _set_sender_fields( self, @@ -1923,7 +2048,7 @@ cdef class Sender: if auth_timeout is not None: if isinstance(auth_timeout, int): c_auth_timeout = auth_timeout - elif isinstance(auth_timeout, timedelta): + elif isinstance(auth_timeout, cp_timedelta): c_auth_timeout = _timedelta_to_millis(auth_timeout) else: raise TypeError( @@ -1971,7 +2096,7 @@ cdef class Sender: c_retry_timeout = retry_timeout if not line_sender_opts_retry_timeout(self._opts, c_retry_timeout, &err): raise c_err_to_py(err) - elif isinstance(retry_timeout, timedelta): + elif isinstance(retry_timeout, cp_timedelta): c_retry_timeout = _timedelta_to_millis(retry_timeout) if not line_sender_opts_retry_timeout(self._opts, c_retry_timeout, &err): raise c_err_to_py(err) @@ -1995,7 +2120,7 @@ cdef class Sender: c_request_timeout = request_timeout if not line_sender_opts_request_timeout(self._opts, c_request_timeout, &err): raise c_err_to_py(err) - elif isinstance(request_timeout, timedelta): + elif isinstance(request_timeout, cp_timedelta): c_request_timeout = _timedelta_to_millis(request_timeout) if not line_sender_opts_request_timeout(self._opts, c_request_timeout, &err): raise c_err_to_py(err) @@ -2024,6 +2149,7 @@ cdef class Sender: self._last_flush_ms = NULL self._init_buf_size = 0 self._in_txn = False + self._slot_id = -1 def __init__( self, @@ -2350,7 +2476,7 @@ cdef class Sender: return self._auto_flush_mode.byte_count @property - def auto_flush_interval(self) -> Optional[timedelta]: + def auto_flush_interval(self) -> Optional[datetime.timedelta]: """ Time interval threshold for the auto-flush logic, or None if disabled. """ @@ -2358,7 +2484,7 @@ cdef class Sender: return None if self._auto_flush_mode.interval == -1: return None - return timedelta(milliseconds=self._auto_flush_mode.interval) + return cp_timedelta(milliseconds=self._auto_flush_mode.interval) @property def protocol_version(self) -> int: @@ -2418,6 +2544,9 @@ cdef class Sender: self._buffer._row_complete_sender = PyWeakref_NewRef(self, None) self._last_flush_ms[0] = line_sender_now_micros() // 1000 + # Track and warn about overly quick reconnections to the server. + self._slot_id = _ACTIVE_SENDERS.track_established() + def __enter__(self) -> Sender: """Call :func:`Sender.establish` at the start of a ``with`` block.""" self.establish() @@ -2459,8 +2588,8 @@ cdef class Sender: symbols: Optional[Dict[str, str]]=None, columns: Optional[Dict[ str, - Union[bool, int, float, str, TimestampMicros, datetime, np.ndarray]]]=None, - at: Union[TimestampNanos, datetime, ServerTimestampType]): + Union[bool, int, float, str, TimestampMicros, datetime.datetime, numpy.ndarray]]]=None, + at: Union[TimestampNanos, datetime.datetime, ServerTimestampType]): """ Write a row to the internal buffer. @@ -2468,6 +2597,8 @@ cdef class Sender: in the constructor. Refer to the :func:`Buffer.row` documentation for details on arguments. + + **Note**: Support for NumPy arrays (``numpy.array``) requires QuestDB server version 8.4.0 or higher. """ if self._in_txn: raise IngressError( @@ -2494,7 +2625,7 @@ cdef class Sender: table_name: Optional[str] = None, table_name_col: Union[None, int, str] = None, symbols: Union[str, bool, List[int], List[str]] = 'auto', - at: Union[ServerTimestampType, int, str, TimestampNanos, datetime]): + at: Union[ServerTimestampType, int, str, TimestampNanos, datetime.datetime]): """ Write a Pandas DataFrame to the internal buffer. @@ -2658,6 +2789,9 @@ cdef class Sender: self._opts = NULL line_sender_close(self._impl) self._impl = NULL + if self._slot_id != -1: + _ACTIVE_SENDERS.track_closed(self._slot_id) + self._slot_id = -1 cpdef close(self, bint flush=True): """ diff --git a/src/questdb/line_sender.pxd b/src/questdb/line_sender.pxd index 8a28c0d3..2b00404e 100644 --- a/src/questdb/line_sender.pxd +++ b/src/questdb/line_sender.pxd @@ -40,9 +40,7 @@ cdef extern from "questdb/ingress/line_sender.h": line_sender_error_http_not_supported, line_sender_error_server_flush_error, line_sender_error_config_error, - line_sender_error_array_large_dim - line_sender_error_array_view_internal_error - line_sender_error_array_view_write_to_buffer_error + line_sender_error_array_error line_sender_error_protocol_version_error cdef enum line_sender_protocol: @@ -230,14 +228,24 @@ cdef extern from "questdb/ingress/line_sender.h": line_sender_error** err_out ) noexcept nogil + bint line_sender_buffer_column_f64_arr_c_major( + line_sender_buffer* buffer, + line_sender_column_name name, + size_t rank, + const size_t* shapes, + const double* data, + size_t data_len, + line_sender_error** err_out + ) noexcept nogil + bint line_sender_buffer_column_f64_arr_byte_strides( line_sender_buffer* buffer, line_sender_column_name name, size_t rank, const size_t* shapes, const ssize_t* strides, - const uint8_t* data_buffer, - size_t data_buffer_len, + const double* data, + size_t data_len, line_sender_error** err_out ) noexcept nogil diff --git a/test/test_dataframe.py b/test/test_dataframe.py index 7e6b909d..df1822e2 100644 --- a/test/test_dataframe.py +++ b/test/test_dataframe.py @@ -1581,7 +1581,7 @@ def test_arrow_chunked_array(self): # need to, so - as for now - we just test that we raise a nice error. with self.assertRaisesRegex( qi.IngressError, - "Unsupported dtype int16\[pyarrow\] for column 'a'.*github"): + r"Unsupported dtype int16\[pyarrow\] for column 'a'.*github"): _dataframe(self.version, df, table_name='tbl1', at = qi.ServerTimestamp) @unittest.skipIf(not fastparquet, 'fastparquet not installed')