Skip to content

Commit

Permalink
Update tags autocreate
Browse files Browse the repository at this point in the history
Do not require group to exist
  • Loading branch information
alpinskiy committed Nov 29, 2024
1 parent 3389ce1 commit 1b59d37
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 157 deletions.
88 changes: 51 additions & 37 deletions cmd/statshouse/statshouse-tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,11 +602,13 @@ func mainPublishTagDrafts() {
metadataNet string
metadataAddr string
metadataActorID int64
dryRun bool
)
flag.Int64Var(&metadataActorID, "metadata-actor-id", 0, "")
flag.StringVar(&metadataAddr, "metadata-addr", "127.0.0.1:2442", "")
flag.StringVar(&metadataNet, "metadata-net", "tcp4", "")
flag.StringVar(&argv.aesPwdFile, "aes-pwd-file", "", "path to AES password file, will try to read "+defaultPathToPwd+" if not set")
flag.BoolVar(&dryRun, "dry-run", true, "do not publish changes")
build.FlagParseShowVersionHelp()
flag.Parse()
client := tlmetadata.Client{
Expand All @@ -625,53 +627,62 @@ func mainPublishTagDrafts() {
work = make(map[int32]map[int32]format.MetricMetaValue)
workCond = sync.NewCond(&workMu)
)
storage = metajournal.MakeMetricsStorage("", nil, nil, func(newEntries []tlmetadata.Event) {
var n int
for _, e := range newEntries {
switch e.EventType {
case format.MetricEvent:
meta := format.MetricMetaValue{}
err := meta.UnmarshalBinary([]byte(e.Data))
storage = metajournal.MakeMetricsStorage("", nil,
func(configID int32, configString string) {
switch configID {
case format.KnownTagsConfigID:
v, err := aggregator.ParseKnownTags([]byte(configString), storage)
fmt.Fprintln(os.Stderr, configString)
if err != nil {
fmt.Fprintln(os.Stderr, e.Data)
fmt.Fprintln(os.Stderr, err)
continue
}
if meta.NamespaceID == 0 || meta.NamespaceID == format.BuiltinNamespaceIDDefault {
continue
}
if len(meta.TagsDraft) == 0 {
continue
}
workCond.L.Lock()
if m := work[meta.NamespaceID]; m != nil {
m[meta.MetricID] = meta
} else {
work[meta.NamespaceID] = map[int32]format.MetricMetaValue{meta.MetricID: meta}
}
workCond.L.Unlock()
n++
case format.PromConfigEvent:
v, err := aggregator.ParseKnownTags([]byte(e.Data), storage)
fmt.Fprintln(os.Stderr, e.Data)
if err != nil {
fmt.Fprintln(os.Stderr, err)
continue
break
}
workCond.L.Lock()
config = v
workCond.L.Unlock()
n++
}
}
if n != 0 {
workCond.Signal()
}
})
},
func(newEntries []tlmetadata.Event) {
var n int
for _, e := range newEntries {
switch e.EventType {
case format.MetricEvent:
meta := format.MetricMetaValue{}
err := meta.UnmarshalBinary([]byte(e.Data))
if err != nil {
fmt.Fprintln(os.Stderr, e.Data)
fmt.Fprintln(os.Stderr, err)
continue
}
if meta.NamespaceID == 0 || meta.NamespaceID == format.BuiltinNamespaceIDDefault {
continue
}
if len(meta.TagsDraft) == 0 {
continue
}
// log.Printf("FOUND tag draft %s\n", meta.Name)
workCond.L.Lock()
if m := work[meta.NamespaceID]; m != nil {
m[meta.MetricID] = meta
} else {
work[meta.NamespaceID] = map[int32]format.MetricMetaValue{meta.MetricID: meta}
}
workCond.L.Unlock()
n++
}
}
if n != 0 {
workCond.Signal()
}
})
storage.Journal().Start(nil, nil, loader.LoadJournal)
fmt.Println("Press <Enter> to start publishing tag drafts")
if dryRun {
fmt.Println("DRY RUN!")
}
fmt.Println()
bufio.NewReader(os.Stdin).ReadString('\n')
fmt.Println("Publishing tag drafts")
fmt.Println("Start publishing tag drafts!")
for {
var meta format.MetricMetaValue
workCond.L.Lock()
Expand Down Expand Up @@ -708,6 +719,9 @@ func mainPublishTagDrafts() {
continue
}
fmt.Println(meta.NamespaceID, meta.Name, meta.Version)
if dryRun {
continue
}
var err error
meta, err = loader.SaveMetric(context.Background(), meta, "")
if err != nil {
Expand Down
203 changes: 84 additions & 119 deletions internal/aggregator/autocreate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -41,25 +43,16 @@ type autoCreate struct {
running bool // guard against double "run"
}

type KnownTags map[int32]namespaceKnownTags
type KnownTags map[int32][]SelectorTags // by namespace ID

type namespaceKnownTags struct {
knownTags // namespace level
groups map[int32]knownTags // group level tags, by group ID
}

type knownTags struct {
p string // metric name prefix
m map[string]KnownTag // name to tag mapping
}

type KnownTagsJSON struct {
Namespace map[string]KnownTag `json:"known_tags,omitempty"` // tag name -> tag
Groups map[string]map[string]KnownTag `json:"groups,omitempty"` // group name -> tag name -> tag
type SelectorTags struct {
Selector string `json:"selector,omitempty"`
Tags []KnownTag `json:"tags,omitempty"`
}

type KnownTag struct {
ID string `json:"id"`
Name string `json:"name"`
ID string `json:"id,omitempty"`
Description string `json:"description,omitempty"`
RawKind string `json:"raw_kind,omitempty"`
SkipMapping bool `json:"skip_mapping,omitempty"`
Expand Down Expand Up @@ -308,23 +301,25 @@ func (ac *autoCreate) createMetric(args tlstatshouse.AutoCreateBytes) error {
return nil
}

func (ac *autoCreate) publishDraftTags(meta *format.MetricMetaValue) int {
func (ac *autoCreate) publishDraftTags(meta *format.MetricMetaValue) (n int) {
ac.configMu.RLock()
defer ac.configMu.RUnlock()
if len(ac.knownTags) == 0 {
return 0
if ac.knownTags != nil {
n = ac.knownTags.PublishDraftTags(meta)
}
return ac.knownTags.PublishDraftTags(meta)
return n
}

func (ac *autoCreate) namespaceAllowed(namespaceID int32) bool {
func (ac *autoCreate) namespaceAllowed(namespaceID int32) (ok bool) {
defaultNamespace := namespaceID == 0 || namespaceID == format.BuiltinNamespaceIDDefault
if defaultNamespace {
return ac.defaultNamespaceAllowed
}
ac.configMu.RLock()
defer ac.configMu.RUnlock()
_, ok := ac.knownTags[namespaceID]
if ac.knownTags != nil {
_, ok = ac.knownTags[namespaceID]
}
return ok
}

Expand All @@ -338,128 +333,98 @@ func (ac *autoCreate) done() bool {
}

func (m KnownTags) PublishDraftTags(meta *format.MetricMetaValue) int {
if meta.NamespaceID == 0 ||
meta.NamespaceID == format.BuiltinNamespaceIDDefault ||
meta.NamespaceID == format.BuiltinNamespaceIDMissing {
return 0
}
c, ok := m[meta.NamespaceID]
if !ok {
return 0
}
var n int
if len(c.m) != 0 {
n = publishDraftTags(meta, c.knownTags)
}
if len(c.groups) == 0 ||
meta.GroupID == 0 ||
meta.GroupID == format.BuiltinGroupIDDefault {
return n
}
if v := c.groups[meta.GroupID]; len(v.m) != 0 {
return n + publishDraftTags(meta, v)
if v, ok := m[meta.NamespaceID]; ok {
for i := range v {
if strings.HasPrefix(meta.Name, v[i].Selector) {
n += publishDraftTags(meta, v[i].Tags)
}
}
}
return n
}

func publishDraftTags(meta *format.MetricMetaValue, t knownTags) int {
func publishDraftTags(meta *format.MetricMetaValue, knownTags []KnownTag) int {
var n int
for k, draftTag := range meta.TagsDraft {
knownTag, ok := t.m[k]
if !ok || knownTag.ID == "" {
for _, knownTag := range knownTags {
if knownTag.Name == "" {
continue
}
if knownTag.ID == format.StringTopTagID {
if meta.StringTopName == "" {
meta.StringTopName = k
if draftTag, ok := meta.TagsDraft[knownTag.Name]; ok {
if knownTag.ID == format.StringTopTagID {
if meta.StringTopName == "" {
meta.StringTopName = knownTag.Name
if knownTag.Description != "" {
meta.StringTopDescription = knownTag.Description
}
log.Printf("autocreate tag %s[_s] %s\n", meta.Name, knownTag.Name)
delete(meta.TagsDraft, knownTag.Name)
n++
}
} else {
var x int
if knownTag.ID != "" {
x = format.TagIndex(knownTag.ID)
} else {
// search for an unnamed tag
for x = 1; x < len(meta.Tags) && meta.Tags[x].Name != ""; x++ {
// pass
}
}
if x < 1 || format.NewMaxTags <= x || (x < len(meta.Tags) && meta.Tags[x].Name != "") {
continue
}
draftTag.Name = knownTag.Name
if knownTag.Description != "" {
meta.StringTopDescription = knownTag.Description
draftTag.Description = knownTag.Description
}
delete(meta.TagsDraft, k)
n++
}
} else if x := format.TagIndex(knownTag.ID); 0 <= x && x < format.NewMaxTags && (len(meta.Tags) <= x || meta.Tags[x].Name == "") {
allow := len(knownTag.Whitelist) == 0 // empty whitelist allows all
if !allow && strings.HasPrefix(meta.Name, t.p) {
name := meta.Name[len(t.p):]
for i := 0; i < len(knownTag.Whitelist) && !allow; i++ {
allow = knownTag.Whitelist[i] == name
if knownTag.RawKind != "" {
rawKind := knownTag.RawKind
if rawKind == "int" {
// The raw attribute is stored separately from the type string in metric meta,
// empty type implies "int" which is not allowed
rawKind = ""
}
if format.ValidRawKind(rawKind) {
draftTag.Raw = true
draftTag.RawKind = rawKind
}
}
}
if !allow {
continue
}
draftTag.Name = k
if knownTag.Description != "" {
draftTag.Description = knownTag.Description
}
if knownTag.RawKind != "" {
rawKind := knownTag.RawKind
if rawKind == "int" {
// The raw attribute is stored separately from the type string in metric meta,
// empty type implies "int" which is not allowed
rawKind = ""
if knownTag.SkipMapping {
draftTag.SkipMapping = true
}
if format.ValidRawKind(rawKind) {
draftTag.Raw = true
draftTag.RawKind = rawKind
if len(meta.Tags) <= x {
meta.Tags = append(make([]format.MetricMetaTag, 0, x+1), meta.Tags...)
meta.Tags = meta.Tags[:x+1]
}
meta.Tags[x] = draftTag
log.Printf("autocreate tag %s[%d] %s\n", meta.Name, x, draftTag.Name)
delete(meta.TagsDraft, knownTag.Name)
n++
}
if knownTag.SkipMapping {
draftTag.SkipMapping = true
}
if len(meta.Tags) <= x {
meta.Tags = append(make([]format.MetricMetaTag, 0, x+1), meta.Tags...)
meta.Tags = meta.Tags[:x+1]
}
meta.Tags[x] = draftTag
delete(meta.TagsDraft, k)
n++
}
}
return n
}

func ParseKnownTags(configS []byte, meta format.MetaStorageInterface) (KnownTags, error) {
var s map[string]KnownTagsJSON
err := json.Unmarshal(configS, &s)
if err != nil {
var s []SelectorTags
if err := json.Unmarshal(configS, &s); err != nil {
return nil, err
}
res := make(map[int32]namespaceKnownTags)
for namespaceName, v := range s {
if namespaceName == "" {
return nil, fmt.Errorf("namespace not set")
}
namespace := meta.GetNamespaceByName(namespaceName)
if namespace == nil {
return nil, fmt.Errorf("namespace not found %q", namespaceName)
}
if namespace.ID == format.BuiltinNamespaceIDDefault {
return nil, fmt.Errorf("namespace can not be __default")
}
knownTagsG := make(map[int32]knownTags, len(v.Groups))
for groupName, g := range v.Groups {
groupName := namespaceName + format.NamespaceSeparator + groupName
group := meta.GetGroupByName(groupName)
if group == nil {
return nil, fmt.Errorf("group not found %q", groupName)
}
if group.ID == format.BuiltinGroupIDDefault {
return nil, fmt.Errorf("scrape group can not be __default")
}
knownTagsG[group.ID] = knownTags{
p: namespace.Name + format.NamespaceSeparator + groupName,
m: g,
res := make(KnownTags)
for i := 0; i < len(s); i++ {
sel := s[i].Selector
if n := strings.Index(sel, format.NamespaceSeparator); n != -1 {
if v := meta.GetNamespaceByName(sel[:n]); v != nil {
res[v.ID] = append(res[v.ID], s[i])
}
}
res[namespace.ID] = namespaceKnownTags{
knownTags: knownTags{
p: namespace.Name + format.NamespaceSeparator,
m: v.Namespace,
},
groups: knownTagsG,
}
}
for _, v := range res {
sort.Slice(v, func(i, j int) bool {
return v[i].Selector < v[j].Selector
})
}
return res, nil
}
Loading

0 comments on commit 1b59d37

Please sign in to comment.