Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

Commit

Permalink
DPAAS-301 send structured events to kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
sodre90 committed Mar 13, 2018
1 parent 871f064 commit 52d61e7
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 17 deletions.
2 changes: 2 additions & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ dependencies {
compile group: 'io.springfox', name: 'springfox-core', version: '2.5.0'
compile group: 'io.springfox', name: 'springfox-swagger-ui', version: '2.5.0'


compile group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.1.2.RELEASE'
compile group: 'io.swagger', name: 'swagger-jersey2-jaxrs', version: swaggerVersion
compile group: 'com.google.http-client', name: 'google-http-client-jackson2', version: '1.22.0'
compile (group: 'com.google.oauth-client', name: 'google-oauth-client-jetty', version: '1.22.0') { exclude module: 'servlet-api' }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.sequenceiq.cloudbreak.conf;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaSenderConfig {

@Value("${cb.kafka.bootstrap.servers:}")
private String bootstrapServers;

@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);

return props;
}

public boolean isKafkaConfigured() {
return StringUtils.isNotEmpty(bootstrapServers);
}

@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,33 @@
package com.sequenceiq.cloudbreak.structuredevent;

import javax.inject.Inject;

import com.sequenceiq.cloudbreak.conf.KafkaSenderConfig;
import com.sequenceiq.cloudbreak.structuredevent.event.StructuredEvent;
import com.sequenceiq.cloudbreak.structuredevent.reactor.KafkaStructuredEventAsyncNotifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import com.sequenceiq.cloudbreak.structuredevent.event.StructuredEvent;
import javax.inject.Inject;

@Service
public class DefaultStructuredEventClient implements StructuredEventClient {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultStructuredEventClient.class);

@Inject
private KafkaSenderConfig kafkaSenderConfig;

@Inject
private StructuredEventService structuredEventService;

@Inject
private KafkaStructuredEventAsyncNotifier kafkaAsyncSender;

@Override
public void sendStructuredEvent(StructuredEvent structuredEvent) {
structuredEventService.storeStructuredEvent(structuredEvent);
if (kafkaSenderConfig.isKafkaConfigured()) {
kafkaAsyncSender.sendEventLogMessage(structuredEvent);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.sequenceiq.cloudbreak.structuredevent.reactor;

import com.sequenceiq.cloudbreak.cloud.reactor.ErrorHandlerAwareReactorEventFactory;
import com.sequenceiq.cloudbreak.structuredevent.event.StructuredEvent;
import org.springframework.stereotype.Component;
import reactor.bus.Event;
import reactor.bus.EventBus;

import javax.inject.Inject;

@Component
public class KafkaStructuredEventAsyncNotifier {

public static final String KAFKA_EVENT_LOG_MESSAGE = "KAFKA_EVENT_LOG_MESSAGE";

@Inject
private ErrorHandlerAwareReactorEventFactory eventFactory;

@Inject
private EventBus eventBus;

public void sendEventLogMessage(StructuredEvent structuredEvent) {
sendAsyncEvent(KAFKA_EVENT_LOG_MESSAGE, eventFactory.createEvent(structuredEvent));
}

private void sendAsyncEvent(String selector, Event<?> event) {
eventBus.notify(selector, event);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.sequenceiq.cloudbreak.structuredevent.reactor;

import java.util.concurrent.ExecutionException;

import javax.inject.Inject;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sequenceiq.cloudbreak.reactor.handler.ReactorEventHandler;
import com.sequenceiq.cloudbreak.structuredevent.event.StructuredEvent;

import reactor.bus.Event;

@Component
public class KafkaStructuredEventHandler<T extends StructuredEvent> implements ReactorEventHandler<T> {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStructuredEventHandler.class);

@Value("${cb.kafka.structured.events.topic:StructuredEvents}")
private String structuredEventsTopic;

private ObjectMapper objectMapper = new ObjectMapper();

@Inject
private KafkaTemplate<String, String> kafkaTemplate;

@Override
public String selector() {
return KafkaStructuredEventAsyncNotifier.KAFKA_EVENT_LOG_MESSAGE;
}

@Override
public void accept(Event<T> structuredEvent) {
try {
ListenableFuture<SendResult<String, String>> sendResultFuture =
kafkaTemplate.send(structuredEventsTopic, objectMapper.writeValueAsString(structuredEvent));
SendResult<String, String> sendResult = sendResultFuture.get();
LOGGER.debug("Structured event sent to kafka: {}", sendResult.getProducerRecord());
} catch (JsonProcessingException e) {
LOGGER.error("Structured event json processing error", e);
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("Error happened in message sending to kafka", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public StructuredFlowErrorEvent(OperationDetails operation, FlowDetails flow, St

public StructuredFlowErrorEvent(OperationDetails operation, FlowDetails flow, StackDetails stack,
ClusterDetails cluster, BlueprintDetails blueprint, String exception) {
super(operation, flow, stack, cluster, blueprint);
super(StructuredFlowErrorEvent.class.getSimpleName(), operation, flow, stack, cluster, blueprint);
this.exception = exception;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ public StructuredFlowEvent(OperationDetails operation, FlowDetails flow, StackDe

public StructuredFlowEvent(OperationDetails operation, FlowDetails flow, StackDetails stack, ClusterDetails cluster,
BlueprintDetails blueprint) {
super(StructuredFlowEvent.class.getSimpleName(), operation);
this(StructuredFlowEvent.class.getSimpleName(), operation, flow, stack, cluster, blueprint);
}

public StructuredFlowEvent(String type, OperationDetails operation, FlowDetails flow, StackDetails stack,
ClusterDetails cluster, BlueprintDetails blueprint) {
super(type, operation);
this.flow = flow;
this.stack = stack;
this.cluster = cluster;
Expand All @@ -32,30 +37,30 @@ public FlowDetails getFlow() {
return flow;
}

public StackDetails getStack() {
return stack;
}

public ClusterDetails getCluster() {
return cluster;
}

public BlueprintDetails getBlueprint() {
return blueprint;
}

public void setFlow(FlowDetails flow) {
this.flow = flow;
}

public StackDetails getStack() {
return stack;
}

public void setStack(StackDetails stack) {
this.stack = stack;
}

public ClusterDetails getCluster() {
return cluster;
}

public void setCluster(ClusterDetails cluster) {
this.cluster = cluster;
}

public BlueprintDetails getBlueprint() {
return blueprint;
}

public void setBlueprint(BlueprintDetails blueprint) {
this.blueprint = blueprint;
}
Expand Down

0 comments on commit 52d61e7

Please sign in to comment.