diff --git a/pkg/processor/runnable_processor.go b/pkg/processor/runnable_processor.go index 48484a1b7..5034fb5a0 100644 --- a/pkg/processor/runnable_processor.go +++ b/pkg/processor/runnable_processor.go @@ -94,6 +94,11 @@ func (p *RunnableProcessor) Process(ctx context.Context, records []opencdc.Recor if len(keptRecords) > 0 { outRecs = p.proc.Process(ctx, keptRecords) + if len(outRecs) > len(keptRecords) { + return []sdk.ProcessedRecord{ + sdk.ErrorRecord{Error: cerrors.New("processor returned more records than input")}, + } + } } if err != nil { outRecs = append(outRecs, sdk.ErrorRecord{Error: err}) @@ -111,6 +116,8 @@ func (p *RunnableProcessor) Process(ctx context.Context, records []opencdc.Recor tmp := make([]sdk.ProcessedRecord, len(outRecs)+len(passthroughRecordIndexes)) prevIndex := -1 for i, index := range passthroughRecordIndexes { + // TODO index-i can be out of bounds if the processor returns + // fewer records than the input. copy(tmp[prevIndex+1:index], outRecs[prevIndex-i+1:index-i]) tmp[index] = sdk.SingleRecord(records[index]) prevIndex = index