Skip to content

Commit

Permalink
Add opentelemetry support
Browse files Browse the repository at this point in the history
1. Added context.Context to most API methods for span propagation
2. Spans are added to the parent (from context) when an API is called
3. Simple cache metrics can be registered with cache.RegisterMetrics()

Signed-off-by: Dave Tucker <[email protected]>
  • Loading branch information
dave-tucker committed Oct 12, 2021
1 parent c59dafc commit 08f8620
Show file tree
Hide file tree
Showing 19 changed files with 660 additions and 106 deletions.
50 changes: 48 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ jobs:
- 2.13.0

runs-on: ubuntu-latest

steps:
- name: Set up Go 1.16
uses: actions/setup-go@v1
Expand All @@ -99,4 +99,50 @@ jobs:
- name: Integration Test
run: make integration-test
env:
OVS_IMAGE_TAG: ${{ matrix.ovs_version }}
OVS_IMAGE_TAG: ${{ matrix.ovs_version }}

images:
needs: build
name: Build Image
runs-on: ubuntu-latest
strategy:
matrix:
cmd: [modelgen, print_schema, stress]

steps:
- name: Check Out Repo
uses: actions/checkout@v2

- name: Set up Docker Buildx
id: buildx
uses: docker/setup-buildx-action@v1

- name: Cache Docker layers
uses: actions/cache@v2
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-buildx-${{ github.sha }}
restore-keys: |
${{ runner.os }}-buildx-
- name: Login to Docker Hub
if: ${{ contains(github.ref, 'refs/head/main') }}
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKER_HUB_USERNAME }}
password: ${{ secrets.DOCKER_HUB_ACCESS_TOKEN }}

- name: Build and push
id: docker_build
uses: docker/build-push-action@v2
with:
context: .
target: ${{ matrix.cmd }}
builder: ${{ steps.buildx.outputs.name }}
push: ${{ contains(github.ref, 'refs/head/main') }}
tags: libovsdb/${{ matrix.cmd }}:latest
cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,dest=/tmp/.buildx-cache

- name: Image digest
run: echo ${{ steps.docker_build.outputs.digest }}
12 changes: 12 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM golang:1.16-alpine as base
COPY . /src
WORKDIR /src

FROM base as stress
RUN go install ./cmd/stress

FROM base as print_schema
RUN go install ./cmd/print_schema

FROM base as modelgen
RUN go install ./cmd/modelgen
49 changes: 43 additions & 6 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"bytes"
"context"
"crypto/sha256"
"encoding/gob"
"encoding/hex"
Expand All @@ -15,8 +16,13 @@ import (
"github.com/ovn-org/libovsdb/mapper"
"github.com/ovn-org/libovsdb/model"
"github.com/ovn-org/libovsdb/ovsdb"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/global"
)

var tracer = otel.Tracer("libovsdb.ovn.org/cache")

const (
updateEvent = "update"
addEvent = "add"
Expand Down Expand Up @@ -437,6 +443,7 @@ func NewTableCache(schema *ovsdb.DatabaseSchema, dbModel *model.DBModel, data Da
}
}
}

