Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update tags autocreate #1587

Merged
merged 1 commit into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 51 additions & 37 deletions cmd/statshouse/statshouse-tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,11 +602,13 @@ func mainPublishTagDrafts() {
metadataNet string
metadataAddr string
metadataActorID int64
dryRun bool
)
flag.Int64Var(&metadataActorID, "metadata-actor-id", 0, "")
flag.StringVar(&metadataAddr, "metadata-addr", "127.0.0.1:2442", "")
flag.StringVar(&metadataNet, "metadata-net", "tcp4", "")
flag.StringVar(&argv.aesPwdFile, "aes-pwd-file", "", "path to AES password file, will try to read "+defaultPathToPwd+" if not set")
flag.BoolVar(&dryRun, "dry-run", true, "do not publish changes")
build.FlagParseShowVersionHelp()
flag.Parse()
client := tlmetadata.Client{
Expand All @@ -625,53 +627,62 @@ func mainPublishTagDrafts() {
work = make(map[int32]map[int32]format.MetricMetaValue)
workCond = sync.NewCond(&workMu)
)
storage = metajournal.MakeMetricsStorage("", nil, nil, func(newEntries []tlmetadata.Event) {
var n int
for _, e := range newEntries {
switch e.EventType {
case format.MetricEvent:
meta := format.MetricMetaValue{}
err := meta.UnmarshalBinary([]byte(e.Data))
storage = metajournal.MakeMetricsStorage("", nil,
func(configID int32, configString string) {
switch configID {
case format.KnownTagsConfigID:
v, err := aggregator.ParseKnownTags([]byte(configString), storage)
fmt.Fprintln(os.Stderr, configString)
if err != nil {
fmt.Fprintln(os.Stderr, e.Data)
fmt.Fprintln(os.Stderr, err)
continue
}
if meta.NamespaceID == 0 || meta.NamespaceID == format.BuiltinNamespaceIDDefault {
continue
}
if len(meta.TagsDraft) == 0 {
continue
}
workCond.L.Lock()
if m := work[meta.NamespaceID]; m != nil {
m[meta.MetricID] = meta
} else {
work[meta.NamespaceID] = map[int32]format.MetricMetaValue{meta.MetricID: meta}
}
workCond.L.Unlock()
n++
case format.PromConfigEvent:
v, err := aggregator.ParseKnownTags([]byte(e.Data), storage)
fmt.Fprintln(os.Stderr, e.Data)
if err != nil {
fmt.Fprintln(os.Stderr, err)
continue
break
}
workCond.L.Lock()
config = v
workCond.L.Unlock()
n++
}
}
if n != 0 {
workCond.Signal()
}
})
},
func(newEntries []tlmetadata.Event) {
var n int
for _, e := range newEntries {
switch e.EventType {
case format.MetricEvent:
meta := format.MetricMetaValue{}
err := meta.UnmarshalBinary([]byte(e.Data))
if err != nil {
fmt.Fprintln(os.Stderr, e.Data)
fmt.Fprintln(os.Stderr, err)
continue
}
if meta.NamespaceID == 0 || meta.NamespaceID == format.BuiltinNamespaceIDDefault {
continue
}
if len(meta.TagsDraft) == 0 {
continue
}
// log.Printf("FOUND tag draft %s\n", meta.Name)
workCond.L.Lock()
if m := work[meta.NamespaceID]; m != nil {
m[meta.MetricID] = meta
} else {
work[meta.NamespaceID] = map[int32]format.MetricMetaValue{meta.MetricID: meta}
}
workCond.L.Unlock()
n++
}
}
if n != 0 {
workCond.Signal()
}
})
storage.Journal().Start(nil, nil, loader.LoadJournal)
fmt.Println("Press <Enter> to start publishing tag drafts")
if dryRun {
fmt.Println("DRY RUN!")
}
fmt.Println()
bufio.NewReader(os.Stdin).ReadString('\n')
fmt.Println("Publishing tag drafts")
fmt.Println("Start publishing tag drafts!")
for {
var meta format.MetricMetaValue
workCond.L.Lock()
Expand Down Expand Up @@ -708,6 +719,9 @@ func mainPublishTagDrafts() {
continue
}
fmt.Println(meta.NamespaceID, meta.Name, meta.Version)
if dryRun {
continue
}
var err error
meta, err = loader.SaveMetric(context.Background(), meta, "")
if err != nil {
Expand Down
203 changes: 84 additions & 119 deletions internal/aggregator/autocreate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -41,25 +43,16 @@ type autoCreate struct {
running bool // guard against double "run"
}

type KnownTags map[int32]namespaceKnownTags
type KnownTags map[int32][]SelectorTags // by namespace ID

type namespaceKnownTags struct {
knownTags // namespace level
groups map[int32]knownTags // group level tags, by group ID
}

type knownTags struct {
p string // metric name prefix
m map[string]KnownTag // name to tag mapping
}

type KnownTagsJSON struct {
Namespace map[string]KnownTag `json:"known_tags,omitempty"` // tag name -> tag
Groups map[string]map[string]KnownTag `json:"groups,omitempty"` // group name -> tag name -> tag
type SelectorTags struct {
Selector string `json:"selector,omitempty"`
Tags []KnownTag `json:"tags,omitempty"`
}

type KnownTag struct {
ID string `json:"id"`
Name string `json:"name"`
ID string `json:"id,omitempty"`
Description string `json:"description,omitempty"`
RawKind string `json:"raw_kind,omitempty"`
SkipMapping bool `json:"skip_mapping,omitempty"`
Expand Down Expand Up @@ -308,23 +301,25 @@ func (ac *autoCreate) createMetric(args tlstatshouse.AutoCreateBytes) error {
return nil
}

func (ac *autoCreate) publishDraftTags(meta *format.MetricMetaValue) int {
func (ac *autoCreate) publishDraftTags(meta *format.MetricMetaValue) (n int) {
ac.configMu.RLock()
defer ac.configMu.RUnlock()
if len(ac.knownTags) == 0 {
return 0
if ac.knownTags != nil {
n = ac.knownTags.PublishDraftTags(meta)
}
return ac.knownTags.PublishDraftTags(meta)
return n
}

func (ac *autoCreate) namespaceAllowed(namespaceID int32) bool {
func (ac *autoCreate) namespaceAllowed(namespaceID int32) (ok bool) {
defaultNamespace := namespaceID == 0 || namespaceID == format.BuiltinNamespaceIDDefault
if defaultNamespace {
return ac.defaultNamespaceAllowed
}
ac.configMu.RLock()
defer ac.configMu.RUnlock()
_, ok := ac.knownTags[namespaceID]
if ac.knownTags != nil {
_, ok = ac.knownTags[namespaceID]
}
return ok
}

Expand All @@ -338,128 +333,98 @@ func (ac *autoCreate) done() bool {
}

func (m KnownTags) PublishDraftTags(meta *format.MetricMetaValue) int {
if meta.NamespaceID == 0 ||
meta.NamespaceID == format.BuiltinNamespaceIDDefault ||
meta.NamespaceID == format.BuiltinNamespaceIDMissing {
return 0
}
c, ok := m[meta.NamespaceID]
if !ok {
return 0
}
var n int
if len(c.m) != 0 {
n = publishDraftTags(meta, c.knownTags)
}
if len(c.groups) == 0 ||
meta.GroupID == 0 ||
meta.GroupID == format.BuiltinGroupIDDefault {
return n
}
if v := c.groups[meta.GroupID]; len(v.m) != 0 {
return n + publishDraftTags(meta, v)
if v, ok := m[meta.NamespaceID]; ok {
for i := range v {
if strings.HasPrefix(meta.Name, v[i].Selector) {
n += publishDraftTags(meta, v[i].Tags)
}
}
}
return n
}

func publishDraftTags(meta *format.MetricMetaValue, t knownTags) int {
func publishDraftTags(meta *format.MetricMetaValue, knownTags []KnownTag) int {
var n int
for k, draftTag := range meta.TagsDraft {
knownTag, ok := t.m[k]
if !ok || knownTag.ID == "" {
for _, knownTag := range knownTags {
if knownTag.Name == "" {
continue
}
if knownTag.ID == format.StringTopTagID {
if meta.StringTopName == "" {
meta.StringTopName = k
if draftTag, ok := meta.TagsDraft[knownTag.Name]; ok {
if knownTag.ID == format.StringTopTagID {
if meta.StringTopName == "" {
meta.StringTopName = knownTag.Name
if knownTag.Description != "" {
meta.StringTopDescription = knownTag.Description
}
log.Printf("autocreate tag %s[_s] %s\n", meta.Name, knownTag.Name)
delete(meta.TagsDraft, knownTag.Name)
n++
}
} else {
var x int
if knownTag.ID != "" {
x = format.TagIndex(knownTag.ID)
} else {
// search for an unnamed tag
for x = 1; x < len(meta.Tags) && meta.Tags[x].Name != ""; x++ {
// pass
}
}
if x < 1 || format.NewMaxTags <= x || (x < len(meta.Tags) && meta.Tags[x].Name != "") {
continue
}
draftTag.Name = knownTag.Name
if knownTag.Description != "" {
meta.StringTopDescription = knownTag.Description
draftTag.Description = knownTag.Description
}
delete(meta.TagsDraft, k)
n++
}
} else if x := format.TagIndex(knownTag.ID); 0 <= x && x < format.NewMaxTags && (len(meta.Tags) <= x || meta.Tags[x].Name == "") {
allow := len(knownTag.Whitelist) == 0 // empty whitelist allows all
if !allow && strings.HasPrefix(meta.Name, t.p) {
name := meta.Name[len(t.p):]
for i := 0; i < len(knownTag.Whitelist) && !allow; i++ {
allow = knownTag.Whitelist[i] == name
if knownTag.RawKind != "" {
rawKind := knownTag.RawKind
if rawKind == "int" {
// The raw attribute is stored separately from the type string in metric meta,
// empty type implies "int" which is not allowed
rawKind = ""
}
if format.ValidRawKind(rawKind) {
draftTag.Raw = true
draftTag.RawKind = rawKind
}
}
}
if !allow {
continue
}
draftTag.Name = k
if knownTag.Description != "" {
draftTag.Description = knownTag.Description
}
if knownTag.RawKind != "" {
rawKind := knownTag.RawKind
if rawKind == "int" {
// The raw attribute is stored separately from the type string in metric meta,
// empty type implies "int" which is not allowed
rawKind = ""
if knownTag.SkipMapping {
draftTag.SkipMapping = true
}
if format.ValidRawKind(rawKind) {
draftTag.Raw = true
draftTag.RawKind = rawKind
if len(meta.Tags) <= x {
meta.Tags = append(make([]format.MetricMetaTag, 0, x+1), meta.Tags...)
meta.Tags = meta.Tags[:x+1]
}
meta.Tags[x] = draftTag
log.Printf("autocreate tag %s[%d] %s\n", meta.Name, x, draftTag.Name)
delete(meta.TagsDraft, knownTag.Name)
n++
}
if knownTag.SkipMapping {
draftTag.SkipMapping = true
}
if len(meta.Tags) <= x {
meta.Tags = append(make([]format.MetricMetaTag, 0, x+1), meta.Tags...)
meta.Tags = meta.Tags[:x+1]
}
meta.Tags[x] = draftTag
delete(meta.TagsDraft, k)
n++
}
}
return n
}

func ParseKnownTags(configS []byte, meta format.MetaStorageInterface) (KnownTags, error) {
var s map[string]KnownTagsJSON
err := json.Unmarshal(configS, &s)
if err != nil {
var s []SelectorTags
if err := json.Unmarshal(configS, &s); err != nil {
return nil, err
}
res := make(map[int32]namespaceKnownTags)
for namespaceName, v := range s {
if namespaceName == "" {
return nil, fmt.Errorf("namespace not set")
}
namespace := meta.GetNamespaceByName(namespaceName)
if namespace == nil {
return nil, fmt.Errorf("namespace not found %q", namespaceName)
}
if namespace.ID == format.BuiltinNamespaceIDDefault {
return nil, fmt.Errorf("namespace can not be __default")
}
knownTagsG := make(map[int32]knownTags, len(v.Groups))
for groupName, g := range v.Groups {
groupName := namespaceName + format.NamespaceSeparator + groupName
group := meta.GetGroupByName(groupName)
if group == nil {
return nil, fmt.Errorf("group not found %q", groupName)
}
if group.ID == format.BuiltinGroupIDDefault {
return nil, fmt.Errorf("scrape group can not be __default")
}
knownTagsG[group.ID] = knownTags{
p: namespace.Name + format.NamespaceSeparator + groupName,
m: g,
res := make(KnownTags)
for i := 0; i < len(s); i++ {
sel := s[i].Selector
if n := strings.Index(sel, format.NamespaceSeparator); n != -1 {
if v := meta.GetNamespaceByName(sel[:n]); v != nil {
res[v.ID] = append(res[v.ID], s[i])
}
}
res[namespace.ID] = namespaceKnownTags{
knownTags: knownTags{
p: namespace.Name + format.NamespaceSeparator,
m: v.Namespace,
},
groups: knownTagsG,
}
}
for _, v := range res {
sort.Slice(v, func(i, j int) bool {
return v[i].Selector < v[j].Selector
})
}
return res, nil
}
Loading
Loading