Skip to content

Commit

Permalink
refacto: Remove dependencies to ShuJu
Browse files Browse the repository at this point in the history
  • Loading branch information
Romuald Rousseau committed Sep 26, 2024
1 parent 76df272 commit 9a1bdd7
Show file tree
Hide file tree
Showing 103 changed files with 3,359 additions and 170 deletions.
58 changes: 58 additions & 0 deletions any2json-commons/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.github.romualdrousseau</groupId>
<artifactId>any2json-monorepo</artifactId>
<version>2.45-SNAPSHOT</version>
</parent>

<groupId>com.github.romualdrousseau</groupId>
<artifactId>any2json-commons</artifactId>
<version>2.45-SNAPSHOT</version>
<packaging>jar</packaging>

<name>any2json-commons</name>
<description>
Any2Json plugin to tag tabular output implementing embeddings.
</description>
<url>https://github.com/romualdrousseau/any2json-monorepo</url>

<dependencies>
<!-- Serialization -->
<dependency>
<groupId>org.furyio</groupId>
<artifactId>fury-core</artifactId>
<version>${fury.version}</version>
</dependency>
<dependency>
<groupId>org.furyio</groupId>
<artifactId>fury-format</artifactId>
<version>${fury.version}</version>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>${snappy.version}</version>
</dependency>
<!-- Reflections Framework -->
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>${reflections.version}</version>
</dependency>
<!-- Test Framework -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
<version>${log4j.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.github.romualdrousseau.any2json.commons.bigdata;

import java.util.ArrayList;
import java.util.List;

public class Chunk {

private final int batchSize;
private final List<ChunkMetaData> batches;

private Row[] rows;

public Chunk(final int batchSize) {
this.batchSize = batchSize;
this.batches = new ArrayList<>();
this.rows = new Row[this.batchSize];
}

public int getBatchSize() {
return this.batchSize;
}

public List<ChunkMetaData> getBatches() {
return this.batches;
}

public Row[] getRows() {
return this.rows;
}

public void setRows(final Row[] rows) {
this.rows = rows;
}

public void setRow(final int idx, final Row row) {
this.rows[idx] = row;
}

public Row getRow(final int idx) {
return this.rows[idx];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.github.romualdrousseau.any2json.commons.bigdata;

public class ChunkMetaData {

private final long position;
private final int length;

private ChunkMetaData(final long position, final int length) {
this.position = position;
this.length = length;
}

public long position() {
return position;
}

public int length() {
return length;
}

public static ChunkMetaData of(final long position, final int length) {
return new ChunkMetaData(position, length);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.github.romualdrousseau.any2json.commons.bigdata;

import java.io.IOException;

public interface ChunkSerializer {

byte[] serialize(Row[] batch) throws IOException;

Row[] deserialize(byte[] bytes) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package com.github.romualdrousseau.any2json.commons.bigdata;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;
import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.romualdrousseau.any2json.commons.bigdata.serializer.ChunkSerializerFury;
import com.github.romualdrousseau.any2json.commons.bigdata.serializer.ChunkSerializerJava;

public class ChunkSerializerFactory {

public enum SerializerType {
DEFAULT, // DEFAULT IS FURY
JAVA,
FURY
}

private static final Logger LOGGER = LoggerFactory.getLogger(ChunkSerializerFactory.class);

private static final ThreadLocal<ChunkSerializer> CONTEXT = new ThreadLocal<>();

public static ChunkSerializer newInstance() {
return ChunkSerializerFactory.newInstance(SerializerType.DEFAULT);
}

public static ChunkSerializer newInstance(final SerializerType type) {
if (CONTEXT.get() == null) {
CONTEXT.set(new ChunkSerializerFactory(type).createSerializerInstance());
}
return CONTEXT.get();
}

private final SerializerType type;

private ChunkSerializerFactory(final SerializerType type) {
try {
if (type.equals(SerializerType.DEFAULT)) {
final var prop = new Properties();
prop.load(this.openDefaultPropertiesInputStream());
final var typeVal = prop.getProperty("serializer");
if (typeVal != null) {
this.type = Enum.valueOf(SerializerType.class, typeVal);
} else {
this.type = type;
}
} else {
this.type = type;
}
LOGGER.info("ChunkSerializerFactor set to {}", this.type);
} catch (final IOException x) {
LOGGER.error("Error during ChunkSerializerFactor initialization: {}", x.getMessage());
throw new UncheckedIOException(x);
}
}

private ChunkSerializer createSerializerInstance() {
switch (this.type) {
case JAVA:
return new ChunkSerializerJava();
case FURY:
return new ChunkSerializerFury();
default:
return new ChunkSerializerFury();
}
}

private InputStream openDefaultPropertiesInputStream() throws IOException {
return this.openPropertiesInputStream("chunk-serializer.properties")
.or(() -> this.openPropertiesInputStream("batch-serializer.properties"))
.orElseGet(InputStream::nullInputStream);
}

private Optional<InputStream> openPropertiesInputStream(final String fileName) {
final var userDir = System.getProperty("user.dir");
return this.getPathIfExists(Path.of(userDir, fileName))
.or(() -> this.getPathIfExists(Path.of(userDir, "classes", fileName)))
.flatMap(this::pathToStream)
.or(() -> this.resolveResourceAsStream(fileName));
}

private Optional<InputStream> pathToStream(final Path x) {
try {
return Optional.of(Files.newInputStream(x));
} catch (final IOException e) {
return Optional.empty();
}
}

private Optional<InputStream> resolveResourceAsStream(final String resourceName) {
final InputStream resource = this.getClass().getClassLoader().getResourceAsStream(resourceName);
if (resource == null) {
LOGGER.debug("module: {} not found", resourceName);
return Optional.empty();
}
LOGGER.debug("module: {} found at {}", resourceName, this.getClass().getClassLoader().getResource(resourceName));
return Optional.of(resource);
}

private Optional<Path> getPathIfExists(final Path path) {
if (!path.toFile().exists()) {
LOGGER.debug("module: {} not found at {}", path.getFileName(), path);
return Optional.empty();
}
LOGGER.debug("module: {} found at {}", path.getFileName(), path);
return Optional.of(path);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package com.github.romualdrousseau.any2json.commons.bigdata;

import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Objects;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataFrame implements Closeable, Iterable<Row> {
private final Logger logger = LoggerFactory.getLogger(DataFrame.class);

private final ChunkSerializer serializer = ChunkSerializerFactory.newInstance();

private final Chunk chunk;
private final Path storePath;
private final int rowCount;
private final int columnCount;
private final FileChannel fileChannel;
private final MappedByteBuffer mappedBuffer;

private int currentChunkIdx;
private boolean isClosed;

public DataFrame(final Chunk chunk, final Path storePath, final int rowCount, final int columnCount)
throws IOException {
this.chunk = chunk;
this.storePath = storePath;
this.rowCount = rowCount;
this.columnCount = columnCount;
this.fileChannel = (FileChannel) Files.newByteChannel(this.storePath,
EnumSet.of(StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE));
if (this.fileChannel.size() <= Integer.MAX_VALUE) {
this.mappedBuffer = this.fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, this.fileChannel.size());
} else {
this.mappedBuffer = null;
}

this.currentChunkIdx = -1;
this.isClosed = false;

this.logger.info("DataFrame initialized with Mapped Buffer: {}", this.isMappedBuffer());
}

@Override
public void close() throws IOException {
if (this.isClosed) {
return;
}
this.fileChannel.close();
this.isClosed = true;
}

public DataView view(final int rowStart, final int columnStart, final int rowCount, final int columnCount) {
Objects.checkFromToIndex(rowStart, rowStart + rowCount - 1, this.rowCount);
Objects.checkFromToIndex(columnStart, columnStart + columnCount - 1, this.columnCount);
return new DataView(this, rowStart, columnStart, rowCount, columnCount);
}

public int getRowCount() {
return this.rowCount;
}

public int getColumnCount() {
return this.columnCount;
}

public int getColumnCount(final int row) {
Objects.checkIndex(row, this.rowCount);
final var r = this.getRow(row);
return r.size();
}

public Row getRow(final int row) {
Objects.checkIndex(row, this.rowCount);
final int idx = row / this.chunk.getBatchSize();
if (this.currentChunkIdx != idx) {
this.chunk.setRows(this.loadOneBatch(this.chunk.getBatches().get(idx)));
this.currentChunkIdx = idx;
}
return this.chunk.getRow(row % this.chunk.getBatchSize());
}

public String getCell(final int row, final int column) {
Objects.checkIndex(row, this.rowCount);
Objects.checkIndex(column, this.columnCount);
return this.getRow(row).get(column);
}

@Override
public Iterator<Row> iterator() {
return new DataFrameIterator(this);
}

private Row[] loadOneBatch(final ChunkMetaData batch) {
final long startTime = System.currentTimeMillis();
try {

if (this.isMappedBuffer()) {
final var bytes = new byte[batch.length()];
this.mappedBuffer.position((int) batch.position());
this.mappedBuffer.get(bytes);
return serializer.deserialize(bytes);
} else {
final var bytes = ByteBuffer.allocate(batch.length());
this.fileChannel.position(batch.position());
this.fileChannel.read(bytes);
return serializer.deserialize(bytes.array());
}
} catch (final IOException x) {
throw new UncheckedIOException(x);
} finally {
final var stopTime = System.currentTimeMillis();
final var executionTimeInMS = (int) (stopTime - startTime);
this.logger.debug("Load a chunk in memory offset: {}, lenght: {}. Took {}ms", batch.position(), batch.length(), executionTimeInMS);
}
}

private boolean isMappedBuffer() {
return this.mappedBuffer != null;
}
}
Loading

0 comments on commit 9a1bdd7

Please sign in to comment.