Skip to content

Commit 449789a

Browse files
author
Ayman Khalil
authored
Allow optional UDT attributes (#93)
* Allow optional UDT attributes * Increase CI action timeout to 90 min
1 parent d78e351 commit 449789a

File tree

3 files changed

+34
-7
lines changed

3 files changed

+34
-7
lines changed

.github/workflows/ci.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ on:
1717
jobs:
1818
build:
1919
runs-on: ubuntu-latest
20-
timeout-minutes: 60
20+
timeout-minutes: 90
2121
name: Build
2222
steps:
2323
- uses: actions/checkout@v2
@@ -33,7 +33,7 @@ jobs:
3333
needs: build
3434
name: Test
3535
runs-on: ubuntu-latest
36-
timeout-minutes: 60
36+
timeout-minutes: 90
3737
strategy:
3838
fail-fast: false
3939
matrix:

connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ Schema dataTypeSchema(KeyspaceMetadata ksm, DataType dataType) {
170170
case ProtocolConstants.DataType.TIME:
171171
return CqlLogicalTypes.timeMicrosType;
172172
case ProtocolConstants.DataType.UDT:
173-
return buildUDTSchema(ksm, dataType.asCql(false, true), false);
173+
return buildUDTSchema(ksm, dataType.asCql(false, true), true);
174174
case ProtocolConstants.DataType.UUID:
175175
case ProtocolConstants.DataType.TIMEUUID:
176176
return CqlLogicalTypes.uuidType;

connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -455,15 +455,19 @@ public void testSchema(String ksName) throws InterruptedException, IOException {
455455
dataSpecMap.get("map").cqlValue
456456
);
457457

458+
// force udt values to be null by populating 1 item, using zudt.newValue() without explicitly setting
459+
// any field to non-null value will cause the udt column itself to be null in the C* table
460+
UdtValue zudtOptionalValues = zudt.newValue(dataSpecMap.get("text").cqlValue);
461+
458462
cqlSession.execute("CREATE TABLE IF NOT EXISTS " + ksName + ".table3 (" +
459463
"xtext text, xascii ascii, xboolean boolean, xblob blob, xtimestamp timestamp, xtime time, xdate date, xuuid uuid, xtimeuuid timeuuid, xtinyint tinyint, xsmallint smallint, xint int, xbigint bigint, xvarint varint, xdecimal decimal, xdouble double, xfloat float, xinet4 inet, xinet6 inet, " +
460-
"ytext text, yascii ascii, yboolean boolean, yblob blob, ytimestamp timestamp, ytime time, ydate date, yuuid uuid, ytimeuuid timeuuid, ytinyint tinyint, ysmallint smallint, yint int, ybigint bigint, yvarint varint, ydecimal decimal, ydouble double, yfloat float, yinet4 inet, yinet6 inet, yduration duration, yudt zudt, ylist list<text>, yset set<int>, ymap map<text, double>, ylistofmap list<frozen<map<text,double>>>, ysetofudt set<frozen<zudt>>," +
464+
"ytext text, yascii ascii, yboolean boolean, yblob blob, ytimestamp timestamp, ytime time, ydate date, yuuid uuid, ytimeuuid timeuuid, ytinyint tinyint, ysmallint smallint, yint int, ybigint bigint, yvarint varint, ydecimal decimal, ydouble double, yfloat float, yinet4 inet, yinet6 inet, yduration duration, yudt zudt, yudtoptional zudt, ylist list<text>, yset set<int>, ymap map<text, double>, ylistofmap list<frozen<map<text,double>>>, ysetofudt set<frozen<zudt>>," +
461465
"primary key (xtext, xascii, xboolean, xblob, xtimestamp, xtime, xdate, xuuid, xtimeuuid, xtinyint, xsmallint, xint, xbigint, xvarint, xdecimal, xdouble, xfloat, xinet4, xinet6)) " +
462466
"WITH CLUSTERING ORDER BY (xascii ASC, xboolean DESC, xblob ASC, xtimestamp DESC, xtime DESC, xdate ASC, xuuid DESC, xtimeuuid ASC, xtinyint DESC, xsmallint ASC, xint DESC, xbigint ASC, xvarint DESC, xdecimal ASC, xdouble DESC, xfloat ASC, xinet4 ASC, xinet6 DESC) AND cdc=true");
463467
cqlSession.execute("INSERT INTO " + ksName + ".table3 (" +
464468
"xtext, xascii, xboolean, xblob, xtimestamp, xtime, xdate, xuuid, xtimeuuid, xtinyint, xsmallint, xint, xbigint, xvarint, xdecimal, xdouble, xfloat, xinet4, xinet6, " +
465-
"ytext, yascii, yboolean, yblob, ytimestamp, ytime, ydate, yuuid, ytimeuuid, ytinyint, ysmallint, yint, ybigint, yvarint, ydecimal, ydouble, yfloat, yinet4, yinet6, yduration, yudt, ylist, yset, ymap, ylistofmap, ysetofudt" +
466-
") VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?, ?,?,?,?,?)",
469+
"ytext, yascii, yboolean, yblob, ytimestamp, ytime, ydate, yuuid, ytimeuuid, ytinyint, ysmallint, yint, ybigint, yvarint, ydecimal, ydouble, yfloat, yinet4, yinet6, yduration, yudt, yudtoptional, ylist, yset, ymap, ylistofmap, ysetofudt" +
470+
") VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?, ?,?,?,?,?)",
467471
dataSpecMap.get("text").cqlValue,
468472
dataSpecMap.get("ascii").cqlValue,
469473
dataSpecMap.get("boolean").cqlValue,
@@ -506,6 +510,7 @@ public void testSchema(String ksName) throws InterruptedException, IOException {
506510

507511
dataSpecMap.get("duration").cqlValue,
508512
zudtValue,
513+
zudtOptionalValues,
509514

510515
dataSpecMap.get("list").cqlValue,
511516
dataSpecMap.get("set").cqlValue,
@@ -615,7 +620,7 @@ void assertGenericArray(String field, GenericArray ga) {
615620

616621
void assertField(String fieldName, Object value) {
617622
String vKey = fieldName.substring(1);
618-
if (!vKey.equals("udt") && ! vKey.equals("setofudt")) {
623+
if (!vKey.equals("udt") && !vKey.equals("udtoptional") && ! vKey.equals("setofudt")) {
619624
Assert.assertTrue("Unknown field " + vKey, dataSpecMap.containsKey(vKey));
620625
}
621626
if (value instanceof GenericRecord) {
@@ -672,6 +677,17 @@ void assertGenericRecords(String field, GenericRecord gr) {
672677
}
673678
}
674679
return;
680+
case "udtoptional": {
681+
for (Field f : gr.getFields()) {
682+
if (f.getName().equals("ztext")){
683+
assertField(f.getName(), gr.getField(f.getName()));
684+
}
685+
else {
686+
assertNull(gr.getField(f.getName()));
687+
}
688+
}
689+
}
690+
return;
675691
}
676692
Assert.assertTrue("Unexpected field="+field, false);
677693
}
@@ -781,6 +797,17 @@ void assertJsonNode(String field, JsonNode node) {
781797
}
782798
}
783799
return;
800+
case "udtoptional": {
801+
for (Iterator<Map.Entry<String, JsonNode>> it = node.fields(); it.hasNext(); ) {
802+
Map.Entry<String, JsonNode> f = it.next();
803+
if (f.getKey().equals("ztext")) {
804+
assertField(f.getKey(), f.getValue());
805+
} else {
806+
assertNull(f.getValue());
807+
}
808+
}
809+
}
810+
return;
784811
}
785812
Assert.assertTrue("Unexpected field="+field, false);
786813
}

0 commit comments

Comments
 (0)