diff --git a/cmd/bigsky/main.go b/cmd/bigsky/main.go index 08d638c3c..6e3a7bdf6 100644 --- a/cmd/bigsky/main.go +++ b/cmd/bigsky/main.go @@ -482,7 +482,7 @@ func runBigsky(cctx *cli.Context) error { if err := ix.HandleRepoEvent(ctx, evt); err != nil { slog.Error("failed to handle repo event", "err", err) } - }, false) + }, true) prodHR, err := api.NewProdHandleResolver(100_000, cctx.String("resolve-address"), cctx.Bool("force-dns-udp")) if err != nil { diff --git a/repomgr/repomgr.go b/repomgr/repomgr.go index c7cacef01..faff3ff25 100644 --- a/repomgr/repomgr.go +++ b/repomgr/repomgr.go @@ -312,6 +312,23 @@ func (rm *RepoManager) DeleteRecord(ctx context.Context, user models.Uid, collec } rpath := collection + "/" + rkey + + op := RepoOp{ + Kind: EvtKindDeleteRecord, + Collection: collection, + Rkey: rkey, + } + + if rm.hydrateRecords && rm.events != nil { + ocid, rec, err := r.GetRecord(ctx, rpath) + if err != nil { + return err + } + + op.Record = rec + op.RecCid = &ocid + } + if err := r.DeleteRecord(ctx, rpath); err != nil { return err } @@ -333,16 +350,12 @@ func (rm *RepoManager) DeleteRecord(ctx context.Context, user models.Uid, collec if rm.events != nil { rm.events(ctx, &RepoEvent{ - User: user, - OldRoot: oldroot, - NewRoot: nroot, - Rev: nrev, - Since: &rev, - Ops: []RepoOp{{ - Kind: EvtKindDeleteRecord, - Collection: collection, - Rkey: rkey, - }}, + User: user, + OldRoot: oldroot, + NewRoot: nroot, + Rev: nrev, + Since: &rev, + Ops: []RepoOp{op}, RepoSlice: rslice, }) } @@ -615,11 +628,23 @@ func (rm *RepoManager) handleExternalUserEventNoArchive(ctx context.Context, pds evtops = append(evtops, rop) case EvtKindDeleteRecord: - evtops = append(evtops, RepoOp{ + rop := RepoOp{ Kind: EvtKindDeleteRecord, Collection: parts[0], Rkey: parts[1], - }) + } + + if rm.hydrateRecords { + cid, rec, err := r.GetRecord(ctx, op.Path) + if err != nil { + return fmt.Errorf("reading changed record from car slice: %w", err) + } + + rop.RecCid = &cid + rop.Record = rec + } + + evtops = append(evtops, rop) default: return fmt.Errorf("unrecognized external user event kind: %q", op.Action) } @@ -738,11 +763,23 @@ func (rm *RepoManager) handleExternalUserEventArchive(ctx context.Context, pdsid evtops = append(evtops, rop) case EvtKindDeleteRecord: - evtops = append(evtops, RepoOp{ + rop := RepoOp{ Kind: EvtKindDeleteRecord, Collection: parts[0], Rkey: parts[1], - }) + } + + if rm.hydrateRecords { + cid, rec, err := r.GetRecord(ctx, op.Path) + if err != nil { + return fmt.Errorf("reading changed record from car slice: %w", err) + } + + rop.RecCid = &cid + rop.Record = rec + } + + evtops = append(evtops, rop) default: return fmt.Errorf("unrecognized external user event kind: %q", op.Action) } @@ -851,15 +888,27 @@ func (rm *RepoManager) BatchWrite(ctx context.Context, user models.Uid, writes [ case w.RepoApplyWrites_Delete != nil: d := w.RepoApplyWrites_Delete + op := RepoOp{ + Kind: EvtKindDeleteRecord, + Collection: d.Collection, + Rkey: d.Rkey, + } + + if rm.hydrateRecords { + cc, rec, err := r.GetRecord(ctx, d.Collection+"/"+d.Rkey) + if err != nil { + return err + } + + op.RecCid = &cc + op.Record = rec + } + if err := r.DeleteRecord(ctx, d.Collection+"/"+d.Rkey); err != nil { return err } - ops = append(ops, RepoOp{ - Kind: EvtKindDeleteRecord, - Collection: d.Collection, - Rkey: d.Rkey, - }) + ops = append(ops, op) default: return fmt.Errorf("no operation set in write enum") } @@ -1028,12 +1077,32 @@ func (rm *RepoManager) processOp(ctx context.Context, bs blockstore.Blockstore, return outop, nil case "del": - return &RepoOp{ + outop := &RepoOp{ Kind: EvtKindDeleteRecord, Collection: parts[0], Rkey: parts[1], - RecCid: nil, - }, nil + RecCid: &op.OldCid, + } + + if hydrateRecords { + blk, err := bs.Get(ctx, op.OldCid) + if err != nil { + return nil, err + } + + rec, err := lexutil.CborDecodeValue(blk.RawData()) + if err != nil { + if !errors.Is(err, lexutil.ErrUnrecognizedType) { + return nil, err + } + + rm.log.Warn("failed processing repo diff", "err", err) + } else { + outop.Record = rec + } + } + + return outop, nil default: return nil, fmt.Errorf("diff returned invalid op type: %q", op.Op)