Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 276 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,15 @@ func buildUrnToOpsMap(mUrn2Spec map[string]*pipepb.MonitoringInfoSpec) map[strin
getMetTyp(pipepb.MonitoringInfoTypeUrns_SET_STRING_TYPE): func() metricAccumulator {
return &stringSet{set: map[string]struct{}{}}
},
getMetTyp(pipepb.MonitoringInfoTypeUrns_BOUNDED_TRIE_TYPE): func() metricAccumulator {
return &boundedTrie{
data: &boundedTrieData{
singleton: []string{},
root: nil,
bound: 100, // Default maximum size of the trie
},
}
},
getMetTyp(pipepb.MonitoringInfoTypeUrns_PROGRESS_TYPE): func() metricAccumulator { return &progress{} },
}

Expand Down Expand Up @@ -468,6 +477,273 @@ func (m *stringSet) toProto(key metricKey) *pipepb.MonitoringInfo {
}
}

type boundedTrie struct {
data *boundedTrieData
}

func (b *boundedTrie) accumulate(pyld []byte) error {
bt := &pipepb.BoundedTrie{}
if err := proto.Unmarshal(pyld, bt); err != nil {
return err
}

incoming := boundedTrieDataFromProto(bt)
if incoming.isEmpty() {
return nil
}

if b.data.isEmpty() {
b.data = incoming
return nil
}
b.data = b.data.combine(incoming)
return nil
}

func (b *boundedTrie) toProto(key metricKey) *pipepb.MonitoringInfo {
payload, err := proto.MarshalOptions{Deterministic: true}.Marshal(b.data.toProto())
if err != nil {
panic(fmt.Sprintf("error encoding bounded trie metric: %v", err))
}
return &pipepb.MonitoringInfo{
Urn: key.Urn(),
Type: getMetTyp(pipepb.MonitoringInfoTypeUrns_BOUNDED_TRIE_TYPE),
Payload: payload,
Labels: key.Labels(),
}
}

type boundedTrieNode struct {
size int32
truncated bool
children map[string]*boundedTrieNode
}

func newBoundedTrieNode() *boundedTrieNode {
return &boundedTrieNode{
size: 1,
truncated: false,
children: map[string]*boundedTrieNode{},
}
}

func boundedTrieNodeFromProto(protoNode *pipepb.BoundedTrieNode) *boundedTrieNode {
node := newBoundedTrieNode()
if protoNode.GetTruncated() {
node.truncated = true
return node
}
children := protoNode.GetChildren()
if len(children) == 0 {
return node
}

node.children = make(map[string]*boundedTrieNode, len(children))
var total int32 = 0
for prefix, child := range children {
c := boundedTrieNodeFromProto(child)
node.children[prefix] = c
total += c.size
}
if total > 1 {
node.size = total
}
return node
}

func (n *boundedTrieNode) clone() *boundedTrieNode {
clone := &boundedTrieNode{
size: n.size,
truncated: n.truncated,
}
clone.children = make(map[string]*boundedTrieNode, len(n.children))
for k, child := range n.children {
clone.children[k] = child.clone()
}
return clone
}

func (n *boundedTrieNode) add(segments []string) int32 {
if n.truncated || len(segments) == 0 {
return 0
}

var delta int32 = 0
head := segments[0]
tail := segments[1:]
wasEmpty := len(n.children) == 0
child, ok := n.children[head]
if !ok {
child = newBoundedTrieNode()
n.children[head] = child

if wasEmpty {
delta = 0
} else {
delta = 1
}
}

if len(tail) > 0 {
delta += child.add(tail)
}
n.size += delta
return delta
}

func (n *boundedTrieNode) merge(other *boundedTrieNode) int32 {
if n.truncated {
return 0
}
if other.truncated {
delta := 1 - n.size
n.truncated = true
n.children = map[string]*boundedTrieNode{}
n.size += delta
return delta
}

if len(other.children) == 0 {
return 0
}
if len(n.children) == 0 {
delta := other.size - n.size
for prefix, child := range other.children {
n.children[prefix] = child
}
n.size += delta
return delta
}

var delta int32 = 0
for prefix, otherChild := range other.children {
if selfChild, ok := n.children[prefix]; ok {
delta += selfChild.merge(otherChild)
} else {
n.children[prefix] = otherChild
delta += otherChild.size
}
}
n.size += delta
return delta
}

func (n *boundedTrieNode) trim() int32 {
if len(n.children) == 0 {
return 0
}

var maxChild *boundedTrieNode
for _, child := range n.children {
if maxChild == nil || child.size > maxChild.size {
maxChild = child
}
}

var delta int32
if maxChild.size == 1 {
delta = 1 - n.size
n.truncated = true
n.children = map[string]*boundedTrieNode{}
} else {
delta = maxChild.trim()
}
n.size += delta
return delta
}

