Skip to content

Commit

Permalink
Feature/add common features (#63)
Browse files Browse the repository at this point in the history
* feat:  Add structured logging

* feat: Add redux

* feat: Add flow

* feat: Add error handling

* feat: Add misceallenous

* feat: Revise tests

* chore: Update archery commons

---------

Co-authored-by: Romuald Rousseau <[email protected]>
  • Loading branch information
RomualdRousseau and Romuald Rousseau authored Nov 29, 2024
1 parent 88e8433 commit fc24cfc
Show file tree
Hide file tree
Showing 43 changed files with 2,188 additions and 141 deletions.
12 changes: 12 additions & 0 deletions archery-commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@
<artifactId>jackson-dataformat-yaml</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- Zip4j Framework -->
<dependency>
<groupId>net.lingala.zip4j</groupId>
<artifactId>zip4j</artifactId>
<version>${zip4j.version}</version>
</dependency>
<!-- Logstash Framework -->
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>${logstash-logback.version}</version>
</dependency>
<!-- Test Framework -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.github.romualdrousseau.archery.commons.behavior;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

public class Scenario<R> {

@FunctionalInterface
public interface Function<U, V> {
V apply(final Scenario<U> scenario, U u) throws Exception;
}

@FunctionalInterface
public interface Supplier<V> {
V get(final Scenario<?> scenario) throws Exception;
}

@FunctionalInterface
public interface Consumer<U> {
void accept(final Scenario<U> scenario, U u) throws Exception;
}

public static Scenario<Void> noParameters() throws Exception {
return new Scenario<Void>();
}

public static Scenario<Void> withParameters(final Map<String, Object> parameters) throws Exception {
return new Scenario<Void>(parameters);
}

private final Map<String, Object> context;

private final R value;

private Scenario() {
this(Map.of(), null);
}

private Scenario(final Map<String, Object> context) {
this(context, null);
}

private Scenario(final Map<String, Object> context, final R value) {
this.context = new HashMap<>(context);
this.value = value;
}

@SuppressWarnings("unchecked")
public <T> Optional<T> get(final String key) {
return Optional.ofNullable((T) this.context.get(key));
}

public <T> Scenario<R> put(final String key, final T value) {
this.context.put(key, value);
return this;
}

public <T> Scenario<T> given(final Supplier<T> step) throws Exception {
return new Scenario<T>(this.context, step.get(this));
}

public <T> Scenario<T> when(final Function<R, T> step) throws Exception {
return new Scenario<T>(this.context, step.apply(this, this.value));
}

public Scenario<R> then(final Consumer<R> step) throws Exception {
step.accept(this, this.value);
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.github.romualdrousseau.archery.commons.flow;

import java.util.Objects;

import com.github.romualdrousseau.archery.commons.logging.LoggerWithLabels;

public abstract class BaseSink<C, R> implements Subscriber<C, R>{

private long countOfProcessedRows = 0;
private LoggerWithLabels logger;

@Override
public long getCountOfProcessedRows() {
return this.countOfProcessedRows;
}

@Override
public LoggerWithLabels getLogger() {
return Objects.requireNonNull(this.logger);
}

@Override
public Subscriber<C, R> setLogger(final LoggerWithLabels logger) {
this.logger = logger;
return this;
}

@Override
public void initialize(final C context) {
this.countOfProcessedRows = 0;
}

@Override
public void accept(final C context, final R record) {
this.countOfProcessedRows++;
}

@Override
public void finalize(final C context) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.github.romualdrousseau.archery.commons.flow;

import java.util.Objects;
import java.util.Optional;

import com.github.romualdrousseau.archery.commons.logging.LoggerWithLabels;

public abstract class BaseSource<C, R> implements Publisher<C, R>, Runnable {

private final Broker<C, R> broker = new Broker<>();
private long countOfProcessedRows = 0;
private LoggerWithLabels logger;

@Override
public BaseSource<C, R> downstream(final Subscriber<C, R> subscriber) {
this.broker.addSuscriber(subscriber);
return this;
}

@Override
public long getCountOfProcessedRows() {
return this.countOfProcessedRows;
}

@Override
public LoggerWithLabels getLogger() {
return Objects.requireNonNull(Optional.ofNullable(this.logger).orElseGet(this::getDefaultLogger));
}

public BaseSource<C, R> setLogger(final LoggerWithLabels logger) {
this.logger = logger;
return this;
}

@Override
public void publishStart(final C context) {
this.countOfProcessedRows = 0;
this.broker.forwardSetLogger(this.getLogger());
this.broker.forwardStart(context);
}

@Override
public void publishRow(final C context, final R row) {
this.broker.forwardRow(context, row);
this.countOfProcessedRows++;
}

@Override
public void publishEnd(final C context) {
this.broker.forwardEnd(context);
}

protected abstract LoggerWithLabels getDefaultLogger();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.github.romualdrousseau.archery.commons.flow;

import java.util.Objects;

import com.github.romualdrousseau.archery.commons.logging.LoggerWithLabels;

public abstract class BaseTask<C, I, O> implements Publisher<C, O>, Subscriber<C, I> {

private final Broker<C, O> broker = new Broker<>();
private long countOfProcessedRows = 0;
private LoggerWithLabels logger;

@Override
public BaseTask<C, I, O> downstream(final Subscriber<C, O> subscriber) {
this.broker.addSuscriber(subscriber);
return this;
}

@Override
public long getCountOfProcessedRows() {
return this.countOfProcessedRows;
}

@Override
public LoggerWithLabels getLogger() {
return Objects.requireNonNull(this.logger);
}

@Override
public Subscriber<C, I> setLogger(final LoggerWithLabels logger) {
this.logger = logger;
return this;
}

@Override
public void publishStart(final C context) {
this.countOfProcessedRows = 0;
this.broker.forwardSetLogger(this.getLogger());
this.broker.forwardStart(context);
}

@Override
public void publishRow(final C context, final O row) {
this.broker.forwardRow(context, row);
this.countOfProcessedRows++;
}

@Override
public void publishEnd(final C context) {
this.broker.forwardEnd(context);
}

@Override
public void initialize(final C context) {
this.publishStart(context);
}

@Override
public void finalize(final C context) {
this.publishEnd(context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.github.romualdrousseau.archery.commons.flow;

import java.util.ArrayList;

import com.github.romualdrousseau.archery.commons.logging.LoggerWithLabels;

public class Broker<C, R> {

private ArrayList<Subscriber<C, R>> subscribers = new ArrayList<>();

public void addSuscriber(Subscriber<C, R> subscriber) {
this.subscribers.add(subscriber);
}

public void forwardSetLogger(final LoggerWithLabels logger) {
this.subscribers.forEach(subscriber -> subscriber.setLogger(logger));
}

public void forwardStart(C context) {
this.subscribers.forEach(subscriber -> subscriber.initialize(context));
}

public void forwardRow(C context, R row) {
this.subscribers.forEach(subscriber -> subscriber.accept(context, row));
}

public void forwardEnd(C context) {
this.subscribers.forEach(subscriber -> subscriber.finalize(context));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.github.romualdrousseau.archery.commons.flow;

import com.github.romualdrousseau.archery.commons.logging.LoggerWithLabels;

public interface Publisher<C, R> {

Publisher<C, R> downstream(Subscriber<C, R> subscriber);

long getCountOfProcessedRows();

LoggerWithLabels getLogger();

void publishStart(C context);

void publishRow(C context, R row);

void publishEnd(C context);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.github.romualdrousseau.archery.commons.flow;

import java.util.function.BiConsumer;

import com.github.romualdrousseau.archery.commons.logging.LoggerWithLabels;

public interface Subscriber<C, R> extends BiConsumer<C, R> {

long getCountOfProcessedRows();

LoggerWithLabels getLogger();

Subscriber<C, R> setLogger(final LoggerWithLabels logger);

void initialize(C context);

void finalize(C context);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.github.romualdrousseau.archery.commons.function;

import java.util.Optional;
import java.util.function.Function;

public class Either<L, R> {
public static <L, R> Either<L, R> ofLeft(final L l) {
return new Either<L, R>(l, null);
}
public static <L, R> Either<L, R> ofRight(final R r) {
return new Either<L, R>(null, r);
}

private final Optional<L> left;

private final Optional<R> right;

protected Either(final L l, final R r) {
this.left = Optional.ofNullable(l);
this.right = Optional.ofNullable(r);
}

public Optional<L> getLeft() {
return this.left;
}

public Optional<R> getRight() {
return this.right;
}

public <Y> Either<L, Y> map(final Function<R, Y> func) {
if (this.right.isPresent()) {
return Either.<L, Y>ofRight(func.apply(this.right.get()));
}
else if (this.left.isPresent()) {
return Either.<L, Y>ofLeft(this.left.get());
} else {
throw new RuntimeException("Either is in illegal state as both sides are not present");
}
}

public <Y> Either<L, Y> flatMap(final Function<R, Either<L, Y>> func) {
if (this.right.isPresent()) {
return func.apply(this.right.get());
}
else if (this.left.isPresent()) {
return Either.<L, Y>ofLeft(this.left.get());
} else {
throw new RuntimeException("Either is in illegal state as both sides are not present");
}
}

public R orElseGet(final Function<L, R> func) {
if (this.right.isPresent()) {
return this.right.get();
}
else if (this.left.isPresent()) {
return func.apply(this.left.get());
} else {
throw new RuntimeException("Either is in illegal state as both sides are not present");
}
}
}
Loading

0 comments on commit fc24cfc

Please sign in to comment.