Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No array column but get "Array index out of range: 1048576" #378

Open
sky9611 opened this issue Oct 19, 2021 · 8 comments
Open

No array column but get "Array index out of range: 1048576" #378

sky9611 opened this issue Oct 19, 2021 · 8 comments

Comments

@sky9611
Copy link

sky9611 commented Oct 19, 2021

Environment

  • OS version: Ubuntu 20.04.3 LTS
  • JDK version: 1.8.0_272
  • ClickHouse Server version: 21.8.5.7
  • ClickHouse Native JDBC version: 2.5.6
  • (Optional) Spark version: 2.4
  • (Optional) Other components' version: N/A

Error logs

java.lang.ArrayIndexOutOfBoundsException: Array index out of range: 1048576
	at net.jpountz.util.SafeUtils.checkRange(SafeUtils.java:24)

	at net.jpountz.util.SafeUtils.checkRange(SafeUtils.java:32)

	at net.jpountz.lz4.LZ4JavaSafeCompressor.compress(LZ4JavaSafeCompressor.java:141)

	at net.jpountz.lz4.LZ4Compressor.compress(LZ4Compressor.java:95)

	at com.github.housepower.jdbc.buffer.CompressedBuffedWriter.flushToTarget(CompressedBuffedWriter.java:75)

	at com.github.housepower.jdbc.serde.BinarySerializer.flushToTarget(BinarySerializer.java:112)

	at com.github.housepower.jdbc.connect.NativeClient.disconnect(NativeClient.java:153)

	at com.github.housepower.jdbc.ClickHouseConnection.close(ClickHouseConnection.java:141)

	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:714)

	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)

	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)

	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)

	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)

	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2158)

	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2158)

	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)

	at org.apache.spark.scheduler.Task.run(Task.scala:123)

	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:409)

	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)

	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:415)

	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

	at java.lang.Thread.run(Thread.java:748)

Steps to reproduce

Using spark to ingest a large dataframe(> 100M rows) with many columns(>5000) into Clickhouse.

Here is the code I use:

df.write.format("jdbc").mode("append").
    option("driver","com.github.housepower.jdbc.ClickHouseDriver").
    option("url", jdbcUrl).
    option("user", "***").
    option("password", "***").
    option("dbtable", target_table).
    option("truncate", "true").
    option("batchsize", 100000).
    option("isolationLevel", "NONE").
    option("numPartitions", "128").save

Other descriptions

Based on this issue, this could be caused by inserting array column. But the dataframe I inserted contains only StringType, timestamp and LongType.

I've also tried to do some investigation on these source codes:

Unfortunately, I still couldn't find the cause.

@sundy-li
Copy link
Member

Good search, can you try to modify the compression codes from:

 int res = lz4Compressor.compress(writtenBuf, 0, position, compressedBuffer, COMPRESSION_HEADER_LENGTH + CHECKSUM_LENGTH, compressedBuffer.length);

to

  int res = lz4Compressor.compress(writtenBuf, 0, position, compressedBuffer, COMPRESSION_HEADER_LENGTH + CHECKSUM_LENGTH, maxLen);

I think this may be the reason.

@sky9611
Copy link
Author

sky9611 commented Oct 19, 2021

Good search, can you try to modify the compression codes from:

 int res = lz4Compressor.compress(writtenBuf, 0, position, compressedBuffer, COMPRESSION_HEADER_LENGTH + CHECKSUM_LENGTH, compressedBuffer.length);

to

  int res = lz4Compressor.compress(writtenBuf, 0, position, compressedBuffer, COMPRESSION_HEADER_LENGTH + CHECKSUM_LENGTH, maxLen);

I think this may be the reason.

Thanks for the suggestion. I've thought about this as well.

The problem is, based on the comments of the function LZ4Compressor.compress:

/**
   * Compresses <code>src[srcOff:srcOff+srcLen]</code> into
   * <code>dest[destOff:destOff+maxDestLen]</code> and returns the compressed
   * length.
   *
   * This method will throw a {@link LZ4Exception} if this compressor is unable
   * to compress the input into less than <code>maxDestLen</code> bytes. To
   * prevent this exception to be thrown, you should make sure that
   * <code>maxDestLen &gt;= maxCompressedLength(srcLen)</code>.
   *
   * @param src the source data
   * @param srcOff the start offset in src
   * @param srcLen the number of bytes to compress
   * @param dest the destination buffer
   * @param destOff the start offset in dest
   * @param maxDestLen the maximum number of bytes to write in dest
   * @throws LZ4Exception if maxDestLen is too small
   * @return the compressed size
   */
  public abstract int compress(byte[] src, int srcOff, int srcLen, byte[] dest, int destOff, int maxDestLen);

