Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add opentelemetry support #189

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 48 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ jobs:
- 2.13.0

runs-on: ubuntu-latest

steps:
- name: Set up Go 1.16
uses: actions/setup-go@v1
Expand All @@ -99,4 +99,50 @@ jobs:
- name: Integration Test
run: make integration-test
env:
OVS_IMAGE_TAG: ${{ matrix.ovs_version }}
OVS_IMAGE_TAG: ${{ matrix.ovs_version }}

images:
needs: build
name: Build Image
runs-on: ubuntu-latest
strategy:
matrix:
cmd: [modelgen, print_schema, stress]

steps:
- name: Check Out Repo
uses: actions/checkout@v2

- name: Set up Docker Buildx
id: buildx
uses: docker/setup-buildx-action@v1

- name: Cache Docker layers
uses: actions/cache@v2
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-buildx-${{ github.sha }}
restore-keys: |
${{ runner.os }}-buildx-

- name: Login to Docker Hub
if: ${{ contains(github.ref, 'refs/head/main') }}
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKER_HUB_USERNAME }}
password: ${{ secrets.DOCKER_HUB_ACCESS_TOKEN }}

- name: Build and push
id: docker_build
uses: docker/build-push-action@v2
with:
context: .
target: ${{ matrix.cmd }}
builder: ${{ steps.buildx.outputs.name }}
push: ${{ contains(github.ref, 'refs/head/main') }}
tags: libovsdb/${{ matrix.cmd }}:latest
cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,dest=/tmp/.buildx-cache

- name: Image digest
run: echo ${{ steps.docker_build.outputs.digest }}
12 changes: 12 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM golang:1.16-alpine as base
COPY . /src
WORKDIR /src

FROM base as stress
RUN go install ./cmd/stress

FROM base as print_schema
RUN go install ./cmd/print_schema

FROM base as modelgen
RUN go install ./cmd/modelgen
49 changes: 43 additions & 6 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"bytes"
"context"
"crypto/sha256"
"encoding/gob"
"encoding/hex"
Expand All @@ -15,8 +16,13 @@ import (
"github.com/ovn-org/libovsdb/mapper"
"github.com/ovn-org/libovsdb/model"
"github.com/ovn-org/libovsdb/ovsdb"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/global"
)

var tracer = otel.Tracer("libovsdb.ovn.org/cache")

const (
updateEvent = "update"
addEvent = "add"
Expand Down Expand Up @@ -437,6 +443,7 @@ func NewTableCache(schema *ovsdb.DatabaseSchema, dbModel *model.DBModel, data Da
}
}
}

