Skip to content

Commit

Permalink
switch import to use WriterPipeline
Browse files Browse the repository at this point in the history
- This makes it do the verification and writing in parallel
- Also, it will now flush periodically (default 1s) even if it has read
  fewer than N (default 10k) records from stdin. This lets import be used
  as a general-purpose non-relay event ingester. To do so, users must
  ensure that the stdout of their process they pipe into import is line
  buffered.
  • Loading branch information
hoytech committed Sep 26, 2023
1 parent cec2ba9 commit b3e5956
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 58 deletions.
2 changes: 1 addition & 1 deletion golpe
36 changes: 30 additions & 6 deletions src/WriterPipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,22 @@ struct WriterPipelineInput {

struct WriterPipeline {
public:
// Params:

uint64_t debounceDelayMilliseconds = 1'000;
uint64_t writeBatchSize = 1'000;
bool verifyMsg = true;
bool verifyTime = true;
bool verboseReject = true;
bool verboseCommit = true;
std::function<void(uint64_t)> onCommit;

// For logging:

std::atomic<uint64_t> totalProcessed = 0;
std::atomic<uint64_t> totalWritten = 0;
std::atomic<uint64_t> totalRejected = 0;
std::atomic<uint64_t> totalDups = 0;

private:
hoytech::protected_queue<WriterPipelineInput> validatorInbox;
Expand Down Expand Up @@ -57,10 +71,11 @@ struct WriterPipeline {
std::string jsonStr;

try {
parseAndVerifyEvent(m.eventJson, secpCtx, true, true, flatStr, jsonStr);
parseAndVerifyEvent(m.eventJson, secpCtx, verifyMsg, verifyTime, flatStr, jsonStr);
} catch (std::exception &e) {
LW << "Rejected event: " << m.eventJson << " reason: " << e.what();
if (verboseReject) LW << "Rejected event: " << m.eventJson << " reason: " << e.what();
numLive--;
totalRejected++;
continue;
}

Expand Down Expand Up @@ -117,6 +132,7 @@ struct WriterPipeline {
auto *flat = flatStrToFlatEvent(event.flatStr);
if (lookupEventById(txn, sv(flat->id()))) {
dups++;
totalDups++;
continue;
}

Expand All @@ -132,13 +148,19 @@ struct WriterPipeline {
}

for (auto &ev : newEventsToProc) {
if (ev.status == EventWriteStatus::Written) written++;
else dups++;
// FIXME: log rejected stats too
if (ev.status == EventWriteStatus::Written) {
written++;
totalWritten++;
} else {
dups++;
totalDups++;
}
}

if (onCommit) onCommit(written);
}

if (written || dups) LI << "Writer: added: " << written << " dups: " << dups;
if (verboseCommit && (written || dups)) LI << "Writer: added: " << written << " dups: " << dups;

if (shutdownComplete) {
flushInbox.push_move(true);
Expand All @@ -158,6 +180,8 @@ struct WriterPipeline {
}

void write(WriterPipelineInput &&inp) {
if (inp.eventJson.is_null()) return;
totalProcessed++;
numLive++;
validatorInbox.push_move(std::move(inp));
}
Expand Down
78 changes: 27 additions & 51 deletions src/apps/dbutils/cmd_import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
#include <docopt.h>
#include "golpe.h"

#include "events.h"
#include "filters.h"
#include "WriterPipeline.h"


static const char USAGE[] =
R"(
Usage:
import [--show-rejected] [--no-verify]
import [--show-rejected] [--no-verify] [--debounce-millis=<debounce-millis>] [--write-batch=<write-batch>]
)";


Expand All @@ -19,71 +18,48 @@ void cmd_import(const std::vector<std::string> &subArgs) {

bool showRejected = args["--show-rejected"].asBool();
bool noVerify = args["--no-verify"].asBool();
uint64_t debounceMillis = 1'000;
if (args["--debounce-millis"]) debounceMillis = args["--debounce-millis"].asLong();
uint64_t writeBatch = 10'000;
if (args["--write-batch"]) writeBatch = args["--write-batch"].asLong();

if (noVerify) LW << "not verifying event IDs or signatures!";

auto txn = env.txn_rw();

secp256k1_context *secpCtx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY);

std::string line;
uint64_t processed = 0, added = 0, rejected = 0, dups = 0;
std::vector<EventToWrite> newEvents;

auto logStatus = [&]{
LI << "Processed " << processed << " lines. " << added << " added, " << rejected << " rejected, " << dups << " dups";
};

auto flushChanges = [&]{
writeEvents(txn, newEvents, 0);

uint64_t numCommits = 0;

for (auto &newEvent : newEvents) {
if (newEvent.status == EventWriteStatus::Written) {
added++;
numCommits++;
} else if (newEvent.status == EventWriteStatus::Duplicate) {
dups++;
} else {
rejected++;
}
}

logStatus();
LI << "Committing " << numCommits << " records";

txn.commit();

txn = env.txn_rw();
newEvents.clear();
WriterPipeline writer;

writer.debounceDelayMilliseconds = debounceMillis;
writer.writeBatchSize = writeBatch;
writer.verifyMsg = !noVerify;
writer.verifyTime = false;
writer.verboseReject = showRejected;
writer.verboseCommit = false;
writer.onCommit = [&](uint64_t numCommitted){
LI << "Committed " << numCommitted
<< ". Processed " << writer.totalProcessed << " lines. " << writer.totalWritten << " added, " << writer.totalRejected << " rejected, " << writer.totalDups << " dups";
};

std::string line;
uint64_t currLine = 0;

while (std::cin) {
currLine++;
std::getline(std::cin, line);
if (!line.size()) continue;

processed++;

std::string flatStr;
std::string jsonStr;
tao::json::value evJson;

try {
auto origJson = tao::json::from_string(line);
parseAndVerifyEvent(origJson, secpCtx, !noVerify, false, flatStr, jsonStr);
evJson = tao::json::from_string(line);
} catch (std::exception &e) {
if (showRejected) LW << "Line " << processed << " rejected: " << e.what();
rejected++;
LW << "Unable to parse JSON on line " << currLine;
continue;
}

newEvents.emplace_back(std::move(flatStr), std::move(jsonStr), hoytech::curr_time_us(), EventSourceType::Import, "");

if (newEvents.size() >= 10'000) flushChanges();
writer.write({ std::move(evJson), EventSourceType::Import, "" });
writer.wait();
}

flushChanges();
writer.flush();

txn.commit();
LI << "Done. Processed " << writer.totalProcessed << " lines. " << writer.totalWritten << " added, " << writer.totalRejected << " rejected, " << writer.totalDups << " dups";
}

0 comments on commit b3e5956

Please sign in to comment.