Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip replaceable events #72

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions TODO
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
subscription polling
execution plan for created_at query
note kind index rebuild migration
(A) filter from json
tags index migration

# Replaceable Events

- [ ] delete note
- [ ] delete note indices
- [ ] when replacing kind0, delete profile as well
- [ ] delete profile index
153 changes: 144 additions & 9 deletions src/nostrdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ enum ndb_writer_msgtype {
NDB_WRITER_PROFILE_LAST_FETCH, // when profiles were last fetched
NDB_WRITER_BLOCKS, // write parsed note blocks
NDB_WRITER_MIGRATE, // migrate the database
NDB_WRITER_REPLACE_NOTE, // replace a note in the db
};

// keys used for storing data in the NDB metadata database (NDB_DB_NDB_META)
Expand Down Expand Up @@ -1352,6 +1353,11 @@ static void ndb_make_search_key(struct ndb_search_key *key, unsigned char *id,
key->search[sizeof(key->search) - 1] = '\0';
}

int ndb_delete_profile(struct ndb_txn *txn, unsigned char *pubkey)
{
struct ndb_search_index
}

static int ndb_write_profile_search_index(struct ndb_txn *txn,
struct ndb_search_key *index_key,
uint64_t profile_key)
Expand Down Expand Up @@ -1823,6 +1829,11 @@ struct ndb_writer_note {
size_t note_len;
};

struct ndb_writer_replace_note {
struct ndb_writer_note note;
uint64_t note_to_delete;
}