return &TableCache{
cache: cache,
schema: schema,
Expand Down Expand Up @@ -480,20 +487,20 @@ func (t *TableCache) Tables() []string {

// Update implements the update method of the NotificationHandler interface
// this populates the cache with new updates
func (t *TableCache) Update(context interface{}, tableUpdates ovsdb.TableUpdates) {
func (t *TableCache) Update(ctx context.Context, monitorID interface{}, tableUpdates ovsdb.TableUpdates) {
if len(tableUpdates) == 0 {
return
}
t.Populate(tableUpdates)
t.Populate(ctx, tableUpdates)
}

// Update2 implements the update method of the NotificationHandler interface
// this populates the cache with new updates
func (t *TableCache) Update2(context interface{}, tableUpdates ovsdb.TableUpdates2) {
func (t *TableCache) Update2(ctx context.Context, monitorID interface{}, tableUpdates ovsdb.TableUpdates2) {
if len(tableUpdates) == 0 {
return
}
t.Populate2(tableUpdates)
t.Populate2(ctx, tableUpdates)
}

// Locked implements the locked method of the NotificationHandler interface
Expand All @@ -512,8 +519,36 @@ func (t *TableCache) Echo([]interface{}) {
func (t *TableCache) Disconnected() {
}

// RegisterMetrics registers a metric for all known tables
func (t *TableCache) RegisterMetrics() error {
for _, tt := range t.Tables() {
if err := t.registerMetric(tt); err != nil {
return err
}
}
return nil
}

func (t *TableCache) registerMetric(table string) error {
meter := global.Meter("libovsdb.ovn.org/cache")
if _, ok := t.cache[table]; !ok {
return fmt.Errorf("table not found")
}
_, err := meter.NewInt64ValueObserver(
fmt.Sprintf("libovsdb.cache.%s.size", strings.ToLower(table)),
func(ctx context.Context, result metric.Int64ObserverResult) {
value := t.Table(table).Len()
result.Observe(int64(value))
},
metric.WithDescription(fmt.Sprintf("the size of the %s table in the cache", strings.ToLower(table))),
)
return err
}

// Populate adds data to the cache and places an event on the channel
func (t *TableCache) Populate(tableUpdates ovsdb.TableUpdates) {
func (t *TableCache) Populate(ctx context.Context, tableUpdates ovsdb.TableUpdates) {
_, span := tracer.Start(ctx, "cache_populate")
defer span.End()
t.mutex.Lock()
defer t.mutex.Unlock()

Expand Down Expand Up @@ -560,7 +595,9 @@ func (t *TableCache) Populate(tableUpdates ovsdb.TableUpdates) {
}

// Populate2 adds data to the cache and places an event on the channel
func (t *TableCache) Populate2(tableUpdates ovsdb.TableUpdates2) {
func (t *TableCache) Populate2(ctx context.Context, tableUpdates ovsdb.TableUpdates2) {
_, span := tracer.Start(ctx, "cache_populate2")
defer span.End()
t.mutex.Lock()
defer t.mutex.Unlock()
for table := range t.dbModel.Types() {
Expand Down
21 changes: 11 additions & 10 deletions cache/cache_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cache

import (
"context"
"encoding/json"
"reflect"
"testing"
Expand Down Expand Up @@ -721,7 +722,7 @@ func TestTableCache_populate(t *testing.T) {
},
},
}
tc.Populate(updates)
tc.Populate(context.Background(), updates)

got := tc.Table("Open_vSwitch").Row("test")
assert.Equal(t, testRowModel, got)
Expand All @@ -733,7 +734,7 @@ func TestTableCache_populate(t *testing.T) {
Old: &testRow,
New: &updatedRow,
}
tc.Populate(updates)
tc.Populate(context.Background(), updates)

got = tc.cache["Open_vSwitch"].cache["test"]
assert.Equal(t, updatedRowModel, got)
Expand All @@ -744,7 +745,7 @@ func TestTableCache_populate(t *testing.T) {
New: nil,
}

tc.Populate(updates)
tc.Populate(context.Background(), updates)

_, ok := tc.cache["Open_vSwitch"].cache["test"]
assert.False(t, ok)
Expand Down Expand Up @@ -786,7 +787,7 @@ func TestTableCachePopulate(t *testing.T) {
},
},
}
tc.Populate(updates)
tc.Populate(context.TODO(), updates)

got := tc.Table("Open_vSwitch").Row("test")
assert.Equal(t, testRowModel, got)
Expand All @@ -798,7 +799,7 @@ func TestTableCachePopulate(t *testing.T) {
Old: &testRow,
New: &updatedRow,
}
tc.Populate(updates)
tc.Populate(context.TODO(), updates)

got = tc.cache["Open_vSwitch"].cache["test"]
assert.Equal(t, updatedRowModel, got)
Expand All @@ -809,7 +810,7 @@ func TestTableCachePopulate(t *testing.T) {
New: nil,
}

tc.Populate(updates)
tc.Populate(context.TODO(), updates)

_, ok := tc.cache["Open_vSwitch"].cache["test"]
assert.False(t, ok)
Expand Down Expand Up @@ -851,7 +852,7 @@ func TestTableCachePopulate2(t *testing.T) {
}

t.Log("Initial")
tc.Populate2(updates)
tc.Populate2(context.TODO(), updates)
got := tc.Table("Open_vSwitch").Row("test")
assert.Equal(t, testRowModel, got)

Expand All @@ -865,7 +866,7 @@ func TestTableCachePopulate2(t *testing.T) {
},
},
}
tc.Populate2(updates)
tc.Populate2(context.TODO(), updates)
got = tc.Table("Open_vSwitch").Row("test2")
assert.Equal(t, testRowModel2, got)

Expand All @@ -879,7 +880,7 @@ func TestTableCachePopulate2(t *testing.T) {
},
},
}
tc.Populate2(updates)
tc.Populate2(context.TODO(), updates)
got = tc.cache["Open_vSwitch"].cache["test"]
assert.Equal(t, updatedRowModel, got)

Expand All @@ -892,7 +893,7 @@ func TestTableCachePopulate2(t *testing.T) {
},
},
}
tc.Populate2(updates)
tc.Populate2(context.TODO(), updates)
_, ok := tc.cache["Open_vSwitch"].cache["test"]
assert.False(t, ok)
}
Expand Down
Loading