diff --git a/accumulators/README.md b/accumulators/README.md index b682ff5..430dc60 100644 --- a/accumulators/README.md +++ b/accumulators/README.md @@ -11,6 +11,5 @@ All metrics include the PCF meta data. Only PCFContainerMetric and PCFLogMessage | PCFContainerMetric | ContainerMetric | Application specific metrics | [`accumulators/container/container.go`](container/container.go) | PCFValueMetric | ValueMetric | PCF System metrics of multiple metric types | [`accumulators/value/value.go`](value/value.go) | PCFCounterEvent | CounterEvent | PCF System metrics as counter types only | [`accumulators/counter/counter.go`](counter/counter.go) -| PCFCapacity | ValueMetric | PCF System metric, derived from Total and Remaining samples in order to provide percent used. | [`accumulators/capacity/capacity.go`](capacity/capacity.go) | PCFLogMessage | LogMessage | PCF Logs | [`accumulators/logmessage/logmessage.go`](logmessage/logmessage.go) | PCFHttpStartStop | HttpStartStop | PCF HTTP request details | [`accumulators/http/http.go`](http/http.go) \ No newline at end of file diff --git a/accumulators/capacity/capacity.go b/accumulators/capacity/capacity.go deleted file mode 100644 index 073069c..0000000 --- a/accumulators/capacity/capacity.go +++ /dev/null @@ -1,239 +0,0 @@ -// Copyright 2020 New Relic Corporation. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 - -package capacity - -import ( - "fmt" - "strings" - "sync" - "time" - - "code.cloudfoundry.org/go-loggregator/rpc/loggregator_v2" - "github.com/fatih/camelcase" - "github.com/newrelic/newrelic-pcf-nozzle-tile/app" - "github.com/newrelic/newrelic-pcf-nozzle-tile/newrelic/accumulators" - "github.com/newrelic/newrelic-pcf-nozzle-tile/newrelic/entities" - "github.com/newrelic/newrelic-pcf-nozzle-tile/newrelic/insights" - "github.com/newrelic/newrelic-pcf-nozzle-tile/newrelic/metrics" - "github.com/newrelic/newrelic-pcf-nozzle-tile/newrelic/nrpcf" -) - -type entityID string -type metricKeyword string - -func (k metricKeyword) String() string { - return string(k) -} - -func (k metricKeyword) ToLower() string { - return strings.ToLower(string(k)) -} - -type capacityMetrics struct { - Total *metrics.Metric - Remaining *metrics.Metric -} - -type capacityMap map[metricKeyword]*capacityMetrics - -func (cMap capacityMap) Has(s metricKeyword) (cMetrics *capacityMetrics, found bool) { - cMetrics, found = cMap[s] - return -} - -// CapacityData ... -type capacityData map[*entities.Entity]capacityMap - -// CapacityUpdateTime ... -type capacityUpdateTime map[*entities.Entity]time.Time - -// Metrics extends metric.Accumulator for -// Firehose ContainerMetric Envelope Event Types -type Metrics struct { - accumulators.Accumulator - capacityData capacityData - capacityUpdateTime capacityUpdateTime - sync *sync.RWMutex -} - -// New satisfies metric.Accumulator -func (m Metrics) New() accumulators.Interface { - i := Metrics{ - Accumulator: accumulators.NewAccumulator( - // This isn't a v2 envelope type, but the router will route matching Gauge envelopes here. - "ValueMetric", - ), - capacityData: capacityData{}, - capacityUpdateTime: capacityUpdateTime{}, - sync: &sync.RWMutex{}, - } - return i -} - -// Update satisfies metric.Accumulator -func (m Metrics) Update(e *loggregator_v2.Envelope) { - - if strings.Contains(m.GetTag(e, "job"), "diego_cell") == false { - return - } - - entity := m.GetEntity(e, nrpcf.GetPCFAttributes(e)) - - g := e.GetGauge() - // A single v2 gauge envelope can contain multiple metrics. - for key, met := range g.Metrics { - target := strings.ToLower(key) - if strings.Contains(target, "capacity") == false { - continue - } - if strings.Contains(target, "allocated") == true { - continue - } - - splits := camelcase.Split(key) - - metric := entity. - NewSample( - key, - metrics.Types.Gauge, - met.GetUnit(), - met.GetValue(), - ). - Done() - - var cMap capacityMap - var cMetrics *capacityMetrics - var found bool - - // Lock before making changes to m.capacityData to avoid race conditions - m.sync.Lock() - if cMap, found = m.capacityData[entity]; !found { - cMap = capacityMap{} - m.capacityData[entity] = cMap - } - - keyword := metricKeyword(splits[len(splits)-1]) - - if cMetrics, found = cMap.Has(keyword); !found { - cMetrics = &capacityMetrics{nil, nil} - cMap[keyword] = cMetrics - } - - metricType := splits[len(splits)-2] - - switch metricType { - case "Total": - cMetrics.Total = metric - case "Remaining": - cMetrics.Remaining = metric - } - - m.capacityUpdateTime[entity] = time.Now() - // Unlock - done making changes to m.capacityData - m.sync.Unlock() - } -} - -// Drain overrides Accumulator Drain for deriving metrics here -func (m Metrics) Drain() (c []*entities.Entity) { - // Lock before making changes to m.capacityData to avoid race conditions - m.sync.Lock() - // Copying data into another map to reduce the amount of time the lock is needed. - myCapacityData := capacityData{} - // If new metric data hasn't been received in over the threshold defined in CAPACITY_ENTITY_AGE_MINS, drop this entity and its metrics before draining - ageThreshold := app.Get().Config.GetDuration("CAPACITY_ENTITY_AGE_MINS") - for k, v := range m.capacityUpdateTime { - if time.Since(v) >= ageThreshold*time.Minute { - delete(m.capacityUpdateTime, k) - delete(m.capacityData, k) - continue - } - myCapacityData[k] = m.capacityData[k] - } - - // Unlock - done making changes to m.capacityData - m.sync.Unlock() - - for entity, cMap := range myCapacityData { - - newEntity := entities.NewEntity(entity.Attributes()) - - c = append(c, newEntity) - - for metricKeyword, ms := range cMap { - - if ms.Remaining == nil || ms.Total == nil { - app.Get().Log.Debugf("\nCapacity metrics do not match for %s\n", newEntity.Signature()) - app.Get().Log.Debugf("\nMetric keyword: %v\n", metricKeyword) - app.Get().Log.Debugf("\nMS: %v\n", ms) - app.Get().Log.Debugf("\ncMap: %v", cMap) - continue - } - - metric := newEntity.NewSample( - fmt.Sprintf("%s.used", metricKeyword.ToLower()), - metrics.Types.Gauge, - "percent", - 100-((ms.Remaining.LastValue/ms.Total.LastValue)*100), - ) - - metric.SetAttribute( - "metric.source.unit", - ms.Total.Unit, - ) - metric.SetAttribute( - "metric.source.remaining", - ms.Remaining.Name, - ) - metric.SetAttribute( - "metric.source.remaining.value", - ms.Remaining.LastValue, - ) - metric.SetAttribute( - "metric.source.total", - ms.Total.Name, - ) - metric.SetAttribute( - "metric.source.total.value", - ms.Total.LastValue, - ) - - metric.Done() - - } - } - return c -} - -// HarvestMetrics ... -func (m Metrics) HarvestMetrics( - entity *entities.Entity, - metric *metrics.Metric, -) { - - metric.SetAttribute( - "eventType", - "PCFCapacity", - ) - - metric.SetAttribute("agent.subscription", m.Config().GetString("FIREHOSE_ID")) - - metric.Attributes().AppendAll(entity.Attributes()) - - // Get a client with the insert key and RPM account ID from the config. - client := insights.New().Get(app.Get().Config.GetNewRelicConfig()) - client.EnqueueEvent(metric.Marshal()) - -} - -// GetTag ... -func (m Metrics) GetTag( - e *loggregator_v2.Envelope, - ta string, -) string { - if tv, ok := e.Tags[ta]; ok { - return tv - } - return "" -} diff --git a/cfclient/cfapps/cache.go b/cfclient/cfapps/cache.go index eb8f73d..1a756bd 100644 --- a/cfclient/cfapps/cache.go +++ b/cfclient/cfapps/cache.go @@ -15,13 +15,14 @@ type Cache struct { Collection map[string]*CFApp WriteBuffer chan *CFApp sync *sync.RWMutex + isUpdating bool } // NewCache ... func NewCache() *Cache { cache := &Cache{ Collection: map[string]*CFApp{}, - WriteBuffer: make(chan *CFApp, 1024), + WriteBuffer: make(chan *CFApp, app.Get().Config.GetDuration("FIREHOSE_CACHE_WRITE_BUFFER_SIZE")), sync: &sync.RWMutex{}, } cache.Start() @@ -32,7 +33,8 @@ func NewCache() *Cache { func (c *Cache) Start() { go func() { cacheDuration := app.Get().Config.GetDuration("FIREHOSE_CACHE_DURATION_MINS") - update := time.NewTicker(30 * time.Second).C + cacheUpdate := app.Get().Config.GetDuration("FIREHOSE_CACHE_UPDATE_INTERVAL_SECS") + update := time.NewTicker(cacheUpdate * time.Second).C timeoutCache := time.NewTicker(cacheDuration * time.Minute).C for { @@ -53,12 +55,9 @@ func (c *Cache) Start() { c.sync.Unlock() case <-update: - GetInstance().app.Log.Debug("Updating status of applications") - c.sync.RLock() - for _, v := range c.Collection { - v.UpdateInstances() + if !c.isUpdating { + go c.updateInstances() } - c.sync.RUnlock() case app := <-c.WriteBuffer: c.sync.Lock() @@ -69,6 +68,26 @@ func (c *Cache) Start() { }() } +func (c *Cache) updateInstances() { + c.isUpdating = true + GetInstance().app.Log.Debug("Updating status of applications") + now := time.Now() + + var apps []*CFApp + c.sync.RLock() + for _, v := range c.Collection { + apps = append(apps, v) + } + c.sync.RUnlock() + + for _, a := range apps { + a.UpdateInstances() + } + + c.isUpdating = false + GetInstance().app.Log.Debugf("Finish cache updating %s", time.Since(now)) +} + // Get ... func (c *Cache) Get(id string) (app *CFApp, found bool) { c.sync.RLock() diff --git a/config/config.go b/config/config.go index f0526ad..4b52a73 100644 --- a/config/config.go +++ b/config/config.go @@ -74,6 +74,10 @@ func set() *Config { // Cache purge threshold in minutes v.SetDefault("FIREHOSE_CACHE_DURATION_MINS", 30) + // Cache instance update in seconds + v.SetDefault("FIREHOSE_CACHE_UPDATE_INTERVAL_SECS", 60) + // Cache instance update in seconds + v.SetDefault("FIREHOSE_CACHE_WRITE_BUFFER_SIZE", 2048) // Rate limiter burst limit v.SetDefault("FIREHOSE_RATE_BURST", 5) // Rate limiter timeout in seconds. @@ -92,8 +96,6 @@ func set() *Config { v.SetDefault("NEWRELIC_DRAIN_INTERVAL", "59s") v.SetDefault("NEWRELIC_ENQUEUE_TIMEOUT", "1s") - v.SetDefault("CAPACITY_ENTITY_AGE_MINS", 5) - v.SetDefault(NewRelicEventTypeContainer, "PCFContainerMetric") v.SetDefault(NewRelicEventTypeValueMetric, "PCFValueMetric") v.SetDefault(NewRelicEventTypeCounterEvent, "PCFCounterEvent") @@ -128,7 +130,6 @@ func set() *Config { // Filtering capabilities for envelope types - | separated values. // By default, all message types are enabled. User configurations will override this behavior. - // Capacity accumulator is enabled by default when ValueMetric is enabled. v.SetDefault("ENABLED_ENVELOPE_TYPES", "ContainerMetric|CounterEvent|HttpStartStop|LogMessage|ValueMetric") // Default account location will be US unless set to EU by cf push or tile. diff --git a/dashboard/dashboard.json b/dashboard/dashboard.json index d21354f..6be5b49 100644 --- a/dashboard/dashboard.json +++ b/dashboard/dashboard.json @@ -19,7 +19,7 @@ }, "data": [ { - "nrql": "SELECT count(*) AS 'Event Count' FROM PCFCapacity, PCFContainerMetric, PCFCounterEvent, PCFHttpStartStop, PCFLogMessage, PCFValueMetric facet pcf.domain" + "nrql": "SELECT count(*) AS 'Event Count' FROM PCFContainerMetric, PCFCounterEvent, PCFHttpStartStop, PCFLogMessage, PCFValueMetric facet pcf.domain" } ], "presentation": { @@ -365,7 +365,7 @@ }, "data": [ { - "nrql": "SELECT count(*) as 'All Events' FROM PCFCapacity, PCFContainerMetric, PCFCounterEvent, PCFHttpStartStop, PCFLogMessage, PCFValueMetric TIMESERIES" + "nrql": "SELECT count(*) as 'All Events' FROM PCFContainerMetric, PCFCounterEvent, PCFHttpStartStop, PCFLogMessage, PCFValueMetric TIMESERIES" } ], "presentation": { @@ -376,7 +376,6 @@ ], "filter": { "event_types": [ - "PCFCapacity", "PCFContainerMetric", "PCFCounterEvent", "PCFHttpStartStop", diff --git a/newrelic/registry/registry.go b/newrelic/registry/registry.go index 9cad18a..a83432e 100644 --- a/newrelic/registry/registry.go +++ b/newrelic/registry/registry.go @@ -4,7 +4,6 @@ package registry import ( - "github.com/newrelic/newrelic-pcf-nozzle-tile/accumulators/capacity" "github.com/newrelic/newrelic-pcf-nozzle-tile/accumulators/container" "github.com/newrelic/newrelic-pcf-nozzle-tile/accumulators/counter" "github.com/newrelic/newrelic-pcf-nozzle-tile/accumulators/http" @@ -21,7 +20,6 @@ var Register = &Accumulators{ counter.Metrics{}, container.Metrics{}, value.Metrics{}, - capacity.Metrics{}, logmessage.Nrevents{}, http.Nrevents{}, } diff --git a/tests/integration/nozzle_test.go b/tests/integration/nozzle_test.go index 62fa3b0..3b82b8d 100644 --- a/tests/integration/nozzle_test.go +++ b/tests/integration/nozzle_test.go @@ -129,63 +129,6 @@ func TestValueMetric(t *testing.T) { } -func TestCapacityMetric(t *testing.T) { - t.Parallel() - m := runNozzleAndMocks() - m.firehose.AddEvent(loggregator_v2.Envelope{ - Tags: map[string]string{ - "job": "diego_cell", - }, - SourceId: "c70684e2-4443-4ed5-8dc8-28b7cf7d97ed", - InstanceId: "c70684e2-4443-4ed5-8dc8-28b7cf7d97ed", - Message: &loggregator_v2.Envelope_Gauge{ - Gauge: &loggregator_v2.Gauge{ - Metrics: map[string]*loggregator_v2.GaugeValue{ - "CapacityRemainingContainers": &loggregator_v2.GaugeValue{ - Unit: "bytes", - Value: float64(25), - }, - "CapacityTotalContainers": &loggregator_v2.GaugeValue{ - Unit: "bytes", - Value: float64(100), - }, - }, - }, - }, - }) - m.firehose.PublishBatch() - - rCapacity := make(map[string]interface{}) -ReadingFromInsights: - for { - select { - case rc := <-m.insights.ReceivedContents: - r := make([]map[string]interface{}, 10) - json.Unmarshal([]byte(rc), &r) - for i, rr := range r { - et := rr["eventType"].(string) - if et == "PCFCapacity" { - rCapacity = r[i] - break ReadingFromInsights - } - } - case <-time.After(10 * time.Second): - break ReadingFromInsights - } - } - closeNozzleAndMocks(m) - - assert.EqualValues(t, "PCFCapacity", rCapacity["eventType"]) - assert.EqualValues(t, 75, rCapacity["metric.sample.last.value"]) - assert.EqualValues(t, "CapacityRemainingContainers", rCapacity["metric.source.remaining"]) - assert.EqualValues(t, 25, rCapacity["metric.source.remaining.value"]) - assert.EqualValues(t, "CapacityTotalContainers", rCapacity["metric.source.total"]) - assert.EqualValues(t, 100, rCapacity["metric.source.total.value"]) - assert.EqualValues(t, "containers.used", rCapacity["metric.name"]) - assert.EqualValues(t, 75, rCapacity["metric.sample.last.value"]) - -} - func TestLogMessage(t *testing.T) { t.Parallel() m := runNozzleAndMocks() diff --git a/tile.yml b/tile.yml index c4b31b2..19034ef 100644 --- a/tile.yml +++ b/tile.yml @@ -162,6 +162,22 @@ forms: constraints: min: 6000 configurable: true + - name: nrf_firehose_cache_update_interval_secs + type: integer + default: 60 + label: Cache update interval + description: Number of seconds that all apps in the cache are updated at this interval. + constraints: + min: 30 + configurable: true + - name: nrf_firehose_cache_write_buffer_size + type: integer + default: 2048 + label: Cache Write Buffer Size + description: Size of the Write Buffer Cache. + constraints: + min: 1024 + configurable: true - name: nrf_log_level type: dropdown_select default: INFO