func (n *boundedTrieNode) toProto() *pipepb.BoundedTrieNode {
protoNode := &pipepb.BoundedTrieNode{
Truncated: n.truncated,
}
if !n.truncated && len(n.children) > 0 {
protoNode.Children = make(map[string]*pipepb.BoundedTrieNode, len(n.children))
for prefix, child := range n.children {
protoNode.Children[prefix] = child.toProto()
}
}
return protoNode
}

type boundedTrieData struct {
singleton []string
root *boundedTrieNode
bound int32
}

func boundedTrieDataFromProto(protoData *pipepb.BoundedTrie) *boundedTrieData {
data := &boundedTrieData{
bound: protoData.GetBound(),
}
if len(protoData.GetSingleton()) > 0 {
data.singleton = append(data.singleton, protoData.GetSingleton()...)
}
if protoData.GetRoot() != nil {
data.root = boundedTrieNodeFromProto(protoData.GetRoot())
}
return data
}

func (d *boundedTrieData) isEmpty() bool {
return d.root == nil && len(d.singleton) == 0
}

func (d *boundedTrieData) clone() *boundedTrieData {
clone := &boundedTrieData{
bound: d.bound,
}
if len(d.singleton) > 0 {
clone.singleton = append(clone.singleton, d.singleton...)
}
if d.root != nil {
clone.root = d.root.clone()
}
return clone
}

func (d *boundedTrieData) asTrie() *boundedTrieNode {
if d.root != nil {
return d.root
}

root := newBoundedTrieNode()
if len(d.singleton) > 0 {
root.add(d.singleton)
}
return root
}

func (d *boundedTrieData) combine(other *boundedTrieData) *boundedTrieData {
if d.isEmpty() {
return other.clone()
}
if other == nil || other.isEmpty() {
return d.clone()
}

combined := d.asTrie().clone()
combined.merge(other.asTrie())
bound := int32(math.Min(float64(d.bound), float64(other.bound)))
for combined.size > bound {
combined.trim()
}
return &boundedTrieData{
root: combined,
bound: bound,
}
}

func (d *boundedTrieData) toProto() *pipepb.BoundedTrie {
protoData := &pipepb.BoundedTrie{Bound: d.bound}
if len(d.singleton) > 0 {
protoData.Singleton = append(protoData.Singleton, d.singleton...)
}
if d.root != nil {
protoData.Root = d.root.toProto()
}
return protoData
}

type durability int

const (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ func makeInfoWBytes(enum pipepb.MonitoringInfoSpecs_Enum, payload []byte) *pipep
return info
}

func boundedTriePayload(t testing.TB, trie *pipepb.BoundedTrie) []byte {
t.Helper()
bytes, err := proto.MarshalOptions{Deterministic: true}.Marshal(trie)
if err != nil {
t.Fatalf("failed to marshal bounded trie: %v", err)
}
return bytes
}

// This test validates that multiple contributions are correctly summed up and accumulated.
func Test_metricsStore_ContributeMetrics(t *testing.T) {

Expand Down Expand Up @@ -166,6 +175,37 @@ func Test_metricsStore_ContributeMetrics(t *testing.T) {
want: []*pipepb.MonitoringInfo{
makeInfoWBytes(pipepb.MonitoringInfoSpecs_USER_SET_STRING, []byte{0, 0, 0, 1, 1, 63}),
},
}, {
name: "boundedTrie",
input: []map[string][]byte{
{"a": boundedTriePayload(t, &pipepb.BoundedTrie{Bound: 100, Singleton: []string{"a"}})},
{"a": boundedTriePayload(t, &pipepb.BoundedTrie{Bound: 100, Singleton: []string{"z", "z", "z"}})},
},
shortIDs: map[string]*pipepb.MonitoringInfo{
"a": makeInfo(pipepb.MonitoringInfoSpecs_USER_BOUNDED_TRIE),
},
want: []*pipepb.MonitoringInfo{
makeInfoWBytes(
pipepb.MonitoringInfoSpecs_USER_BOUNDED_TRIE,
boundedTriePayload(t, &pipepb.BoundedTrie{
Bound: 100,
Root: &pipepb.BoundedTrieNode{
Children: map[string]*pipepb.BoundedTrieNode{
"a": {},
"z": {
Children: map[string]*pipepb.BoundedTrieNode{
"z": {
Children: map[string]*pipepb.BoundedTrieNode{
"z": {},
},
},
},
},
},
},
}),
),
},
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def test_custom_window_type(self):
" https://github.com/apache/beam/issues/31921")

def test_metrics(self):
super().test_metrics(check_bounded_trie=False)
super().test_metrics(check_bounded_trie=True)

def construct_timestamped(k, t):
return window.TimestampedValue((k, t), t)
Expand Down
Loading