Skip to content

Commit

Permalink
fix: properly override template storage implementations for remote (#…
Browse files Browse the repository at this point in the history
…1582)

### Motivation
When api-impl splitting the RemoteTemplateStorage impl was not done
correctly. Methods that handle file streams (in & out) were just called
as usual RPCs which is not possible.

### Modification
This PR overrides the specific method implementations and uses the
chunked file transfer api to transfer filestreams.

### Result
The template storage is working properly from the wrapper side.
  • Loading branch information
0utplay authored Feb 7, 2025
1 parent 51fc766 commit 6445995
Showing 1 changed file with 82 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import java.util.zip.ZipInputStream;
import lombok.NonNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -86,11 +87,7 @@ public boolean deployDirectory(
@NonNull Path directory,
@Nullable Predicate<Path> filter
) {
try (var inputStream = ZipUtil.zipToStream(directory, filter)) {
return this.deploy(target, inputStream);
} catch (IOException exception) {
return false;
}
return TaskUtil.getOrDefault(this.deployDirectoryAsync(target, directory, filter), false);
}

/**
Expand Down Expand Up @@ -127,6 +124,39 @@ public boolean deploy(@NonNull ServiceTemplate target, @NonNull InputStream inpu
return TaskUtil.getOrDefault(this.zipTemplateAsync(template), null);
}

/**
* {@inheritDoc}
*/
@Override
public @NonNull OutputStream appendOutputStream(
@NonNull ServiceTemplate template,
@NonNull String path
) throws IOException {
return this.openLocalOutputStream(template, path, FileUtil.createTempFile(), true);
}

/**
* {@inheritDoc}
*/
@Override
public @NonNull OutputStream newOutputStream(
@NonNull ServiceTemplate template,
@NonNull String path
) throws IOException {
return this.openLocalOutputStream(template, path, FileUtil.createTempFile(), false);
}

/**
* {@inheritDoc}
*/
@Override
public @Nullable InputStream newInputStream(
@NonNull ServiceTemplate template,
@NonNull String path
) throws IOException {
return TaskUtil.getOrDefault(this.newInputStreamAsync(template, path), null);
}

@Override
public @NonNull CompletableFuture<InputStream> zipTemplateAsync(@NonNull ServiceTemplate template) {
return ChunkedFileQueryBuilder.create()
Expand All @@ -140,22 +170,63 @@ public boolean deploy(@NonNull ServiceTemplate target, @NonNull InputStream inpu
* {@inheritDoc}
*/
@Override
public @NonNull OutputStream appendOutputStream(
public @NonNull CompletableFuture<ZipInputStream> openZipInputStreamAsync(@NonNull ServiceTemplate template) {
return this.zipTemplateAsync(template).thenApply(inputStream -> inputStream != null
? new ZipInputStream(inputStream)
: null);
}

/**
* {@inheritDoc}
*/
@Override
public @NonNull CompletableFuture<OutputStream> appendOutputStreamAsync(
@NonNull ServiceTemplate template,
@NonNull String path
) throws IOException {
return this.openLocalOutputStream(template, path, FileUtil.createTempFile(), true);
) {
return TaskUtil.supplyAsync(() -> this.appendOutputStream(template, path));
}

/**
* {@inheritDoc}
*/
@Override
public @NonNull OutputStream newOutputStream(
public @NonNull CompletableFuture<OutputStream> newOutputStreamAsync(
@NonNull ServiceTemplate template,
@NonNull String path
) throws IOException {
return this.openLocalOutputStream(template, path, FileUtil.createTempFile(), false);
) {
return TaskUtil.supplyAsync(() -> this.newOutputStream(template, path));
}

/**
* {@inheritDoc}
*/
@Override
public @NonNull CompletableFuture<Boolean> deployDirectoryAsync(
@NonNull ServiceTemplate target,
@NonNull Path directory,
@Nullable Predicate<Path> filter
) {
return TaskUtil.supplyAsync(() -> {
try (var inputStream = ZipUtil.zipToStream(directory, filter)) {
return this.deploy(target, inputStream);
}
});
}

/**
* {@inheritDoc}
*/
@Override
public @NonNull CompletableFuture<InputStream> newInputStreamAsync(
@NonNull ServiceTemplate template,
@NonNull String path
) {
return ChunkedFileQueryBuilder.create()
.dataIdentifier("remote_templates_template_file")
.requestFromNode(this.componentInfo.nodeUniqueId())
.configureMessageBuffer(buffer -> buffer.writeString(this.name).writeObject(template).writeString(path))
.query();
}

/**
Expand Down Expand Up @@ -190,27 +261,4 @@ public boolean deploy(@NonNull ServiceTemplate target, @NonNull InputStream inpu
.transferChunkedData()
.join());
}

/**
* {@inheritDoc}
*/
@Override
public @Nullable InputStream newInputStream(
@NonNull ServiceTemplate template,
@NonNull String path
) throws IOException {
return TaskUtil.getOrDefault(this.newInputStreamAsync(template, path), null);
}

@Override
public @NonNull CompletableFuture<InputStream> newInputStreamAsync(
@NonNull ServiceTemplate template,
@NonNull String path
) {
return ChunkedFileQueryBuilder.create()
.dataIdentifier("remote_templates_template_file")
.requestFromNode(this.componentInfo.nodeUniqueId())
.configureMessageBuffer(buffer -> buffer.writeString(this.name).writeObject(template).writeString(path))
.query();
}
}

0 comments on commit 6445995

Please sign in to comment.