The system consists of a dispatcher , one or more consumer heads running on separate machines and a manager command line interface to send messages to interactively start ingestion and processing pipeline for a source.
The dispatcher is preconfigured with one or more pipeline definitions to harvest/ingest documents for a source, process/enhance/transform them for indexing. The dispatcher listens to the op-log capped collection of MongoDB for database update events and based on the preconfigured pipeline definitions creates and sends messages to their corresponding message queues. The messages are consumed by consumers coordinated by one or more consumer heads. Each step in a pipeline is handled by a specific consumer. When a consumer finishes the processing of a document it sets the document status to a preconfigured value indicative of the next step in the pipeline and saves the document back to the MongoDB. This creates an op-log activity, which triggers the dispatcher. If the pipeline definition has a route from the current document status to a message queue, indicative of further pipeline processing another message is generated by the dispatcher and put to the message queue indicated by the route. This triggers the corresponding consumer on one of the consumer heads. The persistent message queues deliver each message only once. Thus, if more than one consumer head is listening on the same message queue, only one of them will process the document identified by the message as desired.
Consumer head coordinates a set of consumers. A consumer listens to a preconfigured message queue and
applies a predefined operation to the document indicated by the received message and on success sets the
processing status of the document to a preconfigured status.
A class diagram showing the most important classes and interfaces for consumer head is shown below.
The main class of the consumer head is org.neuinfo.foundry.consumers.coordinator.ConsumerCoordinator
Each consumer implements org.neuinfo.foundry.consumers.coordinator.IConsumer
interface.
Ingestors are specific type of consumers. Each ingestor implements the interface
org.neuinfo.foundry.consumers.plugin.Ingestor
. At startup ConsumerCoordinator
reads
a consumer configuration file (consumers-cfg.xml
) and using the ConsumerFactory
singleton
creates the corresponding consumers. There is also a generic JavaPluginConsumer
that abstracts away common functionality of consumers including message processing
and uses a plugin mechanism for the specific functionality. Most of the current consumers are implemented
as plugins to this generic consumer. A consumer plugin implements
org.neuinfo.foundry.consumers.plugin.IPlugin
interface.
Each consumer runs in its own thread (ConsumerWorker
). Consumers must be stateless.
All ingestors are implemented as plugins to the
org.neuinfo.foundry.consumers.jms.consumers.GenericIngestionConsumer
class.
The currently available ingestor plugins are located in the
org.neuinfo.foundry.consumers.jms.consumers.ingestors
package.
The Ingestor
interface has lifecycle methods to initialize parameters received in the message body
to start the ingestion process. This include the harvest url and ingestion type specific parameters
defined in the harvest description JSON file stored in the MongoDB under sources
collection.
The startup()
method is generally used to get the data to an intermediate storage.
An ingestor plugin acts like an iterator where hasNext()
method returns true if there is still
more records to process and prepPayload()
method returns a JSON representation of
the original record harvested. The GenericIngestionConsumer
is responsible for duplicate checking,
document wrapper generation.
All enhancers need to implement the org.neuinfo.foundry.consumers.plugin.IPlugin
interface which is located
in the consumer-plugin-api
subproject of the Foundry-ES. To develop a new enhancer, you need to include
the Foundry-ES common
and consumer-plugin-api
libraries to your Maven pom.xml
after you have built it
via mvn -Pdev clean install
in addition to all the dependencies from common/pom.xml
of the Foundry-ES.
<dependency>
<groupId>org.neuinfo</groupId>
<artifactId>foundry-es-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.neuinfo</groupId>
<artifactId>foundry-es-consumer-plugin-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
The org.neuinfo.foundry.consumers.plugin.IPlugin
interface is shown below;
public interface IPlugin {
public void setDocumentIngestionService(DocumentIngestionService dis);
public void setGridFSService(GridFSService gridFSService);
public void initialize(Map<String, String> options) throws Exception;
public Result handle(DBObject docWrapper);
public String getPluginName();
}
The enhancement is done in the handle(DBObject docWrapper)
method which takes a Mongo DBObject
object corresponding to the currently processed document wrapper from the Mongo database.
If the original document is small (less than 1MB) it will be stored inline in the document wrapper
otherwise it will be stored in the Mongo GridFS storage with a pointer in the document wrapper.
Below is a code fragment to get the original document converted to JSON from the docWrapper
regardless
of where the original document is stored.
BasicDBObject data = (BasicDBObject) docWrapper.get("Data");
String originalFileIDstr = (String) docWrapper.get("originalFileId");
JSONObject json;
if (!Utils.isEmpty(originalFileIDstr)) {
// large file
Assertion.assertNotNull(this.gridFSService);
json = gridFSService.findJSONFile(new ObjectId(originalFileIDstr));
} else {
DBObject originalDoc = (DBObject) docWrapper.get("OriginalDoc");
json = JSONUtils.toJSON((BasicDBObject) originalDoc, false);
}
The GridFS service org.neuinfo.foundry.common.ingestion.GridFSService
and mongo document service
org.neuinfo.foundry.common.ingestion.DocumentIngestionService
are injected to the plugin by the
ES-Foundry using the setters setGridFSService(GridFSService gridFSService)
and
setDocumentIngestionService(DocumentIngestionService dis)
methods of the IPlugin interface.
If you want to work on and update/add to the transformed data prepared by the downstream enhancers
in the pipeline which is stored under Data.transformedRec
branch of the document wrapper, you can
use the following code snippet to retrieve the transformed document;
BasicDBObject data = (BasicDBObject) docWrapper.get("Data");
BasicDBObject trDBO = (BasicDBObject) data.get("transformedRec");
JSONObject json = JSONUtils.toJSON(trDBO, false);
After you have done with enhancement, you can put back the enhanced JSON object and return the result using the following Java code snippet
BasicDBObject data = (BasicDBObject) docWrapper.get("Data");
BasicDBObject trDBO = (BasicDBObject) data.get("transformedRec");
JSONObject json = JSONUtils.toJSON(trDBO, false);
// enhancement of json
data.put("transformedRec", JSONUtils.encode(json, true));
return new Result(docWrapper, Result.Status.OK_WITH_CHANGE);
If there is an error occurred during the enhancement you need to return an error result as shown below;
Result r = new Result(docWrapper, Result.Status.ERROR);
r.setErrMessage(errorMessage);
return r;