From e1d1b339393adacabc32570d77597e23082e3d75 Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Tue, 12 Mar 2024 10:49:28 +0800 Subject: [PATCH] fix(interactive): use pk hash to generate eid if has pk (#3621) --- .../groot/common/config/FrontendConfig.java | 3 --- .../write/DefaultEdgeIdGenerator.java | 6 ++--- .../groot/frontend/write/GraphWriter.java | 23 +++++-------------- .../graphscope/groot/servers/Frontend.java | 7 +----- 4 files changed, 10 insertions(+), 29 deletions(-) diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/FrontendConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/FrontendConfig.java index ed2fb1e211c5..f83c24687bd1 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/FrontendConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/FrontendConfig.java @@ -29,9 +29,6 @@ public class FrontendConfig { "frontend.service.thread.count", Math.max(Math.min(Runtime.getRuntime().availableProcessors() / 2, 64), 4)); - public static final Config ENABLE_HASH_GENERATE_EID = - Config.boolConfig("enable.hash.generate.eid", false); - public static final Config WRITE_QUEUE_BUFFER_MAX_COUNT = Config.intConfig("write.queue.buffer.max.count", 1024000); } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/DefaultEdgeIdGenerator.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/DefaultEdgeIdGenerator.java index 55212f220d0f..e427d9eaf214 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/DefaultEdgeIdGenerator.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/DefaultEdgeIdGenerator.java @@ -39,10 +39,10 @@ public long getNextId() { @Override public long getHashId(long srcId, long dstId, int labelId, List pks) { - if (pks != null && pks.size() > 0) { - return PkHashUtils.hash(srcId, dstId, labelId, pks); + if (pks == null || pks.size() == 0) { + throw new RuntimeException("Cannot get hash id when pk is empty"); } - return PkHashUtils.hash(srcId, dstId, labelId, System.nanoTime()); + return PkHashUtils.hash(srcId, dstId, labelId, pks); } private void allocateNewIds() { diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java index de7b753cc1d9..6b0657e24270 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java @@ -3,7 +3,6 @@ import com.alibaba.graphscope.groot.CompletionCallback; import com.alibaba.graphscope.groot.SnapshotCache; import com.alibaba.graphscope.groot.common.config.Configs; -import com.alibaba.graphscope.groot.common.config.FrontendConfig; import com.alibaba.graphscope.groot.common.exception.GrootException; import com.alibaba.graphscope.groot.common.exception.PropertyDefNotFoundException; import com.alibaba.graphscope.groot.common.schema.api.GraphElement; @@ -14,7 +13,6 @@ import com.alibaba.graphscope.groot.common.schema.wrapper.LabelId; import com.alibaba.graphscope.groot.common.schema.wrapper.PropertyValue; import com.alibaba.graphscope.groot.common.util.*; -import com.alibaba.graphscope.groot.meta.MetaService; import com.alibaba.graphscope.groot.metrics.MetricsAgent; import com.alibaba.graphscope.groot.metrics.MetricsCollector; import com.alibaba.graphscope.groot.operation.EdgeId; @@ -49,15 +47,10 @@ public class GraphWriter implements MetricsAgent { private volatile long ingestorBlockTimeAvgMs; private volatile long lastUpdateIngestorBlockTimeNano; private AtomicInteger pendingWriteCount; - /** - * true: enable use hash64(srcId, dstId, edgeLabelId, edgePks) to generate eid; - */ - private boolean enableHashEid; - private SnapshotCache snapshotCache; - private EdgeIdGenerator edgeIdGenerator; - private MetaService metaService; - private AtomicLong lastWrittenSnapshotId = new AtomicLong(0L); + private final SnapshotCache snapshotCache; + private final EdgeIdGenerator edgeIdGenerator; + private final AtomicLong lastWrittenSnapshotId = new AtomicLong(0L); private final KafkaAppender kafkaAppender; private ScheduledExecutorService scheduler; @@ -65,18 +58,13 @@ public class GraphWriter implements MetricsAgent { public GraphWriter( SnapshotCache snapshotCache, EdgeIdGenerator edgeIdGenerator, - MetaService metaService, MetricsCollector metricsCollector, KafkaAppender appender, Configs configs) { this.snapshotCache = snapshotCache; this.edgeIdGenerator = edgeIdGenerator; - this.metaService = metaService; initMetrics(); metricsCollector.register(this, this::updateMetrics); - // default for increment eid generate - this.enableHashEid = FrontendConfig.ENABLE_HASH_GENERATE_EID.get(configs); - this.kafkaAppender = appender; } @@ -410,8 +398,9 @@ private long getEdgeInnerId( GraphSchema schema, DataRecord dataRecord) { long edgeInnerId; - if (this.enableHashEid) { - GraphElement edgeDef = schema.getElement(edgeRecordKey.getLabel()); + GraphElement edgeDef = schema.getElement(edgeRecordKey.getLabel()); + List pks = edgeDef.getPrimaryKeyList(); + if (pks != null && pks.size() > 0) { Map edgePkVals = parseRawProperties(edgeDef, dataRecord.getProperties()); List edgePkBytes = getPkBytes(edgePkVals, edgeDef); diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Frontend.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Frontend.java index 1219dcd04850..6207d4ca7918 100644 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Frontend.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/Frontend.java @@ -131,12 +131,7 @@ public Frontend(Configs configs) { KafkaAppender kafkaAppender = new KafkaAppender(configs, metaService, logService); this.graphWriter = new GraphWriter( - snapshotCache, - edgeIdGenerator, - this.metaService, - metricsCollector, - kafkaAppender, - configs); + snapshotCache, edgeIdGenerator, metricsCollector, kafkaAppender, configs); ClientWriteService clientWriteService = new ClientWriteService(graphWriter); RoleClients backupClients =