Skip to content

Commit

Permalink
Added endpoint url parameter (#34)
Browse files Browse the repository at this point in the history
* add endpoint url parameter

* fix creation of S3ParquetFile

* add default value for endpoint url in S3ParquetFile class

* fix argument parser test
  • Loading branch information
sattler authored Jan 18, 2023
1 parent c80cb3b commit fd97598
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 14 deletions.
8 changes: 7 additions & 1 deletion parquet_tools/commands/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,19 @@ def configure_parser(paser: ArgumentParser) -> ArgumentParser:
type=str,
required=False,
help='awscli profile in ~/.aws/credentials. You use this option when you read parquet file on s3.')
paser.add_argument('--endpoint-url',
type=str,
required=False,
default=None,
help='A custom S3 endpoint URL')
paser.set_defaults(handler=_cli)
return paser


def _cli(args: Namespace) -> None:
try:
pfs: List[ParquetFile] = [
to_parquet_file(file_exp=f, awsprofile=args.awsprofile)
to_parquet_file(file_exp=f, awsprofile=args.awsprofile, endpoint_url=args.endpoint_url)
for f in args.file]
with get_datafame_from_objs(pfs, args.head) as df:
if df is None:
Expand All @@ -52,6 +57,7 @@ def _cli(args: Namespace) -> None:
print(str(e), file=sys.stderr)
except FileNotFoundException as e:
print(str(e), file=sys.stderr)
raise e


def _execute(df: pd.DataFrame, head: int, columns: list) -> None:
Expand Down
7 changes: 6 additions & 1 deletion parquet_tools/commands/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,18 @@ def configure_parser(paser: ArgumentParser) -> ArgumentParser:
required=False,
default=False,
help='Detail expression using thrift.')
paser.add_argument('--endpoint-url',
type=str,
required=False,
default=None,
help='A custom S3 endpoint URL')

paser.set_defaults(handler=_cli)
return paser


def _cli(args: Namespace) -> None:
pf: ParquetFile = to_parquet_file(file_exp=args.file, awsprofile=args.awsprofile)
pf: ParquetFile = to_parquet_file(file_exp=args.file, awsprofile=args.awsprofile, endpoint_url=args.endpoint_url)
if pf.is_wildcard():
print('Cannot use wildcard for inspection.', file=sys.stderr)
else:
Expand Down
7 changes: 6 additions & 1 deletion parquet_tools/commands/show.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ def configure_parser(paser: ArgumentParser) -> ArgumentParser:
type=str,
required=False,
help='awscli profile in ~/.aws/credentials. You use this option when you read parquet file on s3.')
paser.add_argument('--endpoint-url',
type=str,
required=False,
default=None,
help='A custom S3 endpoint URL')

paser.set_defaults(handler=_cli)
return paser
Expand All @@ -54,7 +59,7 @@ def configure_parser(paser: ArgumentParser) -> ArgumentParser:
def _cli(args: Namespace) -> None:
try:
pfs: List[ParquetFile] = [
to_parquet_file(file_exp=f, awsprofile=args.awsprofile)
to_parquet_file(file_exp=f, awsprofile=args.awsprofile, endpoint_url=args.endpoint_url)
for f in args.file]
with get_datafame_from_objs(pfs, args.head) as df:
if df is None:
Expand Down
12 changes: 7 additions & 5 deletions parquet_tools/commands/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class S3ParquetFile(ParquetFile):
aws_session: boto3.Session
bucket: str
key: str
endpoint_url: Optional[str] = None

def validation(self):
''' key can have *. But it must be last of the string.
Expand All @@ -116,7 +117,7 @@ def is_wildcard(self) -> bool:
return '*' in self.key

def resolve_wildcard(self) -> List[ParquetFile]:
list_res = self.aws_session.client('s3')\
list_res = self.aws_session.client('s3', endpoint_url=self.endpoint_url)\
.list_objects_v2(
Bucket=self.bucket,
Prefix=self.key[:-1] # remove *
Expand All @@ -128,7 +129,7 @@ def resolve_wildcard(self) -> List[ParquetFile]:
return []
keys = [e['Key'] for e in list_res['Contents']]
return sorted(
[S3ParquetFile(aws_session=self.aws_session, bucket=self.bucket, key=key) for key in keys],
[S3ParquetFile(aws_session=self.aws_session, bucket=self.bucket, key=key, endpoint_url=self.endpoint_url) for key in keys],
key=lambda x: x.key
)

Expand All @@ -141,7 +142,7 @@ def get_local_path(self) -> Iterator[str]:
logger.info(f'Download stat parquet file on s3://{self.bucket}/{self.key} -> {localfile}')
try:
with Halo(text='Downloading from s3', spinner='dots', stream=stderr) as spinner:
self.aws_session.resource('s3')\
self.aws_session.resource('s3', endpoint_url=self.endpoint_url)\
.meta.client.download_file(self.bucket, self.key, localfile)
spinner.info(f's3://{self.bucket}/{self.key} => {localfile}')
except Exception:
Expand All @@ -158,15 +159,16 @@ def _is_s3_file(filename: str) -> bool:
return filename[:5] == 's3://'


def to_parquet_file(file_exp: str, awsprofile: Optional[str]) -> ParquetFile:
def to_parquet_file(file_exp: str, awsprofile: Optional[str], endpoint_url: Optional[str]) -> ParquetFile:
'''Transform file_exp to ParquetFile object.
'''
if _is_s3_file(file_exp):
parsed_url = urlparse(file_exp)
return S3ParquetFile(
aws_session=get_aws_session(awsprofile),
bucket=parsed_url.netloc,
key=parsed_url.path[1:]
key=parsed_url.path[1:],
endpoint_url=endpoint_url
)
else:
return LocalParquetFile(
Expand Down
8 changes: 5 additions & 3 deletions tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,19 @@ def parquet_file():
'columns': [],
'file': ['file1.parquet'],
'head': -1,
'awsprofile': None
'awsprofile': None,
'endpoint_url': None
}
),
# most complex one
(
'--columns col1,col2 --head 100 --awsprofile user1 file1.parquet file2.parquet', True,
'--columns col1,col2 --head 100 --endpoint-url http://localhost:9000 --awsprofile user1 file1.parquet file2.parquet', True,
{
'columns': ['col1', 'col2'],
'file': ['file1.parquet', 'file2.parquet'],
'head': 100,
'awsprofile': 'user1'
'awsprofile': 'user1',
'endpoint_url': 'http://localhost:9000'
}
),
# empty columns
Expand Down
9 changes: 6 additions & 3 deletions tests/test_show.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def parser():
'file': ['file1.parquet'],
'format': 'psql',
'head': -1,
'awsprofile': None
'awsprofile': None,
'endpoint_url': None
}
),
# most complex one
Expand All @@ -33,7 +34,8 @@ def parser():
'file': ['file1.parquet', 'file2.parquet'],
'format': 'github',
'head': 100,
'awsprofile': None
'awsprofile': None,
'endpoint_url': None
}
),
# file is on S3
Expand All @@ -44,7 +46,8 @@ def parser():
'file': ['s3://bucket-name/file1.parquet'],
'format': 'psql',
'head': -1,
'awsprofile': 'user1'
'awsprofile': 'user1',
'endpoint_url': None
}
),
# empty columns
Expand Down

0 comments on commit fd97598

Please sign in to comment.