If the exception is caused by last parameter(aka. maxDestLen, in our case, it's compressedBuffer.length / maxLen), the reason should be it's not large enough.

In CompressedBuffedWriter.java, we can find byte[] compressedBuffer = new byte[maxLen + COMPRESSION_HEADER_LENGTH + CHECKSUM_LENGTH];, which means compressedBuffer.length >= maxLen.

So I think if compressedBuffer.length leads to the exception, then maxLen will too.

@sundy-li
Copy link
Member

sundy-li commented Oct 19, 2021

Yes, surely compressedBuffer.length > maxLen, I did not look into the lz4 codes inside.

From the comment: param maxDestLen the maximum number of bytes to write in dest,

but what if the maxDestLen is calculated from the destOff ?

Now we give a too large value to maxDestLen. So if you make it to a shorter one:maxLen, is this issue reproducible?

@sky9611
Copy link
Author

sky9611 commented Oct 19, 2021

Good point, let me give it a try

@sundy-li
Copy link
Member

sundy-li commented Oct 19, 2021

 * Compresses <code>src[srcOff:srcOff+srcLen]</code> into
   * <code>dest[destOff:destOff+maxDestLen]</code> and returns the compressed
   * length.

Oh, I think that's the reason: maxDestLen must be maxLen.

@sky9611
Copy link
Author

sky9611 commented Oct 20, 2021

Re-ran the pipeline for the whole night with the new-built Jar, the problem seems to persist.

In fact, I found in the code of the latest release, it used

int res = lz4Compressor.compress(writtenBuf, 0, position, compressedBuffer, 9 + 16);

Based on LZ4Compressor.java, it is an overloading method of compress:

public final int compress(byte[] src, int srcOff, int srcLen, byte[] dest, int destOff) {
    return compress(src, srcOff, srcLen, dest, destOff, dest.length - destOff);
}

So it's same as

int res = lz4Compressor.compress(writtenBuf, 0, position, compressedBuffer, 9 + 16, compressedBuffer.length - (9 + 16));

Since byte[] compressedBuffer = new byte[maxLen + 9 + 16]; , so Yes, maxDestLen must be maxLen. The code in master branch may need to be modified.

But still, we didn't find the root cause of "Array index out of range: 1048576"

@sky9611
Copy link
Author

sky9611 commented Oct 20, 2021

I may find the reason.

In the code of realising compress in LZ4JavaSafeCompressor.java:

  public final int compress(byte[] src, int srcOff, int srcLen, byte[] dest,
      int destOff, int maxDestLen) {
    checkRange(src, srcOff, srcLen);
    checkRange(dest, destOff, maxDestLen);
...

Considering

  public static void checkRange(byte[] buf, int off) {
    if (off < 0 || off >= buf.length) {
      throw new ArrayIndexOutOfBoundsException(off);
    }
  }

  public static void checkRange(byte[] buf, int off, int len) {
    checkLength(len);
    if (len > 0) {
      checkRange(buf, off);
      checkRange(buf, off + len - 1);
    }
  }

So this problem could also be caused by the array writtenBuf if it's not long enough, which is initiated by this.writtenBuf = new byte[capacity]; in the constructor of class CompressedBuffedWriter

The variable capacity is a parameter of the constructor and in BinarySerializer.java:

public BinarySerializer(BuffedWriter writer, boolean enableCompress) {
    this.enableCompress = enableCompress;
    BuffedWriter compressBuffer = null;
    if (enableCompress) {
        compressBuffer = new CompressedBuffedWriter(ClickHouseDefines.SOCKET_SEND_BUFFER_BYTES, writer);
    }
    either = new Either<>(writer, compressBuffer);
}

The CompressedBuffedWriter is initialized with the value ClickHouseDefines.SOCKET_SEND_BUFFER_BYTES, which is defined in ClickHouseDefines.java as 1024 * 1024, and that's just 1048576

I'll try again with a bigger value for SOCKET_SEND_BUFFER_BYTES

@sundy-li
Copy link
Member

sundy-li commented Oct 20, 2021

Seems the position is 1048577, but I did not find any reason to see why.

 int res = lz4Compressor.compress(writtenBuf, 0, position, compressedBuffer, 9 + 16);

You can modify the code, print logs if the position is larger than 1048576.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants