Skip to content

corentin-regent/spark-constraints

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Purpose

When ingesting data into an ad-hoc database systems, using Spark as a processing engine, incoming data may not be parsed correctly, or it may not conform to the interface contract, thus introducing inconsistencies in the data lake.

This project is a proof-of-concept data cleaning solution, detecting such anomalies during the data ingestion process. Inconsistent lines are isolated and annotated, pending for human intervention.

Proposed solution

Ideally, for no incoming data to be lost, this data normalization process is a bijection from a raw schema to a clean schema. Therefore, an anomaly in this process results in the bijection being violated. Thus, if we can convert the clean data back to raw (i.e. cast all values back to String) and find back the input, it means that all incoming data could be parsed properly.

In practice however, back-conversion to String types, followed String comparisons, is tedious and error-prone, as e.g. a timestamp can be represented in many different ways. Therefore, the only reliable comparison method is by parsing again both strings, then checking whether the obtained values are the same. This implies leveraging standard parsing tools, notably for timestamps, which Spark already uses internally though, meaning that we will always land back on the same results, losing any interest in the process.

Thus, the approach adopted in this project is to consider invalid the values that are non-null in the input dataset, and that became null during the ingestion (or vice-versa, which is unlikely though).

The anomaly detection process is implemented as denial constraints over an intermediate dataframe. For each field in the table's schema, recursively (for nested complex types), we enforce the constraint: ∀ x ∈ R ¬(x.rawField.isNull != x.cleanField.isNull). Digging into the code, these constraints are defined in Ingestion, and enforced in ConstraintChecker.

Limitations

Misrepresented complex data types in the input file, e.g. trying to parse "a,b,c" as an Array<String>, causes the raw input value to be null, as well the clean casted value, thus not detecting the anomaly.

Also, for nested complex types, only the name of the top-level column is flagged as an anomaly, as opposed to the whole path leading to the mis-parsed primitive value. Improving this would not be too hard though.

One may also want to make sure that the schema of the input file matches that of the target table, before even starting to ingest it. In a production setup though, beware of schema evolution.

Finally, here I only implemented unary denial constraints, which are simple and cheap performance-wise. More complex and costly constraints, such as binary denial constraints for primary key violation detection, are not supported. Should be taken into consideration the performance of the implementation of such algorithms. One should also study the problem of minimal data repairing, i.e. removing the least possible number of elements needed to obtain a consistent database. One may read e.g. Fast Detection of Denial Constraint Violations and Consistent Query Answers in Inconsistent Databases in order to design a robust solution.

About

[PoC] Denial constraints applied to data repair for ingestions

Resources

License

Stars

Watchers

Forks

Languages