From 24583a643d90a0b7b15a6af69209d7d3bb3e450b Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Mon, 13 Nov 2023 22:26:52 +0800 Subject: [PATCH] [3.0][cdc-common] Introduce more data structures and generic implements --- .../ververica/cdc/common/data/ArrayData.java | 242 ++++++++++++ .../cdc/common/data/DecimalData.java | 233 ++++++++++++ .../cdc/common/data/GenericArrayData.java | 343 ++++++++++++++++++ .../cdc/common/data/GenericMapData.java | 132 +++++++ .../{event => data}/GenericRecordData.java | 77 +++- .../cdc/common/data/GenericStringData.java | 78 ++++ .../common/data/LocalZonedTimestampData.java | 180 +++++++++ .../ververica/cdc/common/data/MapData.java | 51 +++ .../ververica/cdc/common/data/RecordData.java | 155 ++++++++ .../ververica/cdc/common/data/StringData.java | 37 ++ .../cdc/common/data/TimestampData.java | 150 ++++++++ .../cdc/common/data/ZonedTimestampData.java | 165 +++++++++ .../cdc/common/event/DataChangeEvent.java | 2 + .../cdc/common/event/RecordData.java | 63 ---- .../ververica/cdc/common/schema/Schema.java | 4 +- .../cdc/common/types/DataTypeChecks.java | 204 ++++++++++- .../ververica/cdc/common/types/DataTypes.java | 76 +++- .../ververica/cdc/common/types/RowType.java | 12 +- .../cdc/common/utils/StringUtf8Utils.java | 265 ++++++++++++++ .../cdc/common/utils/TypeCheckUtils.java | 3 +- 20 files changed, 2381 insertions(+), 91 deletions(-) create mode 100644 flink-cdc-common/src/main/java/com/ververica/cdc/common/data/ArrayData.java create mode 100644 flink-cdc-common/src/main/java/com/ververica/cdc/common/data/DecimalData.java create mode 100644 flink-cdc-common/src/main/java/com/ververica/cdc/common/data/GenericArrayData.java create mode 100644 flink-cdc-common/src/main/java/com/ververica/cdc/common/data/GenericMapData.java rename flink-cdc-common/src/main/java/com/ververica/cdc/common/{event => data}/GenericRecordData.java (66%) create mode 100644 flink-cdc-common/src/main/java/com/ververica/cdc/common/data/GenericStringData.java create mode 100644 flink-cdc-common/src/main/java/com/ververica/cdc/common/data/LocalZonedTimestampData.java create mode 100644 flink-cdc-common/src/main/java/com/ververica/cdc/common/data/MapData.java create mode 100644 flink-cdc-common/src/main/java/com/ververica/cdc/common/data/RecordData.java create mode 100644 flink-cdc-common/src/main/java/com/ververica/cdc/common/data/StringData.java create mode 100644 flink-cdc-common/src/main/java/com/ververica/cdc/common/data/TimestampData.java create mode 100644 flink-cdc-common/src/main/java/com/ververica/cdc/common/data/ZonedTimestampData.java delete mode 100644 flink-cdc-common/src/main/java/com/ververica/cdc/common/event/RecordData.java create mode 100644 flink-cdc-common/src/main/java/com/ververica/cdc/common/utils/StringUtf8Utils.java diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/ArrayData.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/ArrayData.java new file mode 100644 index 00000000000..b62b68d4e74 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/ArrayData.java @@ -0,0 +1,242 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.data; + +import org.apache.flink.annotation.PublicEvolving; + +import com.ververica.cdc.common.types.ArrayType; +import com.ververica.cdc.common.types.DataType; + +import javax.annotation.Nullable; + +import java.io.Serializable; + +import static com.ververica.cdc.common.types.DataTypeChecks.getFieldCount; +import static com.ververica.cdc.common.types.DataTypeChecks.getPrecision; +import static com.ververica.cdc.common.types.DataTypeChecks.getScale; + +/** + * Base interface of an internal data structure representing data of {@link ArrayType}. + * + *

Note: All elements of this data structure must be internal data structures and must be of the + * same type. See {@link RecordData} for more information about internal data structures. + * + *

Use {@link GenericArrayData} to construct instances of this interface from regular Java + * arrays. + */ +@PublicEvolving +public interface ArrayData { + + /** Returns the number of elements in this array. */ + int size(); + + // ------------------------------------------------------------------------------------------ + // Read-only accessor methods + // ------------------------------------------------------------------------------------------ + + /** Returns true if the element is null at the given position. */ + boolean isNullAt(int pos); + + /** Returns the boolean value at the given position. */ + boolean getBoolean(int pos); + + /** Returns the byte value at the given position. */ + byte getByte(int pos); + + /** Returns the short value at the given position. */ + short getShort(int pos); + + /** Returns the integer value at the given position. */ + int getInt(int pos); + + /** Returns the long value at the given position. */ + long getLong(int pos); + + /** Returns the float value at the given position. */ + float getFloat(int pos); + + /** Returns the double value at the given position. */ + double getDouble(int pos); + + /** Returns the string value at the given position. */ + StringData getString(int pos); + + /** + * Returns the decimal value at the given position. + * + *

The precision and scale are required to determine whether the decimal value was stored in + * a compact representation (see {@link DecimalData}). + */ + DecimalData getDecimal(int pos, int precision, int scale); + + /** + * Returns the timestamp value at the given position. + * + *

The precision is required to determine whether the timestamp value was stored in a compact + * representation (see {@link TimestampData}). + */ + TimestampData getTimestamp(int pos, int precision); + + /** + * Returns the local zoned timestamp value at the given position. + * + *

The precision is required to determine whether the timestamp value was stored in a compact + * representation (see {@link LocalZonedTimestampData}). + */ + LocalZonedTimestampData getLocalZonedTimestamp(int pos, int precision); + + /** + * Returns the zoned timestamp value at the given position. + * + *

The precision is required to determine whether the timestamp value was stored in a compact + * representation (see {@link ZonedTimestampData}). + */ + ZonedTimestampData getZonedTimestamp(int pos, int precision); + + /** Returns the binary value at the given position. */ + byte[] getBinary(int pos); + + /** Returns the array value at the given position. */ + ArrayData getArray(int pos); + + /** Returns the map value at the given position. */ + MapData getMap(int pos); + + /** + * Returns the record value at the given position. + * + *

The number of fields is required to correctly extract the row. + */ + RecordData getRecord(int pos, int numFields); + + // ------------------------------------------------------------------------------------------ + // Conversion Utilities + // ------------------------------------------------------------------------------------------ + + boolean[] toBooleanArray(); + + byte[] toByteArray(); + + short[] toShortArray(); + + int[] toIntArray(); + + long[] toLongArray(); + + float[] toFloatArray(); + + double[] toDoubleArray(); + + // ------------------------------------------------------------------------------------------ + // Access Utilities + // ------------------------------------------------------------------------------------------ + + /** + * Creates an accessor for getting elements in an internal array data structure at the given + * position. + * + * @param elementType the element type of the array + */ + static ElementGetter createElementGetter(DataType elementType) { + final ElementGetter elementGetter; + // ordered by type root definition + switch (elementType.getTypeRoot()) { + case CHAR: + case VARCHAR: + elementGetter = ArrayData::getString; + break; + case BOOLEAN: + elementGetter = ArrayData::getBoolean; + break; + case BINARY: + case VARBINARY: + elementGetter = ArrayData::getBinary; + break; + case DECIMAL: + final int decimalPrecision = getPrecision(elementType); + final int decimalScale = getScale(elementType); + elementGetter = + (array, pos) -> array.getDecimal(pos, decimalPrecision, decimalScale); + break; + case TINYINT: + elementGetter = ArrayData::getByte; + break; + case SMALLINT: + elementGetter = ArrayData::getShort; + break; + case INTEGER: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + elementGetter = ArrayData::getInt; + break; + case BIGINT: + elementGetter = ArrayData::getLong; + break; + case FLOAT: + elementGetter = ArrayData::getFloat; + break; + case DOUBLE: + elementGetter = ArrayData::getDouble; + break; + case TIMESTAMP_WITHOUT_TIME_ZONE: + final int timestampPrecision = getPrecision(elementType); + elementGetter = (array, pos) -> array.getTimestamp(pos, timestampPrecision); + break; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final int timestampLtzPrecision = getPrecision(elementType); + elementGetter = + (array, pos) -> array.getLocalZonedTimestamp(pos, timestampLtzPrecision); + break; + case TIMESTAMP_WITH_TIME_ZONE: + final int timestampTzPrecision = getPrecision(elementType); + elementGetter = (array, pos) -> array.getZonedTimestamp(pos, timestampTzPrecision); + break; + case ARRAY: + elementGetter = ArrayData::getArray; + break; + case MAP: + elementGetter = ArrayData::getMap; + break; + case ROW: + final int rowFieldCount = getFieldCount(elementType); + elementGetter = (array, pos) -> array.getRecord(pos, rowFieldCount); + break; + default: + throw new IllegalArgumentException(); + } + if (!elementType.isNullable()) { + return elementGetter; + } + return (array, pos) -> { + if (array.isNullAt(pos)) { + return null; + } + return elementGetter.getElementOrNull(array, pos); + }; + } + + /** + * Accessor for getting the elements of an array during runtime. + * + * @see #createElementGetter(DataType) + */ + @PublicEvolving + interface ElementGetter extends Serializable { + @Nullable + Object getElementOrNull(ArrayData array, int pos); + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/DecimalData.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/DecimalData.java new file mode 100644 index 00000000000..f9cb8c0bb43 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/DecimalData.java @@ -0,0 +1,233 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.data; + +import org.apache.flink.annotation.PublicEvolving; + +import com.ververica.cdc.common.types.DecimalType; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.math.RoundingMode; + +import static com.ververica.cdc.common.utils.Preconditions.checkArgument; + +/** + * An internal data structure representing data of {@link DecimalType}. + * + *

This data structure is immutable and might store decimal values in a compact representation + * (as a long value) if values are small enough. + */ +@PublicEvolving +public final class DecimalData implements Comparable { + + static final int MAX_COMPACT_PRECISION = 18; + + /** Maximum number of decimal digits an Int can represent. (1e9 < Int.MaxValue < 1e10) */ + static final int MAX_INT_DIGITS = 9; + + /** Maximum number of decimal digits a Long can represent. (1e18 < Long.MaxValue < 1e19) */ + static final int MAX_LONG_DIGITS = 18; + + static final long[] POW10 = new long[MAX_COMPACT_PRECISION + 1]; + + static { + POW10[0] = 1; + for (int i = 1; i < POW10.length; i++) { + POW10[i] = 10 * POW10[i - 1]; + } + } + + // The semantics of the fields are as follows: + // - `precision` and `scale` represent the precision and scale of SQL decimal type + // - If `decimalVal` is set, it represents the whole decimal value + // - Otherwise, the decimal value is longVal/(10^scale). + // + // Note that the (precision, scale) must be correct. + // if precision > MAX_COMPACT_PRECISION, + // `decimalVal` represents the value. `longVal` is undefined + // otherwise, (longVal, scale) represents the value + // `decimalVal` may be set and cached + + final int precision; + final int scale; + + final long longVal; + BigDecimal decimalVal; + + // this constructor does not perform any sanity check. + DecimalData(int precision, int scale, long longVal, BigDecimal decimalVal) { + this.precision = precision; + this.scale = scale; + this.longVal = longVal; + this.decimalVal = decimalVal; + } + + // ------------------------------------------------------------------------------------------ + // Public Interfaces + // ------------------------------------------------------------------------------------------ + + /** + * Returns the precision of this {@link DecimalData}. + * + *

The precision is the number of digits in the unscaled value. + */ + public int precision() { + return precision; + } + + /** Returns the scale of this {@link DecimalData}. */ + public int scale() { + return scale; + } + + /** Converts this {@link DecimalData} into an instance of {@link BigDecimal}. */ + public BigDecimal toBigDecimal() { + BigDecimal bd = decimalVal; + if (bd == null) { + decimalVal = bd = BigDecimal.valueOf(longVal, scale); + } + return bd; + } + + /** + * Returns a long describing the unscaled value of this {@link DecimalData}. + * + * @throws ArithmeticException if this {@link DecimalData} does not exactly fit in a long. + */ + public long toUnscaledLong() { + if (isCompact()) { + return longVal; + } else { + return toBigDecimal().unscaledValue().longValueExact(); + } + } + + /** + * Returns a byte array describing the unscaled value of this {@link DecimalData}. + * + * @return the unscaled byte array of this {@link DecimalData}. + */ + public byte[] toUnscaledBytes() { + return toBigDecimal().unscaledValue().toByteArray(); + } + + /** Returns whether the decimal value is small enough to be stored in a long. */ + public boolean isCompact() { + return precision <= MAX_COMPACT_PRECISION; + } + + /** Returns a copy of this {@link DecimalData} object. */ + public DecimalData copy() { + return new DecimalData(precision, scale, longVal, decimalVal); + } + + @Override + public int hashCode() { + return toBigDecimal().hashCode(); + } + + @Override + public int compareTo(@Nonnull DecimalData that) { + if (this.isCompact() && that.isCompact() && this.scale == that.scale) { + return Long.compare(this.longVal, that.longVal); + } + return this.toBigDecimal().compareTo(that.toBigDecimal()); + } + + @Override + public boolean equals(final Object o) { + if (!(o instanceof DecimalData)) { + return false; + } + DecimalData that = (DecimalData) o; + return this.compareTo(that) == 0; + } + + @Override + public String toString() { + return toBigDecimal().toPlainString(); + } + + // ------------------------------------------------------------------------------------------ + // Constructor Utilities + // ------------------------------------------------------------------------------------------ + + /** + * Creates an instance of {@link DecimalData} from a {@link BigDecimal} and the given precision + * and scale. + * + *

The returned decimal value may be rounded to have the desired scale. The precision will be + * checked. If the precision overflows, null will be returned. + */ + public static @Nullable DecimalData fromBigDecimal(BigDecimal bd, int precision, int scale) { + bd = bd.setScale(scale, RoundingMode.HALF_UP); + if (bd.precision() > precision) { + return null; + } + + long longVal = -1; + if (precision <= MAX_COMPACT_PRECISION) { + longVal = bd.movePointRight(scale).longValueExact(); + } + return new DecimalData(precision, scale, longVal, bd); + } + + /** + * Creates an instance of {@link DecimalData} from an unscaled long value and the given + * precision and scale. + */ + public static DecimalData fromUnscaledLong(long unscaledLong, int precision, int scale) { + checkArgument(precision > 0 && precision <= MAX_LONG_DIGITS); + return new DecimalData(precision, scale, unscaledLong, null); + } + + /** + * Creates an instance of {@link DecimalData} from an unscaled byte array value and the given + * precision and scale. + */ + public static DecimalData fromUnscaledBytes(byte[] unscaledBytes, int precision, int scale) { + BigDecimal bd = new BigDecimal(new BigInteger(unscaledBytes), scale); + return fromBigDecimal(bd, precision, scale); + } + + /** + * Creates an instance of {@link DecimalData} for a zero value with the given precision and + * scale. + * + *

The precision will be checked. If the precision overflows, null will be returned. + */ + public static @Nullable DecimalData zero(int precision, int scale) { + if (precision <= MAX_COMPACT_PRECISION) { + return new DecimalData(precision, scale, 0, null); + } else { + return fromBigDecimal(BigDecimal.ZERO, precision, scale); + } + } + + // ------------------------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------------------------ + + /** Returns whether the decimal value is small enough to be stored in a long. */ + public static boolean isCompact(int precision) { + return precision <= MAX_COMPACT_PRECISION; + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/GenericArrayData.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/GenericArrayData.java new file mode 100644 index 00000000000..45518bca110 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/GenericArrayData.java @@ -0,0 +1,343 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.data; + +import org.apache.flink.annotation.PublicEvolving; + +import com.ververica.cdc.common.types.ArrayType; +import org.apache.commons.lang3.ArrayUtils; + +import java.util.Arrays; +import java.util.Objects; + +/** + * An internal data structure representing data of {@link ArrayType}. + * + *

Note: All elements of this data structure must be internal data structures and must be of the + * same type. See {@link RecordData} for more information about internal data structures. + * + *

{@link GenericArrayData} is a generic implementation of {@link ArrayData} which wraps regular + * Java arrays. + * + *

Every instance wraps a one-dimensional Java array. Non-primitive arrays can be used for + * representing element nullability. The Java array might be a primitive array such as {@code int[]} + * or an object array (i.e. instance of {@code Object[]}). Object arrays that contain boxed types + * (e.g. {@link Integer}) MUST be boxed arrays (i.e. {@code new Integer[]{1, 2, 3}}, not {@code new + * Object[]{1, 2, 3}}). For multidimensional arrays, an array of {@link GenericArrayData} MUST be + * passed. For example: + * + *

{@code
+ * // ARRAY < ARRAY < INT NOT NULL > >
+ * new GenericArrayData(
+ *   new GenericArrayData[]{
+ *     new GenericArrayData(new int[3]),
+ *     new GenericArrayData(new int[5])
+ *   }
+ * )
+ * }
+ */ +@PublicEvolving +public final class GenericArrayData implements ArrayData { + + private final Object array; + private final int size; + private final boolean isPrimitiveArray; + + /** + * Creates an instance of {@link GenericArrayData} using the given Java array. + * + *

Note: All elements of the array must be internal data structures. + */ + public GenericArrayData(Object[] array) { + this(array, array.length, false); + } + + public GenericArrayData(int[] primitiveArray) { + this(primitiveArray, primitiveArray.length, true); + } + + public GenericArrayData(long[] primitiveArray) { + this(primitiveArray, primitiveArray.length, true); + } + + public GenericArrayData(float[] primitiveArray) { + this(primitiveArray, primitiveArray.length, true); + } + + public GenericArrayData(double[] primitiveArray) { + this(primitiveArray, primitiveArray.length, true); + } + + public GenericArrayData(short[] primitiveArray) { + this(primitiveArray, primitiveArray.length, true); + } + + public GenericArrayData(byte[] primitiveArray) { + this(primitiveArray, primitiveArray.length, true); + } + + public GenericArrayData(boolean[] primitiveArray) { + this(primitiveArray, primitiveArray.length, true); + } + + private GenericArrayData(Object array, int size, boolean isPrimitiveArray) { + this.array = array; + this.size = size; + this.isPrimitiveArray = isPrimitiveArray; + } + + /** + * Returns true if this is a primitive array. + * + *

A primitive array is an array whose elements are of primitive type. + */ + public boolean isPrimitiveArray() { + return isPrimitiveArray; + } + + /** + * Converts this {@link GenericArrayData} into an array of Java {@link Object}. + * + *

The method will convert a primitive array into an object array. But it will not convert + * internal data structures into external data structures (e.g. {@link + * org.apache.flink.table.data.StringData} to {@link String}). + */ + public Object[] toObjectArray() { + if (isPrimitiveArray) { + Class arrayClass = array.getClass(); + if (int[].class.equals(arrayClass)) { + return ArrayUtils.toObject((int[]) array); + } else if (long[].class.equals(arrayClass)) { + return ArrayUtils.toObject((long[]) array); + } else if (float[].class.equals(arrayClass)) { + return ArrayUtils.toObject((float[]) array); + } else if (double[].class.equals(arrayClass)) { + return ArrayUtils.toObject((double[]) array); + } else if (short[].class.equals(arrayClass)) { + return ArrayUtils.toObject((short[]) array); + } else if (byte[].class.equals(arrayClass)) { + return ArrayUtils.toObject((byte[]) array); + } else if (boolean[].class.equals(arrayClass)) { + return ArrayUtils.toObject((boolean[]) array); + } + throw new RuntimeException("Unsupported primitive array: " + arrayClass); + } else { + return (Object[]) array; + } + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isNullAt(int pos) { + return !isPrimitiveArray && ((Object[]) array)[pos] == null; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GenericArrayData that = (GenericArrayData) o; + return size == that.size + && isPrimitiveArray == that.isPrimitiveArray + && Objects.deepEquals(array, that.array); + } + + @Override + public int hashCode() { + int result = Objects.hash(size, isPrimitiveArray); + result = 31 * result + Arrays.deepHashCode(new Object[] {array}); + return result; + } + + // ------------------------------------------------------------------------------------------ + // Read-only accessor methods + // ------------------------------------------------------------------------------------------ + + @Override + public boolean getBoolean(int pos) { + return isPrimitiveArray ? ((boolean[]) array)[pos] : (boolean) getObject(pos); + } + + @Override + public byte getByte(int pos) { + return isPrimitiveArray ? ((byte[]) array)[pos] : (byte) getObject(pos); + } + + @Override + public short getShort(int pos) { + return isPrimitiveArray ? ((short[]) array)[pos] : (short) getObject(pos); + } + + @Override + public int getInt(int pos) { + return isPrimitiveArray ? ((int[]) array)[pos] : (int) getObject(pos); + } + + @Override + public long getLong(int pos) { + return isPrimitiveArray ? ((long[]) array)[pos] : (long) getObject(pos); + } + + @Override + public float getFloat(int pos) { + return isPrimitiveArray ? ((float[]) array)[pos] : (float) getObject(pos); + } + + @Override + public double getDouble(int pos) { + return isPrimitiveArray ? ((double[]) array)[pos] : (double) getObject(pos); + } + + @Override + public byte[] getBinary(int pos) { + return (byte[]) getObject(pos); + } + + @Override + public StringData getString(int pos) { + return (StringData) getObject(pos); + } + + @Override + public DecimalData getDecimal(int pos, int precision, int scale) { + return (DecimalData) getObject(pos); + } + + @Override + public TimestampData getTimestamp(int pos, int precision) { + return (TimestampData) getObject(pos); + } + + @Override + public LocalZonedTimestampData getLocalZonedTimestamp(int pos, int precision) { + return (LocalZonedTimestampData) getObject(pos); + } + + @Override + public ZonedTimestampData getZonedTimestamp(int pos, int precision) { + return (ZonedTimestampData) getObject(pos); + } + + @Override + public RecordData getRecord(int pos, int numFields) { + return (RecordData) getObject(pos); + } + + @Override + public ArrayData getArray(int pos) { + return (ArrayData) getObject(pos); + } + + @Override + public MapData getMap(int pos) { + return (MapData) getObject(pos); + } + + private Object getObject(int pos) { + return ((Object[]) array)[pos]; + } + + // ------------------------------------------------------------------------------------------ + // Conversion Utilities + // ------------------------------------------------------------------------------------------ + + private boolean anyNull() { + for (Object element : (Object[]) array) { + if (element == null) { + return true; + } + } + return false; + } + + private void checkNoNull() { + if (anyNull()) { + throw new RuntimeException("Primitive array must not contain a null value."); + } + } + + @Override + public boolean[] toBooleanArray() { + if (isPrimitiveArray) { + return (boolean[]) array; + } + checkNoNull(); + return ArrayUtils.toPrimitive((Boolean[]) array); + } + + @Override + public byte[] toByteArray() { + if (isPrimitiveArray) { + return (byte[]) array; + } + checkNoNull(); + return ArrayUtils.toPrimitive((Byte[]) array); + } + + @Override + public short[] toShortArray() { + if (isPrimitiveArray) { + return (short[]) array; + } + checkNoNull(); + return ArrayUtils.toPrimitive((Short[]) array); + } + + @Override + public int[] toIntArray() { + if (isPrimitiveArray) { + return (int[]) array; + } + checkNoNull(); + return ArrayUtils.toPrimitive((Integer[]) array); + } + + @Override + public long[] toLongArray() { + if (isPrimitiveArray) { + return (long[]) array; + } + checkNoNull(); + return ArrayUtils.toPrimitive((Long[]) array); + } + + @Override + public float[] toFloatArray() { + if (isPrimitiveArray) { + return (float[]) array; + } + checkNoNull(); + return ArrayUtils.toPrimitive((Float[]) array); + } + + @Override + public double[] toDoubleArray() { + if (isPrimitiveArray) { + return (double[]) array; + } + checkNoNull(); + return ArrayUtils.toPrimitive((Double[]) array); + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/GenericMapData.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/GenericMapData.java new file mode 100644 index 00000000000..a3c54fa9a27 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/GenericMapData.java @@ -0,0 +1,132 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.data; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; + +import java.util.Map; +import java.util.Objects; + +/** + * An internal data structure representing data of {@link MapType} or {@link MultisetType}. + * + *

{@link GenericMapData} is a generic implementation of {@link + * org.apache.flink.table.data.MapData} which wraps regular Java maps. + * + *

Note: All keys and values of this data structure must be internal data structures. All keys + * must be of the same type; same for values. See {@link RowData} for more information about + * internal data structures. + * + *

Both keys and values can contain null for representing nullability. + */ +@PublicEvolving +public final class GenericMapData implements MapData { + + private final Map map; + + /** + * Creates an instance of {@link GenericMapData} using the given Java map. + * + *

Note: All keys and values of the map must be internal data structures. + */ + public GenericMapData(Map map) { + this.map = map; + } + + /** + * Returns the value to which the specified key is mapped, or {@code null} if this map contains + * no mapping for the key. The returned value is in internal data structure. + */ + public Object get(Object key) { + return map.get(key); + } + + @Override + public int size() { + return map.size(); + } + + @Override + public org.apache.flink.table.data.ArrayData keyArray() { + Object[] keys = map.keySet().toArray(); + return new org.apache.flink.table.data.GenericArrayData(keys); + } + + @Override + public ArrayData valueArray() { + Object[] values = map.values().toArray(); + return new GenericArrayData(values); + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (!(o instanceof GenericMapData)) { + return false; + } + // deepEquals for values of byte[] + return deepEquals(map, ((GenericMapData) o).map); + } + + private static boolean deepEquals(Map m1, Map m2) { + // copied from HashMap.equals but with deepEquals comparison + if (m1.size() != m2.size()) { + return false; + } + try { + for (Map.Entry e : m1.entrySet()) { + K key = e.getKey(); + V value = e.getValue(); + if (value == null) { + if (!(m2.get(key) == null && m2.containsKey(key))) { + return false; + } + } else { + if (!Objects.deepEquals(value, m2.get(key))) { + return false; + } + } + } + } catch (ClassCastException | NullPointerException unused) { + return false; + } + return true; + } + + @Override + public int hashCode() { + int result = 0; + for (Object key : map.keySet()) { + // only include key because values can contain byte[] + result += 31 * Objects.hashCode(key); + } + return result; + } + + @Override + public String toString() { + return map.toString(); + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/GenericRecordData.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/GenericRecordData.java similarity index 66% rename from flink-cdc-common/src/main/java/com/ververica/cdc/common/event/GenericRecordData.java rename to flink-cdc-common/src/main/java/com/ververica/cdc/common/data/GenericRecordData.java index 5f7e3d01a78..4f0ca0fbc2c 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/GenericRecordData.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/GenericRecordData.java @@ -14,16 +14,36 @@ * limitations under the License. */ -package com.ververica.cdc.common.event; +package com.ververica.cdc.common.data; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.util.StringUtils; +import com.ververica.cdc.common.event.OperationType; +import com.ververica.cdc.common.types.ArrayType; +import com.ververica.cdc.common.types.MapType; +import com.ververica.cdc.common.types.RowType; + +import java.io.Serializable; import java.util.Arrays; -/** Class {@code GenericRecordData} describes the data of changed record in the external system. */ +/** + * An internal data structure representing data of {@link RowType} and other (possibly nested) + * structured types such as {@link MapType}, {@link ArrayType}. + * + *

{@link GenericRecordData} is a generic implementation of {@link RecordData} which is backed by + * an array of Java {@link Object}. A {@link GenericRecordData} can have an arbitrary number of + * fields of different types. The fields in a row can be accessed by position (0-based) using either + * the generic {@link #getField(int)} or type-specific getters (such as {@link #getInt(int)}). A + * field can be updated by the generic {@link #setField(int, Object)}. + * + *

Note: All fields of this data structure must be internal data structures. See {@link + * RecordData} for more information about internal data structures. + * + *

The fields in {@link GenericRecordData} can be null for representing nullability. + */ @PublicEvolving -public final class GenericRecordData implements RecordData { +public final class GenericRecordData implements RecordData, Serializable { /** The array to store the actual internal format values. */ private final Object[] fields; @@ -113,13 +133,48 @@ public double getDouble(int pos) { } @Override - public String getString(int pos) { - return (String) this.fields[pos]; + public byte[] getBinary(int pos) { + return (byte[]) this.fields[pos]; } @Override - public byte[] getBinary(int pos) { - return (byte[]) this.fields[pos]; + public StringData getString(int pos) { + return (StringData) this.fields[pos]; + } + + @Override + public DecimalData getDecimal(int pos, int precision, int scale) { + return (DecimalData) this.fields[pos]; + } + + @Override + public TimestampData getTimestamp(int pos, int precision) { + return (TimestampData) this.fields[pos]; + } + + @Override + public ZonedTimestampData getZonedTimestamp(int pos, int precision) { + return (ZonedTimestampData) this.fields[pos]; + } + + @Override + public LocalZonedTimestampData getLocalZonedTimestampData(int pos, int precision) { + return (LocalZonedTimestampData) this.fields[pos]; + } + + @Override + public ArrayData getArray(int pos) { + return (ArrayData) this.fields[pos]; + } + + @Override + public MapData getMap(int pos) { + return (MapData) this.fields[pos]; + } + + @Override + public RecordData getRow(int pos, int numFields) { + return (RecordData) this.fields[pos]; } @Override @@ -153,15 +208,13 @@ public String toString() { return sb.toString(); } - // ---------------------------------------------------------------------------------------- - // Utilities - // ---------------------------------------------------------------------------------------- + // ------------------------------------------------------------------------------------------ + // Constructor Utilities + // ------------------------------------------------------------------------------------------ /** * Creates an instance of {@link GenericRecordData} with given field values. * - *

By default, the record describes a {@link OperationType#INSERT} in a changelog. - * *

Note: All fields of the record must be internal data structures. */ public static GenericRecordData of(Object... values) { diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/GenericStringData.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/GenericStringData.java new file mode 100644 index 00000000000..94ff5423600 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/GenericStringData.java @@ -0,0 +1,78 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.data; + +import org.apache.flink.annotation.PublicEvolving; + +import com.ververica.cdc.common.types.CharType; +import com.ververica.cdc.common.types.VarCharType; +import com.ververica.cdc.common.utils.StringUtf8Utils; + +import javax.annotation.Nonnull; + +import java.util.Objects; + +/** A internal data structure representing data of {@link VarCharType} and {@link CharType}. */ +@PublicEvolving +public final class GenericStringData implements StringData { + + private final String javaStr; + + private GenericStringData(String javaStr) { + this.javaStr = javaStr; + } + + @Override + public byte[] toBytes() { + if (javaStr == null) { + return null; + } else { + return StringUtf8Utils.encodeUTF8(javaStr); + } + } + + @Override + public int compareTo(@Nonnull StringData o) { + GenericStringData other = (GenericStringData) o; + return javaStr.compareTo(other.javaStr); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof GenericStringData)) { + return false; + } + GenericStringData that = (GenericStringData) o; + return Objects.equals(javaStr, that.javaStr); + } + + @Override + public int hashCode() { + return Objects.hash(javaStr); + } + + // ------------------------------------------------------------------------------------------ + // Constructor Utilities + // ------------------------------------------------------------------------------------------ + + public static GenericStringData fromString(String javaStr) { + return new GenericStringData(javaStr); + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/LocalZonedTimestampData.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/LocalZonedTimestampData.java new file mode 100644 index 00000000000..68497f43fae --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/LocalZonedTimestampData.java @@ -0,0 +1,180 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.data; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.util.Preconditions; + +import com.ververica.cdc.common.types.LocalZonedTimestampType; + +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; + +/** + * An internal data structure representing data of {@link LocalZonedTimestampType}. + * + *

This data structure is immutable and consists of a epoch milliseconds and epoch + * nanos-of-millisecond since epoch {@code 1970-01-01 00:00:00}. It might be stored in a compact + * representation (as a long value) if values are small enough. + */ +@PublicEvolving +public final class LocalZonedTimestampData implements Comparable { + + // the number of milliseconds in a day + private static final long MILLIS_PER_DAY = 86400000; // = 24 * 60 * 60 * 1000 + + // this field holds the epoch second and the milli-of-second + private final long epochMillisecond; + + // this field holds the epoch nano-of-millisecond + private final int epochNanoOfMillisecond; + + private LocalZonedTimestampData(long epochMillisecond, int epochNanoOfMillisecond) { + Preconditions.checkArgument( + epochNanoOfMillisecond >= 0 && epochNanoOfMillisecond <= 999_999); + this.epochMillisecond = epochMillisecond; + this.epochNanoOfMillisecond = epochNanoOfMillisecond; + } + + /** Returns the number of epoch milliseconds since epoch {@code 1970-01-01 00:00:00}. */ + public long getEpochMillisecond() { + return epochMillisecond; + } + + /** + * Returns the number of epoch nanoseconds (the nanoseconds within the milliseconds). + * + *

The value range is from 0 to 999,999. + */ + public int getEpochNanoOfMillisecond() { + return epochNanoOfMillisecond; + } + + /** Converts this {@link LocalZonedTimestampData} object to a {@link Instant}. */ + public Instant toInstant() { + long epochSecond = epochMillisecond / 1000; + int milliOfSecond = (int) (epochMillisecond % 1000); + if (milliOfSecond < 0) { + --epochSecond; + milliOfSecond += 1000; + } + long nanoAdjustment = milliOfSecond * 1_000_000 + epochNanoOfMillisecond; + return Instant.ofEpochSecond(epochSecond, nanoAdjustment); + } + + @Override + public int compareTo(LocalZonedTimestampData that) { + int cmp = Long.compare(this.epochMillisecond, that.epochMillisecond); + if (cmp == 0) { + cmp = this.epochNanoOfMillisecond - that.epochNanoOfMillisecond; + } + return cmp; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof LocalZonedTimestampData)) { + return false; + } + LocalZonedTimestampData that = (LocalZonedTimestampData) obj; + return this.epochMillisecond == that.epochMillisecond + && this.epochNanoOfMillisecond == that.epochNanoOfMillisecond; + } + + @Override + public String toString() { + return describeLocalZonedTimestampInUTC0().toString(); + } + + @Override + public int hashCode() { + int ret = (int) epochMillisecond ^ (int) (epochMillisecond >> 32); + return 31 * ret + epochNanoOfMillisecond; + } + + /** + * Describes this {@link LocalZonedTimestampData} object in {@link LocalDateTime} under UTC0 + * time zone. + */ + private LocalDateTime describeLocalZonedTimestampInUTC0() { + int date = (int) (epochMillisecond / MILLIS_PER_DAY); + int time = (int) (epochMillisecond % MILLIS_PER_DAY); + if (time < 0) { + --date; + time += MILLIS_PER_DAY; + } + long nanoOfDay = time * 1_000_000L + epochNanoOfMillisecond; + LocalDate localDate = LocalDate.ofEpochDay(date); + LocalTime localTime = LocalTime.ofNanoOfDay(nanoOfDay); + return LocalDateTime.of(localDate, localTime); + } + + // ------------------------------------------------------------------------------------------ + // Constructor Utilities + // ------------------------------------------------------------------------------------------ + + /** + * Creates an instance of {@link LocalZonedTimestampData} from epoch milliseconds. + * + *

The nanos-of-millisecond field will be set to zero. + * + * @param millisecond the number of epoch milliseconds since epoch {@code 1970-01-01 00:00:00}; + * a negative number is the number of epoch milliseconds before epoch {@code 1970-01-01 + * 00:00:00} + */ + public static LocalZonedTimestampData fromEpochMillis(long millisecond) { + return new LocalZonedTimestampData(millisecond, 0); + } + + /** + * Creates an instance of {@link LocalZonedTimestampData} from epoch milliseconds and a epoch + * nanos-of-millisecond. + * + * @param millisecond the number of epoch milliseconds since epoch {@code 1970-01-01 00:00:00}; + * a negative number is the number of epoch milliseconds before epoch {@code 1970-01-01 + * 00:00:00} + * @param epochNanoOfMillisecond the epoch nanoseconds within the millisecond, from 0 to 999,999 + */ + public static LocalZonedTimestampData fromEpochMillis( + long millisecond, int epochNanoOfMillisecond) { + return new LocalZonedTimestampData(millisecond, epochNanoOfMillisecond); + } + + /** + * Creates an instance of {@link LocalZonedTimestampData} from an instance of {@link Instant}. + * + * @param instant an instance of {@link Instant} + */ + public static LocalZonedTimestampData fromInstant(Instant instant) { + long epochSecond = instant.getEpochSecond(); + int nanoSecond = instant.getNano(); + + long millisecond = epochSecond * 1_000 + nanoSecond / 1_000_000; + int nanoOfMillisecond = nanoSecond % 1_000_000; + + return new LocalZonedTimestampData(millisecond, nanoOfMillisecond); + } + + /** + * Returns whether the timestamp data is small enough to be stored in a long of milliseconds. + */ + public static boolean isCompact(int precision) { + return precision <= 3; + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/MapData.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/MapData.java new file mode 100644 index 00000000000..a84f1bf1b06 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/MapData.java @@ -0,0 +1,51 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.data; + +import org.apache.flink.annotation.PublicEvolving; + +import com.ververica.cdc.common.types.MapType; + +/** + * Base interface of an internal data structure representing data of {@link MapType}. + * + *

Note: All keys and values of this data structure must be internal data structures. All keys + * must be of the same type; same for values. See {@link RecordData} for more information about + * internal data structures. + * + *

Use {@link GenericMapData} to construct instances of this interface from regular Java maps. + */ +@PublicEvolving +public interface MapData { + + /** Returns the number of key-value mappings in this map. */ + int size(); + + /** + * Returns an array view of the keys contained in this map. + * + *

A key-value pair has the same index in the key array and value array. + */ + ArrayData keyArray(); + + /** + * Returns an array view of the values contained in this map. + * + *

A key-value pair has the same index in the key array and value array. + */ + ArrayData valueArray(); +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/RecordData.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/RecordData.java new file mode 100644 index 00000000000..0da3dda9566 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/RecordData.java @@ -0,0 +1,155 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.data; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Class {@code RecordData} describes the data of changed record (i.e. row) in the external system. + * + *

The mappings from external SQL data types to the internal data structures are listed in the + * following table: + * + *

+ * +--------------------------------+-----------------------------------------+
+ * | SQL Data Types                 | Internal Data Structures                |
+ * +--------------------------------+-----------------------------------------+
+ * | BOOLEAN                        | boolean                                 |
+ * +--------------------------------+-----------------------------------------+
+ * | CHAR / VARCHAR / STRING        | {@link StringData}                    |
+ * +--------------------------------+-----------------------------------------+
+ * | BINARY / VARBINARY / BYTES     | byte[]                                  |
+ * +--------------------------------+-----------------------------------------+
+ * | DECIMAL                        | {@link DecimalData}                         |
+ * +--------------------------------+-----------------------------------------+
+ * | TINYINT                        | byte                                    |
+ * +--------------------------------+-----------------------------------------+
+ * | SMALLINT                       | short                                   |
+ * +--------------------------------+-----------------------------------------+
+ * | INT                            | int                                     |
+ * +--------------------------------+-----------------------------------------+
+ * | BIGINT                         | long                                    |
+ * +--------------------------------+-----------------------------------------+
+ * | FLOAT                          | float                                   |
+ * +--------------------------------+-----------------------------------------+
+ * | DOUBLE                         | double                                  |
+ * +--------------------------------+-----------------------------------------+
+ * | DATE                           | int (number of days since epoch)        |
+ * +--------------------------------+-----------------------------------------+
+ * | TIME                           | int (number of milliseconds of the day) |
+ * +--------------------------------+-----------------------------------------+
+ * | TIMESTAMP                      | {@link TimestampData}                   |
+ * +--------------------------------+-----------------------------------------+
+ * | TIMESTAMP WITH LOCAL TIME ZONE | {@link LocalZonedTimestampData}         |
+ * +--------------------------------+-----------------------------------------+
+ * | TIMESTAMP WITH TIME ZONE       | {@link ZonedTimestampData}              |
+ * +--------------------------------+-----------------------------------------+
+ * | ROW                            | {@link RecordData}                      |
+ * +--------------------------------+-----------------------------------------+
+ * | ARRAY                          | {@link ArrayData}                       |
+ * +--------------------------------+-----------------------------------------+
+ * | MAP                            | {@link MapData}                         |
+ * +--------------------------------+-----------------------------------------+
+ * 
+ * + *

Nullability is always handled by the container data structure. + */ +@PublicEvolving +public interface RecordData { + + /** Returns the number of fields in this record. */ + int getArity(); + + // ------------------------------------------------------------------------------------------ + // Read-only accessor methods + // ------------------------------------------------------------------------------------------ + + /** Returns true if the field is null at the given position. */ + boolean isNullAt(int pos); + + /** Returns the boolean value at the given position. */ + boolean getBoolean(int pos); + + /** Returns the byte value at the given position. */ + byte getByte(int pos); + + /** Returns the short value at the given position. */ + short getShort(int pos); + + /** Returns the integer value at the given position. */ + int getInt(int pos); + + /** Returns the long value at the given position. */ + long getLong(int pos); + + /** Returns the float value at the given position. */ + float getFloat(int pos); + + /** Returns the double value at the given position. */ + double getDouble(int pos); + + /** Returns the binary value at the given position. */ + byte[] getBinary(int pos); + + /** Returns the string value at the given position. */ + StringData getString(int pos); + + /** + * Returns the decimal value at the given position. + * + *

The precision and scale are required to determine whether the decimal value was stored in + * a compact representation (see {@link DecimalData}). + */ + DecimalData getDecimal(int pos, int precision, int scale); + + /** + * Returns the timestamp value at the given position. + * + *

The precision is required to determine whether the timestamp value was stored in a compact + * representation (see {@link TimestampData}). + */ + TimestampData getTimestamp(int pos, int precision); + + /** + * Returns the zoned timestamp value at the given position. + * + *

The precision is required to determine whether the zoned timestamp value was stored in a + * compact representation (see {@link ZonedTimestampData}). + */ + ZonedTimestampData getZonedTimestamp(int pos, int precision); + + /** + * Returns the local zoned timestamp value at the given position. + * + *

The precision is required to determine whether the local zoned timestamp value was stored + * in a compact representation (see {@link LocalZonedTimestampData}). + */ + LocalZonedTimestampData getLocalZonedTimestampData(int pos, int precision); + + /** Returns the array value at the given position. */ + ArrayData getArray(int pos); + + /** Returns the map value at the given position. */ + MapData getMap(int pos); + + /** + * Returns the row value at the given position. + * + *

The number of fields is required to correctly extract the record. + */ + RecordData getRow(int pos, int numFields); +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/StringData.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/StringData.java new file mode 100644 index 00000000000..ca70130c022 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/StringData.java @@ -0,0 +1,37 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.data; + +import org.apache.flink.annotation.PublicEvolving; + +import com.ververica.cdc.common.types.CharType; +import com.ververica.cdc.common.types.VarCharType; + +/** An internal data structure representing data of {@link CharType} and {@link VarCharType}. */ +@PublicEvolving +public interface StringData extends Comparable { + + /** + * Converts this {@link StringData} object to a UTF-8 byte array. + * + *

Note: The returned byte array may be reused. + */ + byte[] toBytes(); + + /** Converts this {@link StringData} object to a {@link String}. */ + String toString(); +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/TimestampData.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/TimestampData.java new file mode 100644 index 00000000000..cab2ec14de6 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/TimestampData.java @@ -0,0 +1,150 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.data; + +import org.apache.flink.annotation.PublicEvolving; + +import com.ververica.cdc.common.types.TimestampType; +import com.ververica.cdc.common.utils.Preconditions; + +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; + +/** + * An internal data structure representing data of {@link TimestampType}. + * + *

This data structure is immutable and consists of a milliseconds and nanos-of-millisecond since + * {@code 1970-01-01 00:00:00} of UTC+0. It might be stored in a compact representation (as a long + * value) if values are small enough. + */ +@PublicEvolving +public final class TimestampData implements Comparable { + + // the number of milliseconds in a day + private static final long MILLIS_PER_DAY = 86400000; // = 24 * 60 * 60 * 1000 + + // this field holds the integral second and the milli-of-second + private final long millisecond; + + // this field holds the nano-of-millisecond + private final int nanoOfMillisecond; + + private TimestampData(long millisecond, int nanoOfMillisecond) { + Preconditions.checkArgument(nanoOfMillisecond >= 0 && nanoOfMillisecond <= 999_999); + this.millisecond = millisecond; + this.nanoOfMillisecond = nanoOfMillisecond; + } + + /** Returns the number of milliseconds since {@code 1970-01-01 00:00:00}. */ + public long getMillisecond() { + return millisecond; + } + + /** + * Returns the number of nanoseconds (the nanoseconds within the milliseconds). + * + *

The value range is from 0 to 999,999. + */ + public int getNanoOfMillisecond() { + return nanoOfMillisecond; + } + + /** Converts this {@link TimestampData} object to a {@link Timestamp}. */ + public Timestamp toTimestamp() { + return Timestamp.valueOf(toLocalDateTime()); + } + + /** Converts this {@link TimestampData} object to a {@link LocalDateTime}. */ + public LocalDateTime toLocalDateTime() { + int date = (int) (millisecond / MILLIS_PER_DAY); + int time = (int) (millisecond % MILLIS_PER_DAY); + if (time < 0) { + --date; + time += MILLIS_PER_DAY; + } + long nanoOfDay = time * 1_000_000L + nanoOfMillisecond; + LocalDate localDate = LocalDate.ofEpochDay(date); + LocalTime localTime = LocalTime.ofNanoOfDay(nanoOfDay); + return LocalDateTime.of(localDate, localTime); + } + + @Override + public int compareTo(TimestampData that) { + int cmp = Long.compare(this.millisecond, that.millisecond); + if (cmp == 0) { + cmp = this.nanoOfMillisecond - that.nanoOfMillisecond; + } + return cmp; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof TimestampData)) { + return false; + } + TimestampData that = (TimestampData) obj; + return this.millisecond == that.millisecond + && this.nanoOfMillisecond == that.nanoOfMillisecond; + } + + @Override + public String toString() { + return toLocalDateTime().toString(); + } + + @Override + public int hashCode() { + int ret = (int) millisecond ^ (int) (millisecond >> 32); + return 31 * ret + nanoOfMillisecond; + } + + // ------------------------------------------------------------------------------------------ + // Constructor Utilities + // ------------------------------------------------------------------------------------------ + /** + * Creates an instance of {@link TimestampData} from an instance of {@link LocalDateTime}. + * + * @param dateTime an instance of {@link LocalDateTime} + */ + public static TimestampData fromLocalDateTime(LocalDateTime dateTime) { + long epochDay = dateTime.toLocalDate().toEpochDay(); + long nanoOfDay = dateTime.toLocalTime().toNanoOfDay(); + + long millisecond = epochDay * MILLIS_PER_DAY + nanoOfDay / 1_000_000; + int nanoOfMillisecond = (int) (nanoOfDay % 1_000_000); + + return new TimestampData(millisecond, nanoOfMillisecond); + } + + /** + * Creates an instance of {@link TimestampData} from an instance of {@link Timestamp}. + * + * @param timestamp an instance of {@link Timestamp} + */ + public static TimestampData fromTimestamp(Timestamp timestamp) { + return fromLocalDateTime(timestamp.toLocalDateTime()); + } + + /** + * Returns whether the timestamp data is small enough to be stored in a long of milliseconds. + */ + public static boolean isCompact(int precision) { + return precision <= 3; + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/ZonedTimestampData.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/ZonedTimestampData.java new file mode 100644 index 00000000000..316fcb75b12 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/data/ZonedTimestampData.java @@ -0,0 +1,165 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.data; + +import org.apache.flink.annotation.PublicEvolving; + +import com.ververica.cdc.common.types.ZonedTimestampType; + +import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Objects; + +/** + * An internal data structure representing data of {@link ZonedTimestampType}. It aims to converting + * various Java time representations into the date-time in a particular time zone. + * + *

The ISO date-time format is used by default, it includes the date, time (including fractional + * parts), and offset from UTC, such as '2011-12-03T10:15:30+01:00'. + */ +@PublicEvolving +public final class ZonedTimestampData implements Comparable { + + /** + * The ISO date-time format includes the date, time (including fractional parts), and offset + * from UTC, such as '2011-12-03T10:15:30.030431+01:00'. + */ + public static final DateTimeFormatter ISO_FORMATTER = DateTimeFormatter.ISO_OFFSET_DATE_TIME; + + private final ZonedDateTime zonedDateTime; + + private ZonedTimestampData(ZonedDateTime zonedDateTime) { + this.zonedDateTime = zonedDateTime; + } + + /** Returns the zoned date-time with time-zone. */ + public ZonedDateTime getZonedDateTime() { + return zonedDateTime; + } + + /** Converts this {@link ZonedTimestampData} object to a {@link Timestamp}. */ + public Timestamp toTimestamp() { + return Timestamp.from(zonedDateTime.toInstant()); + } + + /** Converts this {@link ZonedTimestampData} object to a {@link Instant}. */ + public Instant toInstant() { + return zonedDateTime.toInstant(); + } + + @Override + public int compareTo(ZonedTimestampData that) { + long epochMillisecond = this.toInstant().getEpochSecond(); + int epochNanoOfMillisecond = this.toInstant().getNano(); + long thatEpochMillisecond = that.toInstant().getEpochSecond(); + int thatEpochNanoOfMillisecond = that.toInstant().getNano(); + + int cmp = Long.compare(epochMillisecond, thatEpochMillisecond); + if (cmp == 0) { + cmp = epochNanoOfMillisecond - thatEpochNanoOfMillisecond; + } + return cmp; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof ZonedTimestampData)) { + return false; + } + ZonedTimestampData that = (ZonedTimestampData) obj; + return this.zonedDateTime.equals(that.zonedDateTime); + } + + @Override + public String toString() { + return zonedDateTime.format(ISO_FORMATTER); + } + + @Override + public int hashCode() { + return Objects.hash(zonedDateTime); + } + + // ------------------------------------------------------------------------------------------ + // Constructor Utilities + // ------------------------------------------------------------------------------------------ + + /** + * Creates an instance of {@link ZonedTimestampData} from an instance of {@link LocalDateTime}. + * + * @param zonedDateTime an instance of {@link ZonedDateTime} + */ + public static ZonedTimestampData fromZonedDateTime(ZonedDateTime zonedDateTime) { + return new ZonedTimestampData(zonedDateTime); + } + + /** + * Creates an instance of {@link ZonedTimestampData} from an instance of {@link LocalDateTime}. + * + * @param offsetDateTime an instance of {@link OffsetDateTime} + */ + public static ZonedTimestampData fromOffsetDateTime(OffsetDateTime offsetDateTime) { + return new ZonedTimestampData(offsetDateTime.toZonedDateTime()); + } + + /** + * Creates an instance of {@link ZonedTimestampData} from epoch milliseconds. + * + *

The epoch nanos-of-millisecond field will be set to zero. + * + * @param epochMillisecond the number of epoch milliseconds since epoch {@code 1970-01-01 + * 00:00:00}; a negative number is the number of epoch milliseconds before epoch {@code + * 1970-01-01 00:00:00} + */ + public static ZonedTimestampData fromEpochMillis(long epochMillisecond) { + return fromEpochMillis(epochMillisecond, 0); + } + + /** + * Creates an instance of {@link ZonedTimestampData} from epoch milliseconds and a epoch + * nanos-of-millisecond. + * + * @param epochMillisecond the number of epoch milliseconds since epoch {@code 1970-01-01 + * 00:00:00}; a negative number is the number of epoch milliseconds before epoch {@code + * 1970-01-01 00:00:00} + * @param epochNanoOfMillisecond the nanoseconds within the epoch millisecond, from 0 to 999,999 + */ + public static ZonedTimestampData fromEpochMillis( + long epochMillisecond, int epochNanoOfMillisecond) { + long epochSecond = epochMillisecond / 1000; + int milliOfSecond = (int) (epochMillisecond % 1000); + if (milliOfSecond < 0) { + --epochSecond; + milliOfSecond += 1000; + } + long nanoAdjustment = milliOfSecond * 1_000_000 + epochNanoOfMillisecond; + return fromInstant(Instant.ofEpochSecond(epochSecond, nanoAdjustment)); + } + + /** + * Creates an instance of {@link ZonedTimestampData} from an instance of {@link Instant}. + * + * @param instant an instance of {@link Instant} + */ + public static ZonedTimestampData fromInstant(Instant instant) { + return new ZonedTimestampData(ZonedDateTime.from(instant)); + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/DataChangeEvent.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/DataChangeEvent.java index 25fe5b6546b..abe8a46c846 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/DataChangeEvent.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/DataChangeEvent.java @@ -18,6 +18,8 @@ import org.apache.flink.annotation.PublicEvolving; +import com.ververica.cdc.common.data.RecordData; + import java.util.Map; /** diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/RecordData.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/RecordData.java deleted file mode 100644 index 5501e050d15..00000000000 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/RecordData.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright 2023 Ververica Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.ververica.cdc.common.event; - -import org.apache.flink.annotation.PublicEvolving; - -/** Class {@code RecordData} describes the data of changed record in the external system. */ -@PublicEvolving -public interface RecordData { - - /** Returns the number of fields in this row. */ - int getArity(); - - // ------------------------------------------------------------------------------------------ - // Read-only accessor methods - // ------------------------------------------------------------------------------------------ - - /** Returns true if the field is null at the given position. */ - boolean isNullAt(int pos); - - /** Returns the boolean value at the given position. */ - boolean getBoolean(int pos); - - /** Returns the byte value at the given position. */ - byte getByte(int pos); - - /** Returns the short value at the given position. */ - short getShort(int pos); - - /** Returns the integer value at the given position. */ - int getInt(int pos); - - /** Returns the long value at the given position. */ - long getLong(int pos); - - /** Returns the float value at the given position. */ - float getFloat(int pos); - - /** Returns the double value at the given position. */ - double getDouble(int pos); - - /** Returns the string value at the given position. */ - String getString(int pos); - - /** Returns the binary value at the given position. */ - byte[] getBinary(int pos); - - // TODO: add more methods for other types -} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/schema/Schema.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/schema/Schema.java index dbb077e54d7..58a51dc66b8 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/schema/Schema.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/schema/Schema.java @@ -39,8 +39,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; -import static com.ververica.cdc.common.types.DataTypes.ROW; - /** Schema of a table or data collection. */ @PublicEvolving public class Schema implements Serializable { @@ -125,7 +123,7 @@ public DataType toRowDataType() { final DataField[] fields = columns.stream().map(Schema::columnToField).toArray(DataField[]::new); // the row should never be null - return ROW(fields).notNull(); + return DataTypes.ROW(fields).notNull(); } // ----------------------------------------------------------------------------------- diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/types/DataTypeChecks.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/types/DataTypeChecks.java index 770c6c92db2..7dd066ca8ac 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/types/DataTypeChecks.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/types/DataTypeChecks.java @@ -16,6 +16,11 @@ package com.ververica.cdc.common.types; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.OptionalInt; + /** * Utilities for checking {@link DataType} and avoiding a lot of type casting and repetitive work. */ @@ -23,35 +28,220 @@ public final class DataTypeChecks { private static final LengthExtractor LENGTH_EXTRACTOR = new LengthExtractor(); + private static final PrecisionExtractor PRECISION_EXTRACTOR = new PrecisionExtractor(); + + private static final ScaleExtractor SCALE_EXTRACTOR = new ScaleExtractor(); + + private static final FieldCountExtractor FIELD_COUNT_EXTRACTOR = new FieldCountExtractor(); + + private static final FieldNamesExtractor FIELD_NAMES_EXTRACTOR = new FieldNamesExtractor(); + + private static final FieldTypesExtractor FIELD_TYPES_EXTRACTOR = new FieldTypesExtractor(); + + private static final NestedTypesExtractor NESTED_TYPES_EXTRACTOR = new NestedTypesExtractor(); + + /** + * Checks if the given type is a composite type. + * + *

Use {@link #getFieldCount(DataType)}, {@link #getFieldNames(DataType)}, {@link + * #getFieldTypes(DataType)} for unified handling of composite types. + * + * @return True if the type is composite type. + */ + public static boolean isCompositeType(DataType dataType) { + return dataType.getTypeRoot() == DataTypeRoot.ROW; + } + public static int getLength(DataType dataType) { return dataType.accept(LENGTH_EXTRACTOR); } + public static boolean hasLength(DataType dataType, int length) { + return getLength(dataType) == length; + } + + /** Returns the precision of all types that define a precision implicitly or explicitly. */ + public static int getPrecision(DataType dataType) { + return dataType.accept(PRECISION_EXTRACTOR); + } + + /** Checks the precision of a type that defines a precision implicitly or explicitly. */ + public static boolean hasPrecision(DataType dataType, int precision) { + return getPrecision(dataType) == precision; + } + + /** Returns the scale of all types that define a scale implicitly or explicitly. */ + public static int getScale(DataType dataType) { + return dataType.accept(SCALE_EXTRACTOR); + } + + /** Checks the scale of all types that define a scale implicitly or explicitly. */ + public static boolean hasScale(DataType dataType, int scale) { + return getScale(dataType) == scale; + } + + /** Returns the field count of row and structured types. Other types return 1. */ + public static int getFieldCount(DataType dataType) { + return dataType.accept(FIELD_COUNT_EXTRACTOR); + } + + /** Returns the field names of row and structured types. */ + public static List getFieldNames(DataType dataType) { + return dataType.accept(FIELD_NAMES_EXTRACTOR); + } + + /** Returns the field types of row and structured types. */ + public static List getFieldTypes(DataType dataType) { + return dataType.accept(FIELD_TYPES_EXTRACTOR); + } + + public static List getNestedTypes(DataType dataType) { + return dataType.accept(NESTED_TYPES_EXTRACTOR); + } + + /** + * Checks whether the given {@link DataType} has a well-defined string representation when + * calling {@link Object#toString()} on the internal data structure. The string representation + * would be similar in SQL or in a programming language. + * + *

Note: This method might not be necessary anymore, once we have implemented a utility that + * can convert any internal data structure to a well-defined string representation. + */ + public static boolean hasWellDefinedString(DataType dataType) { + switch (dataType.getTypeRoot()) { + case CHAR: + case VARCHAR: + case BOOLEAN: + case TINYINT: + case SMALLINT: + case INTEGER: + case BIGINT: + case FLOAT: + case DOUBLE: + return true; + default: + return false; + } + } + private DataTypeChecks() { // no instantiation } // -------------------------------------------------------------------------------------------- - /** Extracts an attribute of data types that define that attribute. */ + + /** Extracts an attribute of logical types that define that attribute. */ private static class Extractor extends DataTypeDefaultVisitor { @Override protected T defaultMethod(DataType dataType) { throw new IllegalArgumentException( String.format( - "Invalid use of extractor %s. Called on data type: %s", - this.getClass().getSimpleName(), dataType)); + "Invalid use of extractor %s. Called on logical type: %s", + this.getClass().getName(), dataType)); } } private static class LengthExtractor extends Extractor { @Override - public Integer visit(CharType charType) { - return charType.getLength(); + protected Integer defaultMethod(DataType dataType) { + OptionalInt length = DataTypes.getLength(dataType); + if (length.isPresent()) { + return length.getAsInt(); + } + throw new IllegalArgumentException( + String.format( + "Invalid use of extractor %s. Called on logical type: %s", + this.getClass().getName(), dataType)); + } + } + + private static class PrecisionExtractor extends Extractor { + + @Override + protected Integer defaultMethod(DataType dataType) { + OptionalInt precision = DataTypes.getPrecision(dataType); + if (precision.isPresent()) { + return precision.getAsInt(); + } + throw new IllegalArgumentException( + String.format( + "Invalid use of extractor %s. Called on logical type: %s", + this.getClass().getName(), dataType)); + } + } + + private static class ScaleExtractor extends Extractor { + + @Override + public Integer visit(DecimalType decimalType) { + return decimalType.getScale(); + } + + @Override + public Integer visit(TinyIntType tinyIntType) { + return 0; + } + + @Override + public Integer visit(SmallIntType smallIntType) { + return 0; + } + + @Override + public Integer visit(IntType intType) { + return 0; + } + + @Override + public Integer visit(BigIntType bigIntType) { + return 0; + } + } + + private static class FieldCountExtractor extends Extractor { + + @Override + public Integer visit(RowType rowType) { + return rowType.getFieldCount(); + } + + @Override + protected Integer defaultMethod(DataType dataType) { + return 1; + } + } + + private static class FieldNamesExtractor extends Extractor> { + + @Override + public List visit(RowType rowType) { + return rowType.getFieldNames(); + } + } + + private static class FieldTypesExtractor extends Extractor> { + + @Override + public List visit(RowType rowType) { + return rowType.getFieldTypes(); + } + } + + private static class NestedTypesExtractor extends Extractor> { + + @Override + public List visit(ArrayType arrayType) { + return Collections.singletonList(arrayType.getElementType()); + } + + @Override + public List visit(MapType mapType) { + return Arrays.asList(mapType.getKeyType(), mapType.getValueType()); } @Override - public Integer visit(BinaryType binaryType) { - return binaryType.getLength(); + public List visit(RowType rowType) { + return rowType.getFieldTypes(); } } } diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/types/DataTypes.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/types/DataTypes.java index 4302d839e19..108b17439e2 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/types/DataTypes.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/types/DataTypes.java @@ -19,6 +19,7 @@ import org.apache.flink.annotation.PublicEvolving; import java.util.Arrays; +import java.util.OptionalInt; /** * A {@link DataType} can be used to declare input and/or output types of operations. This class * @@ -413,9 +414,82 @@ public static RowType ROW(DataField... fields) { *

This is shortcut for {@link #ROW(DataField...)} where the field names will be generated * using {@code f0, f1, f2, ...}. * - *

Note: Flink CDC currently doesn't support defining nested row in columns. + *

Note: Flink CDC currently doesn't support defining nested record in columns. */ public static RowType ROW(DataType... fieldTypes) { return RowType.builder().fields(fieldTypes).build(); } + + public static OptionalInt getPrecision(DataType dataType) { + return dataType.accept(PRECISION_EXTRACTOR); + } + + public static OptionalInt getLength(DataType dataType) { + return dataType.accept(LENGTH_EXTRACTOR); + } + + private static final PrecisionExtractor PRECISION_EXTRACTOR = new PrecisionExtractor(); + + private static final LengthExtractor LENGTH_EXTRACTOR = new LengthExtractor(); + + private static class PrecisionExtractor extends DataTypeDefaultVisitor { + + @Override + public OptionalInt visit(DecimalType decimalType) { + return OptionalInt.of(decimalType.getPrecision()); + } + + @Override + public OptionalInt visit(TimeType timeType) { + return OptionalInt.of(timeType.getPrecision()); + } + + @Override + public OptionalInt visit(TimestampType timestampType) { + return OptionalInt.of(timestampType.getPrecision()); + } + + @Override + public OptionalInt visit(LocalZonedTimestampType localZonedTimestampType) { + return OptionalInt.of(localZonedTimestampType.getPrecision()); + } + + @Override + public OptionalInt visit(ZonedTimestampType zonedTimestampType) { + return OptionalInt.of(zonedTimestampType.getPrecision()); + } + + @Override + protected OptionalInt defaultMethod(DataType dataType) { + return OptionalInt.empty(); + } + } + + private static class LengthExtractor extends DataTypeDefaultVisitor { + + @Override + public OptionalInt visit(CharType charType) { + return OptionalInt.of(charType.getLength()); + } + + @Override + public OptionalInt visit(VarCharType varCharType) { + return OptionalInt.of(varCharType.getLength()); + } + + @Override + public OptionalInt visit(BinaryType binaryType) { + return OptionalInt.of(binaryType.getLength()); + } + + @Override + public OptionalInt visit(VarBinaryType varBinaryType) { + return OptionalInt.of(varBinaryType.getLength()); + } + + @Override + protected OptionalInt defaultMethod(DataType dataType) { + return OptionalInt.empty(); + } + } } diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/types/RowType.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/types/RowType.java index 09028b50d8d..a61d7532064 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/types/RowType.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/types/RowType.java @@ -30,10 +30,10 @@ /** * Data type of a sequence of fields. A field consists of a field name, field type, and an optional - * description. The most specific type of a row of a table is a row type. In this case, each column - * of the row corresponds to the field of the row type that has the same ordinal position as the - * column. Compared to the SQL standard, an optional field description simplifies the handling with - * complex structures. + * description. The most specific type of a record of a table is a record type. In this case, each + * column of the record corresponds to the field of the record type that has the same ordinal + * position as the column. Compared to the SQL standard, an optional field description simplifies + * the handling with complex structures. */ @PublicEvolving public final class RowType extends DataType { @@ -66,6 +66,10 @@ public List getFieldNames() { return fields.stream().map(DataField::getName).collect(Collectors.toList()); } + public List getFieldTypes() { + return fields.stream().map(DataField::getType).collect(Collectors.toList()); + } + public DataType getTypeAt(int i) { return fields.get(i).getType(); } diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/utils/StringUtf8Utils.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/utils/StringUtf8Utils.java new file mode 100644 index 00000000000..6a2424bc463 --- /dev/null +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/utils/StringUtf8Utils.java @@ -0,0 +1,265 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.common.utils; + +import org.apache.flink.annotation.Internal; + +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +/** Utilities for String UTF-8. */ +@Internal +public final class StringUtf8Utils { + + private static final int MAX_BYTES_PER_CHAR = 3; + private static final int MAX_BYTES_LENGTH = 1024 * 64; + private static final int MAX_CHARS_LENGTH = 1024 * 32; + private static final ThreadLocal BYTES_LOCAL = new ThreadLocal<>(); + private static final ThreadLocal CHARS_LOCAL = new ThreadLocal<>(); + + private StringUtf8Utils() { + // do not instantiate + } + + /** This method must have the same result with JDK's String.getBytes. */ + public static byte[] encodeUTF8(String str) { + byte[] bytes = allocateReuseBytes(str.length() * MAX_BYTES_PER_CHAR); + int len = encodeUTF8(str, bytes); + return Arrays.copyOf(bytes, len); + } + + public static int encodeUTF8(String str, byte[] bytes) { + int offset = 0; + int len = str.length(); + int sl = offset + len; + int dp = 0; + int dlASCII = dp + Math.min(len, bytes.length); + + // ASCII only optimized loop + while (dp < dlASCII && str.charAt(offset) < '\u0080') { + bytes[dp++] = (byte) str.charAt(offset++); + } + + while (offset < sl) { + char c = str.charAt(offset++); + if (c < 0x80) { + // Have at most seven bits + bytes[dp++] = (byte) c; + } else if (c < 0x800) { + // 2 bytes, 11 bits + bytes[dp++] = (byte) (0xc0 | (c >> 6)); + bytes[dp++] = (byte) (0x80 | (c & 0x3f)); + } else if (Character.isSurrogate(c)) { + final int uc; + int ip = offset - 1; + if (Character.isHighSurrogate(c)) { + if (sl - ip < 2) { + uc = -1; + } else { + char d = str.charAt(ip + 1); + if (Character.isLowSurrogate(d)) { + uc = Character.toCodePoint(c, d); + } else { + // for some illegal character + // the jdk will ignore the origin character and cast it to '?' + // this acts the same with jdk + return defaultEncodeUTF8(str, bytes); + } + } + } else { + if (Character.isLowSurrogate(c)) { + // for some illegal character + // the jdk will ignore the origin character and cast it to '?' + // this acts the same with jdk + return defaultEncodeUTF8(str, bytes); + } else { + uc = c; + } + } + + if (uc < 0) { + bytes[dp++] = (byte) '?'; + } else { + bytes[dp++] = (byte) (0xf0 | ((uc >> 18))); + bytes[dp++] = (byte) (0x80 | ((uc >> 12) & 0x3f)); + bytes[dp++] = (byte) (0x80 | ((uc >> 6) & 0x3f)); + bytes[dp++] = (byte) (0x80 | (uc & 0x3f)); + offset++; // 2 chars + } + } else { + // 3 bytes, 16 bits + bytes[dp++] = (byte) (0xe0 | ((c >> 12))); + bytes[dp++] = (byte) (0x80 | ((c >> 6) & 0x3f)); + bytes[dp++] = (byte) (0x80 | (c & 0x3f)); + } + } + return dp; + } + + public static int defaultEncodeUTF8(String str, byte[] bytes) { + try { + byte[] buffer = str.getBytes("UTF-8"); + System.arraycopy(buffer, 0, bytes, 0, buffer.length); + return buffer.length; + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("encodeUTF8 error", e); + } + } + + public static String decodeUTF8(byte[] input, int offset, int byteLen) { + char[] chars = allocateReuseChars(byteLen); + int len = decodeUTF8Strict(input, offset, byteLen, chars); + if (len < 0) { + return defaultDecodeUTF8(input, offset, byteLen); + } + return new String(chars, 0, len); + } + + public static int decodeUTF8Strict(byte[] sa, int sp, int len, char[] da) { + final int sl = sp + len; + int dp = 0; + int dlASCII = Math.min(len, da.length); + + // ASCII only optimized loop + while (dp < dlASCII && sa[sp] >= 0) { + da[dp++] = (char) sa[sp++]; + } + + while (sp < sl) { + int b1 = sa[sp++]; + if (b1 >= 0) { + // 1 byte, 7 bits: 0xxxxxxx + da[dp++] = (char) b1; + } else if ((b1 >> 5) == -2 && (b1 & 0x1e) != 0) { + // 2 bytes, 11 bits: 110xxxxx 10xxxxxx + if (sp < sl) { + int b2 = sa[sp++]; + if ((b2 & 0xc0) != 0x80) { // isNotContinuation(b2) + return -1; + } else { + da[dp++] = (char) (((b1 << 6) ^ b2) ^ (((byte) 0xC0 << 6) ^ ((byte) 0x80))); + } + continue; + } + return -1; + } else if ((b1 >> 4) == -2) { + // 3 bytes, 16 bits: 1110xxxx 10xxxxxx 10xxxxxx + if (sp + 1 < sl) { + int b2 = sa[sp++]; + int b3 = sa[sp++]; + if ((b1 == (byte) 0xe0 && (b2 & 0xe0) == 0x80) + || (b2 & 0xc0) != 0x80 + || (b3 & 0xc0) != 0x80) { // isMalformed3(b1, b2, b3) + return -1; + } else { + char c = + (char) + ((b1 << 12) + ^ (b2 << 6) + ^ (b3 + ^ (((byte) 0xE0 << 12) + ^ ((byte) 0x80 << 6) + ^ ((byte) 0x80)))); + if (Character.isSurrogate(c)) { + return -1; + } else { + da[dp++] = c; + } + } + continue; + } + return -1; + } else if ((b1 >> 3) == -2) { + // 4 bytes, 21 bits: 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx + if (sp + 2 < sl) { + int b2 = sa[sp++]; + int b3 = sa[sp++]; + int b4 = sa[sp++]; + int uc = + ((b1 << 18) + ^ (b2 << 12) + ^ (b3 << 6) + ^ (b4 + ^ (((byte) 0xF0 << 18) + ^ ((byte) 0x80 << 12) + ^ ((byte) 0x80 << 6) + ^ ((byte) 0x80)))); + // isMalformed4 and shortest form check + if (((b2 & 0xc0) != 0x80 || (b3 & 0xc0) != 0x80 || (b4 & 0xc0) != 0x80) + || !Character.isSupplementaryCodePoint(uc)) { + return -1; + } else { + da[dp++] = Character.highSurrogate(uc); + da[dp++] = Character.lowSurrogate(uc); + } + continue; + } + return -1; + } else { + return -1; + } + } + return dp; + } + + public static String defaultDecodeUTF8(byte[] bytes, int offset, int len) { + return new String(bytes, offset, len, StandardCharsets.UTF_8); + } + + /** + * Allocate bytes that is only for temporary usage, it should not be stored in somewhere else. + * Use a {@link ThreadLocal} to reuse bytes to avoid overhead of byte[] new and gc. + * + *

If there are methods that can only accept a byte[], instead of a MemorySegment[] + * parameter, we can allocate a reuse bytes and copy the MemorySegment data to byte[], then call + * the method. Such as String deserialization. + */ + public static byte[] allocateReuseBytes(int length) { + byte[] bytes = BYTES_LOCAL.get(); + + if (bytes == null) { + if (length <= MAX_BYTES_LENGTH) { + bytes = new byte[MAX_BYTES_LENGTH]; + BYTES_LOCAL.set(bytes); + } else { + bytes = new byte[length]; + } + } else if (bytes.length < length) { + bytes = new byte[length]; + } + + return bytes; + } + + public static char[] allocateReuseChars(int length) { + char[] chars = CHARS_LOCAL.get(); + + if (chars == null) { + if (length <= MAX_CHARS_LENGTH) { + chars = new char[MAX_CHARS_LENGTH]; + CHARS_LOCAL.set(chars); + } else { + chars = new char[length]; + } + } else if (chars.length < length) { + chars = new char[length]; + } + + return chars; + } +} diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/utils/TypeCheckUtils.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/utils/TypeCheckUtils.java index 3ee4751f980..d218bcec11a 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/utils/TypeCheckUtils.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/utils/TypeCheckUtils.java @@ -101,7 +101,8 @@ public static boolean isMutable(DataType type) { // ordered by type root definition switch (type.getTypeRoot()) { case CHAR: - case VARCHAR: // The internal representation of String is BinaryString which is mutable + case VARCHAR: // The internal representation of String is BinaryString which is + // mutable case ARRAY: case MAP: case ROW: