Skip to content

Commit

Permalink
refactor: remove the queue in LanceArrowWriter to reduce memory usage…
Browse files Browse the repository at this point in the history
… for spark sink (#3110)

Remove the queue in LanceArrowWriter since it may cache all rows in
queue and that will require a lot of jvm memory.

Use mutex to control the write rate of sinker. Writer will wait util the
reader take the batch.

And more I had moved the `maven-shade-plugin` into a new profile which
is diabled by default because `jar-with-dependencie` was conflict with
many jars in spark dependencie

---------

Co-authored-by: Lei Xu <[email protected]>
  • Loading branch information
SaintBacchus and eddyxu authored Nov 13, 2024
1 parent 219ebcf commit f60283e
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 71 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/java-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ jobs:
echo "use-agent" >> ~/.gnupg/gpg.conf
echo "pinentry-mode loopback" >> ~/.gnupg/gpg.conf
export GPG_TTY=$(tty)
mvn --batch-mode -DskipTests -Drust.release.build=true -DpushChanges=false -Dgpg.passphrase=${{ secrets.GPG_PASSPHRASE }} deploy -P deploy-to-ossrh
mvn --batch-mode -DskipTests -Drust.release.build=true -DpushChanges=false -Dgpg.passphrase=${{ secrets.GPG_PASSPHRASE }} deploy -P deploy-to-ossrh -P shade-jar
env:
SONATYPE_USER: ${{ secrets.SONATYPE_USER }}
SONATYPE_TOKEN: ${{ secrets.SONATYPE_TOKEN }}
79 changes: 42 additions & 37 deletions java/spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,48 @@
<scala.compat.version>2.13</scala.compat.version>
</properties>
</profile>
<profile>
<id>shade-jar</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>uber-jar</id>
<goals>
<goal>shade</goal>
</goals>
<phase>package</phase>
<configuration>
<finalName>${project.artifactId}-${scala.compat.version}-${project.version}-jar-with-dependencies</finalName>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>LICENSE</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>

<dependencies>
Expand All @@ -53,41 +95,4 @@
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>uber-jar</id>
<goals>
<goal>shade</goal>
</goals>
<phase>package</phase>
<configuration>
<finalName>${project.artifactId}-${scala.compat.version}-${project.version}-jar-with-dependencies</finalName>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>LICENSE</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand All @@ -41,6 +43,9 @@ public class LanceArrowWriter extends ArrowReader {

private final AtomicLong totalBytesRead = new AtomicLong();
private ArrowWriter arrowWriter = null;
private final AtomicInteger count = new AtomicInteger(0);
private final Semaphore writeToken;
private final Semaphore loadToken;

public LanceArrowWriter(BufferAllocator allocator, Schema schema, int batchSize) {
super(allocator);
Expand All @@ -49,60 +54,63 @@ public LanceArrowWriter(BufferAllocator allocator, Schema schema, int batchSize)
this.schema = schema;
// TODO(lu) batch size as config?
this.batchSize = batchSize;
this.writeToken = new Semaphore(0);
this.loadToken = new Semaphore(0);
}

void write(InternalRow row) {
Preconditions.checkNotNull(row);
synchronized (monitor) {
// TODO(lu) wait if too much elements in rowQueue
rowQueue.offer(row);
monitor.notify();
try {
// wait util prepareLoadNextBatch to release write token,
writeToken.acquire();
arrowWriter.write(row);
if (count.incrementAndGet() == batchSize) {
// notify loadNextBatch to take the batch
loadToken.release();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

void setFinished() {
synchronized (monitor) {
finished = true;
monitor.notify();
}
loadToken.release();
finished = true;
}

@Override
protected void prepareLoadNextBatch() throws IOException {
public void prepareLoadNextBatch() throws IOException {
super.prepareLoadNextBatch();
// Do not use ArrowWriter.reset since it does not work well with Arrow JNI
arrowWriter = ArrowWriter.create(this.getVectorSchemaRoot());
// release batch size token for write
writeToken.release(batchSize);
}

@Override
public boolean loadNextBatch() throws IOException {
prepareLoadNextBatch();
int rowCount = 0;
synchronized (monitor) {
while (rowCount < batchSize) {
while (rowQueue.isEmpty() && !finished) {
try {
monitor.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while waiting for data", e);
}
}
if (rowQueue.isEmpty() && finished) {
break;
}
InternalRow row = rowQueue.poll();
if (row != null) {
arrowWriter.write(row);
rowCount++;
try {
if (finished && count.get() == 0) {
return false;
}
// wait util batch if full or finished
loadToken.acquire();
arrowWriter.finish();
if (!finished) {
count.set(0);
return true;
} else {
// true if it has some rows and return false if there is no record
if (count.get() > 0) {
count.set(0);
return true;
} else {
return false;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (rowCount == 0) {
return false;
}
arrowWriter.finish();
return true;
}

@Override
Expand Down

0 comments on commit f60283e

Please sign in to comment.