Skip to content

Commit

Permalink
Merge pull request #1 from swisspush/going_async
Browse files Browse the repository at this point in the history
Execute log transformation in "executeBlocking" code blocks
  • Loading branch information
floriankammermann authored Dec 6, 2016
2 parents fa14206 + d2f0fd4 commit 7bf87cd
Show file tree
Hide file tree
Showing 12 changed files with 425 additions and 142 deletions.
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,22 @@ The [DoNothingTransformStrategy](src/main/java/org/swisspush/logtransformer/stra
```java
public class DoNothingTransformStrategy implements TransformStrategy {

private Vertx vertx;

public DoNothingTransformStrategy(Vertx vertx) {
this.vertx = vertx;
}

@Override
public List<String> transformLog(String logToTransform) {
return Collections.singletonList(logToTransform);
public void transformLog(String logToTransform, Handler<AsyncResult<List<String>>> resultHandler) {
vertx.executeBlocking(future -> future.complete(Collections.singletonList(logToTransform)), resultHandler);
}
}
```

By extending the [AbstractTransformStrategy](src/main/java/org/swisspush/logtransformer/strategy/AbstractTransformStrategy.java) class, basic functionality like error handling and JSON parsing are available.

When writing a new custom log transform strategy also extend the _**findTransformStrategy(MultiMap headers)**_ method of [TransformStrategyFinder](src/main/java/org/swisspush/logtransformer/strategy/TransformStrategyFinder.java) class to match the provided strategy name (_strategyHeader_) to the strategy implementation.
When writing a new custom log transform strategy also extend the _**findTransformStrategy(MultiMap headers)**_ method of [DefaultTransformStrategyFinder](src/main/java/org/swisspush/logtransformer/strategy/DefaultTransformStrategyFinder.java) class (or write a new one) to match the provided strategy name (_strategyHeader_) to the strategy implementation.

Example:
```java
Expand Down
39 changes: 29 additions & 10 deletions src/main/java/org/swisspush/logtransformer/LogTransformer.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
import org.swisspush.logtransformer.logger.DefaultLogTransformLogger;
import org.swisspush.logtransformer.logger.LogTransformLogger;
import org.swisspush.logtransformer.strategy.TransformStrategy;
import org.swisspush.logtransformer.strategy.DefaultTransformStrategyFinder;
import org.swisspush.logtransformer.strategy.TransformStrategyFinder;
import org.swisspush.logtransformer.util.Configuration;

import java.util.List;

/**
* @author https://github.com/mcweba [Marc-Andre Weber]
*/
Expand All @@ -22,14 +21,22 @@ public class LogTransformer extends AbstractVerticle {
private final Logger log = LoggerFactory.getLogger(LogTransformer.class);
private LogTransformLogger logTransformLogger;
private TransformStrategyFinder transformStrategyFinder;
private boolean useDefaultTransformLogger = false;

public LogTransformer(){
useDefaultTransformLogger = true;
this(null, null);
}

public LogTransformer(LogTransformLogger logTransformLogger) {
this(logTransformLogger, null);
}

public LogTransformer(TransformStrategyFinder transformStrategyFinder) {
this(null, transformStrategyFinder);
}

public LogTransformer(LogTransformLogger logTransformLogger, TransformStrategyFinder transformStrategyFinder) {
this.logTransformLogger = logTransformLogger;
this.transformStrategyFinder = transformStrategyFinder;
}

@Override
Expand All @@ -40,18 +47,30 @@ public void start(Future<Void> future) {
Configuration modConfig = Configuration.fromJsonObject(config());
log.info("Starting LogTransformer module with configuration: " + modConfig);

if(useDefaultTransformLogger){
this.logTransformLogger = new DefaultLogTransformLogger(modConfig.getLoggerName());
if(this.logTransformLogger == null){
this.logTransformLogger = new DefaultLogTransformLogger(vertx, modConfig.getLoggerName());
}

transformStrategyFinder = new TransformStrategyFinder(modConfig.getStrategyHeader());
if(this.transformStrategyFinder == null) {
this.transformStrategyFinder = new DefaultTransformStrategyFinder(vertx, modConfig.getStrategyHeader());
}

eb.consumer(modConfig.getAddress(), event -> {
TransformStrategy strategy = transformStrategyFinder.findTransformStrategy(event.headers());
log.info("About to transform log with strategy '" + strategy.getClass().getSimpleName() + "'");
List<String> transformedLogEntries = strategy.transformLog(event.body().toString());
logTransformLogger.doLog(transformedLogEntries);
event.reply(new JsonObject().put("status", "ok"));
strategy.transformLog(event.body().toString(), transformFuture -> {
if(transformFuture.succeeded()){
logTransformLogger.doLog(transformFuture.result(), logFuture -> {
if(logFuture.succeeded()){
event.reply(new JsonObject().put("status", "ok"));
} else {
event.fail(0, logFuture.cause().getMessage());
}
});
} else {
event.fail(0, transformFuture.cause().getMessage());
}
});
});

future.complete();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.swisspush.logtransformer.logger;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;

Expand All @@ -11,15 +14,22 @@
public class DefaultLogTransformLogger implements LogTransformLogger {

private final Logger log;
private Vertx vertx;

public DefaultLogTransformLogger(String loggerName){
public DefaultLogTransformLogger(Vertx vertx, String loggerName) {
this.vertx = vertx;
this.log = LoggerFactory.getLogger(loggerName);
}

@Override
public void doLog(List<String> logEntries) {
for (String logEntry : logEntries) {
log.info(logEntry);
}
public void doLog(List<String> logEntries, Handler<AsyncResult<Void>> resultHandler) {
vertx.executeBlocking(future -> {
if (logEntries != null) {
for (String logEntry : logEntries) {
log.info(logEntry);
}
}
future.complete();
}, resultHandler);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.swisspush.logtransformer.logger;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;

import java.util.List;

/**
Expand All @@ -10,9 +13,9 @@
public interface LogTransformLogger {

/**
* Logs the provided log entries line-by-line
* Logs the provided log entries asynchronously line-by-line
*
* @param logEntries a list of log entries to log line-by-line
*/
void doLog(List<String> logEntries);
void doLog(List<String> logEntries, Handler<AsyncResult<Void>> resultHandler);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package org.swisspush.logtransformer.strategy;

import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;

/**
* Default implementation of the {@link TransformStrategyFinder}
*
* @author https://github.com/mcweba [Marc-Andre Weber]
*/
public class DefaultTransformStrategyFinder implements TransformStrategyFinder {

private final String strategyHeader;
private final Logger log = LoggerFactory.getLogger(DefaultTransformStrategyFinder.class);

private Vertx vertx;

private DoNothingTransformStrategy doNothingTransformStrategy;
private SplitStorageExpandLogStrategy splitStorageExpandLogStrategy;

public DefaultTransformStrategyFinder(Vertx vertx, String strategyHeader) {
this.vertx = vertx;
this.strategyHeader = strategyHeader;
}

/**
* Returns the corresponding {@link TransformStrategy} implementation based on the provided header strategy property.
*
* @param headers the headers containing the name of the strategy
* @return returns the corresponding {@link TransformStrategy} implementation or {@link DoNothingTransformStrategy} when no matching strategy was found
*/
@Override
public TransformStrategy findTransformStrategy(MultiMap headers){
String strategy = headers.get(strategyHeader);

if(isEmpty(strategy)){
return getDoNothingTransformStrategy();
} else if("SplitStorageExpandLogStrategy".equalsIgnoreCase(strategy)){
return getSplitStorageExpandLogStrategy();
}

log.warn("No log transform strategy found for value '" + strategy + "'. Using DoNothingTransformStrategy instead");
return getDoNothingTransformStrategy();
}

private DoNothingTransformStrategy getDoNothingTransformStrategy(){
if(doNothingTransformStrategy == null){
doNothingTransformStrategy = new DoNothingTransformStrategy(vertx);
}
return doNothingTransformStrategy;
}

private SplitStorageExpandLogStrategy getSplitStorageExpandLogStrategy(){
if(splitStorageExpandLogStrategy == null){
splitStorageExpandLogStrategy = new SplitStorageExpandLogStrategy(vertx);
}
return splitStorageExpandLogStrategy;
}

private boolean isEmpty(String stringToTest){
if(stringToTest == null){
return true;
}
String trimmed = stringToTest.trim();
return trimmed.length() == 0;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package org.swisspush.logtransformer.strategy;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;

import java.util.Collections;
import java.util.List;

Expand All @@ -10,8 +14,15 @@
*/
public class DoNothingTransformStrategy implements TransformStrategy {

private Vertx vertx;

public DoNothingTransformStrategy(Vertx vertx) {
this.vertx = vertx;
}

@Override
public List<String> transformLog(String logToTransform) {
return Collections.singletonList(logToTransform);
public void transformLog(String logToTransform, Handler<AsyncResult<List<String>>> resultHandler) {
vertx.executeBlocking(future -> future.complete(Collections.singletonList(logToTransform)), resultHandler);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.swisspush.logtransformer.strategy;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;

import java.util.*;
Expand All @@ -19,23 +22,31 @@ public class SplitStorageExpandLogStrategy extends AbstractTransformStrategy {
public static final String PROP_BODY = "body";
public static final String PARAM_STORAGE_EXPAND = "?storageExpand=true";

private Vertx vertx;

public SplitStorageExpandLogStrategy(Vertx vertx) {
this.vertx = vertx;
}

@Override
public List<String> transformLog(String logToTransform) {
try {
List<String> logEntries = new ArrayList<>();
JsonObject storageExpandLog = verifyLogInput(logToTransform);
Map<String, JsonObject> subRequestsMap = extractSubLogEntries(storageExpandLog);
JsonObject skeleton = buildLogEntryJsonObjectSkeleton(storageExpandLog);
for (Map.Entry<String, JsonObject> subRequest : subRequestsMap.entrySet()) {
JsonObject subRequestObj = skeleton.copy();
subRequestObj.put(PROP_URL, subRequestObj.getString(PROP_URL).replace(PARAM_STORAGE_EXPAND, subRequest.getKey()));
subRequestObj.getJsonObject(PROP_RESPONSE).put(PROP_BODY, subRequest.getValue());
logEntries.add(subRequestObj.encode());
public void transformLog(String logToTransform, Handler<AsyncResult<List<String>>> resultHandler) {
vertx.executeBlocking(future -> {
try {
List<String> logEntries = new ArrayList<>();
JsonObject storageExpandLog = verifyLogInput(logToTransform);
Map<String, JsonObject> subRequestsMap = extractSubLogEntries(storageExpandLog);
JsonObject skeleton = buildLogEntryJsonObjectSkeleton(storageExpandLog);
for (Map.Entry<String, JsonObject> subRequest : subRequestsMap.entrySet()) {
JsonObject subRequestObj = skeleton.copy();
subRequestObj.put(PROP_URL, subRequestObj.getString(PROP_URL).replace(PARAM_STORAGE_EXPAND, subRequest.getKey()));
subRequestObj.getJsonObject(PROP_RESPONSE).put(PROP_BODY, subRequest.getValue());
logEntries.add(subRequestObj.encode());
}
future.complete(logEntries);
} catch (LogContentException ex) {
future.complete(doNothingInCaseOfError(logToTransform, ex.getMessage()));
}
return logEntries;
} catch (LogContentException ex) {
return doNothingInCaseOfError(logToTransform, ex.getMessage());
}
}, resultHandler);
}

private JsonObject verifyLogInput(String logToTransform) throws LogContentException {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.swisspush.logtransformer.strategy;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;

import java.util.List;

/**
Expand All @@ -10,10 +13,11 @@
public interface TransformStrategy {

/**
* Transforms the provided logToTransform and returns a list of log entries.
* Transforms the provided logToTransform and returns a list of log entries asynchronous.
*
* @param logToTransform the log to transform
* @return a list of log entries
* @param resultHandler the handler containing the transformed log
*
*/
List<String> transformLog(String logToTransform);
void transformLog(String logToTransform, Handler<AsyncResult<List<String>>> resultHandler);
}
Loading

0 comments on commit 7bf87cd

Please sign in to comment.