diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java index 0102b996a1a448b..fd3509333403879 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java @@ -140,7 +140,7 @@ public static BulkLoadJob fromLoadStmt(LoadStmt stmt) throws DdlException { case DELETE: case HADOOP: case INSERT: - throw new DdlException("LoadManager only support create broker load job from stmt."); + throw new DdlException("LoadManager only support create broker load job and spark from stmt."); default: throw new DdlException("Unknown load job type."); } @@ -410,6 +410,11 @@ public static BulkLoadJob fromInsertStmt(InsertStmt insertStmt) throws DdlExcept insertStmt.getLoadLabel().getLabelName(), (BrokerDesc) insertStmt.getResourceDesc(), insertStmt.getOrigStmt(), insertStmt.getUserInfo()); break; + case SPARK_LOAD: + bulkLoadJob = new SparkLoadJob(db.getId(), insertStmt.getLoadLabel().getLabelName(), + insertStmt.getResourceDesc(), + insertStmt.getOrigStmt(), insertStmt.getUserInfo()); + break; default: throw new DdlException("Unknown load job type."); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 764c871523056b2..85c4dde78154a67 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -410,7 +410,7 @@ public class GsonUtils { .registerSubtype(PaimonFileExternalCatalog.class, PaimonFileExternalCatalog.class.getSimpleName()) .registerSubtype(MaxComputeExternalCatalog.class, MaxComputeExternalCatalog.class.getSimpleName()) .registerSubtype( - TrinoConnectorExternalCatalog.class, TrinoConnectorExternalCatalog.class.getSimpleName()) + TrinoConnectorExternalCatalog.class, TrinoConnectorExternalCatalog.class.getSimpleName()) .registerSubtype(LakeSoulExternalCatalog.class, LakeSoulExternalCatalog.class.getSimpleName()) .registerSubtype(TestExternalCatalog.class, TestExternalCatalog.class.getSimpleName()); if (Config.isNotCloudMode()) { @@ -430,9 +430,9 @@ public class GsonUtils { .registerSubtype(KafkaDataSourceProperties.class, KafkaDataSourceProperties.class.getSimpleName()); private static RuntimeTypeAdapterFactory jobExecutorRuntimeTypeAdapterFactory - = RuntimeTypeAdapterFactory.of(org.apache.doris.job.base.AbstractJob.class, "clazz") - .registerSubtype(InsertJob.class, InsertJob.class.getSimpleName()) - .registerSubtype(MTMVJob.class, MTMVJob.class.getSimpleName()); + = RuntimeTypeAdapterFactory.of(org.apache.doris.job.base.AbstractJob.class, "clazz") + .registerSubtype(InsertJob.class, InsertJob.class.getSimpleName()) + .registerSubtype(MTMVJob.class, MTMVJob.class.getSimpleName()); private static RuntimeTypeAdapterFactory mtmvSnapshotTypeAdapterFactory = RuntimeTypeAdapterFactory.of(MTMVSnapshotIf.class, "clazz") @@ -562,25 +562,25 @@ public class GsonUtils { private static RuntimeTypeAdapterFactory jobBackupTypeAdapterFactory - = RuntimeTypeAdapterFactory.of(org.apache.doris.backup.AbstractJob.class, "clazz") - .registerSubtype(BackupJob.class, BackupJob.class.getSimpleName()) - .registerSubtype(RestoreJob.class, RestoreJob.class.getSimpleName()); + = RuntimeTypeAdapterFactory.of(org.apache.doris.backup.AbstractJob.class, "clazz") + .registerSubtype(BackupJob.class, BackupJob.class.getSimpleName()) + .registerSubtype(RestoreJob.class, RestoreJob.class.getSimpleName()); private static RuntimeTypeAdapterFactory loadJobTypeAdapterFactory - = RuntimeTypeAdapterFactory.of(LoadJob.class, "clazz") - .registerSubtype(BrokerLoadJob.class, BrokerLoadJob.class.getSimpleName()) - .registerSubtype(BulkLoadJob.class, BulkLoadJob.class.getSimpleName()) - .registerSubtype(CloudBrokerLoadJob.class, CloudBrokerLoadJob.class.getSimpleName()) - .registerSubtype(CopyJob.class, CopyJob.class.getSimpleName()) - .registerSubtype(InsertLoadJob.class, InsertLoadJob.class.getSimpleName()) - .registerSubtype(MiniLoadJob.class, MiniLoadJob.class.getSimpleName()) - .registerSubtype(SparkLoadJob.class, SparkLoadJob.class.getSimpleName()) - .registerSubtype(IngestionLoadJob.class, IngestionLoadJob.class.getSimpleName()); + = RuntimeTypeAdapterFactory.of(LoadJob.class, "clazz") + .registerSubtype(BrokerLoadJob.class, BrokerLoadJob.class.getSimpleName()) + .registerSubtype(BulkLoadJob.class, BulkLoadJob.class.getSimpleName()) + .registerSubtype(CloudBrokerLoadJob.class, CloudBrokerLoadJob.class.getSimpleName()) + .registerSubtype(CopyJob.class, CopyJob.class.getSimpleName()) + .registerSubtype(InsertLoadJob.class, InsertLoadJob.class.getSimpleName()) + .registerSubtype(MiniLoadJob.class, MiniLoadJob.class.getSimpleName()) + .registerSubtype(SparkLoadJob.class, SparkLoadJob.class.getSimpleName()) + .registerSubtype(IngestionLoadJob.class, IngestionLoadJob.class.getSimpleName()); private static RuntimeTypeAdapterFactory partitionItemTypeAdapterFactory - = RuntimeTypeAdapterFactory.of(PartitionItem.class, "clazz") - .registerSubtype(ListPartitionItem.class, ListPartitionItem.class.getSimpleName()) - .registerSubtype(RangePartitionItem.class, RangePartitionItem.class.getSimpleName()); + = RuntimeTypeAdapterFactory.of(PartitionItem.class, "clazz") + .registerSubtype(ListPartitionItem.class, ListPartitionItem.class.getSimpleName()) + .registerSubtype(RangePartitionItem.class, RangePartitionItem.class.getSimpleName()); // the builder of GSON instance. // Add any other adapters if necessary. @@ -893,7 +893,7 @@ private static class AtomicBooleanAdapter @Override public AtomicBoolean deserialize(JsonElement jsonElement, Type type, - JsonDeserializationContext jsonDeserializationContext) + JsonDeserializationContext jsonDeserializationContext) throws JsonParseException { JsonObject jsonObject = jsonElement.getAsJsonObject(); boolean value = jsonObject.get("boolean").getAsBoolean(); @@ -902,7 +902,7 @@ public AtomicBoolean deserialize(JsonElement jsonElement, Type type, @Override public JsonElement serialize(AtomicBoolean atomicBoolean, Type type, - JsonSerializationContext jsonSerializationContext) { + JsonSerializationContext jsonSerializationContext) { JsonObject jsonObject = new JsonObject(); jsonObject.addProperty("boolean", atomicBoolean.get()); return jsonObject; @@ -912,7 +912,7 @@ public JsonElement serialize(AtomicBoolean atomicBoolean, Type type, public static final class ImmutableMapDeserializer implements JsonDeserializer> { @Override public ImmutableMap deserialize(final JsonElement json, final Type type, - final JsonDeserializationContext context) throws JsonParseException { + final JsonDeserializationContext context) throws JsonParseException { final Type type2 = TypeUtils.parameterize(Map.class, ((ParameterizedType) type).getActualTypeArguments()); final Map map = context.deserialize(json, type2); return ImmutableMap.copyOf(map); @@ -922,7 +922,7 @@ public static final class ImmutableMapDeserializer implements JsonDeserializer> { @Override public ImmutableList deserialize(final JsonElement json, final Type type, - final JsonDeserializationContext context) throws JsonParseException { + final JsonDeserializationContext context) throws JsonParseException { final Type type2 = TypeUtils.parameterize(List.class, ((ParameterizedType) type).getActualTypeArguments()); final List list = context.deserialize(json, type2); return ImmutableList.copyOf(list);