diff --git a/.checklist.yaml b/.checklist.yaml deleted file mode 100644 index f0c21171..00000000 --- a/.checklist.yaml +++ /dev/null @@ -1,30 +0,0 @@ -apiVersion: quintoandar.com.br/checklist/v2 -kind: ServiceChecklist -metadata: - name: butterfree -spec: - description: >- - A solution for Feature Stores. - - costCenter: C055 - department: engineering - lifecycle: production - docs: true - - ownership: - team: data_products_mlops - line: tech_platform - owner: otavio.cals@quintoandar.com.br - - libraries: - - name: butterfree - type: common-usage - path: https://quintoandar.github.io/python-package-server/ - description: A lib to build Feature Stores. - registries: - - github-packages - tier: T0 - - channels: - squad: 'mlops' - alerts: 'data-products-reports' diff --git a/.github/workflows/skip_lint.yml b/.github/workflows/skip_lint.yml deleted file mode 100644 index 1c768a23..00000000 --- a/.github/workflows/skip_lint.yml +++ /dev/null @@ -1,17 +0,0 @@ -# This step is used only because we want to mark the runner-linter check as required -# for PRs to develop, but not for the merge queue to merge into develop, -# github does not have this functionality yet - -name: 'Skip github-actions/runner-linter check at merge queue' - -on: - merge_group: - -jobs: - empty_job: - name: 'github-actions/runner-linter' - runs-on: github-actions-developers-runner - steps: - - name: Skip github-actions/runner-linter check at merge queue - run: | - echo "Done" diff --git a/CHANGELOG.md b/CHANGELOG.md index 26d5f80d..51cbfdfd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,11 @@ All notable changes to this project will be documented in this file. Preferably use **Added**, **Changed**, **Removed** and **Fixed** topics in each release or unreleased log for a better organization. -## [Unreleased] +## [1.4.6](https://github.com/quintoandar/butterfree/releases/tag/1.4.6) + +### Fixed +* [MLOP-2519] avoid configuring logger at lib level ([#393](https://github.com/quintoandar/butterfree/pull/393)) +* Rollback to latest stable release ([#391](https://github.com/quintoandar/butterfree/pull/391)) ## [1.4.5](https://github.com/quintoandar/butterfree/releases/tag/1.4.5) * Rollback repartitions ([#386](https://github.com/quintoandar/butterfree/pull/386)) diff --git a/butterfree/_cli/migrate.py b/butterfree/_cli/migrate.py index 207e7daf..811796d3 100644 --- a/butterfree/_cli/migrate.py +++ b/butterfree/_cli/migrate.py @@ -1,10 +1,11 @@ import datetime import importlib import inspect +import logging import os import pkgutil import sys -from typing import Set, Type +from typing import Set import boto3 import setuptools @@ -12,7 +13,6 @@ from botocore.exceptions import ClientError from butterfree.configs import environment -from butterfree.configs.logger import __logger from butterfree.migrations.database_migration import ALLOWED_DATABASE from butterfree.pipelines import FeatureSetPipeline @@ -20,7 +20,7 @@ help="Apply the automatic migrations in a database.", no_args_is_help=True ) -logger = __logger("migrate", True) +logger = logging.getLogger(__name__) def __find_modules(path: str) -> Set[str]: @@ -90,18 +90,8 @@ def __fs_objects(path: str) -> Set[FeatureSetPipeline]: instances.add(value) - def create_instance(cls: Type[FeatureSetPipeline]) -> FeatureSetPipeline: - sig = inspect.signature(cls.__init__) - parameters = sig.parameters - - if "run_date" in parameters: - run_date = datetime.datetime.today().strftime("%Y-%m-%d") - return cls(run_date) - - return cls() - logger.info("Creating instances...") - return set(create_instance(value) for value in instances) # type: ignore + return set(value() for value in instances) # type: ignore PATH = typer.Argument( diff --git a/butterfree/clients/cassandra_client.py b/butterfree/clients/cassandra_client.py index 0b300844..f50c5b6c 100644 --- a/butterfree/clients/cassandra_client.py +++ b/butterfree/clients/cassandra_client.py @@ -1,5 +1,6 @@ """CassandraClient entity.""" +import logging from ssl import CERT_REQUIRED, PROTOCOL_TLSv1 from typing import Dict, List, Optional, Union @@ -23,6 +24,11 @@ EMPTY_STRING_HOST_ERROR = "The value of Cassandra host is empty. Please fill correctly with your endpoints" # noqa: E501 GENERIC_INVALID_HOST_ERROR = "The Cassandra host must be a valid string, a string that represents a list or list of strings" # noqa: E501 +logger = logging.getLogger(__name__) + +EMPTY_STRING_HOST_ERROR = "The value of Cassandra host is empty. Please fill correctly with your endpoints" # noqa: E501 +GENERIC_INVALID_HOST_ERROR = "The Cassandra host must be a valid string, a string that represents a list or list of strings" # noqa: E501 + class CassandraColumn(TypedDict): """Type for cassandra columns. diff --git a/butterfree/extract/source.py b/butterfree/extract/source.py index 9d50e94c..bfc15271 100644 --- a/butterfree/extract/source.py +++ b/butterfree/extract/source.py @@ -3,7 +3,6 @@ from typing import List, Optional from pyspark.sql import DataFrame -from pyspark.storagelevel import StorageLevel from butterfree.clients import SparkClient from butterfree.extract.readers.reader import Reader @@ -96,21 +95,16 @@ def construct( DataFrame with the query result against all readers. """ - # Step 1: Build temporary views for each reader for reader in self.readers: - reader.build(client=client, start_date=start_date, end_date=end_date) + reader.build( + client=client, start_date=start_date, end_date=end_date + ) # create temporary views for each reader - # Step 2: Execute SQL query on the combined readers dataframe = client.sql(self.query) - # Step 3: Cache the dataframe if necessary, using memory and disk storage if not dataframe.isStreaming and self.eager_evaluation: - # Persist to ensure the DataFrame is stored in mem and disk (if necessary) - dataframe.persist(StorageLevel.MEMORY_AND_DISK) - # Trigger the cache/persist operation by performing an action - dataframe.count() + dataframe.cache().count() - # Step 4: Run post-processing hooks on the dataframe post_hook_df = self.run_post_hooks(dataframe) return post_hook_df diff --git a/butterfree/load/writers/delta_writer.py b/butterfree/load/writers/delta_writer.py index 933f1adb..45ce1c0c 100644 --- a/butterfree/load/writers/delta_writer.py +++ b/butterfree/load/writers/delta_writer.py @@ -1,10 +1,11 @@ +import logging + from delta.tables import DeltaTable from pyspark.sql.dataframe import DataFrame from butterfree.clients import SparkClient -from butterfree.configs.logger import __logger -logger = __logger("delta_writer", True) +logger = logging.getLogger(__name__) class DeltaWriter: diff --git a/butterfree/migrations/database_migration/cassandra_migration.py b/butterfree/migrations/database_migration/cassandra_migration.py index 4d50746c..5a4f755f 100644 --- a/butterfree/migrations/database_migration/cassandra_migration.py +++ b/butterfree/migrations/database_migration/cassandra_migration.py @@ -78,9 +78,6 @@ def _get_alter_table_add_query(self, columns: List[Diff], table_name: str) -> st def _get_alter_column_type_query(self, column: Diff, table_name: str) -> str: """Creates CQL statement to alter columns' types. - In Cassandra 3.4.x to 3.11.x alter type is not allowed. - This method creates a temp column to comply. - Args: columns: list of Diff objects with ALTER_TYPE kind. table_name: table name. @@ -89,23 +86,10 @@ def _get_alter_column_type_query(self, column: Diff, table_name: str) -> str: Alter column type query. """ - temp_column_name = f"{column.column}_temp" - - add_temp_column_query = ( - f"ALTER TABLE {table_name} ADD {temp_column_name} {column.value};" - ) - copy_data_to_temp_query = ( - f"UPDATE {table_name} SET {temp_column_name} = {column.column};" - ) - - drop_old_column_query = f"ALTER TABLE {table_name} DROP {column.column};" - rename_temp_column_query = ( - f"ALTER TABLE {table_name} RENAME {temp_column_name} TO {column.column};" - ) + parsed_columns = self._get_parsed_columns([column]) return ( - f"{add_temp_column_query} {copy_data_to_temp_query} " - f"{drop_old_column_query} {rename_temp_column_query};" + f"ALTER TABLE {table_name} ALTER {parsed_columns.replace(' ', ' TYPE ')};" ) @staticmethod diff --git a/butterfree/migrations/database_migration/database_migration.py b/butterfree/migrations/database_migration/database_migration.py index 351a4724..aa07fb35 100644 --- a/butterfree/migrations/database_migration/database_migration.py +++ b/butterfree/migrations/database_migration/database_migration.py @@ -1,16 +1,16 @@ """Migration entity.""" +import logging from abc import ABC, abstractmethod from dataclasses import dataclass from enum import Enum, auto from typing import Any, Dict, List, Optional, Set from butterfree.clients import AbstractClient -from butterfree.configs.logger import __logger from butterfree.load.writers.writer import Writer from butterfree.transform import FeatureSet -logger = __logger("database_migrate", True) +logger = logging.getLogger(__name__) @dataclass diff --git a/butterfree/pipelines/feature_set_pipeline.py b/butterfree/pipelines/feature_set_pipeline.py index c33f3bb9..8ba1a636 100644 --- a/butterfree/pipelines/feature_set_pipeline.py +++ b/butterfree/pipelines/feature_set_pipeline.py @@ -2,8 +2,6 @@ from typing import List, Optional -from pyspark.storagelevel import StorageLevel - from butterfree.clients import SparkClient from butterfree.dataframe_service import repartition_sort_df from butterfree.extract import Source @@ -211,25 +209,19 @@ def run( soon. Use only if strictly necessary. """ - # Step 1: Construct input dataframe from the source. dataframe = self.source.construct( client=self.spark_client, start_date=self.feature_set.define_start_date(start_date), end_date=end_date, ) - # Step 2: Repartition and sort if required, avoid if not necessary. if partition_by: order_by = order_by or partition_by - current_partitions = dataframe.rdd.getNumPartitions() - optimal_partitions = num_processors or current_partitions - if current_partitions != optimal_partitions: - dataframe = repartition_sort_df( - dataframe, partition_by, order_by, num_processors - ) - - # Step 3: Construct the feature set dataframe using defined transformations. - transformed_dataframe = self.feature_set.construct( + dataframe = repartition_sort_df( + dataframe, partition_by, order_by, num_processors + ) + + dataframe = self.feature_set.construct( dataframe=dataframe, client=self.spark_client, start_date=start_date, @@ -237,22 +229,15 @@ def run( num_processors=num_processors, ) - if transformed_dataframe.storageLevel != StorageLevel( - False, False, False, False, 1 - ): - dataframe.unpersist() # Clear the data from the cache (disk and memory) - - # Step 4: Load the data into the configured sink. self.sink.flush( - dataframe=transformed_dataframe, + dataframe=dataframe, feature_set=self.feature_set, spark_client=self.spark_client, ) - # Step 5: Validate the output if not streaming and data volume is reasonable. - if not transformed_dataframe.isStreaming: + if not dataframe.isStreaming: self.sink.validate( - dataframe=transformed_dataframe, + dataframe=dataframe, feature_set=self.feature_set, spark_client=self.spark_client, ) diff --git a/butterfree/transform/aggregated_feature_set.py b/butterfree/transform/aggregated_feature_set.py index 0760af14..6706bf8c 100644 --- a/butterfree/transform/aggregated_feature_set.py +++ b/butterfree/transform/aggregated_feature_set.py @@ -387,7 +387,6 @@ def _aggregate( ] groupby = self.keys_columns.copy() - if window is not None: dataframe = dataframe.withColumn("window", window.get()) groupby.append("window") @@ -411,23 +410,19 @@ def _aggregate( "keep_rn", functions.row_number().over(partition_window) ).filter("keep_rn = 1") - current_partitions = dataframe.rdd.getNumPartitions() - optimal_partitions = num_processors or current_partitions - - if current_partitions != optimal_partitions: - dataframe = repartition_df( - dataframe, - partition_by=groupby, - num_processors=optimal_partitions, - ) - + # repartition to have all rows for each group at the same partition + # by doing that, we won't have to shuffle data on grouping by id + dataframe = repartition_df( + dataframe, + partition_by=groupby, + num_processors=num_processors, + ) grouped_data = dataframe.groupby(*groupby) - if self._pivot_column and self._pivot_values: + if self._pivot_column: grouped_data = grouped_data.pivot(self._pivot_column, self._pivot_values) aggregated = grouped_data.agg(*aggregations) - return self._with_renamed_columns(aggregated, features, window) def _with_renamed_columns( @@ -576,16 +571,14 @@ def construct( pre_hook_df = self.run_pre_hooks(dataframe) - output_df = pre_hook_df - for feature in self.keys + [self.timestamp]: - output_df = feature.transform(output_df) - - output_df = self.incremental_strategy.filter_with_incremental_strategy( - dataframe=output_df, start_date=start_date, end_date=end_date + output_df = reduce( + lambda df, feature: feature.transform(df), + self.keys + [self.timestamp], + pre_hook_df, ) if self._windows and end_date is not None: - # Run aggregations for each window + # run aggregations for each window agg_list = [ self._aggregate( dataframe=output_df, @@ -605,12 +598,13 @@ def construct( # keeping this logic to maintain the same behavior for already implemented # feature sets + if self._windows[0].slide == "1 day": base_df = self._get_base_dataframe( client=client, dataframe=output_df, end_date=end_date ) - # Left join each aggregation result to our base dataframe + # left join each aggregation result to our base dataframe output_df = reduce( lambda left, right: self._dataframe_join( left, @@ -643,18 +637,12 @@ def construct( output_df = output_df.select(*self.columns).replace( # type: ignore float("nan"), None ) - - if not output_df.isStreaming and self.deduplicate_rows: - output_df = self._filter_duplicated_rows(output_df) + if not output_df.isStreaming: + if self.deduplicate_rows: + output_df = self._filter_duplicated_rows(output_df) + if self.eager_evaluation: + output_df.cache().count() post_hook_df = self.run_post_hooks(output_df) - # Eager evaluation, only if needed and managable - if not output_df.isStreaming and self.eager_evaluation: - # Small dataframes only - if output_df.count() < 1_000_000: - post_hook_df.cache().count() - else: - post_hook_df.cache() # Cache without materialization for large volumes - return post_hook_df diff --git a/butterfree/transform/feature_set.py b/butterfree/transform/feature_set.py index 2c4b9b51..369eaf29 100644 --- a/butterfree/transform/feature_set.py +++ b/butterfree/transform/feature_set.py @@ -436,8 +436,11 @@ def construct( pre_hook_df, ).select(*self.columns) - if not output_df.isStreaming and self.deduplicate_rows: - output_df = self._filter_duplicated_rows(output_df) + if not output_df.isStreaming: + if self.deduplicate_rows: + output_df = self._filter_duplicated_rows(output_df) + if self.eager_evaluation: + output_df.cache().count() output_df = self.incremental_strategy.filter_with_incremental_strategy( dataframe=output_df, start_date=start_date, end_date=end_date diff --git a/docs/source/butterfree.dataframe_service.rst b/docs/source/butterfree.dataframe_service.rst index 4fb54fd3..ae9658a5 100644 --- a/docs/source/butterfree.dataframe_service.rst +++ b/docs/source/butterfree.dataframe_service.rst @@ -23,22 +23,6 @@ butterfree.dataframe\_service.partitioning module butterfree.dataframe\_service.repartition module ------------------------------------------------ -.. automodule:: butterfree.dataframe_service.incremental_strategy - :members: - :undoc-members: - :show-inheritance: - -butterfree.dataframe\_service.partitioning module -------------------------------------------------- - -.. automodule:: butterfree.dataframe_service.partitioning - :members: - :undoc-members: - :show-inheritance: - -butterfree.dataframe\_service.repartition module ------------------------------------------------- - .. automodule:: butterfree.dataframe_service.repartition :members: :undoc-members: diff --git a/logging.json b/logging.json deleted file mode 100644 index e69de29b..00000000 diff --git a/setup.py b/setup.py index 8ff386d9..d51e1866 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,7 @@ from setuptools import find_packages, setup __package_name__ = "butterfree" -__version__ = "1.4.5" +__version__ = "1.4.6" __repository_url__ = "https://github.com/quintoandar/butterfree" with open("requirements.txt") as f: diff --git a/tests/unit/butterfree/transform/test_feature_set.py b/tests/unit/butterfree/transform/test_feature_set.py index 37a69be2..e907dc0a 100644 --- a/tests/unit/butterfree/transform/test_feature_set.py +++ b/tests/unit/butterfree/transform/test_feature_set.py @@ -220,7 +220,7 @@ def test_construct( + feature_divide.get_output_columns() ) assert_dataframe_equality(result_df, feature_set_dataframe) - assert not result_df.is_cached + assert result_df.is_cached def test_construct_invalid_df( self, key_id, timestamp_c, feature_add, feature_divide