-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathread_parquet_from_S3_or_local.py
executable file
·135 lines (110 loc) · 5.1 KB
/
read_parquet_from_S3_or_local.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env python3
from pprint import pprint
import logging
logger = logging.getLogger()
if logger.handlers:
for handler in logger.handlers:
logger.removeHandler(handler)
logging.basicConfig(format='%(asctime)s %(message)s',level=logging.INFO)
import findspark
findspark.init()
from pathlib import Path
import os
logger.info('current folder=%s', Path.cwd())
import sys
print('Python version='+ sys.version)
print(sys.path)
from pyspark import SparkConf
conf:SparkConf = SparkConf().setAppName("pyspark-local")
# Corresponds to:
# pyspark --packages "org.apache.hadoop:hadoop-aws:3.2.1,com.amazonaws:aws-java-sdk-bundle:1.11.563"
conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.2.1,com.amazonaws:aws-java-sdk-bundle:1.11.563')
# conf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'com.amazonaws.auth.EnvironmentVariableCredentialsProvider')
# conf.set('spark.hadoop.fs.s3a.access.key', 'AWS_ACCESS_KEY')
# conf.set('spark.hadoop.fs.s3a.secret.key', 'AWS_SECRET_KEY')
conf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'com.amazonaws.auth.profile.ProfileCredentialsProvider')
conf.set('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
conf.set('spark.hadoop.fs.s3a.impl.disable.cache', 'true')
conf.set('com.amazonaws.services.s3.enableV4', 'true')
print('Conf :===')
pprint(conf)
# Create spark context
from pyspark import SparkContext
sc:SparkContext = SparkContext.getOrCreate(conf=conf)
print('Spark version='+ sc.version)
print('Spark Context (i.e. sc) :===')
pprint(sc)
# Create spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print('Spark Session (i.e. spark) :===')
pprint(spark)
# imports
from pyspark.sql.functions import col, lit, count
from pyspark.sql.functions import to_timestamp, to_date
PARQUET_SUFFIX:str='.parquet'
USE_KMS:bool=False
PRIMARY_KEY_COLUMN:str='FLIGHT_RANGE_ID'
def register_kms_key() -> bool:
try:
# kms_key_landing = 'arn:aws:kms:us-east-1:550060283415:key/1c9916d5-8214-4a84-b4ed-ae5570e2ea43'
kms_key_landing = 'arn:aws:kms:us-east-1:550060283415:key/2bb9d33c-8b5b-4f67-bccc-6d9f603d7609'
spark._jsc.hadoopConfiguration().set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
spark._jsc.hadoopConfiguration().set('fs.s3a.impl.disable.cache', 'true')
spark._jsc.hadoopConfiguration().set('fs.s3a.server-side-encryption-algorithm', 'SSE-KMS')
spark._jsc.hadoopConfiguration().set('fs.s3a.server-side-encryption.key', kms_key_landing)
logger.info('KMS key setup for reading S3')
return True
except Exception as ex:
logger.error(ex)
return False
def check_parquet(use_local:bool=False) -> None:
parquet_location:str = None
if (use_local):
# Iterate over current folder
curr_folder_path_obj:Path = Path('.')
logger.info('Canonical current folder: %s', curr_folder_path_obj.as_posix())
logger.info('Absolute current folder: %s', curr_folder_path_obj.absolute())
local_filename:str = None
for f_obj in curr_folder_path_obj.iterdir():
logger.info(f'{type(f_obj)} : {f_obj} {f_obj.suffix}')
if (f_obj.suffix == PARQUET_SUFFIX and f_obj.stem.endswith('local_test')):
logger.info('Found local_test parquet file: %s', f_obj)
local_filename = f_obj.absolute().name
break
if not local_filename:
raise FileNotFoundError('Could not find any parquet file locally!')
try:
# local_filepath_obj:Path = Path(f'./{local_filename}')
# local_filepath:str = local_filepath_obj.absolute().name
logger.info('Absolute path of local parquet: %s', local_filename)
parquet_location = local_filename
logger.info('Using local parquet file: %s', parquet_location)
except Exception as e:
logger.error(e.message)
raise e
else:
if (USE_KMS and (not register_kms_key())):
raise Exception('ERROR during registering KMS key')
s3_obj_name:str='*'
s3_folder:str = 'lambda_datasync_10k/lndcdcadsprpsl_flightrange'
partition:str = '2021/09/07/18'
s3_obj_name:str = 'lndcdcadsprpsl_flightrange_1631041182.882919.parquet'
s3_location = f's3a://lineardp-conformance-common-flink-dev/{s3_folder}/{partition}/{s3_obj_name}'
parquet_location = s3_location
logger.info(f'Using parquet from S3 location: {parquet_location}')
try:
df = spark.read.parquet(parquet_location)
logger.info('Uploaded schema:')
df.printSchema()
total_record_cnt:int=df.count()
distinct_record_cnt:int=df.select(PRIMARY_KEY_COLUMN).distinct().count()
logger.info('Total Records: %d. Distinct Records: %d', total_record_cnt, distinct_record_cnt)
df.show(truncate=False)
except Exception as ex:
print(f'Issue reading {parquet_location}: {ex}')
def main():
# check_parquet(use_local=False) # read from S3
check_parquet(use_local=True) # read local parquet
if __name__ == "__main__":
main()