Skip to content

Commit

Permalink
feature tid order
Browse files Browse the repository at this point in the history
  • Loading branch information
spasserby authored and tugraph committed Sep 25, 2023
1 parent ddeabf5 commit 3496458
Show file tree
Hide file tree
Showing 23 changed files with 222 additions and 72 deletions.
1 change: 1 addition & 0 deletions include/fma-common/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace fma_common {
#define LOG() FMA_LOG()
#define WARN() FMA_WARN()
#define ERR() FMA_ERR()
#define FATAL() FMA_FATAL()
#define CHECK(pred) FMA_CHECK(pred)
#define CHECK_EQ(a, b) FMA_CHECK_EQ(a, b)
#define CHECK_NEQ(a, b) FMA_CHECK_NEQ(a, b)
Expand Down
24 changes: 22 additions & 2 deletions include/lgraph/lgraph_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ enum class AccessLevel {
FULL = 3
};

[[maybe_unused]]
inline static std::string to_string(const AccessLevel& v) {
switch (v) {
case AccessLevel::NONE: return "NONE";
Expand All @@ -57,6 +58,7 @@ enum class FieldAccessLevel {
WRITE = 2
};

[[maybe_unused]]
inline static std::string to_string(const FieldAccessLevel& v) {
switch (v) {
case FieldAccessLevel::NONE: return "NONE";
Expand All @@ -71,6 +73,7 @@ enum class GraphQueryType {
GQL = 1
};

[[maybe_unused]]
inline static std::string to_string(const GraphQueryType& v) {
switch (v) {
case GraphQueryType::CYPHER: return "CYPHER";
Expand Down Expand Up @@ -106,6 +109,23 @@ struct EdgeOptions : LabelOptions {
// edge temporal field, edge will be stored in the order of this field
// Default: empty
std::string temporal_field;
// order of edge temporal field
// Default: ASC
enum class TemporalFieldOrder {
ASC = 0,
DESC = 1,
} temporal_field_order = TemporalFieldOrder::ASC;

inline static std::string to_string(const TemporalFieldOrder& v) {
switch (v) {
case TemporalFieldOrder::ASC:
return "ASC";
case TemporalFieldOrder::DESC:
return "DESC";
default:
throw std::runtime_error("Unknown TemporalFieldOrder");
}
}

EdgeOptions() = default;
explicit EdgeOptions(const EdgeConstraints& edge_constraints)
Expand All @@ -123,8 +143,8 @@ struct EdgeOptions : LabelOptions {
constraints = "[" + constraints + "]";

return "detach_property: " + std::to_string(detach_property) +
", edge_constraints: " + constraints +
", temporal_field: " + temporal_field;
", edge_constraints: " + constraints + ", temporal_field: " + temporal_field +
", temporal_field_order: " + to_string(temporal_field_order);
}
void clear() {
detach_property = false;
Expand Down
2 changes: 2 additions & 0 deletions src/core/data_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ typedef lgraph_api::EdgeConstraints EdgeConstraints;
typedef lgraph_api::LabelOptions LabelOptions;
typedef lgraph_api::VertexOptions VertexOptions;
typedef lgraph_api::EdgeOptions EdgeOptions;
typedef lgraph_api::EdgeOptions::TemporalFieldOrder TemporalFieldOrder;


typedef int64_t VertexId;
typedef int64_t EdgeId;
Expand Down
2 changes: 2 additions & 0 deletions src/core/schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ void Schema::ClearFields() {

void Schema::SetSchema(bool is_vertex, size_t n_fields, const FieldSpec* fields,
const std::string& primary, const std::string& temporal,
const TemporalFieldOrder& temporal_order,
const EdgeConstraints& edge_constraints) {
if (_F_UNLIKELY(n_fields > _detail::MAX_NUM_FIELDS)) throw TooManyFieldsException();
fields_.clear();
Expand All @@ -433,6 +434,7 @@ void Schema::SetSchema(bool is_vertex, size_t n_fields, const FieldSpec* fields,
is_vertex_ = is_vertex;
primary_field_ = primary;
temporal_field_ = temporal;
temporal_order_ = temporal_order;
edge_constraints_ = edge_constraints;
RefreshLayout();
}
Expand Down
18 changes: 14 additions & 4 deletions src/core/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class Schema {
std::string primary_field_{};
// temporal_field_ should be int64.
std::string temporal_field_{};
TemporalFieldOrder temporal_order_{};
// When Schema is VERTEX type, `edge_constraints_` is always empty.
EdgeConstraints edge_constraints_;
std::unordered_set<size_t> fulltext_fields_;
Expand Down Expand Up @@ -141,11 +142,14 @@ class Schema {
*/
void SetSchema(bool is_vertex, size_t n_fields, const FieldSpec* fields,
const std::string& primary, const std::string& temporal,
const TemporalFieldOrder& temporal_order,
const EdgeConstraints& edge_constraints);

void SetSchema(bool is_vertex, const std::vector<FieldSpec>& fields, const std::string& primary,
const std::string& temporal, const EdgeConstraints& edge_constraints) {
SetSchema(is_vertex, fields.size(), fields.data(), primary, temporal, edge_constraints);
const std::string& temporal, const TemporalFieldOrder& temporal_order,
const EdgeConstraints& edge_constraints) {
SetSchema(is_vertex, fields.size(), fields.data(), primary, temporal, temporal_order,
edge_constraints);
}

void SetEdgeConstraintsLids(std::unordered_map<LabelId, std::unordered_set<LabelId>> lids) {
Expand Down Expand Up @@ -175,6 +179,7 @@ class Schema {
const std::string& GetPrimaryField() const { return primary_field_; }
bool DetachProperty() const { return detach_property_; }
bool HasTemporalField() const { return !temporal_field_.empty(); }
TemporalFieldOrder GetTemporalOrder() const { return temporal_order_; }
const EdgeConstraints& GetEdgeConstraints() const { return edge_constraints_; }
void SetEdgeConstraints(const EdgeConstraints& edge_constraints) {
edge_constraints_ = edge_constraints;
Expand Down Expand Up @@ -476,13 +481,17 @@ class Schema {
s = BinaryRead(buf, temporal_field_);
if (!s) return 0;
bytes_read += s;
s = BinaryRead(buf, temporal_order_);
if (!s) return 0;
bytes_read += s;
s = BinaryRead(buf, edge_constraints_);
if (!s) return 0;
bytes_read += s;
s = BinaryRead(buf, detach_property_);
if (!s) return 0;
bytes_read += s;
SetSchema(is_vertex_, fds, primary_field_, temporal_field_, edge_constraints_);
SetSchema(is_vertex_, fds, primary_field_, temporal_field_, temporal_order_,
edge_constraints_);
return bytes_read;
}

Expand All @@ -492,7 +501,8 @@ class Schema {
BinaryWrite(buf, label_in_record_) + BinaryWrite(buf, deleted_) +
BinaryWrite(buf, GetFieldSpecs()) + BinaryWrite(buf, is_vertex_) +
BinaryWrite(buf, primary_field_) + BinaryWrite(buf, temporal_field_) +
BinaryWrite(buf, edge_constraints_) + BinaryWrite(buf, detach_property_);
BinaryWrite(buf, temporal_order_) + BinaryWrite(buf, edge_constraints_) +
BinaryWrite(buf, detach_property_);
}

std::string ToString() const { return fma_common::ToString(GetFieldSpecsAsMap()); }
Expand Down
5 changes: 4 additions & 1 deletion src/core/schema_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ class SchemaManager {
ls->SetLabelId((LabelId)(schemas_.size() - 1));
}
std::string primary, temporal;
TemporalFieldOrder temporal_order;
if (is_vertex) {
primary = dynamic_cast<const VertexOptions&>(options).primary_field;
}
Expand All @@ -212,8 +213,10 @@ class SchemaManager {
edge_constraints = dynamic_cast<const EdgeOptions&>(options).edge_constraints;
primary = dynamic_cast<const EdgeOptions&>(options).temporal_field;
temporal = dynamic_cast<const EdgeOptions&>(options).temporal_field;
temporal_order = dynamic_cast<const EdgeOptions&>(options).temporal_field_order;
}
ls->SetSchema(is_vertex, n_fields, fields, primary, temporal, edge_constraints);
ls->SetSchema(is_vertex, n_fields, fields, primary, temporal, temporal_order,
edge_constraints);
ls->SetLabel(label);
ls->SetDetachProperty(options.detach_property);
name_to_idx_.emplace_hint(it, label, ls->GetLabelId());
Expand Down
23 changes: 18 additions & 5 deletions src/core/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/

#include "core/blob_manager.h"
#include "core/data_type.h"
#include "core/field_data_helper.h"
#include "core/index_manager.h"
#include "core/lightning_graph.h"
Expand Down Expand Up @@ -1197,13 +1198,25 @@ Transaction::AddVertex(const LabelT& label, size_t n_fields, const FieldT* field
return newvid;
}

TemporalId Transaction::ParseTemporalId(const FieldData& fd) { return fd.AsInt64(); }
TemporalId Transaction::ParseTemporalId(const FieldData& fd,
const TemporalFieldOrder& temporal_order) {
if (temporal_order == TemporalFieldOrder::ASC) {
return fd.AsInt64();
} else {
return std::numeric_limits<int64_t>::max() ^ fd.AsInt64();
}
}

TemporalId Transaction::ParseTemporalId(const std::string& str) {
TemporalId Transaction::ParseTemporalId(const std::string& str,
const TemporalFieldOrder& temporal_order) {
TemporalId tid = 0;
if (fma_common::TextParserUtils::ParseT(str, tid) != str.size())
throw InputError(FMA_FMT("Incorrect tid format: {}", str));
return tid;
if (temporal_order == TemporalFieldOrder::ASC) {
return tid;
} else {
return std::numeric_limits<int64_t>::max() ^ tid;
}
}

/**
Expand Down Expand Up @@ -1240,7 +1253,7 @@ Transaction::AddEdge(VertexId src, VertexId dst, const LabelT& label, size_t n_f
if (schema->HasTemporalField()) {
int pos = schema->GetTemporalPos(n_fields, fields);
if (pos != -1) {
tid = ParseTemporalId(values[pos]);
tid = ParseTemporalId(values[pos], schema->GetTemporalOrder());
}
}
const auto& constraints = schema->GetEdgeConstraintsLids();
Expand Down Expand Up @@ -1275,7 +1288,7 @@ Transaction::UpsertEdge(VertexId src, VertexId dst, const LabelT& label, size_t
// NOTE: if one edge has primary id, different primary id will be insert rather than update.
int pos = schema->GetTemporalPos(n_fields, fields);
if (pos != -1) {
tid = ParseTemporalId(values[pos]);
tid = ParseTemporalId(values[pos], schema->GetTemporalOrder());
}
}
graph::OutEdgeIterator it =
Expand Down
9 changes: 6 additions & 3 deletions src/core/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,12 @@ class Transaction {

void GetStartAndEndVid(VertexId& start, VertexId& end);

static TemporalId ParseTemporalId(const FieldData& fd,
const TemporalFieldOrder& temporal_order);

static TemporalId ParseTemporalId(const std::string& fd,
const TemporalFieldOrder& temporal_order);

private:
void CloseAllIterators() {
/** NOTE: it->Close() will remove the it from iterators_. So we
Expand All @@ -1077,9 +1083,6 @@ class Transaction {
EdgeIndex* GetEdgeIndex(const std::string& label, const std::string& field);
EdgeIndex* GetEdgeIndex(size_t label, size_t field);

TemporalId ParseTemporalId(const FieldData& fd);
TemporalId ParseTemporalId(const std::string& fd);

void EnterTxn();
void LeaveTxn();

Expand Down
32 changes: 29 additions & 3 deletions src/import/import_config_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "fma-common/fma_stream.h"
#include "lgraph/lgraph_types.h"
#include "core/data_type.h"
#include "core/field_data_helper.h"
#include "core/lightning_graph.h"
#include "import/import_exception.h"
Expand Down Expand Up @@ -54,7 +55,9 @@ enum KeyWord {
SKIP,
HEADER,
OPTIONAL_, // OPTIONAL is a macro in windows SDK
FORMAT
FORMAT,
ASC, // TemporalFieldOrder::ASC
DESC // TemporalFieldOrder::DESC
};

class KeyWordFunc {
Expand Down Expand Up @@ -83,6 +86,8 @@ class KeyWordFunc {
{KeyWord::HEADER, "HEADER"},
{KeyWord::OPTIONAL_, "OPTIONAL"},
{KeyWord::FORMAT, "FORMAT"},
{KeyWord::ASC, "ASC"},
{KeyWord::DESC, "DESC"},
};
return m;
}
Expand Down Expand Up @@ -164,6 +169,17 @@ class KeyWordFunc {
return ft;
}

static TemporalFieldOrder GetTemporalFieldOrderFromStr(const std::string& s) {
KeyWord kw = GetKeyWordFromStr(s);
if (kw == KeyWord::ASC) {
return TemporalFieldOrder::ASC;
} else if (kw == KeyWord::DESC) {
return TemporalFieldOrder::DESC;
} else {
throw std::runtime_error(FMA_FMT("keyword [{}] is not a TemporalFieldOrder", s));
}
}

static const std::string& GetStrFromKeyWord(const KeyWord& kw) {
return KeyWordToStrMap().at(kw);
}
Expand All @@ -182,6 +198,7 @@ struct ColumnSpec {
bool global = false;
bool primary = false;
bool temporal = false;
TemporalFieldOrder temporal_order = TemporalFieldOrder::ASC;
bool fulltext = false;

inline bool CheckValid() const {
Expand All @@ -207,10 +224,13 @@ struct ColumnSpec {
global(false),
primary(false),
temporal(false),
temporal_order(TemporalFieldOrder::ASC),
fulltext(false) {}
ColumnSpec(std::string name_, FieldType type_, bool is_id_, bool optional_ = false,
bool index_ = false, bool unique_ = false, bool global_ = false,
bool temporal_ = false, bool fulltext_ = false)
bool temporal_ = false,
TemporalFieldOrder temporal_order_ = TemporalFieldOrder::ASC,
bool fulltext_ = false)
: name(std::move(name_)),
type(type_),
optional(optional_),
Expand All @@ -219,6 +239,7 @@ struct ColumnSpec {
global(global_),
primary(is_id_),
temporal(temporal_),
temporal_order(temporal_order_),
fulltext(fulltext_) {}

bool operator==(const ColumnSpec& rhs) const {
Expand Down Expand Up @@ -342,7 +363,6 @@ struct LabelDesc {
return ret;
}


std::vector<FieldSpec> GetFieldSpecs(std::vector<std::string>& names) const {
std::vector<FieldSpec> fs;
for (const auto& n : names) {
Expand Down Expand Up @@ -812,6 +832,12 @@ class ImportConfParser {
}
if (s.contains("temporal") && cs.name == s["temporal"]) {
cs.temporal = true;
if (s.contains("temporal_order")) {
cs.temporal_order =
KeyWordFunc::GetTemporalFieldOrderFromStr(s["temporal_order"]);
} else {
cs.temporal_order = TemporalFieldOrder::ASC;
}
}
if (p.contains("index")) {
cs.index = p["index"];
Expand Down
7 changes: 5 additions & 2 deletions src/import/import_online.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,11 @@ std::string lgraph::import_v2::ImportOnline::HandleOnlineSchema(std::string&& de
} else {
auto eo = std::make_unique<EdgeOptions>();
eo->edge_constraints = v.edge_constraints;
if (v.HasTemporalField())
eo->temporal_field = v.GetTemporalField().name;
if (v.HasTemporalField()) {
auto tf = v.GetTemporalField();
eo->temporal_field = tf.name;
eo->temporal_field_order = tf.temporal_order;
}
options = std::move(eo);
}
options->detach_property = v.detach_property;
Expand Down
Loading

0 comments on commit 3496458

Please sign in to comment.