diff --git a/app/utils/dataframes.py b/app/utils/dataframes.py index 62445e6..e3fda9f 100644 --- a/app/utils/dataframes.py +++ b/app/utils/dataframes.py @@ -1,4 +1,5 @@ import datetime +from typing import Union from urllib.parse import urlparse import numpy as np @@ -15,24 +16,45 @@ setting = Settings() -def get_encoding(obj=None, url=None): - if url: - obj = get(url).content - encoding = from_bytes(obj).best().encoding +def get_encoding(obj: Union[str, bytes], is_object=False) -> str: + """Get encoding for a csv File for any given object url or file content in bytes + + Args: + obj (Union[str, bytes]): obj is url in string format for URL or file content in bytes + is_object (bool, optional): for file content in bytes pass True. Defaults to False. + + Returns: + str: encoding name + """ + if not is_object: + obj: bytes = get(obj).content + + encoding: str = from_bytes(obj).best().encoding return encoding async def get_dataframe_honouring_encoding_async( - file_url: str, + source: Union[str, bytes], + is_object=False, ) -> pl.DataFrame: + """Get Dataframe irrespective of encoding for csv file with async + + Args: + source (Union[str, bytes]): source is url string incase for file url else incase for s3 + file content in bytes + is_object (bool, optional): _description_. pass it as True in case for s3 file. + + Returns: + pl.DataFrame: polars Dataframe object + """ try: - df = pl.read_csv(file_url, null_values="NA", infer_schema_length=0) + df = pl.read_csv(source, null_values="NA", infer_schema_length=0) except (UnicodeDecodeError, pl_exc.ComputeError) as err: - logger.exception(f"Could not interpret File encoding : {err}") - encoding = get_encoding(url=file_url) - logger.info(f"File encoding for `{file_url}` : {encoding}") + logger.error(f"Could not interpret File encoding : {err}") + encoding = get_encoding(obj=source, is_object=is_object) + logger.info(f"File encoding : {encoding}") df = pl.read_csv( - file_url, + source, null_values="NA", encoding=encoding, infer_schema_length=0, @@ -40,15 +62,27 @@ async def get_dataframe_honouring_encoding_async( return df -def get_dataframe_honouring_encoding(file_url: str) -> pl.DataFrame: +def get_dataframe_honouring_encoding( + source: Union[str, bytes], is_object=False +) -> pl.DataFrame: + """Get Dataframe irrespective of encoding for csv file with sync + + Args: + source (Union[str, bytes]): source is url string incase for file url else incase for s3 + file content in bytes + is_object (bool, optional): _description_. pass it as True in case for s3 file. + + Returns: + pl.DataFrame: polars Dataframe object + """ try: - df = pl.read_csv(file_url, null_values="NA", infer_schema_length=0) + df = pl.read_csv(source, null_values="NA", infer_schema_length=0) except (UnicodeDecodeError, pl_exc.ComputeError) as err: - logger.exception(f"Could not interpret File encoding : {err}") - encoding = get_encoding(url=file_url) - logger.info(f"File encoding for `{file_url}` : {encoding}") + logger.error(f"Could not interpret File encoding : {err}") + encoding = get_encoding(obj=source, is_object=is_object) + logger.info(f"File encoding : {encoding}") df = pl.read_csv( - file_url, + source, null_values="NA", encoding=encoding, infer_schema_length=0, @@ -92,16 +126,17 @@ async def get_dataframe_async(file_url: str): return df elif url.scheme == "s3": - + logger.info("Check for files with s3 extension") fs = s3fs.S3FileSystem( key=setting.S3_ACCESS_KEY_ID, secret=setting.S3_SECRET_ACCESS_KEY, client_kwargs={"endpoint_url": setting.S3_ENDPOINT_URL}, ) - with fs.open(f"{url.netloc}{url.path}") as f: - df = await get_dataframe_honouring_encoding_async(f) + with fs.open(f"{url.netloc}{url.path}", "rb") as f: + obj = f.read() + df = await get_dataframe_honouring_encoding_async(obj, is_object=True) return df @@ -121,7 +156,7 @@ def get_dataframe(file_url: str): url = urlparse(file_url) if url.scheme == "http" or url.scheme == "https": - df = get_dataframe_honouring_encoding(file_url) + df = get_dataframe_honouring_encoding(source=file_url, is_object=False) return df elif url.scheme == "s3": @@ -132,7 +167,9 @@ def get_dataframe(file_url: str): client_kwargs={"endpoint_url": setting.S3_ENDPOINT_URL}, ) - with fs.open(f"{url.netloc}{url.path}") as f: - df = get_dataframe_honouring_encoding(f) - + with fs.open(f"{url.netloc}{url.path}", "rb") as f: + file_content = f.read() + df = get_dataframe_honouring_encoding( + source=file_content, is_object=True + ) return df