return &TableCache{
cache: cache,
schema: schema,
Expand Down Expand Up @@ -480,20 +487,20 @@ func (t *TableCache) Tables() []string {

// Update implements the update method of the NotificationHandler interface
// this populates the cache with new updates
func (t *TableCache) Update(context interface{}, tableUpdates ovsdb.TableUpdates) {
func (t *TableCache) Update(ctx context.Context, monitorID interface{}, tableUpdates ovsdb.TableUpdates) {
if len(tableUpdates) == 0 {
return
}
t.Populate(tableUpdates)
t.Populate(ctx, tableUpdates)
}

// Update2 implements the update method of the NotificationHandler interface
// this populates the cache with new updates
func (t *TableCache) Update2(context interface{}, tableUpdates ovsdb.TableUpdates2) {
func (t *TableCache) Update2(ctx context.Context, monitorID interface{}, tableUpdates ovsdb.TableUpdates2) {
if len(tableUpdates) == 0 {
return
}
t.Populate2(tableUpdates)
t.Populate2(ctx, tableUpdates)
}

// Locked implements the locked method of the NotificationHandler interface
Expand All @@ -512,8 +519,36 @@ func (t *TableCache) Echo([]interface{}) {
func (t *TableCache) Disconnected() {
}

// RegisterMetrics registers a metric for all known tables
func (t *TableCache) RegisterMetrics() error {
for _, tt := range t.Tables() {
if err := t.registerMetric(tt); err != nil {
return err
}
}
return nil
}

func (t *TableCache) registerMetric(table string) error {
meter := global.Meter("libovsdb.ovn.org/cache")
if _, ok := t.cache[table]; !ok {
return fmt.Errorf("table not found")
}
_, err := meter.NewInt64ValueObserver(
fmt.Sprintf("libovsdb.cache.%s.size", strings.ToLower(table)),
func(ctx context.Context, result metric.Int64ObserverResult) {
value := t.Table(table).Len()
result.Observe(int64(value))
},
metric.WithDescription(fmt.Sprintf("the size of the %s table in the cache", strings.ToLower(table))),
)
return err
}

// Populate adds data to the cache and places an event on the channel
func (t *TableCache) Populate(tableUpdates ovsdb.TableUpdates) {
func (t *TableCache) Populate(ctx context.Context, tableUpdates ovsdb.TableUpdates) {
_, span := tracer.Start(ctx, "cache_populate")
defer span.End()
t.mutex.Lock()
defer t.mutex.Unlock()

Expand Down Expand Up @@ -560,7 +595,9 @@ func (t *TableCache) Populate(tableUpdates ovsdb.TableUpdates) {
}

// Populate2 adds data to the cache and places an event on the channel
func (t *TableCache) Populate2(tableUpdates ovsdb.TableUpdates2) {
func (t *TableCache) Populate2(ctx context.Context, tableUpdates ovsdb.TableUpdates2) {
_, span := tracer.Start(ctx, "cache_populate2")
defer span.End()
t.mutex.Lock()
defer t.mutex.Unlock()
for table := range t.dbModel.Types() {
Expand Down
21 changes: 11 additions & 10 deletions cache/cache_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cache

import (
"context"
"encoding/json"
"reflect"
"testing"
Expand Down Expand Up @@ -721,7 +722,7 @@ func TestTableCache_populate(t *testing.T) {
},
},
}
tc.Populate(updates)
tc.Populate(context.Background(), updates)

got := tc.Table("Open_vSwitch").Row("test")
assert.Equal(t, testRowModel, got)
Expand All @@ -733,7 +734,7 @@ func TestTableCache_populate(t *testing.T) {
Old: &testRow,
New: &updatedRow,
}
tc.Populate(updates)
tc.Populate(context.Background(), updates)

got = tc.cache["Open_vSwitch"].cache["test"]
assert.Equal(t, updatedRowModel, got)
Expand All @@ -744,7 +745,7 @@ func TestTableCache_populate(t *testing.T) {
New: nil,
}

tc.Populate(updates)
tc.Populate(context.Background(), updates)

_, ok := tc.cache["Open_vSwitch"].cache["test"]
assert.False(t, ok)
Expand Down Expand Up @@ -786,7 +787,7 @@ func TestTableCachePopulate(t *testing.T) {
},
},
}
tc.Populate(updates)
tc.Populate(context.TODO(), updates)

got := tc.Table("Open_vSwitch").Row("test")
assert.Equal(t, testRowModel, got)
Expand All @@ -798,7 +799,7 @@ func TestTableCachePopulate(t *testing.T) {
Old: &testRow,
New: &updatedRow,
}
tc.Populate(updates)
tc.Populate(context.TODO(), updates)

got = tc.cache["Open_vSwitch"].cache["test"]
assert.Equal(t, updatedRowModel, got)
Expand All @@ -809,7 +810,7 @@ func TestTableCachePopulate(t *testing.T) {
New: nil,
}

tc.Populate(updates)
tc.Populate(context.TODO(), updates)

_, ok := tc.cache["Open_vSwitch"].cache["test"]
assert.False(t, ok)
Expand Down Expand Up @@ -851,7 +852,7 @@ func TestTableCachePopulate2(t *testing.T) {
}

t.Log("Initial")
tc.Populate2(updates)
tc.Populate2(context.TODO(), updates)
got := tc.Table("Open_vSwitch").Row("test")
assert.Equal(t, testRowModel, got)

Expand All @@ -865,7 +866,7 @@ func TestTableCachePopulate2(t *testing.T) {
},
},
}
tc.Populate2(updates)
tc.Populate2(context.TODO(), updates)
got = tc.Table("Open_vSwitch").Row("test2")
assert.Equal(t, testRowModel2, got)

Expand All @@ -879,7 +880,7 @@ func TestTableCachePopulate2(t *testing.T) {
},
},
}
tc.Populate2(updates)
tc.Populate2(context.TODO(), updates)
got = tc.cache["Open_vSwitch"].cache["test"]
assert.Equal(t, updatedRowModel, got)

Expand All @@ -892,7 +893,7 @@ func TestTableCachePopulate2(t *testing.T) {
},
},
}
tc.Populate2(updates)
tc.Populate2(context.TODO(), updates)
_, ok := tc.cache["Open_vSwitch"].cache["test"]
assert.False(t, ok)
}
Expand Down
Loading

0 comments on commit 08f8620

Please sign in to comment.