Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parser error but incomplete file location #194

Open
juliocpmelo opened this issue Aug 23, 2023 · 0 comments
Open

Parser error but incomplete file location #194

juliocpmelo opened this issue Aug 23, 2023 · 0 comments

Comments

@juliocpmelo
Copy link

juliocpmelo commented Aug 23, 2023

Hello I was facing several parser errors while processing many csv files with glue, the enviroment is configured to run with Glue 4.0, and all the default configurations (also tried 3.0). The etl script I'm using is as follows:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrameCollection
from awsglue.dynamicframe import DynamicFrame
import re

# Script generated for node Custom Transform
def MyTransform(glueContext, dfc) -> DynamicFrameCollection:
    import pyspark.sql.functions as f
    from pyspark.sql.functions import to_timestamp, date_format

    df = dfc.select(list(dfc.keys())[0]).toDF()
    df_filtered_1 = df.withColumn("year", date_format(f.col("ts"), "yyyy"))
    df_filtered_2 = df_filtered_1.withColumn("month", date_format(f.col("ts"), "MM"))
    df_filtered_3 = df_filtered_2.withColumn("day", date_format(f.col("ts"), "d"))

    dyf_filtered = DynamicFrame.fromDF(df_filtered_3, glueContext, "add_index_columns")

    return DynamicFrameCollection(
        {"CustomTransformAddColumns": dyf_filtered}, glueContext
    )


args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
    format_options={
        "quoteChar": '"',
        "withHeader": True,
        "separator": ",",
        "optimizePerformance": False,
    },
    connection_type="s3",
    format="csv",
    connection_options={
        "paths": ['s3://path/with/many/csvs'],
        "recurse": True
    },
    transformation_ctx="S3bucket_node1",
)

def testRows(row):
    return bool(re.match("^[A-Za-z0-9]+$", row["serial_number"])) and bool(re.match("^2023-.*", row["ts"])) and not bool(re.match("connections", row["parameter"]))

# Script generated for node Filter
Filter_node1692709557667 = Filter.apply(
    frame=S3bucket_node1,
    f=testRows,
    transformation_ctx="Filter_node1692709557667",
)

# Script generated for node Change Schema
ChangeSchema_node1692709669764 = ApplyMapping.apply(
    frame=Filter_node1692709557667,
    mappings=[
        ("id", "string", "id", "string"),
        ("created_at", "string", "created_at", "timestamp"),
        ("parameter", "string", "parameter", "string"),
        ("serial_number", "string", "serial_number", "string"),
        ("ts", "string", "ts", "timestamp"),
        ("value", "string", "value", "string"),
        ("device_model_id", "string", "device_model_id", "string"),
    ],
    transformation_ctx="ChangeSchema_node1692709669764",
)

# Script generated for node Custom Transform
CustomTransform_node1692709691084 = MyTransform(
    glueContext,
    DynamicFrameCollection(
        {"ChangeSchema_node1692709669764": ChangeSchema_node1692709669764}, glueContext
    ),
)

# Script generated for node Select From Collection
SelectFromCollection_node1692709727564 = SelectFromCollection.apply(
    dfc=CustomTransform_node1692709691084,
    key=list(CustomTransform_node1692709691084.keys())[0],
    transformation_ctx="SelectFromCollection_node1692709727564",
)

# Script generated for node Amazon S3
AmazonS3_node1692709906215 = glueContext.getSink(
    path="s3://dest/",
    connection_type="s3",
    updateBehavior="UPDATE_IN_DATABASE",
    partitionKeys=["serial_number", "year", "month", "day"],
    compression="snappy",
    enableUpdateCatalog=True,
    transformation_ctx="AmazonS3_node1692709906215",
)
AmazonS3_node1692709906215.setCatalogInfo(
    catalogDatabase="dest-db", catalogTableName="dest"
)
AmazonS3_node1692709906215.setFormat("glueparquet")
AmazonS3_node1692709906215.writeFrame(SelectFromCollection_node1692709727564)
job.commit()

The structure of my s3 repository is:
deviceId (eg: 21134412AB)
|--timestamp (eg: 2023-05-10)
|--|--data.csv

There are many folders, each representing a deviceId, with many timestamps within. When firing the ETL job the process is ending with the following error:

: org.apache.spark.SparkException: Job aborted due to stage failure: Task 29745 in stage 0.0 failed 4 times, most recent failure: Lost task 29745.3 in stage 0.0 (TID 30301) (172.34.138.241 executor 8): com.amazonaws.services.glue.util.FatalException: Unable to parse file: data.csv

As you can see there is no info of which file is giving the error. The expected output could be at least the full path of "data.csv", something like "s3://bucket/21134412AB/2023-05-10/data.csv" so I would be able to fix the file.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant