Skip to content

Commit

Permalink
Merge Arrow into Main for Release (#37)
Browse files Browse the repository at this point in the history
* Threadpool executor (#22)

* Release v5.15.0

* update protobuf to v4.22.3

* Add threaded streamset calls

Using concurrent.futures.ThreadPoolExecutor

* Blacken code

* Update for failing tests

* Ignore flake8 as part of testing

pytest-flake8 seems to have issues with the later versions of flake8

tholo/pytest-flake8#92

* Update .gitignore

* Update ignore and remove extra print.

* Remove idea folder (pycharm)

---------

Co-authored-by: David Konigsberg <[email protected]>
Co-authored-by: Jeff Lin <[email protected]>

* Threaded arrow (#23)

* Release v5.15.0

* update protobuf to v4.22.3

* Add threaded streamset calls

Using concurrent.futures.ThreadPoolExecutor

* Blacken code

* Update for failing tests

* Ignore flake8 as part of testing

pytest-flake8 seems to have issues with the later versions of flake8

tholo/pytest-flake8#92

* Update .gitignore

* Update proto definitions.

* Update endpoint to support arrow methods

* Support arrow endpoints

* Additional arrow updates

* Update transformers, add polars conversion

* Update .gitignore

* Update ignore and remove extra print.

* Remove idea folder (pycharm)

* Update requirements.txt

* Update btrdb/transformers.py

* Update the way to check for arrow-enabled btrdb

This has not been "turned on" yet though, since we dont know the version number this will be enabled for. The method is currently commented out, but can be re-enabled pretty easily.

* Use IPC streams to send the arrow bytes for insert

Instead of writing out feather files to an `io.BytesIO` stream and then sending the feather files over the wire, this creates a buffered outputstream and then sends that data back as bytes to btrdb.

* Create arrow specific stream methods.

* Update test conn object to support minor version

* Update tests and migrate arrow code.

* Arrow and standard streamset insert

* Create basic arrow to dataframe transformer

* Support multirawvalues, arrow transformers

* Multivalue arrow queries, in progress

* Update stream filter to properly filter for sampling frequency

* Update arrow values queries for multivalues

* Update param passing for sampling frequency

* Update index passing, and ignore depth

* benchmark raw values queries for arrow and current api

* Add aligned windows and run func

* Streamset read benchmarks (WIP)

In addition:
* update streamset.count to support the `precise` boolean flag.

* Update mock return value for versionMajor

* In progress validation of stream benchs

---------

Co-authored-by: David Konigsberg <[email protected]>
Co-authored-by: Jeff Lin <[email protected]>

* Add 3.10 python to the testing matrix (#21)

* Add 3.10 python to the testing matrix

* Fix yaml parsing

* Update requirements to support 3.10

* Use pip-tools `pip-compile` cli tool to generate requirements.txt files from the updated pyproject.toml file
* Include pyproject.toml with basic features to support proper extra deps
* Support different ways to install btrdb from pip
  * `btrdb, btrdb[data], btrdb[all], btrdb[testing], btrdb[ray]`
* Update transformers.py to build up a numpy array when the subarrays are not the same size (number of entries)
  * This converts the main array's dtype to `object`
  * tests still pass with this change
* recompile the btrdb proto files with latest protobuf and grpc plugins
* Create multiple requirements.txt files for easier updating in the future as well as a locked version with pinned dependencies

* Ignore protoc generated flake errors

* Update test requirements

* Include pre-commit and setup.

* Pre-commit lints.

* Update pre-commit.yaml

add staging to pre-commit checks

* Fix missing logging import, rerun pre-commit (#24)

* Add basic doc string to endpoint object (#25)

* Update benchmark scripts.

* Multistream read bench insert bench (#26)

* Fix multistream endpoint bugs

* The streamset was passing the incorrect params to the endpoint
* The endpoint does not return a `version` in its response, just `stat` and `arrowBytes`

Params have been updated and a NoneType is passed around to ignore the
lack of version info, which lets us use the same logic for all bytes
decoding.

* Add multistream benchmark methods for timesnap and no timesnap.

* Add insert benchmarking methods (#27)

Benchmarking methods added for:

* stream inserts using tuples of time, value data
* stream inserts using pyarrow tables of timestamps, value columns

* streamset inserts using a dict map of streamset stream uuids, and lists of tuples of time, value data
* streamset inserts using a dict map of streamset stream uuids, and pyarrow tables of timestamps, values.

* Fix arrow inserts (#28)

* Add insert benchmarking methods

Benchmarking methods added for:

* stream inserts using tuples of time, value data
* stream inserts using pyarrow tables of timestamps, value columns

* streamset inserts using a dict map of streamset stream uuids, and lists of tuples of time, value data
* streamset inserts using a dict map of streamset stream uuids, and pyarrow tables of timestamps, values.

* Include nullable false in pyarrow schema inserts

* This was the only difference in the schemas between go and python.
* also using a bytesIO stream to act as the sink for the ipc bytes.

* Start integration test suite

* Add more streamset integration tests.

* Add support for authenticated requests without encryption.

* Optimize logging calls (#30)

Previously, the debug logging in the api would create the f-strings no matter if logging.DEBUG was the current log level or not.

This can impact the performance, especially for benchmarking.

Now, a cached IS_DEBUG flag is created for the stream operations, and other locations, the logger.isEnabledFor boolean is checked.

Note that in the stream.py, this same function call is only executed once, and the results are cached for the rest of the logic.

* Add more arrow tests and minor refactoring.

* More integration test cases

* Restructure tests.

* Mark new failing tests as expected failures for now.

* Disable gzip compression, it is very slow.

* Reenable test, server has been fixed.

* Update pandas testing and fix flake8 issues (#31)

* Update pandas testing and fix flake8 issues

* Update stream logic for unpacking arrow tables, update integration tests.

* add init.py for integration tests.

* Add additional tests for arrow methods vs their old api counterparts.

* Add tests for timesnap boundary conditions. (#32)

* Add more integration tests.

* Add additional integration tests, modify the name_callable ability of the arrow_values.

* remove extraneous prints.

* Include retry logic.

* Update statpoint order in arrow, fix some bugs with the arrow methods.

* Update testing to account for NaNs.

* Update github action versions.

* Update tests, add in a test for duplicate values.

* Remove empty test, remove extraneous prints

---------

Co-authored-by: andrewchambers <[email protected]>

* Update docs for arrow (#35)

* Update docs, add in final enhanced edits.

* Only enable arrow-endpoints when version >= 5.30 (#36)

Once we have a v5.30tag of the server with arrow/multistream, we can
merge this and complete the ticket.

* Update arrow notes, small doc changes. (#38)

---------

Co-authored-by: Justin Gilmer <[email protected]>
Co-authored-by: David Konigsberg <[email protected]>
Co-authored-by: Jeff Lin <[email protected]>
Co-authored-by: Andrew Chambers <[email protected]>
Co-authored-by: andrewchambers <[email protected]>
  • Loading branch information
6 people authored Jul 20, 2023
1 parent 74e7567 commit 1900a30
Show file tree
Hide file tree
Showing 62 changed files with 7,396 additions and 5,018 deletions.
39 changes: 39 additions & 0 deletions .github/workflows/pre-commit.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
name: pre-commit

on:
pull_request:
branches:
- master
- staging
types:
- opened
- reopened
- ready_for_review
- synchronize

env:
SKIP: pytest-check

jobs:
pre-commit:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
token: ${{ secrets.GITHUB_TOKEN }}
fetch-depth: 0 # get full git history
- uses: actions/setup-python@v3
with:
cache: 'pip'
- name: Install pre-commit
run: |
pip install pre-commit
- name: Get changed files
id: changed-files
uses: tj-actions/changed-files@v21
with:
token: ${{ secrets.GITHUB_TOKEN }}
- name: Run pre-commit
uses: pre-commit/[email protected]
with:
extra_args: --files ${{ steps.changed-files.outputs.all_changed_files }}
12 changes: 6 additions & 6 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
python-version: [3.7, 3.8, 3.9]
python-version: [3.7, 3.8, 3.9, '3.10']
os: [ubuntu-latest, macos-latest, windows-latest]

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }} ${{ matrix.os }}
uses: actions/setup-python@v2
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
Expand All @@ -39,7 +39,7 @@ jobs:
if: startsWith(github.ref, 'refs/tags/')
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Create Release
id: create_release
uses: actions/create-release@v1
Expand All @@ -59,9 +59,9 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v2
uses: actions/setup-python@v4
with:
python-version: '3.8'
- name: Install dependencies
Expand Down
10 changes: 10 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,13 @@ dmypy.json

# Pyre type checker
.pyre/

# arrow parquet files
*.parquet

.idea
.idea/misc.xml
.idea/vcs.xml
.idea/inspectionProfiles/profiles_settings.xml
.idea/inspectionProfiles/Project_Default.xml
/.idea/
35 changes: 35 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
- id: check-yaml
- id: end-of-file-fixer
- id: trailing-whitespace
exclude: ^(setup.cfg|btrdb/grpcinterface)
- repo: https://github.com/psf/black
rev: 23.3.0
hooks:
- id: black-jupyter
args: [--line-length=88]
exclude: btrdb/grpcinterface/.*\.py
- repo: https://github.com/pycqa/isort
rev: 5.11.5
hooks:
- id: isort
name: isort (python)
args: [--profile=black, --line-length=88]
exclude: btrdb/grpcinterface/.*\.py
- repo: https://github.com/PyCQA/flake8
rev: 6.0.0
hooks:
- id: flake8
args: [--config=setup.cfg]
exclude: ^(btrdb/grpcinterface|tests|setup.py|btrdb4|docs|benchmarks)
- repo: local
hooks:
- id: pytest-check
name: pytest-check
entry: pytest
language: system
pass_filenames: false
always_run: true
2 changes: 1 addition & 1 deletion MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ global-exclude *.py[co]
global-exclude .ipynb_checkpoints
global-exclude .DS_Store
global-exclude .env
global-exclude .coverage.*
global-exclude .coverage.*
80 changes: 80 additions & 0 deletions benchmarks/benchmark_stream_inserts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from time import perf_counter
from typing import Dict, List, Tuple, Union

import pyarrow

import btrdb


def time_stream_insert(
stream: btrdb.stream.Stream,
data: List[Tuple[int, float]],
merge_policy: str = "never",
) -> Dict[str, Union[int, float, str]]:
"""Insert raw data into a single stream, where data is a List of tuples of int64 timestamps and float64 values.
Parameters
----------
stream : btrdb.stream.Stream, required
The stream to insert data into.
data : List[Tuple[int, float]], required
The data to insert into stream.
merge_policy : str, optional, default = 'never'
How should the platform handle duplicated data?
Valid policies:
`never`: the default, no points are merged
`equal`: points are deduplicated if the time and value are equal
`retain`: if two points have the same timestamp, the old one is kept
`replace`: if two points have the same timestamp, the new one is kept
"""
prev_ver = stream.version()
tic = perf_counter()
new_ver = stream.insert(data, merge=merge_policy)
toc = perf_counter()
run_time = toc - tic
n_points = len(data)
result = {
"uuid": stream.uuid,
"previous_version": prev_ver,
"new_version": new_ver,
"points_to_insert": n_points,
"total_time_seconds": run_time,
"merge_policy": merge_policy,
}
return result


def time_stream_arrow_insert(
stream: btrdb.stream.Stream, data: pyarrow.Table, merge_policy: str = "never"
) -> Dict[str, Union[int, float, str]]:
"""Insert raw data into a single stream, where data is a pyarrow Table of timestamps and float values.
Parameters
----------
stream : btrdb.stream.Stream, required
The stream to insert data into.
data : pyarrow.Table, required
The table of data to insert into stream.
merge_policy : str, optional, default = 'never'
How should the platform handle duplicated data?
Valid policies:
`never`: the default, no points are merged
`equal`: points are deduplicated if the time and value are equal
`retain`: if two points have the same timestamp, the old one is kept
`replace`: if two points have the same timestamp, the new one is kept
"""
prev_ver = stream.version()
tic = perf_counter()
new_ver = stream.arrow_insert(data, merge=merge_policy)
toc = perf_counter()
run_time = toc - tic
n_points = data.num_rows
result = {
"uuid": stream.uuid,
"previous_version": prev_ver,
"new_version": new_ver,
"points_to_insert": n_points,
"total_time_seconds": run_time,
"merge_policy": merge_policy,
}
return result
Loading

0 comments on commit 1900a30

Please sign in to comment.