Skip to content

Commit

Permalink
[concurrentbatchprocessor] Allow max_in_flight_size_mib to be set to …
Browse files Browse the repository at this point in the history
…zero to disable inflight limit (#243)

For OTel-Arrow, we now prefer to limit admitted bytes in the receiver.
Allow the function to be disabled in the CBP.
([Refer to this for reciever admission
control](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/receiver/otelarrowreceiver/README.md#arrow-specific-configuration).)
  • Loading branch information
jmacd authored Sep 4, 2024
1 parent b1ae3f4 commit 5ed5ffc
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 15 deletions.
3 changes: 2 additions & 1 deletion collector/processor/concurrentbatchprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ The differences in this component, relative to that component are:
until the request returns with success or an error status code.
2. Maximim in-flight-bytes setting. This component measures the
in-memory size of each request it admits to the pipeline and
otherwise stalls requests until they timeout.
otherwise stalls requests until they timeout. This function is
disabled by `max_in_flight_size_mib: 0`.
3. Unlimited concurrency: this component will start as many goroutines
as needed to send batches through the pipeline.

Expand Down
19 changes: 14 additions & 5 deletions collector/processor/concurrentbatchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func newBatchProcessor(set processor.Settings, cfg *Config, batchFunc func() bat

tp := set.TelemetrySettings.TracerProvider
if tp == nil {
tp = otel.GetTracerProvider()
tp = otel.GetTracerProvider()
}

bp := &batchProcessor{
Expand All @@ -189,9 +189,13 @@ func newBatchProcessor(set processor.Settings, cfg *Config, batchFunc func() bat
metadataKeys: mks,
metadataLimit: int(cfg.MetadataCardinalityLimit),
limitBytes: limitBytes,
sem: semaphore.NewWeighted(limitBytes),
tracer: tp,
}

if limitBytes != 0 {
bp.sem = semaphore.NewWeighted(limitBytes)
}

if len(bp.metadataKeys) == 0 {
bp.batcher = &singleShardBatcher{batcher: bp.newShard(nil)}
} else {
Expand Down Expand Up @@ -385,7 +389,7 @@ func (b *shard) sendItems(trigger trigger) {
parent = contexts[0]
parent, parentSpan = b.tracer.Tracer("otel").Start(parent, "concurrent_batch_processor/export")
} else {
spans := parentSpans(contexts)
spans := parentSpans(contexts)

links := make([]trace.Link, len(spans))
for i, span := range spans {
Expand Down Expand Up @@ -454,7 +458,10 @@ func allSame(x []context.Context) bool {
}

func (bp *batchProcessor) countAcquire(ctx context.Context, bytes int64) error {
err := bp.sem.Acquire(ctx, bytes)
var err error
if bp.sem != nil {
err = bp.sem.Acquire(ctx, bytes)
}
if err == nil && bp.telemetry.batchInFlightBytes != nil {
bp.telemetry.batchInFlightBytes.Add(ctx, bytes, bp.telemetry.processorAttrOption)
}
Expand All @@ -465,7 +472,9 @@ func (bp *batchProcessor) countRelease(bytes int64) {
if bp.telemetry.batchInFlightBytes != nil {
bp.telemetry.batchInFlightBytes.Add(context.Background(), -bytes, bp.telemetry.processorAttrOption)
}
bp.sem.Release(bytes)
if bp.sem != nil {
bp.sem.Release(bytes)
}
}

func (b *shard) consumeAndWait(ctx context.Context, data any) error {
Expand Down
5 changes: 1 addition & 4 deletions collector/processor/concurrentbatchprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Config struct {
MetadataCardinalityLimit uint32 `mapstructure:"metadata_cardinality_limit"`

// MaxInFlightSizeMiB limits the number of bytes in queue waiting to be
// processed by the senders.
// processed by the senders. If zero, this functionality is disabled.
MaxInFlightSizeMiB uint32 `mapstructure:"max_in_flight_size_mib"`
}

Expand All @@ -68,8 +68,5 @@ func (cfg *Config) Validate() error {
if cfg.Timeout < 0 {
return errors.New("timeout must be greater or equal to 0")
}
if cfg.MaxInFlightSizeMiB <= 0 {
return errors.New("max_in_flight_size_mib must be greater than 0")
}
return nil
}
5 changes: 0 additions & 5 deletions collector/processor/concurrentbatchprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,3 @@ func TestValidateConfig_InvalidTimeout(t *testing.T) {
}
assert.Error(t, cfg.Validate())
}

func TestValidateConfig_InvalidZero(t *testing.T) {
cfg := &Config{}
assert.Error(t, cfg.Validate())
}

0 comments on commit 5ed5ffc

Please sign in to comment.