diff --git a/Camel/pom.xml b/Camel/pom.xml index aea1497..dc14ed2 100644 --- a/Camel/pom.xml +++ b/Camel/pom.xml @@ -27,11 +27,6 @@ org.springframework.boot spring-boot-starter-web - - org.apache.camel - camel-mongodb3-starter - 2.24.0 - org.apache.camel camel-spring-boot-starter diff --git a/Camel/src/main/java/com/akquinet/pipeline/streaming/Config/MongoConfig.java b/Camel/src/main/java/com/akquinet/pipeline/streaming/Config/MongoConfig.java deleted file mode 100644 index 12cbe1b..0000000 --- a/Camel/src/main/java/com/akquinet/pipeline/streaming/Config/MongoConfig.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.akquinet.pipeline.streaming.Config; - -import com.mongodb.MongoClient; -import com.mongodb.MongoClientOptions; -import com.mongodb.ServerAddress; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -@Configuration -public class MongoConfig { - @Value("${mongodb.host}") - private String mongodbHost; - - /** - * My MongoConfig. - * - * @see Bean - * Initialisastion of Mongo DB connection bean, host found in application.properties file - */ - @Bean("mongoBean") - public MongoClient getMongoClient() { - return new MongoClient(new ServerAddress(mongodbHost), new MongoClientOptions.Builder().build()); - } -} diff --git a/Camel/src/main/java/com/akquinet/pipeline/streaming/Route/MessageConsumer.java b/Camel/src/main/java/com/akquinet/pipeline/streaming/Route/MessageConsumer.java deleted file mode 100644 index 04a10bb..0000000 --- a/Camel/src/main/java/com/akquinet/pipeline/streaming/Route/MessageConsumer.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.akquinet.pipeline.streaming.Route; - -import com.akquinet.pipeline.streaming.Model.TaxiRide; -import org.apache.camel.LoggingLevel; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.jackson.JacksonDataFormat; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.stereotype.Component; - -@Component -public class MessageConsumer extends RouteBuilder { - - @Value("${camel.message-consumer.kafka}") - private String inputKafkaPath; - - @Value("${camel.message-consumer.error}") - private String outputErrorPath; - - @Value("${camel.message-consumer.datalake}") - private String outputDataLakePath; - - @Override - public final void configure() throws Exception { - - errorHandler(deadLetterChannel(outputErrorPath) - .useOriginalMessage() - .retryAttemptedLogLevel(LoggingLevel.WARN).log("An Error has occurred while processing " + - "${headers.CamelFileName} , please check the " + - "ErrorLog Folder to view the offending file")); - - from(inputKafkaPath).routeId("Write to Data Lake") - .streamCaching() - .unmarshal(new JacksonDataFormat(TaxiRide.class)) - .to(outputDataLakePath); - } -} diff --git a/Camel/src/main/resources/application.properties b/Camel/src/main/resources/application.properties index 66ee30d..fe2c958 100644 --- a/Camel/src/main/resources/application.properties +++ b/Camel/src/main/resources/application.properties @@ -4,10 +4,6 @@ camel.message-producer.from = file:../Data/?noop=true&maxMessagesPerPoll=1&delay camel.message-producer.error = file:../Error/Initial camel.message-producer.kafka = kafka:rawTaxi?brokers=localhost:9092 -camel.message-consumer.kafka = kafka:rawTaxi?brokers=localhost:9092 -camel.message-consumer.error = file:../Error/DataLake -camel.message-consumer.datalake = mongodb3://mongoBean?database=taxi&collection=nyc&operation=insert&createCollection=true - camel.elastic.kafka = kafka:cookedTaxi?brokers=localhost:9092 camel.elastic.error = file:../Error/Elastic camel.elastic.search = elasticsearch-rest://elasticsearch_aaron?operation=INDEX&indexName=taxi_nyc&indexType=trip @@ -18,7 +14,4 @@ spring.kafka.port=9092 camel.component.elasticsearch-rest.client=https://elasticsearch_aaron:9200 camel.component.elasticsearch-rest.host-addresses=localhost:9200 -mongodb.host = 127.0.0.1 -mongodb.port = 27017 - logging.level.org.apache.camel= INFO \ No newline at end of file diff --git a/README.md b/README.md index 94814e5..73c5a6c 100644 --- a/README.md +++ b/README.md @@ -77,12 +77,6 @@ Kibana (https://www.elastic.co/products/kibana) is an open source data visualiza This can be accessed when the app is running via entering: `https://localhost:5601` into your browser. -### MongoDB - -MongoDB (https://www.mongodb.com/) is a cross-platform document-oriented database program. Classified as a NoSQL -database program, MongoDB uses JSON-like documents with schemata. MongoDB is developed by MongoDB Inc. and licensed -under the Server Side Public License (SSPL). - ### Docker Docker (https://www.docker.com/) is a collection of interoperating software-as-a-service and platform-as-a-service offerings that employ operating-system-level virtualization to cultivate development and delivery of software inside standardized software packages called containers. diff --git a/docker-compose.yml b/docker-compose.yml index 8e45b75..74743a1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -83,25 +83,11 @@ services: camel.message-producer.from: file:/Data/?noop=true&maxMessagesPerPoll=1&delay=5000 camel.message-producer.kafka: kafka:rawTaxi?brokers=kafka-gateway:9092 camel.message-consumer.kafka: kafka:rawTaxi?brokers=kafka-gateway:9092 - mongodb.host: mongodb-gateway camel.elastic.kafka: kafka:cookedTaxi?brokers=kafka-gateway:9092 depends_on: - kafka-gateway networks: - streaming - mongodb-gateway: - container_name: mongo - image: mongo:3.6 - volumes: - - ./data/db:/var/micro-data/mongodb/data/db - environment: - - MONGODB_INITDB_DATABASE=taxi - - MONGODB_INITDB_COLLECTION=nyc - ports: - - 27017:27017 - networks: - - streaming - networks: streaming: