Skip to content

Commit

Permalink
IGNITE-23114 Draft streams
Browse files Browse the repository at this point in the history
  • Loading branch information
skorotkov committed Sep 11, 2024
1 parent bc1036f commit c06b7ab
Show file tree
Hide file tree
Showing 63 changed files with 506 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.ignite.jdbc.thin;

import java.io.InputStream;
import java.io.ByteArrayInputStream;
import java.io.Reader;
import java.io.Serializable;
import java.math.BigDecimal;
Expand All @@ -36,6 +36,7 @@
import java.sql.Types;
import java.util.Arrays;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCache;
Expand Down Expand Up @@ -886,6 +887,53 @@ public void testBlob() throws Exception {
assertFalse(rs.next());
}

/**
* @throws Exception If failed.
*/
@Test
public void testBlobInputStream() throws Exception {
final int BLOB_SIZE = 123000;

byte[] bytes = new byte[BLOB_SIZE];
new Random().nextBytes(bytes);

try {
try (PreparedStatement stmt = conn.prepareStatement("insert into TestObject(_key, id, blobVal) values (?, ?, ?)")) {
ByteArrayInputStream stream1 = new ByteArrayInputStream(bytes);

stmt.setInt(1, 3);
stmt.setInt(2, 3);
stmt.setBlob(3, stream1, BLOB_SIZE);
int inserted = stmt.executeUpdate();

assertEquals(1, inserted);
}

try (PreparedStatement stmt = conn.prepareStatement(SQL_PART + " where blobVal is not distinct from ?")) {
ByteArrayInputStream stream2 = new ByteArrayInputStream(bytes);

stmt.setBlob(1, stream2);

ResultSet rs = stmt.executeQuery();

int cnt = 0;

while (rs.next()) {
if (cnt == 0)
assert rs.getInt("id") == 3;

cnt++;
}

assertEquals(1, cnt);
}

}
finally {
grid(0).cache(DEFAULT_CACHE_NAME).remove(3);
}
}

/**
* @throws Exception If failed.
*/
Expand Down Expand Up @@ -1089,36 +1137,6 @@ public void testNotSupportedTypes() throws Exception {
}
});

checkNotSupported(new RunnableX() {
@Override public void runx() throws Exception {
stmt.setBinaryStream(1, null);
}
});

checkNotSupported(new RunnableX() {
@Override public void runx() throws Exception {
stmt.setBinaryStream(1, null, 0);
}
});

checkNotSupported(new RunnableX() {
@Override public void runx() throws Exception {
stmt.setBinaryStream(1, null, 0L);
}
});

checkNotSupported(new RunnableX() {
@Override public void runx() throws Exception {
stmt.setBlob(1, (InputStream)null);
}
});

checkNotSupported(new RunnableX() {
@Override public void runx() throws Exception {
stmt.setBlob(1, null, 0L);
}
});

