Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated dependencies and Improvements to CI process #188

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
version: 2

updates:
- package-ecosystem: "gomod"
directory: "/"
schedule:
interval: "daily"
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: daily

57 changes: 57 additions & 0 deletions .github/workflows/docker.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
name: docker

on:
push:
branches:
- 'master'
pull_request:
branches:
- 'master'
workflow_dispatch:
release:
types: [published, edited]

jobs:
build-and-push-images:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3

- name: Docker meta
id: meta
uses: docker/metadata-action@v4
with:
images: |
ghcr.io/edgio/vflow
tags: |
type=raw,value=latest,enable=${{ endsWith(github.ref, github.event.repository.default_branch) }}
type=ref,event=branch
type=ref,event=pr
type=semver,pattern={{version}}
type=semver,pattern={{major}}
type=semver,pattern={{major}}.{{minor}}
type=sha

- name: Set up QEMU
uses: docker/setup-qemu-action@v2

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2

- name: Login to GitHub Container Registry
if: github.event_name != 'pull_request'
uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}

- name: Build and push
uses: docker/build-push-action@v4
with:
context: .
push: ${{ github.event_name != 'pull_request' }}
platforms: linux/amd64 # Other platforms can be added here
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
27 changes: 22 additions & 5 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,34 @@ on:
jobs:
test:
name: Test
runs-on: ubuntu-latest
strategy:
matrix:
go-version: [1.19.x, 1.20.x]
platform: [ubuntu-latest, windows-latest]
runs-on: ${{ matrix.platform }}
steps:
- name: Set up Golang
uses: actions/setup-go@v2
uses: actions/setup-go@v4
with:
go-version: ^1.15
go-version: ${{ matrix.go-version }}

- name: Setup golang cache
uses: actions/cache@v3
with:
path: |
~/go/pkg/mod
~/.cache/go-build
~/Library/Caches/go-build
~\AppData\Local\go-build
key: ${{ runner.os }}-go-${{ matrix.go-version }}-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-${{ matrix.go-version }}-

- name: Check out code
uses: actions/checkout@v2
uses: actions/checkout@v3

- name: Test
run: go test -v ./... -timeout 1m
run: make test

- name: Build
run: make build
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# build vFlow in the first stage
FROM golang:1.15.3 as builder
FROM golang:1.19 as builder
WORKDIR /go/src/

RUN mkdir -p github.com/EdgeCast/vflow
Expand Down
85 changes: 56 additions & 29 deletions consumers/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@
package main

import (
"context"
"encoding/json"
"flag"
"log"
"sync"
"time"

cluster "github.com/bsm/sarama-cluster"
"github.com/Shopify/sarama"
)

type options struct {
Expand Down Expand Up @@ -68,57 +69,83 @@ func init() {
func main() {
var wg sync.WaitGroup

config := cluster.NewConfig()
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
config.Consumer.Group.Session.Timeout = 10 * time.Second
config.Consumer.Group.Heartbeat.Interval = 3 * time.Second
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Version = sarama.V2_1_0_0

wg.Add(opts.Workers)

for i := 0; i < opts.Workers; i++ {
go func(ti int) {
var objmap ipfix

brokers := []string{opts.Broker}
topics := []string{opts.Topic}
consumer, err := cluster.NewConsumer(brokers, "mygroup", topics, config)
consumerGroup, err := sarama.NewConsumerGroup(brokers, "mygroup", config)

if err != nil {
panic(err)
}
defer consumer.Close()
defer consumerGroup.Close()

pCount := 0
count := 0
tik := time.Tick(10 * time.Second)

for {
select {
case <-tik:
if opts.Debug {
log.Printf("partition GroupId#%d, rate=%d\n", ti, (count-pCount)/10)
}
pCount = count
case msg, more := <-consumer.Messages():
if more {
if err := json.Unmarshal(msg.Value, &objmap); err != nil {
log.Println(err)
} else {
for _, data := range objmap.DataSets {
for _, dd := range data {
if dd.I == opts.Id && dd.V == opts.Value {
log.Printf("%#v\n", data)
}
}
err := consumerGroup.Consume(context.Background(), topics, consumerGroupHandler{ti: ti, debug: opts.Debug, id: opts.Id, value: opts.Value, pCount: &pCount, count: &count, tik: tik})
if err != nil {
log.Printf("Error from consumer: %v", err)
}
}
}(i)
}

wg.Wait()
}

type consumerGroupHandler struct {
ti int
debug bool
id int
value string
pCount *int
count *int
tik <-chan time.Time
}

func (c consumerGroupHandler) Setup(sarama.ConsumerGroupSession) error { return nil }
func (c consumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }
func (c consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
var objmap ipfix

for {
select {
case <-c.tik:
if c.debug {
log.Printf("partition GroupId#%d, rate=%d\n", c.ti, (*c.count-*c.pCount)/10)
}
*c.pCount = *c.count
case msg, more := <-claim.Messages():
if more {
if err := json.Unmarshal(msg.Value, &objmap); err != nil {
log.Println(err)
} else {
for _, data := range objmap.DataSets {
for _, dd := range data {
if dd.I == c.id && dd.V == c.value {
log.Printf("%#v\n", data)
}
}

consumer.MarkOffset(msg, "")
count++
}
}

sess.MarkMessage(msg, "")
*c.count++
}
}(i)
}
}

wg.Wait()
return nil
}
2 changes: 1 addition & 1 deletion consumers/clickhouse/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
## Clickhouse, Apache Kafka
ClickHouse is an open source column-oriented database management system capable of real time generation of analytical data reports using SQL queries. ClickHouse's performance exceeds comparable column-oriented DBMS currently available on the market. It processes hundreds of millions to more than a billion rows and tens of gigabytes of data per single server per second. ClickHouse uses all available hardware to it's full potential to process each query as fast as possible. The peak processing performance for a single query (after decompression, only used columns) stands at more than 2 terabytes per second. (https://clickhouse.yandex/)
ClickHouse is an open source column-oriented database management system capable of real time generation of analytical data reports using SQL queries. ClickHouse's performance exceeds comparable column-oriented DBMS currently available on the market. It processes hundreds of millions to more than a billion rows and tens of gigabytes of data per single server per second. ClickHouse uses all available hardware to it's full potential to process each query as fast as possible. The peak processing performance for a single query (after decompression, only used columns) stands at more than 2 terabytes per second. (https://clickhouse.com/)
![Alt text](/docs/imgs/clickhouse.jpeg?raw=true "vFlow")
The below clickhouse setup needs a zookeeper server, replica server is optional.

Expand Down
Loading