diff --git a/TODO b/TODO index 2225760..2d4e67b 100644 --- a/TODO +++ b/TODO @@ -1,5 +1,7 @@ -subscription polling -execution plan for created_at query -note kind index rebuild migration -(A) filter from json -tags index migration + +# Replaceable Events + +- [ ] delete note +- [ ] delete note indices +- [ ] when replacing kind0, delete profile as well +- [ ] delete profile index diff --git a/src/nostrdb.c b/src/nostrdb.c index ce6e129..701fac7 100644 --- a/src/nostrdb.c +++ b/src/nostrdb.c @@ -136,6 +136,7 @@ enum ndb_writer_msgtype { NDB_WRITER_PROFILE_LAST_FETCH, // when profiles were last fetched NDB_WRITER_BLOCKS, // write parsed note blocks NDB_WRITER_MIGRATE, // migrate the database + NDB_WRITER_REPLACE_NOTE, // replace a note in the db }; // keys used for storing data in the NDB metadata database (NDB_DB_NDB_META) @@ -1352,6 +1353,11 @@ static void ndb_make_search_key(struct ndb_search_key *key, unsigned char *id, key->search[sizeof(key->search) - 1] = '\0'; } +int ndb_delete_profile(struct ndb_txn *txn, unsigned char *pubkey) +{ + struct ndb_search_index +} + static int ndb_write_profile_search_index(struct ndb_txn *txn, struct ndb_search_key *index_key, uint64_t profile_key) @@ -1823,6 +1829,11 @@ struct ndb_writer_note { size_t note_len; }; +struct ndb_writer_replace_note { + struct ndb_writer_note note; + uint64_t note_to_delete; +} + struct ndb_writer_profile { struct ndb_writer_note note; struct ndb_profile_record_builder record; @@ -1859,6 +1870,7 @@ struct ndb_writer_msg { enum ndb_writer_msgtype type; union { struct ndb_writer_note note; + struct ndb_writer_replace_note replace_note; struct ndb_writer_profile profile; struct ndb_writer_ndb_meta ndb_meta; struct ndb_writer_last_fetch last_fetch; @@ -2363,6 +2375,10 @@ static int ndb_ingester_process_note(secp256k1_context *ctx, struct ndb_ingester *ingester) { enum ndb_ingest_filter_action action; + struct ndb_replace_action replace_action; + uint64_t note_to_delete; + + note_to_delete = 0; action = NDB_INGEST_ACCEPT; if (ingester->filter) @@ -2384,6 +2400,27 @@ static int ndb_ingester_process_note(secp256k1_context *ctx, } } + // check to see if we have a replaceable event, because we can bail + // early if so + if (is_replaceable_kind(note->kind)) { + ndb_process_replaceable_event(txn, note, &replace_action); + + switch (replace_action->action) { + case RA_ERROR: + ndb_debug("error processing replaceable event %d\n", note->kind); + return 0; + case RA_REPLACE: + note_to_delete = replace_action->note_to_delete; + break; + case RA_SKIP: + // this is old or we already have it, skip + return 0; + case RA_NEW: + // new event, write it like normal + break; + } + } + // we didn't find anything. let's send it // to the writer thread note = realloc(note, note_size); @@ -2406,9 +2443,16 @@ static int ndb_ingester_process_note(secp256k1_context *ctx, ndb_note_content_length(note), 0); } - out->type = NDB_WRITER_NOTE; - out->note.note = note; - out->note.note_len = note_size; + if (note_to_delete != 0) { + out->type = NDB_WRITER_REPLACE_NOTE; + out->replace_note.note_to_delete = note_to_delete; + out->replace_note.write_note.note = note; + out->replace_note.write_note.note_len = note_size; + } else { + out->type = NDB_WRITER_NOTE; + out->note.note = note; + out->note.note_len = note_size; + } return 1; } @@ -4341,6 +4385,61 @@ static int ndb_write_new_blocks(struct ndb_txn *txn, struct ndb_note *note, return 1; } +enum replacement_action { + RA_ERROR, + RA_REPLACE, + RA_SKIP, + RA_NEW, +} + +struct ndb_replace_action { + uint64_t note_to_delete; + enum replacement_action action; +} + +// Find the note to replace, generate the diff, and replace +void ndb_process_replaceable_event(struct ndb_txn *txn, + struct ndb_note *note, + struct ndb_replace_action *action) +{ + struct ndb_filter filter, *f = &filter; + struct ndb_query_result result; + int result_count; + + // we only need one page to malloc, since our filter is small + ndb_filter_init_with(f, 1); + + ndb_filter_start_field(f, NDB_FILTER_KINDS); + ndb_filter_add_int_element(f, note->kind); + ndb_filter_end_field(f); + + ndb_filter_start_field(f, NDB_FILTER_AUTHORS); + ndb_filter_add_id_element(f, note->pubkey); + ndb_filter_end_field(f); + + ndb_filter_end(f); + + if (!ndb_query(txn, &filter, 1, &result, 1, &result_count)) { + // something went wrong? + action->action = RA_ERROR; + } else if (result_count == 0) { + // no note found, so let's just insert + action->action = RA_NEW; + } else { + // result_count == 1 + if (result.note->created_at >= note->created_at) { + // this event is newer, make sure we delete this one + result.note_to_delete = result.note_id; + action->action = RA_REPLACE; + } else { + action->action = RA_SKIP; + } + } + + ndb_filter_destroy(f); + return action; +} + static uint64_t ndb_write_note(struct ndb_txn *txn, struct ndb_writer_note *note, unsigned char *scratch, size_t scratch_size, @@ -4545,6 +4644,22 @@ static int ndb_run_migrations(struct ndb_txn *txn) return 1; } +int ndb_delete_note(struct ndb_txn *txn, uint64_t note_key *key) +{ + // delete note + MDB_val k; + + k.mv_data = key; + k.mv_size = sizeof(*key); + + if (mdb_del(txn->mdb_txn, lmdb->dbs[NDB_DB_NOTE], &k, NULL)) { + // error + ndb_debug("error deleting note\n"); + return 0; + } + + // delete note indices +} static void *ndb_writer_thread(void *data) { @@ -4576,12 +4691,15 @@ static void *ndb_writer_thread(void *data) for (i = 0 ; i < popped; i++) { msg = &msgs[i]; switch (msg->type) { - case NDB_WRITER_NOTE: needs_commit = 1; break; - case NDB_WRITER_PROFILE: needs_commit = 1; break; - case NDB_WRITER_DBMETA: needs_commit = 1; break; - case NDB_WRITER_PROFILE_LAST_FETCH: needs_commit = 1; break; - case NDB_WRITER_BLOCKS: needs_commit = 1; break; - case NDB_WRITER_MIGRATE: needs_commit = 1; break; + case NDB_WRITER_NOTE: + case NDB_WRITER_REPLACE_NOTE: + case NDB_WRITER_PROFILE: + case NDB_WRITER_DBMETA: + case NDB_WRITER_PROFILE_LAST_FETCH: + case NDB_WRITER_BLOCKS: + case NDB_WRITER_MIGRATE: + needs_commit = 1; + break; case NDB_WRITER_QUIT: break; } } @@ -4622,6 +4740,23 @@ static void *ndb_writer_thread(void *data) ndb_debug("failed to write note\n"); } break; + case NDB_WRITER_REPLACE_NOTE: + if (!ndb_delete_note(&txn, msg->replace_note.note_to_delete)) { + ndb_debug("failed to delete note\n"); + } + + note_nkey = ndb_write_note(&txn, &msg->replace_note.note, + scratch, + scratch_size, + writer->ndb_flags); + + if (note_nkey > 0) { + written_notes[num_notes++] = (struct written_note){ + .note_id = note_nkey, + .note = &msg->note, + }; + } + break; case NDB_WRITER_NOTE: note_nkey = ndb_write_note(&txn, &msg->note, scratch,