Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add common features #64

Merged
merged 8 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions archery-commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@
<artifactId>jackson-dataformat-yaml</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- Zip4j Framework -->
<dependency>
<groupId>net.lingala.zip4j</groupId>
<artifactId>zip4j</artifactId>
<version>${zip4j.version}</version>
</dependency>
<!-- Logstash Framework -->
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>${logstash-logback.version}</version>
</dependency>
<!-- Test Framework -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.github.romualdrousseau.archery.commons.behavior;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

public class Scenario<R> {

@FunctionalInterface
public interface Function<U, V> {
V apply(final Scenario<U> scenario, U u) throws Exception;
}

@FunctionalInterface
public interface Supplier<V> {
V get(final Scenario<?> scenario) throws Exception;
}

@FunctionalInterface
public interface Consumer<U> {
void accept(final Scenario<U> scenario, U u) throws Exception;
}

public static Scenario<Void> noParameters() throws Exception {
return new Scenario<Void>();
}

public static Scenario<Void> withParameters(final Map<String, Object> parameters) throws Exception {
return new Scenario<Void>(parameters);
}

private final Map<String, Object> context;

private final R value;

private Scenario() {
this(Map.of(), null);
}

private Scenario(final Map<String, Object> context) {
this(context, null);
}

private Scenario(final Map<String, Object> context, final R value) {
this.context = new HashMap<>(context);
this.value = value;
}

@SuppressWarnings("unchecked")
public <T> Optional<T> get(final String key) {
return Optional.ofNullable((T) this.context.get(key));
}

public <T> Scenario<R> put(final String key, final T value) {
this.context.put(key, value);
return this;
}

public <T> Scenario<T> given(final Supplier<T> step) throws Exception {
return new Scenario<T>(this.context, step.get(this));
}

public <T> Scenario<T> when(final Function<R, T> step) throws Exception {
return new Scenario<T>(this.context, step.apply(this, this.value));
}

public Scenario<R> then(final Consumer<R> step) throws Exception {
step.accept(this, this.value);
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.github.romualdrousseau.archery.commons.flow;

import java.util.Objects;

import com.github.romualdrousseau.archery.commons.logging.LoggerWithLabels;

public abstract class BaseSink<C, R> implements Subscriber<C, R>{

private long countOfProcessedRows = 0;
private LoggerWithLabels logger;

@Override
public long getCountOfProcessedRows() {
return this.countOfProcessedRows;
}

@Override
public LoggerWithLabels getLogger() {
return Objects.requireNonNull(this.logger);
}

@Override
public Subscriber<C, R> setLogger(final LoggerWithLabels logger) {
this.logger = logger;
return this;
}

@Override
public void initialize(final C context) {
this.countOfProcessedRows = 0;
}

@Override
public void accept(final C context, final R record) {
this.countOfProcessedRows++;
}

@Override
public void finalize(final C context) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.github.romualdrousseau.archery.commons.flow;

import java.util.Objects;
import java.util.Optional;

import com.github.romualdrousseau.archery.commons.logging.LoggerWithLabels;

public abstract class BaseSource<C, R> implements Publisher<C, R>, Runnable {

private final Broker<C, R> broker = new Broker<>();
private long countOfProcessedRows = 0;
private LoggerWithLabels logger;

@Override
public BaseSource<C, R> downstream(final Subscriber<C, R> subscriber) {
this.broker.addSuscriber(subscriber);
return this;
}

@Override
public long getCountOfProcessedRows() {
return this.countOfProcessedRows;
}

@Override
public LoggerWithLabels getLogger() {
return Objects.requireNonNull(Optional.ofNullable(this.logger).orElseGet(this::getDefaultLogger));
}

public BaseSource<C, R> setLogger(final LoggerWithLabels logger) {
this.logger = logger;
return this;
}

@Override
public void publishStart(final C context) {
this.countOfProcessedRows = 0;
this.broker.forwardSetLogger(this.getLogger());
this.broker.forwardStart(context);
}

@Override
public void publishRow(final C context, final R row) {
this.broker.forwardRow(context, row);
this.countOfProcessedRows++;
}

@Override
public void publishEnd(final C context) {
this.broker.forwardEnd(context);
}

protected abstract LoggerWithLabels getDefaultLogger();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.github.romualdrousseau.archery.commons.flow;

import java.util.Objects;

import com.github.romualdrousseau.archery.commons.logging.LoggerWithLabels;

public abstract class BaseTask<C, I, O> implements Publisher<C, O>, Subscriber<C, I> {

private final Broker<C, O> broker = new Broker<>();
private long countOfProcessedRows = 0;
private LoggerWithLabels logger;

@Override
public BaseTask<C, I, O> downstream(final Subscriber<C, O> subscriber) {
this.broker.addSuscriber(subscriber);
return this;
}

@Override
public long getCountOfProcessedRows() {
return this.countOfProcessedRows;
}

@Override
public LoggerWithLabels getLogger() {
return Objects.requireNonNull(this.logger);
}

@Override
public Subscriber<C, I> setLogger(final LoggerWithLabels logger) {
this.logger = logger;
return this;
}

@Override
public void publishStart(final C context) {
this.countOfProcessedRows = 0;
this.broker.forwardSetLogger(this.getLogger());
this.broker.forwardStart(context);
}

@Override
public void publishRow(final C context, final O row) {
this.broker.forwardRow(context, row);
this.countOfProcessedRows++;
}

@Override
public void publishEnd(final C context) {
this.broker.forwardEnd(context);
}

@Override
public void initialize(final C context) {
this.publishStart(context);
}

@Override
public void finalize(final C context) {
this.publishEnd(context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.github.romualdrousseau.archery.commons.flow;

import java.util.ArrayList;

import com.github.romualdrousseau.archery.commons.logging.LoggerWithLabels;

public class Broker<C, R> {

private ArrayList<Subscriber<C, R>> subscribers = new ArrayList<>();

public void addSuscriber(Subscriber<C, R> subscriber) {
this.subscribers.add(subscriber);
}

public void forwardSetLogger(final LoggerWithLabels logger) {
this.subscribers.forEach(subscriber -> subscriber.setLogger(logger));
}

public void forwardStart(C context) {
this.subscribers.forEach(subscriber -> subscriber.initialize(context));
}

public void forwardRow(C context, R row) {
this.subscribers.forEach(subscriber -> subscriber.accept(context, row));
}

public void forwardEnd(C context) {
this.subscribers.forEach(subscriber -> subscriber.finalize(context));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.github.romualdrousseau.archery.commons.flow;

import com.github.romualdrousseau.archery.commons.logging.LoggerWithLabels;

public interface Publisher<C, R> {

Publisher<C, R> downstream(Subscriber<C, R> subscriber);

long getCountOfProcessedRows();

LoggerWithLabels getLogger();

void publishStart(C context);

void publishRow(C context, R row);

void publishEnd(C context);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.github.romualdrousseau.archery.commons.flow;

import java.util.function.BiConsumer;

import com.github.romualdrousseau.archery.commons.logging.LoggerWithLabels;

public interface Subscriber<C, R> extends BiConsumer<C, R> {

long getCountOfProcessedRows();

LoggerWithLabels getLogger();

Subscriber<C, R> setLogger(final LoggerWithLabels logger);

void initialize(C context);

void finalize(C context);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.github.romualdrousseau.archery.commons.function;

import java.util.Optional;
import java.util.function.Function;

public class Either<L, R> {
public static <L, R> Either<L, R> ofLeft(final L l) {
return new Either<L, R>(l, null);
}
public static <L, R> Either<L, R> ofRight(final R r) {
return new Either<L, R>(null, r);
}

private final Optional<L> left;

private final Optional<R> right;

protected Either(final L l, final R r) {
this.left = Optional.ofNullable(l);
this.right = Optional.ofNullable(r);
}

public Optional<L> getLeft() {
return this.left;
}

public Optional<R> getRight() {
return this.right;
}

public <Y> Either<L, Y> map(final Function<R, Y> func) {
if (this.right.isPresent()) {
return Either.<L, Y>ofRight(func.apply(this.right.get()));
}
else if (this.left.isPresent()) {
return Either.<L, Y>ofLeft(this.left.get());
} else {
throw new RuntimeException("Either is in illegal state as both sides are not present");
}
}

public <Y> Either<L, Y> flatMap(final Function<R, Either<L, Y>> func) {
if (this.right.isPresent()) {
return func.apply(this.right.get());
}
else if (this.left.isPresent()) {
return Either.<L, Y>ofLeft(this.left.get());
} else {
throw new RuntimeException("Either is in illegal state as both sides are not present");
}
}

public R orElseGet(final Function<L, R> func) {
if (this.right.isPresent()) {
return this.right.get();
}
else if (this.left.isPresent()) {
return func.apply(this.left.get());
} else {
throw new RuntimeException("Either is in illegal state as both sides are not present");
}
}
}
Loading
Loading