Skip to content

Commit

Permalink
Merge pull request #20 from factly/fix/encoding
Browse files Browse the repository at this point in the history
File content to read properly from s3 file system with non-utf encoding
  • Loading branch information
100mi authored Jan 26, 2023
2 parents da4272c + 9a819a6 commit 726f100
Showing 1 changed file with 60 additions and 23 deletions.
83 changes: 60 additions & 23 deletions app/utils/dataframes.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
from typing import Union
from urllib.parse import urlparse

import numpy as np
Expand All @@ -15,40 +16,73 @@
setting = Settings()


def get_encoding(obj=None, url=None):
if url:
obj = get(url).content
encoding = from_bytes(obj).best().encoding
def get_encoding(obj: Union[str, bytes], is_object=False) -> str:
"""Get encoding for a csv File for any given object url or file content in bytes
Args:
obj (Union[str, bytes]): obj is url in string format for URL or file content in bytes
is_object (bool, optional): for file content in bytes pass True. Defaults to False.
Returns:
str: encoding name
"""
if not is_object:
obj: bytes = get(obj).content

encoding: str = from_bytes(obj).best().encoding
return encoding


async def get_dataframe_honouring_encoding_async(
file_url: str,
source: Union[str, bytes],
is_object=False,
) -> pl.DataFrame:
"""Get Dataframe irrespective of encoding for csv file with async
Args:
source (Union[str, bytes]): source is url string incase for file url else incase for s3
file content in bytes
is_object (bool, optional): _description_. pass it as True in case for s3 file.
Returns:
pl.DataFrame: polars Dataframe object
"""
try:
df = pl.read_csv(file_url, null_values="NA", infer_schema_length=0)
df = pl.read_csv(source, null_values="NA", infer_schema_length=0)
except (UnicodeDecodeError, pl_exc.ComputeError) as err:
logger.exception(f"Could not interpret File encoding : {err}")
encoding = get_encoding(url=file_url)
logger.info(f"File encoding for `{file_url}` : {encoding}")
logger.error(f"Could not interpret File encoding : {err}")
encoding = get_encoding(obj=source, is_object=is_object)
logger.info(f"File encoding : {encoding}")
df = pl.read_csv(
file_url,
source,
null_values="NA",
encoding=encoding,
infer_schema_length=0,
)
return df


def get_dataframe_honouring_encoding(file_url: str) -> pl.DataFrame:
def get_dataframe_honouring_encoding(
source: Union[str, bytes], is_object=False
) -> pl.DataFrame:
"""Get Dataframe irrespective of encoding for csv file with sync
Args:
source (Union[str, bytes]): source is url string incase for file url else incase for s3
file content in bytes
is_object (bool, optional): _description_. pass it as True in case for s3 file.
Returns:
pl.DataFrame: polars Dataframe object
"""
try:
df = pl.read_csv(file_url, null_values="NA", infer_schema_length=0)
df = pl.read_csv(source, null_values="NA", infer_schema_length=0)
except (UnicodeDecodeError, pl_exc.ComputeError) as err:
logger.exception(f"Could not interpret File encoding : {err}")
encoding = get_encoding(url=file_url)
logger.info(f"File encoding for `{file_url}` : {encoding}")
logger.error(f"Could not interpret File encoding : {err}")
encoding = get_encoding(obj=source, is_object=is_object)
logger.info(f"File encoding : {encoding}")
df = pl.read_csv(
file_url,
source,
null_values="NA",
encoding=encoding,
infer_schema_length=0,
Expand Down Expand Up @@ -92,16 +126,17 @@ async def get_dataframe_async(file_url: str):
return df

elif url.scheme == "s3":

logger.info("Check for files with s3 extension")
fs = s3fs.S3FileSystem(
key=setting.S3_ACCESS_KEY_ID,
secret=setting.S3_SECRET_ACCESS_KEY,
client_kwargs={"endpoint_url": setting.S3_ENDPOINT_URL},
)

with fs.open(f"{url.netloc}{url.path}") as f:
df = await get_dataframe_honouring_encoding_async(f)
with fs.open(f"{url.netloc}{url.path}", "rb") as f:
obj = f.read()

df = await get_dataframe_honouring_encoding_async(obj, is_object=True)
return df


Expand All @@ -121,7 +156,7 @@ def get_dataframe(file_url: str):
url = urlparse(file_url)

if url.scheme == "http" or url.scheme == "https":
df = get_dataframe_honouring_encoding(file_url)
df = get_dataframe_honouring_encoding(source=file_url, is_object=False)
return df

elif url.scheme == "s3":
Expand All @@ -132,7 +167,9 @@ def get_dataframe(file_url: str):
client_kwargs={"endpoint_url": setting.S3_ENDPOINT_URL},
)

with fs.open(f"{url.netloc}{url.path}") as f:
df = get_dataframe_honouring_encoding(f)

with fs.open(f"{url.netloc}{url.path}", "rb") as f:
file_content = f.read()
df = get_dataframe_honouring_encoding(
source=file_content, is_object=True
)
return df

0 comments on commit 726f100

Please sign in to comment.