Skip to content

Commit

Permalink
Merge branch 'main' into multi-ed-xp-path
Browse files Browse the repository at this point in the history
  • Loading branch information
zhanglei1949 authored Nov 13, 2023
2 parents 1d72960 + d386676 commit 8c52f7f
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,7 @@ where
debug!("scan_vertex worker_partitions: {:?}", worker_partitions);
if !worker_partitions.is_empty() {
let store = self.store.clone();
let si = params
.get_extra_param(SNAPSHOT_ID)
.map(|s| {
s.parse::<SnapshotId>()
.unwrap_or(DEFAULT_SNAPSHOT_ID)
})
.unwrap_or(DEFAULT_SNAPSHOT_ID);
let si = get_snapshot_id(params);
let label_ids = encode_storage_labels(params.labels.as_ref())?;
let row_filter = params.filter.clone();

Expand Down Expand Up @@ -160,28 +154,31 @@ where
) -> GraphProxyResult<Option<Vertex>> {
// get_vertex_id_by_primary_keys() is a global query function, that is,
// you can query vertices (with only vertex id) by pks on any graph partitions (not matter locally or remotely).
// To guarantee the correctness (i.e., avoid duplication results), only one worker (for now, it is worker 0) is going to search for it.
let worker_id = pegasus::get_current_worker_checked()
.map(|worker| worker.index)
.unwrap_or(0);
if worker_id == 0 {
let store_label_id = encode_storage_label(label_id)?;
let store_indexed_values = match primary_key {
OneOrMany::One(pkv) => {
vec![encode_store_prop_val(pkv[0].1.clone())]
}
OneOrMany::Many(pkvs) => pkvs
.iter()
.map(|(_pk, value)| encode_store_prop_val(value.clone()))
.collect(),
};
debug!("index_scan_vertex store_indexed_values {:?}", store_indexed_values);
if let Some(vid) = self
// To guarantee the correctness,
// 1. all workers are going to search for gid, and compute which partition this vertex belongs;
// 2. the worker assigned for this partition will further confirm the result by calling get_vertex() to see if this vertex exists
let store_label_id = encode_storage_label(label_id)?;
let store_indexed_values = match primary_key {
OneOrMany::One(pkv) => {
vec![encode_store_prop_val(pkv[0].1.clone())]
}
OneOrMany::Many(pkvs) => pkvs
.iter()
.map(|(_pk, value)| encode_store_prop_val(value.clone()))
.collect(),
};
debug!("index_scan_vertex store_indexed_values {:?}", store_indexed_values);
if let Some(vid) = self
.partition_manager
.get_vertex_id_by_primary_keys(store_label_id, store_indexed_values.as_ref())
{
debug!("index_scan_vertex vid {:?}", vid);
let partition_id = self
.partition_manager
.get_vertex_id_by_primary_keys(store_label_id, store_indexed_values.as_ref())
{
debug!("index_scan_vertex vid {:?}", vid);
Ok(Some(Vertex::new(vid as ID, Some(label_id.clone()), DynDetails::default())))
.get_partition_id(vid as VertexId) as PartitionId;
let worker_partitions = assign_worker_partitions(&self.server_partitions, &self.cluster_info)?;
if worker_partitions.contains(&partition_id) {
Ok(self.get_vertex(&[vid as ID], _params)?.next())
} else {
Ok(None)
}
Expand All @@ -194,13 +191,7 @@ where
let worker_partitions = assign_worker_partitions(&self.server_partitions, &self.cluster_info)?;
if !worker_partitions.is_empty() {
let store = self.store.clone();
let si = params
.get_extra_param(SNAPSHOT_ID)
.map(|s| {
s.parse::<SnapshotId>()
.unwrap_or(DEFAULT_SNAPSHOT_ID)
})
.unwrap_or(DEFAULT_SNAPSHOT_ID);
let si = get_snapshot_id(params);
let label_ids = encode_storage_labels(params.labels.as_ref())?;
let row_filter = params.filter.clone();

Expand Down Expand Up @@ -245,13 +236,7 @@ where
&self, ids: &[ID], params: &QueryParams,
) -> GraphProxyResult<Box<dyn Iterator<Item = Vertex> + Send>> {
let store = self.store.clone();
let si = params
.get_extra_param(SNAPSHOT_ID)
.map(|s| {
s.parse::<SnapshotId>()
.unwrap_or(DEFAULT_SNAPSHOT_ID)
})
.unwrap_or(DEFAULT_SNAPSHOT_ID);
let si = get_snapshot_id(params);

let column_filter_pushdown = self.column_filter_pushdown;
// also need props in filter, because `filter_limit!`
Expand Down Expand Up @@ -294,13 +279,7 @@ where
let limit = params.limit.clone();
let store = self.store.clone();
let partition_manager = self.partition_manager.clone();
let si = params
.get_extra_param(SNAPSHOT_ID)
.map(|s| {
s.parse::<SnapshotId>()
.unwrap_or(DEFAULT_SNAPSHOT_ID)
})
.unwrap_or(DEFAULT_SNAPSHOT_ID);
let si = get_snapshot_id(params);
let edge_label_ids = encode_storage_labels(params.labels.as_ref())?;

let stmt = from_fn(move |v: ID| {
Expand Down Expand Up @@ -361,13 +340,7 @@ where
&self, direction: Direction, params: &QueryParams,
) -> GraphProxyResult<Box<dyn Statement<ID, Edge>>> {
let store = self.store.clone();
let si = params
.get_extra_param(SNAPSHOT_ID)
.map(|s| {
s.parse::<SnapshotId>()
.unwrap_or(DEFAULT_SNAPSHOT_ID)
})
.unwrap_or(DEFAULT_SNAPSHOT_ID);
let si = get_snapshot_id(params);

let partition_manager = self.partition_manager.clone();
let row_filter = params.filter.clone();
Expand Down Expand Up @@ -486,13 +459,7 @@ where
let worker_partitions = assign_worker_partitions(&self.server_partitions, &self.cluster_info)?;
if !worker_partitions.is_empty() {
let store = self.store.clone();
let si = params
.get_extra_param(SNAPSHOT_ID)
.map(|s| {
s.parse::<SnapshotId>()
.unwrap_or(DEFAULT_SNAPSHOT_ID)
})
.unwrap_or(DEFAULT_SNAPSHOT_ID);
let si = get_snapshot_id(params);
let label_ids = encode_storage_labels(params.labels.as_ref())?;
let count =
store.count_all_vertices(si, label_ids.as_ref(), None, worker_partitions.as_ref());
Expand All @@ -510,13 +477,7 @@ where
let worker_partitions = assign_worker_partitions(&self.server_partitions, &self.cluster_info)?;
if !worker_partitions.is_empty() {
let store = self.store.clone();
let si = params
.get_extra_param(SNAPSHOT_ID)
.map(|s| {
s.parse::<SnapshotId>()
.unwrap_or(DEFAULT_SNAPSHOT_ID)
})
.unwrap_or(DEFAULT_SNAPSHOT_ID);
let si = get_snapshot_id(params);
let label_ids = encode_storage_labels(params.labels.as_ref())?;
let count = store.count_all_edges(si, label_ids.as_ref(), None, worker_partitions.as_ref());
Ok(count)
Expand All @@ -527,6 +488,17 @@ where
}
}

fn get_snapshot_id(params: &QueryParams) -> SnapshotId {
let si = params
.get_extra_param(SNAPSHOT_ID)
.map(|s| {
s.parse::<SnapshotId>()
.unwrap_or(DEFAULT_SNAPSHOT_ID)
})
.unwrap_or(DEFAULT_SNAPSHOT_ID);
si
}

#[inline]
fn to_runtime_vertex<V>(v: V, prop_keys: Option<Vec<NameOrId>>) -> Vertex
where
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ impl GraphPartitionManager for GlobalGraph {

fn get_vertex_id_by_primary_key(&self, _label_id: u32, _key: &String) -> Option<(u32, i64)> {
// TODO check
None
unimplemented!("get vertex id by primary key is not implemented")
}

fn get_vertex_id_by_primary_keys(&self, label_id: LabelId, pks: &[Property]) -> Option<VertexId> {
Expand Down

0 comments on commit 8c52f7f

Please sign in to comment.