Skip to content

Commit

Permalink
Hydrate DeleteRecords events with deleted Record
Browse files Browse the repository at this point in the history
To enable Firehose users to action deleted records without necessarily
storing the records being deleted (e.g. by decrementing counters, etc.),
we include the record being deleted in the Firehose.

This is only enabled when `hydrateRecords` is enabled, as per
`UpdateRecords`.

Currently, BigSky has `hydrateRecords` set to `false`. We set this to
`true` so that the Firehose provided by BigSky hyrates records for
`update` and `delete` events.

Closes bluesky-social#927
  • Loading branch information
nicktelford committed Feb 3, 2025
1 parent fd270fb commit 2e5cc4a
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 23 deletions.
2 changes: 1 addition & 1 deletion cmd/bigsky/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ func runBigsky(cctx *cli.Context) error {
if err := ix.HandleRepoEvent(ctx, evt); err != nil {
slog.Error("failed to handle repo event", "err", err)
}
}, false)
}, true)

prodHR, err := api.NewProdHandleResolver(100_000, cctx.String("resolve-address"), cctx.Bool("force-dns-udp"))
if err != nil {
Expand Down
113 changes: 91 additions & 22 deletions repomgr/repomgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,23 @@ func (rm *RepoManager) DeleteRecord(ctx context.Context, user models.Uid, collec
}

rpath := collection + "/" + rkey

op := RepoOp{
Kind: EvtKindDeleteRecord,
Collection: collection,
Rkey: rkey,
}

if rm.hydrateRecords && rm.events != nil {
ocid, rec, err := r.GetRecord(ctx, rpath)
if err != nil {
return err
}

op.Record = rec
op.RecCid = &ocid
}

if err := r.DeleteRecord(ctx, rpath); err != nil {
return err
}
Expand All @@ -333,16 +350,12 @@ func (rm *RepoManager) DeleteRecord(ctx context.Context, user models.Uid, collec

if rm.events != nil {
rm.events(ctx, &RepoEvent{
User: user,
OldRoot: oldroot,
NewRoot: nroot,
Rev: nrev,
Since: &rev,
Ops: []RepoOp{{
Kind: EvtKindDeleteRecord,
Collection: collection,
Rkey: rkey,
}},
User: user,
OldRoot: oldroot,
NewRoot: nroot,
Rev: nrev,
Since: &rev,
Ops: []RepoOp{op},
RepoSlice: rslice,
})
}
Expand Down Expand Up @@ -615,11 +628,23 @@ func (rm *RepoManager) handleExternalUserEventNoArchive(ctx context.Context, pds

evtops = append(evtops, rop)
case EvtKindDeleteRecord:
evtops = append(evtops, RepoOp{
rop := RepoOp{
Kind: EvtKindDeleteRecord,
Collection: parts[0],
Rkey: parts[1],
})
}

if rm.hydrateRecords {
cid, rec, err := r.GetRecord(ctx, op.Path)
if err != nil {
return fmt.Errorf("reading changed record from car slice: %w", err)
}

rop.RecCid = &cid
rop.Record = rec
}

evtops = append(evtops, rop)
default:
return fmt.Errorf("unrecognized external user event kind: %q", op.Action)
}
Expand Down Expand Up @@ -738,11 +763,23 @@ func (rm *RepoManager) handleExternalUserEventArchive(ctx context.Context, pdsid

evtops = append(evtops, rop)
case EvtKindDeleteRecord:
evtops = append(evtops, RepoOp{
rop := RepoOp{
Kind: EvtKindDeleteRecord,
Collection: parts[0],
Rkey: parts[1],
})
}

if rm.hydrateRecords {
cid, rec, err := r.GetRecord(ctx, op.Path)
if err != nil {
return fmt.Errorf("reading changed record from car slice: %w", err)
}

rop.RecCid = &cid
rop.Record = rec
}

evtops = append(evtops, rop)
default:
return fmt.Errorf("unrecognized external user event kind: %q", op.Action)
}
Expand Down Expand Up @@ -851,15 +888,27 @@ func (rm *RepoManager) BatchWrite(ctx context.Context, user models.Uid, writes [
case w.RepoApplyWrites_Delete != nil:
d := w.RepoApplyWrites_Delete

op := RepoOp{
Kind: EvtKindDeleteRecord,
Collection: d.Collection,
Rkey: d.Rkey,
}

if rm.hydrateRecords {
cc, rec, err := r.GetRecord(ctx, d.Collection+"/"+d.Rkey)
if err != nil {
return err
}

op.RecCid = &cc
op.Record = rec
}

if err := r.DeleteRecord(ctx, d.Collection+"/"+d.Rkey); err != nil {
return err
}

ops = append(ops, RepoOp{
Kind: EvtKindDeleteRecord,
Collection: d.Collection,
Rkey: d.Rkey,
})
ops = append(ops, op)
default:
return fmt.Errorf("no operation set in write enum")
}
Expand Down Expand Up @@ -1028,12 +1077,32 @@ func (rm *RepoManager) processOp(ctx context.Context, bs blockstore.Blockstore,

return outop, nil
case "del":
return &RepoOp{
outop := &RepoOp{
Kind: EvtKindDeleteRecord,
Collection: parts[0],
Rkey: parts[1],
RecCid: nil,
}, nil
RecCid: &op.OldCid,
}

if hydrateRecords {
blk, err := bs.Get(ctx, op.OldCid)
if err != nil {
return nil, err
}

rec, err := lexutil.CborDecodeValue(blk.RawData())
if err != nil {
if !errors.Is(err, lexutil.ErrUnrecognizedType) {
return nil, err
}

rm.log.Warn("failed processing repo diff", "err", err)
} else {
outop.Record = rec
}
}

return outop, nil

default:
return nil, fmt.Errorf("diff returned invalid op type: %q", op.Op)
Expand Down

0 comments on commit 2e5cc4a

Please sign in to comment.