Skip to content

Commit

Permalink
Declare schema for pyarrow table
Browse files Browse the repository at this point in the history
Fixes a problem where the column type was not inferable because all entries had a NULL value for that column
  • Loading branch information
manuelwedler committed Oct 25, 2024
1 parent 550c8cc commit 979b78a
Showing 1 changed file with 24 additions and 1 deletion.
25 changes: 24 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,28 @@ def process_df(df, dtypes):
df[col] = df[col].astype(pd.UInt16Dtype() if dtype == 'UInt16' else dtype)
return df

def get_pyarrow_type(dt):
match dt:
case 'bool':
return pa.bool_()
case 'Int32':
return pa.int32()
case 'Int64':
return pa.int64()
case 'string':
return pa.string()
case 'object':
return pa.binary()
case 'datetime64[ns]':
return pa.timestamp('ns')
case 'json':
return pa.string()
case _:
raise ValueError("Type not supported")

def get_pyarrow_schema(dtypes):
return pa.schema([pa.field(col, get_pyarrow_type(dt)) for col, dt in dtypes.items()])

def upload_to_s3(file_path, bucket_name, object_name):
logger.info(f"Uploading {object_name} to S3")
if os.getenv("DEBUG"):
Expand All @@ -156,6 +178,7 @@ def upload_to_s3(file_path, bucket_name, object_name):
def fetch_and_write(table_config, engine):
table_name = table_config['name']
dtypes = table_config['datatypes']
schema = get_pyarrow_schema(dtypes)
chunk_size = table_config['chunk_size']
if os.getenv('DEBUG'):
logger.debug(f"DEBUG: Setting chunk_size to 1/100 of {chunk_size} = {chunk_size // 100}")
Expand Down Expand Up @@ -191,7 +214,7 @@ def fetch_and_write(table_config, engine):

logger.info(f"Processed chunk {chunk_counter} of file {file_counter}")
logger.info(f"DataFrame size: {df.memory_usage(deep=True).sum() / (1024 * 1024):.2f} MB")
chunk_table = pa.Table.from_pandas(df) # Convert the dataframe to a PyArrow table
chunk_table = pa.Table.from_pandas(df, schema=schema) # Convert the dataframe to a PyArrow table

if writer is None:
# file name: contracts_0_10000_zstd.parquet, contracts_10000_20000_zstd.parquet, etc.
Expand Down

0 comments on commit 979b78a

Please sign in to comment.