struct ndb_writer_profile {
struct ndb_writer_note note;
struct ndb_profile_record_builder record;
Expand Down Expand Up @@ -1859,6 +1870,7 @@ struct ndb_writer_msg {
enum ndb_writer_msgtype type;
union {
struct ndb_writer_note note;
struct ndb_writer_replace_note replace_note;
struct ndb_writer_profile profile;
struct ndb_writer_ndb_meta ndb_meta;
struct ndb_writer_last_fetch last_fetch;
Expand Down Expand Up @@ -2363,6 +2375,10 @@ static int ndb_ingester_process_note(secp256k1_context *ctx,
struct ndb_ingester *ingester)
{
enum ndb_ingest_filter_action action;
struct ndb_replace_action replace_action;
uint64_t note_to_delete;

note_to_delete = 0;
action = NDB_INGEST_ACCEPT;

if (ingester->filter)
Expand All @@ -2384,6 +2400,27 @@ static int ndb_ingester_process_note(secp256k1_context *ctx,
}
}

// check to see if we have a replaceable event, because we can bail
// early if so
if (is_replaceable_kind(note->kind)) {
ndb_process_replaceable_event(txn, note, &replace_action);

switch (replace_action->action) {
case RA_ERROR:
ndb_debug("error processing replaceable event %d\n", note->kind);
return 0;
case RA_REPLACE:
note_to_delete = replace_action->note_to_delete;
break;
case RA_SKIP:
// this is old or we already have it, skip
return 0;
case RA_NEW:
// new event, write it like normal
break;
}
}

// we didn't find anything. let's send it
// to the writer thread
note = realloc(note, note_size);
Expand All @@ -2406,9 +2443,16 @@ static int ndb_ingester_process_note(secp256k1_context *ctx,
ndb_note_content_length(note), 0);
}

out->type = NDB_WRITER_NOTE;
out->note.note = note;
out->note.note_len = note_size;
if (note_to_delete != 0) {
out->type = NDB_WRITER_REPLACE_NOTE;
out->replace_note.note_to_delete = note_to_delete;
out->replace_note.write_note.note = note;
out->replace_note.write_note.note_len = note_size;
} else {
out->type = NDB_WRITER_NOTE;
out->note.note = note;
out->note.note_len = note_size;
}

return 1;
}
Expand Down Expand Up @@ -4341,6 +4385,61 @@ static int ndb_write_new_blocks(struct ndb_txn *txn, struct ndb_note *note,
return 1;
}

enum replacement_action {
RA_ERROR,
RA_REPLACE,
RA_SKIP,
RA_NEW,
}

struct ndb_replace_action {
uint64_t note_to_delete;
enum replacement_action action;
}

// Find the note to replace, generate the diff, and replace
void ndb_process_replaceable_event(struct ndb_txn *txn,
struct ndb_note *note,
struct ndb_replace_action *action)
{
struct ndb_filter filter, *f = &filter;
struct ndb_query_result result;
int result_count;

// we only need one page to malloc, since our filter is small
ndb_filter_init_with(f, 1);

ndb_filter_start_field(f, NDB_FILTER_KINDS);
ndb_filter_add_int_element(f, note->kind);
ndb_filter_end_field(f);

ndb_filter_start_field(f, NDB_FILTER_AUTHORS);
ndb_filter_add_id_element(f, note->pubkey);
ndb_filter_end_field(f);

ndb_filter_end(f);

if (!ndb_query(txn, &filter, 1, &result, 1, &result_count)) {
// something went wrong?
action->action = RA_ERROR;
} else if (result_count == 0) {
// no note found, so let's just insert
action->action = RA_NEW;
} else {
// result_count == 1
if (result.note->created_at >= note->created_at) {
// this event is newer, make sure we delete this one
result.note_to_delete = result.note_id;
action->action = RA_REPLACE;
} else {
action->action = RA_SKIP;
}
}

ndb_filter_destroy(f);
return action;
}

static uint64_t ndb_write_note(struct ndb_txn *txn,
struct ndb_writer_note *note,
unsigned char *scratch, size_t scratch_size,
Expand Down Expand Up @@ -4545,6 +4644,22 @@ static int ndb_run_migrations(struct ndb_txn *txn)
return 1;
}

int ndb_delete_note(struct ndb_txn *txn, uint64_t note_key *key)
{
// delete note
MDB_val k;

k.mv_data = key;
k.mv_size = sizeof(*key);

if (mdb_del(txn->mdb_txn, lmdb->dbs[NDB_DB_NOTE], &k, NULL)) {
// error
ndb_debug("error deleting note\n");
return 0;
}

// delete note indices
}

static void *ndb_writer_thread(void *data)
{
Expand Down Expand Up @@ -4576,12 +4691,15 @@ static void *ndb_writer_thread(void *data)
for (i = 0 ; i < popped; i++) {
msg = &msgs[i];
switch (msg->type) {
case NDB_WRITER_NOTE: needs_commit = 1; break;
case NDB_WRITER_PROFILE: needs_commit = 1; break;
case NDB_WRITER_DBMETA: needs_commit = 1; break;
case NDB_WRITER_PROFILE_LAST_FETCH: needs_commit = 1; break;
case NDB_WRITER_BLOCKS: needs_commit = 1; break;
case NDB_WRITER_MIGRATE: needs_commit = 1; break;
case NDB_WRITER_NOTE:
case NDB_WRITER_REPLACE_NOTE:
case NDB_WRITER_PROFILE:
case NDB_WRITER_DBMETA:
case NDB_WRITER_PROFILE_LAST_FETCH:
case NDB_WRITER_BLOCKS:
case NDB_WRITER_MIGRATE:
needs_commit = 1;
break;
case NDB_WRITER_QUIT: break;
}
}
Expand Down Expand Up @@ -4622,6 +4740,23 @@ static void *ndb_writer_thread(void *data)
ndb_debug("failed to write note\n");
}
break;
case NDB_WRITER_REPLACE_NOTE:
if (!ndb_delete_note(&txn, msg->replace_note.note_to_delete)) {
ndb_debug("failed to delete note\n");
}

note_nkey = ndb_write_note(&txn, &msg->replace_note.note,
scratch,
scratch_size,
writer->ndb_flags);

if (note_nkey > 0) {
written_notes[num_notes++] = (struct written_note){
.note_id = note_nkey,
.note = &msg->note,
};
}
break;
case NDB_WRITER_NOTE:
note_nkey = ndb_write_note(&txn, &msg->note,
scratch,
Expand Down
Loading