Skip to content

Commit ee4fc56

Browse files
authored
Improve error message when cycle is detected in component graph (open-telemetry#7206)
See open-telemetry#7045 (comment)
1 parent 02cceec commit ee4fc56

File tree

2 files changed

+90
-28
lines changed

2 files changed

+90
-28
lines changed

service/graph.go

+44-4
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ package service // import "go.opentelemetry.io/collector/service"
1616

1717
import (
1818
"context"
19+
"errors"
20+
"fmt"
1921
"net/http"
22+
"strings"
2023

2124
"go.uber.org/multierr"
2225
"gonum.org/v1/gonum/graph"
@@ -159,10 +162,7 @@ func (g *pipelinesGraph) createEdges() {
159162
func (g *pipelinesGraph) buildComponents(ctx context.Context, set pipelinesSettings) error {
160163
nodes, err := topo.Sort(g.componentGraph)
161164
if err != nil {
162-
// TODO When there is a cycle in the graph, there is enough information
163-
// within the error to construct a better error message that indicates
164-
// exactly the components that are in a cycle.
165-
return err
165+
return cycleErr(err, topo.DirectedCyclesIn(g.componentGraph))
166166
}
167167

168168
for i := len(nodes) - 1; i >= 0; i-- {
@@ -356,3 +356,43 @@ func (p *pipelineNodes) exporterIDs() []string {
356356
func (p *pipelineNodes) mutatesData() bool {
357357
return p.capabilitiesNode.getConsumer().Capabilities().MutatesData
358358
}
359+
360+
func cycleErr(err error, cycles [][]graph.Node) error {
361+
var topoErr topo.Unorderable
362+
if !errors.As(err, &topoErr) || len(cycles) == 0 || len(cycles[0]) == 0 {
363+
return err
364+
}
365+
366+
// There may be multiple cycles, but report only the first one.
367+
cycle := cycles[0]
368+
369+
// The last node is a duplicate of the first node.
370+
// Remove it because we may start from a different node.
371+
cycle = cycle[:len(cycle)-1]
372+
373+
// A cycle always contains a connector. For the sake of consistent
374+
// error messages report the cycle starting from a connector.
375+
for i := 0; i < len(cycle); i++ {
376+
if _, ok := cycle[i].(*connectorNode); ok {
377+
cycle = append(cycle[i:], cycle[:i]...)
378+
break
379+
}
380+
}
381+
382+
// Repeat the first node at the end to clarify the cycle
383+
cycle = append(cycle, cycle[0])
384+
385+
// Build the error message
386+
componentDetails := make([]string, 0, len(cycle))
387+
for _, node := range cycle {
388+
switch n := node.(type) {
389+
case *processorNode:
390+
componentDetails = append(componentDetails, fmt.Sprintf("processor %q in pipeline %q", n.componentID, n.pipelineID))
391+
case *connectorNode:
392+
componentDetails = append(componentDetails, fmt.Sprintf("connector %q (%s to %s)", n.componentID, n.exprPipelineType, n.rcvrPipelineType))
393+
default:
394+
continue // skip capabilities/fanout nodes
395+
}
396+
}
397+
return fmt.Errorf("cycle detected: %s", strings.Join(componentDetails, " -> "))
398+
}

service/graph_test.go

+46-24
Original file line numberDiff line numberDiff line change
@@ -824,14 +824,13 @@ func TestGraphBuildErrors(t *testing.T) {
824824
badConnectorFactory := newBadConnectorFactory()
825825

826826
tests := []struct {
827-
name string
828-
receiverCfgs map[component.ID]component.Config
829-
processorCfgs map[component.ID]component.Config
830-
exporterCfgs map[component.ID]component.Config
831-
connectorCfgs map[component.ID]component.Config
832-
pipelineCfgs map[component.ID]*PipelineConfig
833-
expected string
834-
expectedStartsWith string
827+
name string
828+
receiverCfgs map[component.ID]component.Config
829+
processorCfgs map[component.ID]component.Config
830+
exporterCfgs map[component.ID]component.Config
831+
connectorCfgs map[component.ID]component.Config
832+
pipelineCfgs map[component.ID]*PipelineConfig
833+
expected string
835834
}{
836835
{
837836
name: "not_supported_exporter_logs",
@@ -1217,7 +1216,10 @@ func TestGraphBuildErrors(t *testing.T) {
12171216
Exporters: []component.ID{component.NewIDWithName("nop", "conn")},
12181217
},
12191218
},
1220-
expectedStartsWith: "topo: no topological ordering: cyclic components:",
1219+
expected: `cycle detected: ` +
1220+
`connector "nop/conn" (traces to traces) -> ` +
1221+
`processor "nop" in pipeline "traces" -> ` +
1222+
`connector "nop/conn" (traces to traces)`,
12211223
},
12221224
{
12231225
name: "not_allowed_simple_cycle_metrics.yaml",
@@ -1240,7 +1242,10 @@ func TestGraphBuildErrors(t *testing.T) {
12401242
Exporters: []component.ID{component.NewIDWithName("nop", "conn")},
12411243
},
12421244
},
1243-
expectedStartsWith: "topo: no topological ordering: cyclic components:",
1245+
expected: `cycle detected: ` +
1246+
`connector "nop/conn" (metrics to metrics) -> ` +
1247+
`processor "nop" in pipeline "metrics" -> ` +
1248+
`connector "nop/conn" (metrics to metrics)`,
12441249
},
12451250
{
12461251
name: "not_allowed_simple_cycle_logs.yaml",
@@ -1263,7 +1268,10 @@ func TestGraphBuildErrors(t *testing.T) {
12631268
Exporters: []component.ID{component.NewIDWithName("nop", "conn")},
12641269
},
12651270
},
1266-
expectedStartsWith: "topo: no topological ordering: cyclic components:",
1271+
expected: `cycle detected: ` +
1272+
`connector "nop/conn" (logs to logs) -> ` +
1273+
`processor "nop" in pipeline "logs" -> ` +
1274+
`connector "nop/conn" (logs to logs)`,
12671275
},
12681276
{
12691277
name: "not_allowed_deep_cycle_traces.yaml",
@@ -1303,8 +1311,12 @@ func TestGraphBuildErrors(t *testing.T) {
13031311
Exporters: []component.ID{component.NewID("nop")},
13041312
},
13051313
},
1306-
expectedStartsWith: "topo: no topological ordering: cyclic components:",
1307-
// TODO rebuild cycle in order
1314+
expected: `cycle detected: ` +
1315+
`connector "nop/conn1" (traces to traces) -> ` +
1316+
`processor "nop" in pipeline "traces/2" -> ` +
1317+
`connector "nop/conn" (traces to traces) -> ` +
1318+
`processor "nop" in pipeline "traces/1" -> ` +
1319+
`connector "nop/conn1" (traces to traces)`,
13081320
},
13091321
{
13101322
name: "not_allowed_deep_cycle_metrics.yaml",
@@ -1344,8 +1356,12 @@ func TestGraphBuildErrors(t *testing.T) {
13441356
Exporters: []component.ID{component.NewID("nop")},
13451357
},
13461358
},
1347-
expectedStartsWith: "topo: no topological ordering: cyclic components:",
1348-
// TODO rebuild cycle in order
1359+
expected: `cycle detected: ` +
1360+
`connector "nop/conn1" (metrics to metrics) -> ` +
1361+
`processor "nop" in pipeline "metrics/2" -> ` +
1362+
`connector "nop/conn" (metrics to metrics) -> ` +
1363+
`processor "nop" in pipeline "metrics/1" -> ` +
1364+
`connector "nop/conn1" (metrics to metrics)`,
13491365
},
13501366
{
13511367
name: "not_allowed_deep_cycle_logs.yaml",
@@ -1385,8 +1401,12 @@ func TestGraphBuildErrors(t *testing.T) {
13851401
Exporters: []component.ID{component.NewID("nop")},
13861402
},
13871403
},
1388-
expectedStartsWith: "topo: no topological ordering: cyclic components:",
1389-
// TODO rebuild cycle in order
1404+
expected: `cycle detected: ` +
1405+
`connector "nop/conn1" (logs to logs) -> ` +
1406+
`processor "nop" in pipeline "logs/2" -> ` +
1407+
`connector "nop/conn" (logs to logs) -> ` +
1408+
`processor "nop" in pipeline "logs/1" -> ` +
1409+
`connector "nop/conn1" (logs to logs)`,
13901410
},
13911411
{
13921412
name: "not_allowed_deep_cycle_multi_signal.yaml",
@@ -1442,7 +1462,14 @@ func TestGraphBuildErrors(t *testing.T) {
14421462
Exporters: []component.ID{component.NewIDWithName("nop", "fork")}, // cannot loop back to "nop/fork"
14431463
},
14441464
},
1445-
expected: "topo: no topological ordering: 12 nodes in 1 cyclic components",
1465+
expected: `cycle detected: ` +
1466+
`connector "nop/rawlog" (traces to logs) -> ` +
1467+
`processor "nop" in pipeline "logs/raw" -> ` +
1468+
`connector "nop/fork" (logs to traces) -> ` +
1469+
`processor "nop" in pipeline "traces/copy2" -> ` +
1470+
`connector "nop/forkagain" (traces to traces) -> ` +
1471+
`processor "nop" in pipeline "traces/copy2b" -> ` +
1472+
`connector "nop/rawlog" (traces to logs)`,
14461473
},
14471474
{
14481475
name: "unknown_exporter_config",
@@ -1605,12 +1632,7 @@ func TestGraphBuildErrors(t *testing.T) {
16051632
PipelineConfigs: test.pipelineCfgs,
16061633
}
16071634
_, err := buildPipelinesGraph(context.Background(), set)
1608-
if test.expected != "" {
1609-
assert.EqualError(t, err, test.expected)
1610-
} else {
1611-
assert.Error(t, err)
1612-
assert.Contains(t, err.Error(), test.expectedStartsWith)
1613-
}
1635+
assert.EqualError(t, err, test.expected)
16141636
})
16151637
}
16161638
}

0 commit comments

Comments
 (0)