Skip to content

Commit

Permalink
use custom packing for indexable data: PackedEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
hoytech committed Aug 30, 2024
1 parent 32a3677 commit 61781f3
Show file tree
Hide file tree
Showing 18 changed files with 212 additions and 208 deletions.
28 changes: 0 additions & 28 deletions fbs/nostr-index.fbs

This file was deleted.

45 changes: 16 additions & 29 deletions golpe.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,11 @@ features:
onAppStartup: true
db: true
customLMDBSetup: true
flatbuffers: true
websockets: true
templar: true

flatBuffers: |
include "../fbs/nostr-index.fbs";
includes: |
inline std::string_view sv(const NostrIndex::Fixed32Bytes *f) {
return std::string_view((const char *)f->val()->data(), 32);
}
#include "PackedEvent.h"
tables:
## DB meta-data. Single entry, with id = 1
Expand All @@ -29,9 +23,8 @@ tables:
Event:
fields:
- name: receivedAt # microseconds
- name: flat
- name: packed
type: ubytes
nestedFlat: NostrIndex.Event
- name: sourceType
- name: sourceInfo
type: ubytes
Expand Down Expand Up @@ -61,36 +54,30 @@ tables:
multi: true

indexPrelude: |
auto *flat = v.flat_nested();
created_at = flat->created_at();
PackedEventView packed(v.packed());
created_at = packed.created_at();
uint64_t indexTime = *created_at;
receivedAt = v.receivedAt();
id = makeKey_StringUint64(sv(flat->id()), indexTime);
pubkey = makeKey_StringUint64(sv(flat->pubkey()), indexTime);
kind = makeKey_Uint64Uint64(flat->kind(), indexTime);
pubkeyKind = makeKey_StringUint64Uint64(sv(flat->pubkey()), flat->kind(), indexTime);
for (const auto &tagPair : *(flat->tagsGeneral())) {
auto tagName = (char)tagPair->key();
auto tagVal = sv(tagPair->val());
id = makeKey_StringUint64(packed.id(), indexTime);
pubkey = makeKey_StringUint64(packed.pubkey(), indexTime);
kind = makeKey_Uint64Uint64(packed.kind(), indexTime);
pubkeyKind = makeKey_StringUint64Uint64(packed.pubkey(), packed.kind(), indexTime);
packed.foreachTag([&](char tagName, std::string_view tagVal){
tag.push_back(makeKey_StringUint64(std::string(1, tagName) + std::string(tagVal), indexTime));
if (tagName == 'd' && replace.size() == 0) {
replace.push_back(makeKey_StringUint64(std::string(sv(flat->pubkey())) + std::string(tagVal), flat->kind()));
replace.push_back(makeKey_StringUint64(std::string(packed.pubkey()) + std::string(tagVal), packed.kind()));
} else if (tagName == 'e' && packed.kind() == 5) {
deletion.push_back(std::string(tagVal) + std::string(packed.pubkey()));
}
}
for (const auto &tagPair : *(flat->tagsFixed32())) {
auto tagName = (char)tagPair->key();
auto tagVal = sv(tagPair->val());
tag.push_back(makeKey_StringUint64(std::string(1, tagName) + std::string(tagVal), indexTime));
if (flat->kind() == 5 && tagName == 'e') deletion.push_back(std::string(tagVal) + std::string(sv(flat->pubkey())));
}
return true;
});
if (flat->expiration() != 0) {
expiration.push_back(flat->expiration());
if (packed.expiration() != 0) {
expiration.push_back(packed.expiration());
}
CompressionDictionary:
Expand Down
24 changes: 9 additions & 15 deletions src/ActiveMonitors.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ struct ActiveMonitors : NonCopyable {
if (item.latestEventId >= ev.primaryKeyId || item.mon->sub.latestEventId >= ev.primaryKeyId) continue;
item.latestEventId = ev.primaryKeyId;

if (f->doesMatch(ev.flat_nested())) {
if (f->doesMatch(PackedEventView(ev.packed()))) {
recipients.emplace_back(item.mon->sub.connId, item.mon->sub.subId);
item.mon->sub.latestEventId = ev.primaryKeyId;
continue;
Expand Down Expand Up @@ -124,38 +124,32 @@ struct ActiveMonitors : NonCopyable {
}
};

auto *flat = ev.flat_nested();
auto packed = PackedEventView(ev.packed());

{
auto id = std::string(sv(flat->id()));
auto id = std::string(packed.id());
processMonitorsPrefix(allIds, id, static_cast<std::function<bool(const std::string&)>>([&](const std::string &val){
return id.starts_with(val);
}));
}

{
auto pubkey = std::string(sv(flat->pubkey()));
auto pubkey = std::string(packed.pubkey());
processMonitorsPrefix(allAuthors, pubkey, static_cast<std::function<bool(const std::string&)>>([&](const std::string &val){
return pubkey.starts_with(val);
}));
}

for (const auto &tag : *flat->tagsFixed32()) {
auto &tagSpec = getTagSpec(tag->key(), sv(tag->val()));
packed.foreachTag([&](char tagName, std::string_view tagVal){
auto &tagSpec = getTagSpec(tagName, tagVal);
processMonitorsExact(allTags, tagSpec, static_cast<std::function<bool(const std::string&)>>([&](const std::string &val){
return tagSpec == val;
}));
}

for (const auto &tag : *flat->tagsGeneral()) {
auto &tagSpec = getTagSpec(tag->key(), sv(tag->val()));
processMonitorsExact(allTags, tagSpec, static_cast<std::function<bool(const std::string&)>>([&](const std::string &val){
return tagSpec == val;
}));
}
return true;
});

{
auto kind = flat->kind();
auto kind = packed.kind();
processMonitorsExact(allKinds, kind, static_cast<std::function<bool(const uint64_t&)>>([&](const uint64_t &val){
return kind == val;
}));
Expand Down
2 changes: 1 addition & 1 deletion src/DBQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ struct DBScan : NonCopyable {
} else {
approxWork += 10;
auto view = env.lookup_Event(txn, levId);
if (view && f.doesMatch(view->flat_nested())) doSend = true;
if (view && f.doesMatch(PackedEventView(view->packed()))) doSend = true;
}

if (doSend) {
Expand Down
91 changes: 91 additions & 0 deletions src/PackedEvent.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#pragma once

#include <string_view>

#include "golpe.h"


// PackedEvent (summary of indexable data in a nostr event)
// 0: id (32)
// 32: pubkey (32)
// 64: created_at (8)
// 72: kind (8)
// 80: expiration (8)
// 88: tags[] (variable)
//
// each tag:
// 0: tag char (1)
// 1: length (1)
// 2: value (variable)

struct PackedEventView {
std::string_view buf;

PackedEventView(const std::string &str) : buf(std::string_view(str)) {
if (buf.size() < 88) throw hoytech::error("PackedEventView too short");
}

PackedEventView(std::string_view sv) : buf(sv) {
if (buf.size() < 88) throw hoytech::error("PackedEventView too short");
}

std::string_view id() const {
return buf.substr(0, 32);
}

std::string_view pubkey() const {
return buf.substr(32, 32);
}

uint64_t created_at() const {
return lmdb::from_sv<uint64_t>(buf.substr(64, 8));
}

uint64_t kind() const {
return lmdb::from_sv<uint64_t>(buf.substr(72, 8));
}

uint64_t expiration() const {
return lmdb::from_sv<uint64_t>(buf.substr(80, 8));
}

void foreachTag(const std::function<bool(char, std::string_view)> &cb) {
std::string_view b = buf.substr(88);

while (b.size()) {
bool done = cb(b[0], b.substr(2, (size_t)b[1]));
if (done) break;
b = b.substr(2 + b[1]);
}
}
};

struct PackedEventTagBuilder {
std::string buf;

void add(char tagKey, std::string_view tagVal) {
if (tagVal.size() > 255) throw hoytech::error("tagVal too long");

buf += tagKey;
buf += (unsigned char) tagVal.size();
buf += tagVal;
}
};

struct PackedEventBuilder {
std::string buf;

PackedEventBuilder(std::string_view id, std::string_view pubkey, uint64_t created_at, uint64_t kind, uint64_t expiration, const PackedEventTagBuilder &tagBuilder) {
if (id.size() != 32) throw hoytech::error("unexpected id size");
if (pubkey.size() != 32) throw hoytech::error("unexpected pubkey size");

buf.reserve(88 + tagBuilder.buf.size());

buf += id;
buf += pubkey;
buf += lmdb::to_sv<uint64_t>(created_at);
buf += lmdb::to_sv<uint64_t>(kind);
buf += lmdb::to_sv<uint64_t>(expiration);
buf += tagBuilder.buf;
}
};
12 changes: 6 additions & 6 deletions src/WriterPipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,19 @@ struct WriterPipeline {
return;
}

std::string flatStr;
std::string packedStr;
std::string jsonStr;

try {
parseAndVerifyEvent(m.eventJson, secpCtx, verifyMsg, verifyTime, flatStr, jsonStr);
parseAndVerifyEvent(m.eventJson, secpCtx, verifyMsg, verifyTime, packedStr, jsonStr);
} catch (std::exception &e) {
if (verboseReject) LW << "Rejected event: " << m.eventJson << " reason: " << e.what();
numLive--;
totalRejected++;
continue;
}

writerInbox.push_move({ std::move(flatStr), std::move(jsonStr), hoytech::curr_time_us(), m.sourceType, std::move(m.sourceInfo) });
writerInbox.push_move({ std::move(packedStr), std::move(jsonStr), hoytech::curr_time_us(), m.sourceType, std::move(m.sourceInfo) });
}
}
});
Expand Down Expand Up @@ -122,15 +122,15 @@ struct WriterPipeline {
auto event = std::move(newEvents.front());
newEvents.pop_front();

if (event.flatStr.size() == 0) {
if (event.packedStr.size() == 0) {
shutdownComplete = true;
break;
}

numLive--;

auto *flat = flatStrToFlatEvent(event.flatStr);
if (lookupEventById(txn, sv(flat->id()))) {
PackedEventView packed(event.packedStr);
if (lookupEventById(txn, packed.id())) {
dups++;
totalDups++;
continue;
Expand Down
2 changes: 1 addition & 1 deletion src/apps/mesh/cmd_router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ struct Router {

void outgoingEvent(lmdb::txn &txn, defaultDb::environment::View_Event &ev, std::string &responseStr, tao::json::value &evJson) {
if (dir == "down") return;
if (!filterCompiled.doesMatch(ev.flat_nested())) return;
if (!filterCompiled.doesMatch(PackedEventView(ev.packed()))) return;

if (responseStr.size() == 0) {
auto evStr = getEventJson(txn, router->decomp, ev.primaryKeyId);
Expand Down
2 changes: 1 addition & 1 deletion src/apps/mesh/cmd_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void cmd_stream(const std::vector<std::string> &subArgs) {
env.foreach_Event(txn, [&](auto &ev){
currEventId = ev.primaryKeyId;

auto id = std::string(sv(ev.flat_nested()->id()));
auto id = std::string(PackedEventView(ev.packed()).id());
if (downloadedIds.find(id) != downloadedIds.end()) {
downloadedIds.erase(id);
return true;
Expand Down
3 changes: 2 additions & 1 deletion src/apps/mesh/cmd_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ void cmd_sync(const std::vector<std::string> &subArgs) {

for (auto levId : levIds) {
auto ev = lookupEventByLevId(txn, levId);
ne.addItem(ev.flat_nested()->created_at(), sv(ev.flat_nested()->id()).substr(0, ne.idSize));
PackedEventView packed(ev.packed());
ne.addItem(packed.created_at(), packed.id().substr(0, ne.idSize));
}

LI << "Filter matches " << numEvents << " events";
Expand Down
2 changes: 1 addition & 1 deletion src/apps/relay/RelayCron.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ void RelayServer::runCron() {
if (expiration == 1) { // Ephemeral event
auto view = env.lookup_Event(txn, levId);
if (!view) throw herr("missing event from index, corrupt DB?");
uint64_t created = view->flat_nested()->created_at();
uint64_t created = PackedEventView(view->packed()).created_at();

if (created <= ephemeralCutoff) {
numEphemeral++;
Expand Down
14 changes: 7 additions & 7 deletions src/apps/relay/RelayIngester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,33 +86,33 @@ void RelayServer::runIngester(ThreadPool<MsgIngester>::Thread &thr) {
}

void RelayServer::ingesterProcessEvent(lmdb::txn &txn, uint64_t connId, std::string ipAddr, secp256k1_context *secpCtx, const tao::json::value &origJson, std::vector<MsgWriter> &output) {
std::string flatStr, jsonStr;
std::string packedStr, jsonStr;

parseAndVerifyEvent(origJson, secpCtx, true, true, flatStr, jsonStr);
parseAndVerifyEvent(origJson, secpCtx, true, true, packedStr, jsonStr);

auto *flat = flatbuffers::GetRoot<NostrIndex::Event>(flatStr.data());
PackedEventView packed(packedStr);

{
for (const auto &tagArr : origJson.at("tags").get_array()) {
auto tag = tagArr.get_array();
if (tag.size() == 1 && tag.at(0).get_string() == "-") {
LI << "Protected event, skipping";
sendOKResponse(connId, to_hex(sv(flat->id())), false, "blocked: event marked as protected");
sendOKResponse(connId, to_hex(packed.id()), false, "blocked: event marked as protected");
return;
}
}
}

{
auto existing = lookupEventById(txn, sv(flat->id()));
auto existing = lookupEventById(txn, packed.id());
if (existing) {
LI << "Duplicate event, skipping";
sendOKResponse(connId, to_hex(sv(flat->id())), true, "duplicate: have this event");
sendOKResponse(connId, to_hex(packed.id()), true, "duplicate: have this event");
return;
}
}

output.emplace_back(MsgWriter{MsgWriter::AddEvent{connId, std::move(ipAddr), hoytech::curr_time_us(), std::move(flatStr), std::move(jsonStr)}});
output.emplace_back(MsgWriter{MsgWriter::AddEvent{connId, std::move(ipAddr), hoytech::curr_time_us(), std::move(packedStr), std::move(jsonStr)}});
}

void RelayServer::ingesterProcessReq(lmdb::txn &txn, uint64_t connId, const tao::json::value &arr) {
Expand Down
3 changes: 2 additions & 1 deletion src/apps/relay/RelayNegentropy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {
for (auto levId : view->levIds) {
try {
auto ev = lookupEventByLevId(txn, levId);
view->ne.addItem(ev.flat_nested()->created_at(), sv(ev.flat_nested()->id()).substr(0, view->ne.idSize));
auto packed = PackedEventView(ev.packed());
view->ne.addItem(packed.created_at(), packed.id().substr(0, view->ne.idSize));
} catch (std::exception &) {
// levId was deleted when query was paused
}
Expand Down
Loading

0 comments on commit 61781f3

Please sign in to comment.