From 1c58afb8447bae281676200b852408729acad7b5 Mon Sep 17 00:00:00 2001 From: Charles Nguyen Date: Thu, 30 Oct 2025 01:41:10 -0400 Subject: [PATCH 1/2] Support BoundedTrie metric for Prism Runner --- .../prism/internal/jobservices/metrics.go | 286 ++++++++++++++++++ .../internal/jobservices/metrics_test.go | 40 +++ .../runners/portability/prism_runner_test.py | 2 +- 3 files changed, 327 insertions(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go index bbbdfd1eba4f..8d66f3b34ac6 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go @@ -183,6 +183,15 @@ func buildUrnToOpsMap(mUrn2Spec map[string]*pipepb.MonitoringInfoSpec) map[strin getMetTyp(pipepb.MonitoringInfoTypeUrns_SET_STRING_TYPE): func() metricAccumulator { return &stringSet{set: map[string]struct{}{}} }, + getMetTyp(pipepb.MonitoringInfoTypeUrns_BOUNDED_TRIE_TYPE): func() metricAccumulator { + return &boundedTrie{ + data: &boundedTrieData{ + singleton: []string{}, + root: nil, + bound: 100, // Default maximum size of the trie + }, + } + }, getMetTyp(pipepb.MonitoringInfoTypeUrns_PROGRESS_TYPE): func() metricAccumulator { return &progress{} }, } @@ -468,6 +477,283 @@ func (m *stringSet) toProto(key metricKey) *pipepb.MonitoringInfo { } } +type boundedTrie struct { + data *boundedTrieData +} + +func (b *boundedTrie) accumulate(pyld []byte) error { + bt := &pipepb.BoundedTrie{} + if err := proto.Unmarshal(pyld, bt); err != nil { + return err + } + + incoming := boundedTrieDataFromProto(bt) + if incoming.isEmpty() { + return nil + } + + if b.data.isEmpty() { + b.data = incoming + return nil + } + b.data = b.data.combine(incoming) + return nil +} + +func (b *boundedTrie) toProto(key metricKey) *pipepb.MonitoringInfo { + payload, err := proto.Marshal(b.data.toProto()) + if err != nil { + panic(fmt.Sprintf("error encoding bounded trie metric: %v", err)) + } + return &pipepb.MonitoringInfo{ + Urn: key.Urn(), + Type: getMetTyp(pipepb.MonitoringInfoTypeUrns_BOUNDED_TRIE_TYPE), + Payload: payload, + Labels: key.Labels(), + } +} + +type boundedTrieNode struct { + size int32 + truncated bool + children map[string]*boundedTrieNode +} + +func newBoundedTrieNode() *boundedTrieNode { + return &boundedTrieNode{ + size: 1, + children: map[string]*boundedTrieNode{}, + } +} + +func boundedTrieNodeFromProto(protoNode *pipepb.BoundedTrieNode) *boundedTrieNode { + node := newBoundedTrieNode() + if protoNode.GetTruncated() { + node.truncated = true + return node + } + children := protoNode.GetChildren() + if len(children) == 0 { + return node + } + node.children = make(map[string]*boundedTrieNode, len(children)) + var total int32 = 0 + for prefix, child := range children { + c := boundedTrieNodeFromProto(child) + node.children[prefix] = c + total += c.size + } + if total > 1 { + node.size = total + } + return node +} + +func (n *boundedTrieNode) clone() *boundedTrieNode { + clone := &boundedTrieNode{ + size: n.size, + truncated: n.truncated, + } + clone.children = make(map[string]*boundedTrieNode, len(n.children)) + for k, child := range n.children { + clone.children[k] = child.clone() + } + return clone +} + +func (n *boundedTrieNode) add(segments []string) int32 { + if n.truncated || len(segments) == 0 { + return 0 + } + if n.children == nil { + n.children = make(map[string]*boundedTrieNode) + } + head := segments[0] + tail := segments[1:] + wasEmpty := len(n.children) == 0 + child, ok := n.children[head] + if !ok { + child = newBoundedTrieNode() + n.children[head] = child + } + var delta int32 = 0 + if !ok { + if wasEmpty { + delta = 0 + } else { + delta = 1 + } + } + if len(tail) > 0 { + delta += child.add(tail) + } + n.size += delta + return delta +} + +func (n *boundedTrieNode) merge(other *boundedTrieNode) int32 { + if n.truncated || other == nil { + return 0 + } + if other.truncated { + delta := 1 - n.size + n.truncated = true + n.children = nil + n.size += delta + return delta + } + if len(other.children) == 0 { + return 0 + } + if len(n.children) == 0 { + if n.children == nil { + n.children = make(map[string]*boundedTrieNode, len(other.children)) + } + delta := other.size - n.size + for prefix, child := range other.children { + n.children[prefix] = child.clone() + } + n.size += delta + return delta + } + var delta int32 = 0 + for prefix, otherChild := range other.children { + if n.children == nil { + n.children = make(map[string]*boundedTrieNode) + } + if selfChild, ok := n.children[prefix]; ok { + delta += selfChild.merge(otherChild.clone()) + } else { + cloned := otherChild.clone() + n.children[prefix] = cloned + delta += cloned.size + } + } + n.size += delta + return delta +} + +func (n *boundedTrieNode) trim() int32 { + if len(n.children) == 0 { + return 0 + } + var maxChild *boundedTrieNode + for _, child := range n.children { + if maxChild == nil || child.size > maxChild.size { + maxChild = child + } + } + if maxChild == nil { + return 0 + } + var delta int32 + if maxChild.size == 1 { + delta = 1 - n.size + n.truncated = true + n.children = nil + } else { + delta = maxChild.trim() + } + n.size += delta + return delta +} + +func (n *boundedTrieNode) toProto() *pipepb.BoundedTrieNode { + protoNode := &pipepb.BoundedTrieNode{ + Truncated: n.truncated, + } + if !n.truncated && len(n.children) > 0 { + protoNode.Children = make(map[string]*pipepb.BoundedTrieNode, len(n.children)) + for prefix, child := range n.children { + protoNode.Children[prefix] = child.toProto() + } + } + return protoNode +} + +type boundedTrieData struct { + singleton []string + root *boundedTrieNode + bound int32 +} + +func boundedTrieDataFromProto(protoData *pipepb.BoundedTrie) *boundedTrieData { + if protoData == nil { + return &boundedTrieData{} + } + + data := &boundedTrieData{ + bound: protoData.GetBound(), + } + if len(protoData.GetSingleton()) > 0 { + data.singleton = append(data.singleton, protoData.GetSingleton()...) + } + if protoData.GetRoot() != nil { + data.root = boundedTrieNodeFromProto(protoData.GetRoot()) + } + return data +} + +func (d *boundedTrieData) isEmpty() bool { + return d.root == nil && len(d.singleton) == 0 +} + +func (d *boundedTrieData) clone() *boundedTrieData { + clone := &boundedTrieData{ + bound: d.bound, + } + if len(d.singleton) > 0 { + clone.singleton = append(clone.singleton, d.singleton...) + } + if d.root != nil { + clone.root = d.root.clone() + } + return clone +} + +func (d *boundedTrieData) asTrie() *boundedTrieNode { + if d.root != nil { + return d.root + } + + root := newBoundedTrieNode() + if len(d.singleton) > 0 { + root.add(d.singleton) + } + return root +} + +func (d *boundedTrieData) combine(other *boundedTrieData) *boundedTrieData { + if d.isEmpty() { + return other.clone() + } + if other == nil || other.isEmpty() { + return d.clone() + } + + combined := d.asTrie().clone() + combined.merge(other.asTrie()) + bound := int32(math.Min(float64(d.bound), float64(other.bound))) + for combined.size > bound { + combined.trim() + } + return &boundedTrieData{ + root: combined, + bound: bound, + } +} + +func (d *boundedTrieData) toProto() *pipepb.BoundedTrie { + protoData := &pipepb.BoundedTrie{Bound: int32(d.bound)} + if len(d.singleton) > 0 { + protoData.Singleton = append([]string(nil), d.singleton...) + } + if d.root != nil { + protoData.Root = d.root.toProto() + } + return protoData +} + type durability int const ( diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics_test.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics_test.go index 339d862292fd..17dccd8a42d2 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics_test.go @@ -53,6 +53,15 @@ func makeInfoWBytes(enum pipepb.MonitoringInfoSpecs_Enum, payload []byte) *pipep return info } +func boundedTriePayload(t testing.TB, trie *pipepb.BoundedTrie) []byte { + t.Helper() + bytes, err := proto.Marshal(trie) + if err != nil { + t.Fatalf("failed to marshal bounded trie: %v", err) + } + return bytes +} + // This test validates that multiple contributions are correctly summed up and accumulated. func Test_metricsStore_ContributeMetrics(t *testing.T) { @@ -166,6 +175,37 @@ func Test_metricsStore_ContributeMetrics(t *testing.T) { want: []*pipepb.MonitoringInfo{ makeInfoWBytes(pipepb.MonitoringInfoSpecs_USER_SET_STRING, []byte{0, 0, 0, 1, 1, 63}), }, + }, { + name: "boundedTrie", + input: []map[string][]byte{ + {"a": boundedTriePayload(t, &pipepb.BoundedTrie{Bound: 100, Singleton: []string{"a"}})}, + {"a": boundedTriePayload(t, &pipepb.BoundedTrie{Bound: 100, Singleton: []string{"z", "z", "z"}})}, + }, + shortIDs: map[string]*pipepb.MonitoringInfo{ + "a": makeInfo(pipepb.MonitoringInfoSpecs_USER_BOUNDED_TRIE), + }, + want: []*pipepb.MonitoringInfo{ + makeInfoWBytes( + pipepb.MonitoringInfoSpecs_USER_BOUNDED_TRIE, + boundedTriePayload(t, &pipepb.BoundedTrie{ + Bound: 100, + Root: &pipepb.BoundedTrieNode{ + Children: map[string]*pipepb.BoundedTrieNode{ + "a": {}, + "z": { + Children: map[string]*pipepb.BoundedTrieNode{ + "z": { + Children: map[string]*pipepb.BoundedTrieNode{ + "z": {}, + }, + }, + }, + }, + }, + }, + }), + ), + }, }, } diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py b/sdks/python/apache_beam/runners/portability/prism_runner_test.py index a65f9a9960b4..67710eb8ace2 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py @@ -232,7 +232,7 @@ def test_custom_window_type(self): " https://github.com/apache/beam/issues/31921") def test_metrics(self): - super().test_metrics(check_bounded_trie=False) + super().test_metrics(check_bounded_trie=True) def construct_timestamped(k, t): return window.TimestampedValue((k, t), t) From a915c18955c9855836d007741260af1e1eaa3b73 Mon Sep 17 00:00:00 2001 From: Charles Nguyen Date: Sat, 1 Nov 2025 20:03:21 -0400 Subject: [PATCH 2/2] Fix test and refined implementation --- .../prism/internal/jobservices/metrics.go | 54 ++++++++----------- .../internal/jobservices/metrics_test.go | 2 +- 2 files changed, 23 insertions(+), 33 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go index 8d66f3b34ac6..fbe18ff44049 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go @@ -501,7 +501,7 @@ func (b *boundedTrie) accumulate(pyld []byte) error { } func (b *boundedTrie) toProto(key metricKey) *pipepb.MonitoringInfo { - payload, err := proto.Marshal(b.data.toProto()) + payload, err := proto.MarshalOptions{Deterministic: true}.Marshal(b.data.toProto()) if err != nil { panic(fmt.Sprintf("error encoding bounded trie metric: %v", err)) } @@ -521,8 +521,9 @@ type boundedTrieNode struct { func newBoundedTrieNode() *boundedTrieNode { return &boundedTrieNode{ - size: 1, - children: map[string]*boundedTrieNode{}, + size: 1, + truncated: false, + children: map[string]*boundedTrieNode{}, } } @@ -536,6 +537,7 @@ func boundedTrieNodeFromProto(protoNode *pipepb.BoundedTrieNode) *boundedTrieNod if len(children) == 0 { return node } + node.children = make(map[string]*boundedTrieNode, len(children)) var total int32 = 0 for prefix, child := range children { @@ -565,9 +567,8 @@ func (n *boundedTrieNode) add(segments []string) int32 { if n.truncated || len(segments) == 0 { return 0 } - if n.children == nil { - n.children = make(map[string]*boundedTrieNode) - } + + var delta int32 = 0 head := segments[0] tail := segments[1:] wasEmpty := len(n.children) == 0 @@ -575,15 +576,14 @@ func (n *boundedTrieNode) add(segments []string) int32 { if !ok { child = newBoundedTrieNode() n.children[head] = child - } - var delta int32 = 0 - if !ok { + if wasEmpty { delta = 0 } else { delta = 1 } } + if len(tail) > 0 { delta += child.add(tail) } @@ -592,41 +592,36 @@ func (n *boundedTrieNode) add(segments []string) int32 { } func (n *boundedTrieNode) merge(other *boundedTrieNode) int32 { - if n.truncated || other == nil { + if n.truncated { return 0 } if other.truncated { delta := 1 - n.size n.truncated = true - n.children = nil + n.children = map[string]*boundedTrieNode{} n.size += delta return delta } + if len(other.children) == 0 { return 0 } if len(n.children) == 0 { - if n.children == nil { - n.children = make(map[string]*boundedTrieNode, len(other.children)) - } delta := other.size - n.size for prefix, child := range other.children { - n.children[prefix] = child.clone() + n.children[prefix] = child } n.size += delta return delta } + var delta int32 = 0 for prefix, otherChild := range other.children { - if n.children == nil { - n.children = make(map[string]*boundedTrieNode) - } if selfChild, ok := n.children[prefix]; ok { - delta += selfChild.merge(otherChild.clone()) + delta += selfChild.merge(otherChild) } else { - cloned := otherChild.clone() - n.children[prefix] = cloned - delta += cloned.size + n.children[prefix] = otherChild + delta += otherChild.size } } n.size += delta @@ -637,20 +632,19 @@ func (n *boundedTrieNode) trim() int32 { if len(n.children) == 0 { return 0 } + var maxChild *boundedTrieNode for _, child := range n.children { if maxChild == nil || child.size > maxChild.size { maxChild = child } } - if maxChild == nil { - return 0 - } + var delta int32 if maxChild.size == 1 { delta = 1 - n.size n.truncated = true - n.children = nil + n.children = map[string]*boundedTrieNode{} } else { delta = maxChild.trim() } @@ -678,10 +672,6 @@ type boundedTrieData struct { } func boundedTrieDataFromProto(protoData *pipepb.BoundedTrie) *boundedTrieData { - if protoData == nil { - return &boundedTrieData{} - } - data := &boundedTrieData{ bound: protoData.GetBound(), } @@ -744,9 +734,9 @@ func (d *boundedTrieData) combine(other *boundedTrieData) *boundedTrieData { } func (d *boundedTrieData) toProto() *pipepb.BoundedTrie { - protoData := &pipepb.BoundedTrie{Bound: int32(d.bound)} + protoData := &pipepb.BoundedTrie{Bound: d.bound} if len(d.singleton) > 0 { - protoData.Singleton = append([]string(nil), d.singleton...) + protoData.Singleton = append(protoData.Singleton, d.singleton...) } if d.root != nil { protoData.Root = d.root.toProto() diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics_test.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics_test.go index 17dccd8a42d2..58f98c79f4bf 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics_test.go @@ -55,7 +55,7 @@ func makeInfoWBytes(enum pipepb.MonitoringInfoSpecs_Enum, payload []byte) *pipep func boundedTriePayload(t testing.TB, trie *pipepb.BoundedTrie) []byte { t.Helper() - bytes, err := proto.Marshal(trie) + bytes, err := proto.MarshalOptions{Deterministic: true}.Marshal(trie) if err != nil { t.Fatalf("failed to marshal bounded trie: %v", err) }