diff --git a/bindings/java/src/main/java/org/apache/opendal/Operator.java b/bindings/java/src/main/java/org/apache/opendal/Operator.java index 63ede3829ee6..bb08c87a9aa2 100644 --- a/bindings/java/src/main/java/org/apache/opendal/Operator.java +++ b/bindings/java/src/main/java/org/apache/opendal/Operator.java @@ -72,6 +72,10 @@ public OperatorOutputStream createOutputStream(String path) { return new OperatorOutputStream(this, path); } + public OperatorOutputStream createOutputStream(String path, int maxBytes) { + return new OperatorOutputStream(this, path, maxBytes); + } + public byte[] read(String path) { return read(nativeHandle, path); } diff --git a/bindings/java/src/main/java/org/apache/opendal/OperatorOutputStream.java b/bindings/java/src/main/java/org/apache/opendal/OperatorOutputStream.java index 0f315174f037..05afac416681 100644 --- a/bindings/java/src/main/java/org/apache/opendal/OperatorOutputStream.java +++ b/bindings/java/src/main/java/org/apache/opendal/OperatorOutputStream.java @@ -35,31 +35,38 @@ protected void disposeInternal(long handle) { } } - private static final int MAX_BYTES = 16384; + private static final int DEFAULT_MAX_BYTES = 16384; private final Writer writer; - private final byte[] bytes = new byte[MAX_BYTES]; + private final byte[] bytes; + private final int maxBytes; private int offset = 0; public OperatorOutputStream(Operator operator, String path) { + this(operator, path, DEFAULT_MAX_BYTES); + } + + public OperatorOutputStream(Operator operator, String path, int maxBytes) { final long op = operator.nativeHandle; this.writer = new Writer(constructWriter(op, path)); + this.maxBytes = maxBytes; + this.bytes = new byte[maxBytes]; } @Override public void write(int b) throws IOException { bytes[offset++] = (byte) b; - if (offset >= MAX_BYTES) { + if (offset >= maxBytes) { flush(); } } @Override public void flush() throws IOException { - if (offset > MAX_BYTES) { - throw new IOException("INTERNAL ERROR: " + offset + " > " + MAX_BYTES); - } else if (offset < MAX_BYTES) { + if (offset > maxBytes) { + throw new IOException("INTERNAL ERROR: " + offset + " > " + maxBytes); + } else if (offset < maxBytes) { final byte[] bytes = Arrays.copyOf(this.bytes, offset); writeBytes(writer.nativeHandle, bytes); } else { diff --git a/bindings/java/src/test/java/org/apache/opendal/test/behavior/BehaviorExtension.java b/bindings/java/src/test/java/org/apache/opendal/test/behavior/BehaviorExtension.java index 8812f3b4bd6a..d48c645a485f 100644 --- a/bindings/java/src/test/java/org/apache/opendal/test/behavior/BehaviorExtension.java +++ b/bindings/java/src/test/java/org/apache/opendal/test/behavior/BehaviorExtension.java @@ -40,6 +40,7 @@ public class BehaviorExtension implements BeforeAllCallback, AfterAllCallback, TestWatcher { private String testName; + public String scheme; public AsyncOperator asyncOperator; public Operator operator; @@ -67,6 +68,7 @@ public void beforeAll(ExtensionContext context) { this.asyncOperator = op.layer(RetryLayer.builder().build()); this.operator = this.asyncOperator.blocking(); + this.scheme = scheme; this.testName = String.format("%s(%s)", context.getDisplayName(), scheme); log.info( "\n================================================================================" @@ -94,6 +96,7 @@ public void afterAll(ExtensionContext context) { operator = null; } + this.scheme = null; this.testName = null; } diff --git a/bindings/java/src/test/java/org/apache/opendal/test/behavior/BehaviorTestBase.java b/bindings/java/src/test/java/org/apache/opendal/test/behavior/BehaviorTestBase.java index 78a138cf098f..8380ed6e09f2 100644 --- a/bindings/java/src/test/java/org/apache/opendal/test/behavior/BehaviorTestBase.java +++ b/bindings/java/src/test/java/org/apache/opendal/test/behavior/BehaviorTestBase.java @@ -46,6 +46,10 @@ protected Operator op() { return behaviorExtension.operator; } + protected String scheme() { + return behaviorExtension.scheme; + } + /** * Generates a byte array of random content. */ @@ -57,6 +61,16 @@ public static byte[] generateBytes() { return content; } + /** + * Generates a byte array of random content with a specific size. + */ + public static byte[] generateBytes(int size) { + final Random random = new Random(); + final byte[] content = new byte[size]; + random.nextBytes(content); + return content; + } + /** * Calculate SHA256 digest of input bytes * diff --git a/bindings/java/src/test/java/org/apache/opendal/test/behavior/RegressionTest.java b/bindings/java/src/test/java/org/apache/opendal/test/behavior/RegressionTest.java new file mode 100644 index 000000000000..2a9721b69473 --- /dev/null +++ b/bindings/java/src/test/java/org/apache/opendal/test/behavior/RegressionTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.opendal.test.behavior; + +import static org.junit.jupiter.api.Assumptions.assumeTrue; +import java.util.UUID; +import org.apache.opendal.OperatorOutputStream; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class RegressionTest extends BehaviorTestBase { + // @see https://github.com/apache/opendal/issues/5421 + @Test + public void testAzblobLargeFile() throws Exception { + assumeTrue(scheme() != null && scheme().equalsIgnoreCase("azblob")); + + final String path = UUID.randomUUID().toString(); + final int size = 16384 * 10; // 10 x OperatorOutputStream.DEFAULT_MAX_BYTES (10 flushes per write) + final byte[] content = generateBytes(size); + + try (OperatorOutputStream operatorOutputStream = op().createOutputStream(path, size)) { + for (int i = 0; i < 20000; i++) { + // More iterations in case BlockCountExceedsLimit doesn't pop up exactly after 100K blocks. + operatorOutputStream.write(content); + } + } + } +}