checkNotSupported(new RunnableX() {
@Override public void runx() throws Exception {
stmt.setCharacterStream(1, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.UUID;
import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.internal.binary.BinaryReaderExImpl;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.processors.odbc.SqlBinaryWriter;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcProtocolContext;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcRawBinarylizable;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -110,7 +110,7 @@ public Set<Integer> cacheIds() {

/** {@inheritDoc} */
@Override public void writeBinary(
BinaryWriterExImpl writer,
SqlBinaryWriter writer,
JdbcProtocolContext protoCtx
)
throws BinaryObjectException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Calendar;
import org.apache.ignite.internal.processors.odbc.SqlInputStreamWrapper;
import org.apache.ignite.internal.processors.odbc.SqlListenerUtils;
import org.apache.ignite.internal.processors.odbc.SqlStateCode;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcMetaParamsRequest;
Expand All @@ -59,6 +60,15 @@ public class JdbcThinPreparedStatement extends JdbcThinStatement implements Prep
/** Parameters metadata. */
private JdbcThinParameterMetadata metaData;

/**
* The maximum size of array to allocate.
* Some VMs reserve some header words in an array.
* Attempts to allocate larger arrays may result in
* OutOfMemoryError: Requested array size exceeds VM limit
* @see java.util.ArrayList#MAX_ARRAY_SIZE
*/
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

/**
* Creates new prepared statement.
*
Expand Down Expand Up @@ -215,7 +225,10 @@ public class JdbcThinPreparedStatement extends JdbcThinStatement implements Prep
@Override public void setBinaryStream(int paramIdx, InputStream x, int length) throws SQLException {
ensureNotClosed();

throw new SQLFeatureNotSupportedException("Streams are not supported.");
if (length < 0)
throw new SQLException("Invalid argument. Length should be greater than 0.");

setArgument(paramIdx, new SqlInputStreamWrapper(x, length));
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -411,9 +424,7 @@ private void executeWithArguments(JdbcStatementType stmtType) throws SQLExceptio

/** {@inheritDoc} */
@Override public void setBlob(int paramIdx, InputStream inputStream, long length) throws SQLException {
ensureNotClosed();

throw new SQLFeatureNotSupportedException("SQL-specific types are not supported.");
setBinaryStream(paramIdx, inputStream, length);
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -446,7 +457,10 @@ private void executeWithArguments(JdbcStatementType stmtType) throws SQLExceptio
@Override public void setBinaryStream(int paramIdx, InputStream x, long length) throws SQLException {
ensureNotClosed();

throw new SQLFeatureNotSupportedException("Streams are not supported.");
if (length > MAX_ARRAY_SIZE)
throw new SQLFeatureNotSupportedException("Invalid argument. InputStreams with length > " + length + " are not supported.");

setBinaryStream(paramIdx, x, Math.toIntExact(length));
}

/** {@inheritDoc} */
Expand All @@ -467,7 +481,7 @@ private void executeWithArguments(JdbcStatementType stmtType) throws SQLExceptio
@Override public void setBinaryStream(int paramIdx, InputStream x) throws SQLException {
ensureNotClosed();

throw new SQLFeatureNotSupportedException("Streams are not supported.");
setArgument(paramIdx, new SqlInputStreamWrapper(x));
}

/** {@inheritDoc} */
Expand All @@ -493,9 +507,7 @@ private void executeWithArguments(JdbcStatementType stmtType) throws SQLExceptio

/** {@inheritDoc} */
@Override public void setBlob(int paramIdx, InputStream inputStream) throws SQLException {
ensureNotClosed();

throw new SQLFeatureNotSupportedException("SQL-specific types are not supported.");
setBinaryStream(paramIdx, inputStream);
}

/** {@inheritDoc} */
Expand All @@ -518,6 +530,19 @@ private void executeWithArguments(JdbcStatementType stmtType) throws SQLExceptio
return iface != null && iface.isAssignableFrom(JdbcThinPreparedStatement.class);
}

/** {@inheritDoc} */
@Override public void closeImpl() {
try {
for (Object arg : args) {
if (arg instanceof AutoCloseable)
((AutoCloseable)arg).close();
}
}
catch (Exception ignored) {
// No-op.
}
}

/**
* Sets query argument value.
*
Expand All @@ -528,7 +553,7 @@ private void executeWithArguments(JdbcStatementType stmtType) throws SQLExceptio
private void setArgument(int paramIdx, Object val) throws SQLException {
ensureNotClosed();

if (val != null && !SqlListenerUtils.isPlainType(val.getClass()))
if (val != null && !SqlListenerUtils.isPlainType(val.getClass()) && !(val instanceof SqlInputStreamWrapper))
ensureCustomObjectsSupported();

if (paramIdx < 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,12 +399,22 @@ private JdbcResult sendFile(JdbcBulkLoadAckResult cmdRes, JdbcThinTcpIo stickyIO
closeResults();

conn.closeStatement(this);

closeImpl();
}
finally {
closed = true;
}
}

/**
* This is guaranteed to be called exactly once even in case of concurrent {@link #close()} calls.
* @throws SQLException in case of error
*/
protected void closeImpl() throws SQLException {
// No-op.
}

/**
* Close results.
* @throws SQLException On error.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.ignite.internal.processors.odbc.ClientListenerNioListener;
import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
import org.apache.ignite.internal.processors.odbc.SqlBinaryWriter;
import org.apache.ignite.internal.processors.odbc.SqlStateCode;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteRequest;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcProtocolContext;
Expand Down Expand Up @@ -561,7 +562,7 @@ else if (req instanceof JdbcQueryFetchRequest)
private void sendRequestRaw(JdbcRequest req) throws IOException {
int cap = guessCapacity(req);

BinaryWriterExImpl writer = new BinaryWriterExImpl(ctx, new BinaryHeapOutputStream(cap),
SqlBinaryWriter writer = new SqlBinaryWriter(ctx, new BinaryHeapOutputStream(cap),
BinaryThreadLocalContext.get().schemaHolder(), null);

req.writeBinary(writer, protoCtx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal.jdbc2;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.sql.Blob;
Expand All @@ -36,6 +37,9 @@ public class JdbcBlob implements Blob {
/** Byte array. */
private byte[] arr;

/** Output stream. */
private OutputStream stream;

/**
* @param arr Byte array.
*/
Expand Down Expand Up @@ -187,4 +191,38 @@ private void ensureNotClosed() throws SQLException {
if (arr == null)
throw new SQLException("Blob instance can't be used after free() has been called.");
}

/**
*
*/
private static class BlobOutputStream extends OutputStream {
/** */
private byte[] arr;

/** */
private int idx;

/** */
private int len;

/**
* @param arr Array.
* @param len Length.
*/
public BlobOutputStream(byte[] arr, int len) {
this.arr = arr;
this.len = len;
}

/** {@inheritDoc} */
@Override public void write(int b) throws IOException {
if (idx < len)
arr[idx++] = (byte)b;
}

/** {@inheritDoc} */
@Override public void write(byte b[], int off, int len) {

}
}
}
Loading

0 comments on commit c06b7ab

Please sign in to comment.