Skip to content

Commit

Permalink
Switch to Ruff for Python linting (#529)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Mar 4, 2024
1 parent 3a82be0 commit 76ecf56
Show file tree
Hide file tree
Showing 31 changed files with 239 additions and 496 deletions.
16 changes: 16 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,22 @@ on:
branches: ["branch-*"]

jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Python
uses: actions/setup-python@v4
with:
python-version: "3.11"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install ruff
# Update output format to enable automatic inline annotations.
- name: Run Ruff
run: ruff check --output-format=github datafusion

generate-license:
runs-on: ubuntu-latest
steps:
Expand Down
7 changes: 0 additions & 7 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,6 @@ jobs:
source venv/bin/activate
pip install -r requirements-311.txt
- name: Run Python Linters
if: ${{ matrix.python-version == '3.10' && matrix.toolchain == 'stable' }}
run: |
source venv/bin/activate
flake8 --exclude venv,benchmarks/db-benchmark --ignore=E501,W503
black --line-length 79 --diff --check .
- name: Run tests
env:
RUST_BACKTRACE: 1
Expand Down
21 changes: 7 additions & 14 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,14 @@ repos:
rev: v1.6.23
hooks:
- id: actionlint-docker
- repo: https://github.com/psf/black
rev: 22.3.0
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.3.0
hooks:
- id: black
files: datafusion/.*
# Explicitly specify the pyproject.toml at the repo root, not per-project.
args: ["--config", "pyproject.toml", "--line-length", "79", "--diff", "--check", "."]
- repo: https://github.com/PyCQA/flake8
rev: 5.0.4
hooks:
- id: flake8
files: datafusion/.*$
types: [file]
types_or: [python]
additional_dependencies: ["flake8-force"]
# Run the linter.
- id: ruff
# Run the formatter.
- id: ruff-format
- repo: local
hooks:
- id: rust-fmt
Expand Down
21 changes: 18 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ source venv/bin/activate
# update pip itself if necessary
python -m pip install -U pip
# install dependencies (for Python 3.8+)
python -m pip install -r requirements-310.txt
python -m pip install -r requirements.in
```

The tests rely on test data in git submodules.
Expand All @@ -222,12 +222,27 @@ python -m pytest

### Running & Installing pre-commit hooks

arrow-datafusion-python takes advantage of [pre-commit](https://pre-commit.com/) to assist developers with code linting to help reduce the number of commits that ultimately fail in CI due to linter errors. Using the pre-commit hooks is optional for the developer but certainly helpful for keeping PRs clean and concise.
arrow-datafusion-python takes advantage of [pre-commit](https://pre-commit.com/) to assist developers with code linting to help reduce
the number of commits that ultimately fail in CI due to linter errors. Using the pre-commit hooks is optional for the
developer but certainly helpful for keeping PRs clean and concise.

Our pre-commit hooks can be installed by running `pre-commit install`, which will install the configurations in your ARROW_DATAFUSION_PYTHON_ROOT/.github directory and run each time you perform a commit, failing to complete the commit if an offending lint is found allowing you to make changes locally before pushing.
Our pre-commit hooks can be installed by running `pre-commit install`, which will install the configurations in
your ARROW_DATAFUSION_PYTHON_ROOT/.github directory and run each time you perform a commit, failing to complete
the commit if an offending lint is found allowing you to make changes locally before pushing.

The pre-commit hooks can also be run adhoc without installing them by simply running `pre-commit run --all-files`

## Running linters without using pre-commit

There are scripts in `ci/scripts` for running Rust and Python linters.

```shell
./ci/scripts/python_lint.sh
./ci/scripts/rust_clippy.sh
./ci/scripts/rust_fmt.sh
./ci/scripts/rust_toml_fmt.sh
```

## How to update dependencies

To change test dependencies, change the `requirements.in` and run
Expand Down
16 changes: 4 additions & 12 deletions benchmarks/db-benchmark/groupby-datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,13 @@ def execute(df):

data = pacsv.read_csv(
src_grp,
convert_options=pacsv.ConvertOptions(
auto_dict_encode=True, column_types=schema
),
convert_options=pacsv.ConvertOptions(auto_dict_encode=True, column_types=schema),
)
print("dataset loaded")

# create a session context with explicit runtime and config settings
runtime = (
RuntimeConfig()
.with_disk_manager_os()
.with_fair_spill_pool(64 * 1024 * 1024 * 1024)
RuntimeConfig().with_disk_manager_os().with_fair_spill_pool(64 * 1024 * 1024 * 1024)
)
config = (
SessionConfig()
Expand All @@ -116,9 +112,7 @@ def execute(df):
if sql:
df = ctx.sql("SELECT id1, SUM(v1) AS v1 FROM x GROUP BY id1")
else:
df = ctx.table("x").aggregate(
[f.col("id1")], [f.sum(f.col("v1")).alias("v1")]
)
df = ctx.table("x").aggregate([f.col("id1")], [f.sum(f.col("v1")).alias("v1")])
ans = execute(df)

shape = ans_shape(ans)
Expand Down Expand Up @@ -197,9 +191,7 @@ def execute(df):
gc.collect()
t_start = timeit.default_timer()
if sql:
df = ctx.sql(
"SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3"
)
df = ctx.sql("SELECT id3, SUM(v1) AS v1, AVG(v3) AS v3 FROM x GROUP BY id3")
else:
df = ctx.table("x").aggregate(
[f.col("id3")],
Expand Down
24 changes: 4 additions & 20 deletions benchmarks/db-benchmark/join-datafusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,7 @@ def ans_shape(batches):
print(f"q2: {t}")
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = (
df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))])
.collect()[0]
.column(0)[0]
)
chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]).collect()[0].column(0)[0]
chkt = timeit.default_timer() - t_start
m = memory_usage()
write_log(
Expand Down Expand Up @@ -193,11 +189,7 @@ def ans_shape(batches):
print(f"q3: {t}")
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = (
df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))])
.collect()[0]
.column(0)[0]
)
chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]).collect()[0].column(0)[0]
chkt = timeit.default_timer() - t_start
m = memory_usage()
write_log(
Expand Down Expand Up @@ -234,11 +226,7 @@ def ans_shape(batches):
print(f"q4: {t}")
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = (
df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))])
.collect()[0]
.column(0)[0]
)
chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]).collect()[0].column(0)[0]
chkt = timeit.default_timer() - t_start
m = memory_usage()
write_log(
Expand Down Expand Up @@ -275,11 +263,7 @@ def ans_shape(batches):
print(f"q5: {t}")
t_start = timeit.default_timer()
df = ctx.create_dataframe([ans])
chk = (
df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))])
.collect()[0]
.column(0)[0]
)
chk = df.aggregate([], [f.sum(col("v1")), f.sum(col("v2"))]).collect()[0].column(0)[0]
chkt = timeit.default_timer() - t_start
m = memory_usage()
write_log(
Expand Down
4 changes: 1 addition & 3 deletions benchmarks/tpch/tpch.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@ def bench(data_path, query_path):
time_millis = (end - start) * 1000
total_time_millis += time_millis
print("q{},{}".format(query, round(time_millis, 1)))
results.write(
"q{},{}\n".format(query, round(time_millis, 1))
)
results.write("q{},{}\n".format(query, round(time_millis, 1)))
results.flush()
except Exception as e:
print("query", query, "failed", e)
Expand Down
22 changes: 22 additions & 0 deletions ci/scripts/python_lint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

set -ex
ruff format datafusion
ruff check datafusion
4 changes: 1 addition & 3 deletions datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,7 @@ def udaf(accum, input_type, return_type, state_type, volatility, name=None):
Create a new User Defined Aggregate Function
"""
if not issubclass(accum, Accumulator):
raise TypeError(
"`accum` must implement the abstract base class Accumulator"
)
raise TypeError("`accum` must implement the abstract base class Accumulator")
if name is None:
name = accum.__qualname__.lower()
if isinstance(input_type, pa.lib.DataType):
Expand Down
4 changes: 1 addition & 3 deletions datafusion/cudf.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,7 @@ def to_cudf_df(self, plan):
elif isinstance(node, TableScan):
return cudf.read_parquet(self.parquet_tables[node.table_name()])
else:
raise Exception(
"unsupported logical operator: {}".format(type(node))
)
raise Exception("unsupported logical operator: {}".format(type(node)))

def create_schema(self, schema_name: str, **kwargs):
logger.debug(f"Creating schema: {schema_name}")
Expand Down
8 changes: 2 additions & 6 deletions datafusion/input/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,9 @@ class BaseInputSource(ABC):
"""

@abstractmethod
def is_correct_input(
self, input_item: Any, table_name: str, **kwargs
) -> bool:
def is_correct_input(self, input_item: Any, table_name: str, **kwargs) -> bool:
pass

@abstractmethod
def build_table(
self, input_item: Any, table_name: str, **kwarg
) -> SqlTable:
def build_table(self, input_item: Any, table_name: str, **kwarg) -> SqlTable:
pass
4 changes: 1 addition & 3 deletions datafusion/input/location.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ def build_table(
for _ in reader:
num_rows += 1
# TODO: Need to actually consume this row into resonable columns
raise RuntimeError(
"TODO: Currently unable to support CSV input files."
)
raise RuntimeError("TODO: Currently unable to support CSV input files.")
else:
raise RuntimeError(
f"Input of format: `{format}` is currently not supported.\
Expand Down
4 changes: 1 addition & 3 deletions datafusion/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@ def to_pandas_df(self, plan):
elif isinstance(node, TableScan):
return pd.read_parquet(self.parquet_tables[node.table_name()])
else:
raise Exception(
"unsupported logical operator: {}".format(type(node))
)
raise Exception("unsupported logical operator: {}".format(type(node)))

def create_schema(self, schema_name: str, **kwargs):
logger.debug(f"Creating schema: {schema_name}")
Expand Down
12 changes: 3 additions & 9 deletions datafusion/polars.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ def to_polars_df(self, plan):
args = [self.to_polars_expr(expr) for expr in node.projections()]
return inputs[0].select(*args)
elif isinstance(node, Aggregate):
groupby_expr = [
self.to_polars_expr(expr) for expr in node.group_by_exprs()
]
groupby_expr = [self.to_polars_expr(expr) for expr in node.group_by_exprs()]
aggs = []
for expr in node.aggregate_exprs():
expr = expr.to_variant()
Expand All @@ -67,17 +65,13 @@ def to_polars_df(self, plan):
)
)
else:
raise Exception(
"Unsupported aggregate function {}".format(expr)
)
raise Exception("Unsupported aggregate function {}".format(expr))
df = inputs[0].groupby(groupby_expr).agg(aggs)
return df
elif isinstance(node, TableScan):
return polars.read_parquet(self.parquet_tables[node.table_name()])
else:
raise Exception(
"unsupported logical operator: {}".format(type(node))
)
raise Exception("unsupported logical operator: {}".format(type(node)))

def create_schema(self, schema_name: str, **kwargs):
logger.debug(f"Creating schema: {schema_name}")
Expand Down
12 changes: 3 additions & 9 deletions datafusion/tests/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@ def data_datetime(f):
datetime.datetime.now() - datetime.timedelta(days=1),
datetime.datetime.now() + datetime.timedelta(days=1),
]
return pa.array(
data, type=pa.timestamp(f), mask=np.array([False, True, False])
)
return pa.array(data, type=pa.timestamp(f), mask=np.array([False, True, False]))


def data_date32():
Expand All @@ -61,9 +59,7 @@ def data_date32():
datetime.date(1980, 1, 1),
datetime.date(2030, 1, 1),
]
return pa.array(
data, type=pa.date32(), mask=np.array([False, True, False])
)
return pa.array(data, type=pa.date32(), mask=np.array([False, True, False]))


def data_timedelta(f):
Expand All @@ -72,9 +68,7 @@ def data_timedelta(f):
datetime.timedelta(days=1),
datetime.timedelta(seconds=1),
]
return pa.array(
data, type=pa.duration(f), mask=np.array([False, True, False])
)
return pa.array(data, type=pa.duration(f), mask=np.array([False, True, False]))


def data_binary_other():
Expand Down
Loading

0 comments on commit 76ecf56

Please sign in to comment.