Skip to content

Commit

Permalink
update negentropy
Browse files Browse the repository at this point in the history
  • Loading branch information
hoytech committed Dec 6, 2023
1 parent 50d66d2 commit b9f40e6
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 7 deletions.
13 changes: 11 additions & 2 deletions src/apps/mesh/cmd_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <negentropy.h>
#include <negentropy/storage/Vector.h>
#include <negentropy/storage/BTreeLMDB.h>
#include <negentropy/storage/SubRange.h>

#include "golpe.h"

Expand Down Expand Up @@ -98,7 +99,11 @@ void cmd_sync(const std::vector<std::string> &subArgs) {

if (isFullDbQuery) {
negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, 0);
Negentropy ne(storageBtree, frameSizeLimit);

const auto &f = filterCompiled.filters.at(0);
negentropy::storage::SubRange subStorage(storageBtree, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1));

Negentropy ne(subStorage, frameSizeLimit);
neMsg = ne.initiate();
} else {
Negentropy ne(storageVector, frameSizeLimit);
Expand Down Expand Up @@ -147,7 +152,11 @@ void cmd_sync(const std::vector<std::string> &subArgs) {

if (isFullDbQuery) {
negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, 0);
Negentropy ne(storageBtree, frameSizeLimit);

const auto &f = filterCompiled.filters.at(0);
negentropy::storage::SubRange subStorage(storageBtree, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1));

Negentropy ne(subStorage, frameSizeLimit);
ne.setInitiator();
neMsg = ne.reconcile(inputMsg, have, need);
} else {
Expand Down
16 changes: 12 additions & 4 deletions src/apps/relay/RelayNegentropy.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <negentropy.h>
#include <negentropy/storage/Vector.h>
#include <negentropy/storage/BTreeLMDB.h>
#include <negentropy/storage/SubRange.h>

#include "RelayServer.h"
#include "QueryScheduler.h"
Expand Down Expand Up @@ -97,7 +98,7 @@ void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {
Negentropy ne(storage, 500'000);
resp = ne.reconcile(msg);
} catch (std::exception &e) {
LI << "[" << connId << "] Error parsing negentropy initial message: " << e.what();
LI << "[" << connId << "] Error parsing negentropy message: " << e.what();

sendToConn(connId, tao::json::to_string(tao::json::value::array({
"NEG-ERR",
Expand Down Expand Up @@ -189,7 +190,10 @@ void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {

if (msg->sub.filterGroup.isFullDbQuery()) {
negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, 0);
handleReconcile(connId, subId, storage, msg->negPayload);

const auto &f = msg->sub.filterGroup.filters.at(0);
negentropy::storage::SubRange subStorage(storage, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1));
handleReconcile(connId, subId, subStorage, msg->negPayload);

if (!views.addStatelessView(connId, subId, std::move(msg->sub))) {
queries.removeSub(connId, subId);
Expand Down Expand Up @@ -225,9 +229,13 @@ void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {
continue;
}
handleReconcile(msg->connId, msg->subId, view->storageVector, msg->negPayload);
} else if (std::get_if<NegentropyViews::StatelessView>(userView)) {
} else if (auto *view = std::get_if<NegentropyViews::StatelessView>(userView)) {
negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, 0);
handleReconcile(msg->connId, msg->subId, storage, msg->negPayload);

const auto &f = view->sub.filterGroup.filters.at(0);
negentropy::storage::SubRange subStorage(storage, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1));

handleReconcile(msg->connId, msg->subId, subStorage, msg->negPayload);
}
} else if (auto msg = std::get_if<MsgNegentropy::NegClose>(&newMsg.msg)) {
queries.removeSub(msg->connId, msg->subId);
Expand Down
52 changes: 52 additions & 0 deletions test/runSyncTests.pl
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#!/usr/bin/env perl

use strict;

## Full DB sync tests
{
my $f = '{}';
test(qq{ 1 0 0 '$f' });
test(qq{ 0 1 0 '$f' });
test(qq{ 0 0 1 '$f' });
test(qq{ 1 1 1000 '$f' });
}

## Vector DB sync tests
{
my $f = '{"kinds":[1]}';
test(qq{ 1 0 0 '$f' });
test(qq{ 0 1 0 '$f' });
test(qq{ 0 0 1 '$f' });
test(qq{ 1 1 1000 '$f' });
}

## Full DB sync tests with time bounds
{
my $f = '{"since":1652985767,"until":1662969916}';
test(qq{ 1 1 1000 '$f' }, 100000);
test(qq{ 0 0 1 '$f' }, 100000);

$f = '{"since":1652985767}';
test(qq{ 1 1 1100 '$f' }, 100000);

$f = '{"until":1662969916}';
test(qq{ 1 1 1200 '$f' }, 100000);
}


print "All OK\n";

sub test {
my $params = shift;
my $num = shift // 1000;

print "---------------------------\n";
print "TEST: params = $params num = $num\n";

my $redir = $ENV{VERBOSE} ? '' : '2>/dev/null';

my $cmd = qq{ zstdcat ../nostr-dumps/nostr-wellorder-early-500k-v1.jsonl.zst | head -$num | perl test/syncTest.pl $params $redir};
print "CMD: $cmd\n";
system($cmd) && die "failed";
print "\n";
}

0 comments on commit b9f40e6

Please sign in to comment.