A generic and pluggable data ingestion utility based on Apache Flink.
- Data exchanges are required to produce data in a standard format
- Downstream datasources are diverse in terms of protocol (http, grpc, etc) and data serialization formats (json, xml, protobuf, etc)
- Most of the downstream datasources can be modeled into fixed categories in the context of ingesting and parsing their data
- Maintaining different scripts to ingest data is both inefficient and hard to maintain
The above necessitates the need for a tool which:
- Ingests data from diverse sources, protocols and serialization formats
- Ensures data deduplication and late message rejections to give out a stream of novel data
- Transforms the data format into the formats required by the exchange
- Publishes the transformed data into the exchange with flexibility in choosing the sink
- Supporta a fully configuration file based specification and operation of the entire pipeline
The figure below shows an overview of the framework. Note: Some features are work in progress.
- Based on Apache Flink
- Configuration file based specification of the pipeline
- Pluggable for extending capabilities
- JSON Path based parsing and key extraction for watermarking
- Jolt based Json-Json transformation
- Quartz based job scheduling
- Vert x based Api server with config based pipeline JAR generation, user and adaptor job management and monitoring
- Docker development and deployment
The figure below shows an overview of the implementation of the framework and components involved.
The Generic layer is the standard Flink based pipeline which all adaptors are comprised off. The generic layer assumes the implementation of the interfaces which constitute the activity of the particular block. Further, the framework provides standard implementations of the interfaces for specification file based code-generation purposes. It maybe possible for a developer to pass their own implementation of the interfaces to obtain more control over the components of the pipeline in the Process stage (WIP).
Using the framework as an ordinary consumer involves writing a specification file for the entire pipeline (source -> transform -> sink) and using a hosted instances api to publish the spec and perform the pipeline operations. There are two fundamental modes of operation -
- Bounded mode - The pipeline runs once based on the specified schedule
- Unbounded mode - The pipeline is always running
A pipeline maybe specified in Json format and submitted to the framework server to auto-generate JAR files and run them. The following is the spec outline to be followed in making a configuration file.
{
"name": "<unique name for this adaptor",
"schedulePattern": "<cron like schedule pattern >",
"failureRecoverySpec": {
},
"inputSpec": {
},
"parseSpec": {
},
"deduplicationSpec": {
},
"transformSpec": {
},
"publishSpec": {
}
}
Detailed explanation of the individual specs are given below.
- Meta spec
- Failure Recovery Spec
- Input Spec
- Parse Spec
- Deduplication Spec
- Transformation Spec
- Publish Spec
The spec can then be submitted to the adaptor server which will validate it and generate a JAR for the entire pipeline.
The framework provides Apis to manage the lifecycle of the adaptor pipeline and to monitor it. Assuming the administrator of the framework has already provided the user with authentication credentials, the Api of relevance to get started with are
-
newAdaptor: Submit the above pipeline spec and create a new adaptor
POST /adaptor Header: {"username": "uname", "password": "password"} Body: Spec File Content-Type: application/json Response: 202 (Accepted and generating jar), 401 (Unauthorized), 400 (Bad spec file)
-
getAdaptors: List all the adaptors and their running state (including recently submitted ones) and their ids
GET /adaptor Header: {"username": "uname", "password": "password"} Content-Type: application/json Response: 200 (List of adaptors and their running state)
-
startAdaptor: Start and adaptor given its id. Starts even scheduled adaptors.
POST /adaptor/{id}/start Header: {"username": "uname", "password": "password"} Content-Type: application/json Response: 200 (Success), 404 (No such adaptor), 401 (Unauthorized)
-
stopAdaptor: Stop and adaptor given its id. Stops even scheduled adaptors.
POST /adaptor/{id}/stop Header: {"username": "uname", "password": "password"} Content-Type: application/json Response: 200 (Success), 404 (No such adaptor), 401 (Unauthorized)
-
deleteAdaptors: Delete an adaptor given its id
DELETE /adaptor/{id} Header: {"username": "uname", "password": "password"} Content-Type: application/json Response: 200 (Deleted), 404 (No such adaptor), 401 (Unauthorized)
On submitting the adaptor pipeline spec file, the server will generate a JAR with all the dependencies and run the pipeline according to the configurations specified.
The entire API specification can be found here.
- Build all required images
./setup/build.sh
- Modify
./configs/config-example.json
and make the server config. Modify./configs/quartz.properties
and make the quartz config. - Modify
./setup/*/docker-compose
to take up the correct config files. - Bring up the local environment
./setup/start_local_dev_env.sh
This brings up flink, rabbitmq, the apiserver and a mockserver. - Use the apis to submit the above example config
- Spec validation
- Local playground for local spec testing
- Support for diverse sources (AMQP, MQTT, GRPC) and sinks (Elasticsearch, Redis, Kafka)