This is an examplary AWS Glue Streaming ETL Job project for CDK development with Python.
This project is based on the AWS Big Data Blog titled "Crafting serverless stream ETL with AWS Glue" with aws sample codes
In this project, we create a streaming ETL job in AWS Glue that consumes continuously generated ventilator metrics in micro-batches, applies transformations, performs aggregations, and delivers the data to a sink, so the results can be visualized or used in downstream processes. In this streaming ETL architecture, a Python script generates sample ventilator metrics and publishes them as a stream into Kinesis Data Streams.
After ingested to Amazon S3, you can query the data with Amazon Athena and build visual dashboards using Amazon QuickSight.
The cdk.json
file tells the CDK Toolkit how to execute your app.
This project is set up like a standard Python project. The initialization
process also creates a virtualenv within this project, stored under the .venv
directory. To create the virtualenv it assumes that there is a python3
(or python
for Windows) executable in your path with access to the venv
package. If for any reason the automatic creation of the virtualenv fails,
you can create the virtualenv manually.
To manually create a virtualenv on MacOS and Linux:
$ python3 -m venv .venv
After the init process completes and the virtualenv is created, you can use the following step to activate your virtualenv.
$ source .venv/bin/activate
If you are a Windows platform, you would activate the virtualenv like this:
% .venv\Scripts\activate.bat
Once the virtualenv is activated, you can install the required dependencies.
$ pip install -r requirements.txt
Then you should set approperly the cdk context configuration file, cdk.context.json
.
For example:
{ "kinesis_stream_name": "ventilatorstream", "glue_assets_s3_bucket_name": "aws-glue-assets-123456789012-us-east-1", "glue_job_script_file_name": "glue_streaming_from_kds_to_s3.py", "glue_job_name": "glue_streaming_from_kds_to_s3", "glue_job_input_arguments": { "--aws_region": "us-east-1", "--output_path": "s3://aws-glue-streaming-output-parquet-atq4q5u/", "--glue_database": "ventilatordb", "--glue_table_name": "ventilators_table", "stream_starting_position": "LATEST" }, "glue_kinesis_table": { "database_name": "ventilatordb", "table_name": "ventilators_table", "columns": [ { "name": "ventilatorid", "type": "int" }, { "name": "eventtime", "type": "string" }, { "name": "serialnumber", "type": "string" }, { "name": "pressurecontrol", "type": "int" }, { "name": "o2stats", "type": "int" }, { "name": "minutevolume", "type": "int" }, { "name": "manufacturer", "type": "string" } ] } }
At this point you can now synthesize the CloudFormation template for this code.
(.venv) $ export CDK_DEFAULT_ACCOUNT=$(aws sts get-caller-identity --query Account --output text) (.venv) $ export CDK_DEFAULT_REGION=$(aws configure get region) (.venv) $ cdk synth --all
To add additional dependencies, for example other CDK libraries, just add
them to your setup.py
file and rerun the pip install -r requirements.txt
command.
-
Create a S3 bucket for Apache Iceberg table
(.venv) $ cdk deploy GlueStreamingSinkToS3Path
-
Create a Kinesis data stream
(.venv) $ cdk deploy KinesisStreamAsGlueStreamingJobDataSource
-
Define a schema for the streaming data
(.venv) $ cdk deploy GlueSchemaOnKinesisStream
Running
cdk deploy GlueSchemaOnKinesisStream
command is like that we create a schema manually using the AWS Glue Data Catalog as the following steps:(1) On the AWS Glue console, choose Data Catalog.
(2) Choose Databases, and click Add database.
(3) Create a database with the nameventilatordb
.
(4) On the Data Catalog menu, Choose Tables, and click Add Table.
(5) For the table name, enterventilators_table
.
(6) Selectventilatordb
as a database.
(7) Choose Kinesis as the type of source.
(8) Enter the name of the stream.
(9) For the classification, choose JSON.
(10) Define the schema according to the following table.Column name Data type Example ventilatorid int 29 eventtime string "2023-09-07 16:02:17" serialnumber string "d85324d6-b426-4713-9409-aa7f5c7523b4" pressurecontrol int 26 o2stats int 91 minutevolume int 5 manufacturer string "GE" (11) Choose Finish
-
Create Glue Streaming Job
(.venv) $ ls src/main/python/ glue_streaming_from_kds_to_s3.py (.venv) $ aws mb s3://aws-glue-assets-123456789012-us-east-1 --region us-east-1 (.venv) $ aws cp src/main/python/glue_streaming_from_kds_to_s3.py s3://aws-glue-assets-123456789012-us-east-1/scripts/ (.venv) $ cdk deploy GlueStreamingSinkToS3JobRole GrantLFPermissionsOnGlueJobRole GlueStreamingSinkToS3Job
-
Make sure the glue job to access the Kinesis Data Streams table in the Glue Catalog database, otherwise grant the glue job to permissions
We can get permissions by running the following command:
(.venv) $ aws lakeformation list-permissions | jq -r '.PrincipalResourcePermissions[] | select(.Principal.DataLakePrincipalIdentifier | endswith(":role/GlueStreamingJobRole"))'
If not found, we need manually to grant the glue job to required permissions by running the following command:
(.venv) $ aws lakeformation grant-permissions \ --principal DataLakePrincipalIdentifier=arn:aws:iam::{account-id}:role/GlueStreamingJobRole \ --permissions SELECT DESCRIBE \ --resource '{ "Table": {"DatabaseName": "ventilatordb", "TableWildcard": {}} }'
-
Run glue job to load data from Kinesis Data Streams into S3
(.venv) $ aws glue start-job-run --job-name glue-streaming-from-kds-to-s3
-
Generate streaming data
We can synthetically generate ventilator data in JSON format using a simple Python application.
(.venv) $ python src/utils/gen_fake_kinesis_stream_data.py \ --region-name us-east-1 \ --stream-name your-stream-name \ --max-count 1000
-
Check the access logs in S3
After
5~10
minutes, you can see that the access logs have been delivered from Kinesis Data Streams to S3 and stored in a folder structure by year, month, day, and hour. -
Create and load a table with partitioned data in Amazon Athena
Go to Athena on the AWS Management console.
-
(step 1) Create a database
In order to create a new database called
ventilatordb
, enter the following statement in the Athena query editor and click the Run button to execute the query.CREATE DATABASE IF NOT EXISTS ventilatordb
-
(step 2) Create a table
Copy the following query into the Athena query editor, replace the
xxxxxxx
in the last line underLOCATION
with the string of your S3 bucket, and execute the query to create a new table.CREATE EXTERNAL TABLE ventilatordb.ventilators_parquet ( `ventilatorid` integer, `eventtime` timestamp, `serialnumber` string, `pressurecontrol` integer, `o2stats` integer, `minutevolume` integer, `manufacturer` string) PARTITIONED BY ( `ingest_year` int, `ingest_month` int, `ingest_day` int, `ingest_hour` int) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://aws-glue-streaming-output-parquet-xxxxx/ventilator_metrics';
If the query is successful, a table named
ventilators_parquet
is created and displayed on the left panel under the Tables section.If you get an error, check if (a) you have updated the
LOCATION
to the correct S3 bucket name, (b) you have mydatabase selected under the Database dropdown, and (c) you haveAwsDataCatalog
selected as the Data source.ℹ️ If you fail to create the table, give Athena users access permissions on
ventilatordb
through AWS Lake Formation, or you can grant anyone using Athena to accessventilatordb
by running the following command:(.venv) $ aws lakeformation grant-permissions \ --principal DataLakePrincipalIdentifier=arn:aws:iam::{account-id}:user/example-user-id \ --permissions CREATE_TABLE DESCRIBE ALTER DROP \ --resource '{ "Database": { "Name": "ventilatordb" } }' (.venv) $ aws lakeformation grant-permissions \ --principal DataLakePrincipalIdentifier=arn:aws:iam::{account-id}:user/example-user-id \ --permissions SELECT DESCRIBE ALTER INSERT DELETE DROP \ --resource '{ "Table": {"DatabaseName": "ventilatordb", "TableWildcard": {}} }'
-
(step 3) Load the partition data
Run the following query to load the partition data.
MSCK REPAIR TABLE ventilatordb.ventilators_parquet;
After you run this command, the data is ready for querying.
Instead of
MSCK REPAIR TABLE
command, you can use theALTER TABLE ADD PARTITION
command to add each partition manually.For example, to load the data in
s3://aws-glue-streaming-output-parquet-xxxxx/ventilator_metrics/ingest_year=2023/ingest_month=01/ingest_day=10/ingest_hour=06/
, you can run the following query.ALTER TABLE ventilatordb.ventilators_parquet ADD IF NOT EXISTS PARTITION (ingest_year='2023', ingest_month='01', ingest_day='10', ingest_hour='06') LOCATION 's3://aws-glue-streaming-output-parquet-xxxxx/ventilator_metrics/ingest_year=2023/ingest_month=01/ingest_day=10/ingest_hour=06/';
-
(Optional) (step 4) Check partitions
Run the following query to list all the partitions in an Athena table in unsorted order.
SHOW PARTITIONS ventilatordb.ventilators_parquet;
-
-
Run test query
Enter the following SQL statement and execute the query.
SELECT COUNT(*) FROM ventilatordb.ventilators_parquet;
-
Stop the glue job by replacing the job name in below command.
(.venv) $ JOB_RUN_IDS=$(aws glue get-job-runs \ --job-name glue-streaming-from-kds-to-s3 | jq -r '.JobRuns[] | select(.JobRunState=="RUNNING") | .Id' \ | xargs) (.venv) $ aws glue batch-stop-job-run \ --job-name glue-streaming-from-kds-to-s3 \ --job-run-ids $JOB_RUN_IDS
-
Delete the CloudFormation stack by running the below command.
(.venv) $ cdk destroy --all
cdk ls
list all stacks in the appcdk synth
emits the synthesized CloudFormation templatecdk deploy
deploy this stack to your default AWS account/regioncdk diff
compare deployed stack with current statecdk docs
open CDK documentation
- AWS Glue versions: The AWS Glue version determines the versions of Apache Spark and Python that AWS Glue supports.
- Crafting serverless streaming ETL jobs with AWS Glue (2020-10-14)
- aws-samples/aws-glue-streaming-etl-blog
- Streaming ETL jobs in AWS Glue
- Best practices to optimize cost and performance for AWS Glue streaming ETL jobs (2022-08-03)
- Granting database or table permissions error using AWS CDK
-
Error message:
AWS::LakeFormation::PrincipalPermissions | CfnPrincipalPermissions Resource handler returned message: "Resource does not exist or requester is not authorized to access requested permissions. (Service: LakeFormation, Status Code: 400, Request ID: f4d5e58b-29b6-4889-9666-7e38420c9035)" (RequestToken: 4a4bb1d6-b051-032f-dd12-5951d7b4d2a9, HandlerErrorCode: AccessDenied)
-
Solution:
The role assumed by cdk is not a data lake administrator. (e.g.,
cdk-hnb659fds-deploy-role-123456789012-us-east-1
)
So, deploying PrincipalPermissions meets the error such as:Resource does not exist or requester is not authorized to access requested permissions.
In order to solve the error, it is necessary to promote the cdk execution role to the data lake administrator.
For example, https://github.com/aws-samples/data-lake-as-code/blob/mainline/lib/stacks/datalake-stack.ts#L68 -
Reference:
https://github.com/aws-samples/data-lake-as-code - Data Lake as Code
-