Skip to content

Commit

Permalink
Observability (#22)
Browse files Browse the repository at this point in the history
* observability platform initial work
wardviaene authored Oct 15, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent fd641af commit fd9790b
Showing 34 changed files with 2,035 additions and 52 deletions.
142 changes: 142 additions & 0 deletions cmd/cloudwatch-ingestion/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package main

import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"log"
"net/http"
"os"
"strings"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
)

func main() {
var (
logGroupName string
)
flag.StringVar(&logGroupName, "log-group", "", "log group to ingest")
flag.Parse()

if logGroupName == "" {
flag.PrintDefaults()
os.Exit(1)
}

cfg, err := config.LoadDefaultConfig(context.TODO())
if err != nil {
log.Fatalf("unable to load SDK config, %v", err)
}

svc := cloudwatchlogs.NewFromConfig(cfg)

logStreams, err := fetchLogStreams(svc, logGroupName)
if err != nil {
log.Fatalf("failed to fetch log streams: %v", err)
}

for _, stream := range logStreams {
err := fetchLogEvents(svc, logGroupName, *stream.LogStreamName)
if err != nil {
log.Printf("failed to fetch log events for stream %s: %v", *stream.LogStreamName, err)
return
}
}
}

func fetchLogStreams(svc *cloudwatchlogs.Client, logGroupName string) ([]types.LogStream, error) {
var allStreams []types.LogStream
nextToken := ""

for {
input := &cloudwatchlogs.DescribeLogStreamsInput{
LogGroupName: aws.String(logGroupName),
}

if nextToken != "" {
input.NextToken = aws.String(nextToken)
}

result, err := svc.DescribeLogStreams(context.TODO(), input)
if err != nil {
return nil, err
}

allStreams = append(allStreams, result.LogStreams...)

if result.NextToken == nil {
break
}

nextToken = *result.NextToken
}

return allStreams, nil
}

func fetchLogEvents(svc *cloudwatchlogs.Client, logGroupName, logStreamName string) error {
nextToken := ""
messages := []map[string]any{}
logStreamNameSplit := strings.Split(logStreamName, "/")
logStreamWithoutRandom := strings.Join(logStreamNameSplit[:len(logStreamNameSplit)-1], "/")

for {
input := &cloudwatchlogs.GetLogEventsInput{
LogGroupName: aws.String(logGroupName),
LogStreamName: aws.String(logStreamName),
StartFromHead: aws.Bool(true),
}

if nextToken != "" {
input.NextToken = aws.String(nextToken)
}

result, err := svc.GetLogEvents(context.TODO(), input)
if err != nil {
return err
}

for _, event := range result.Events {
seconds := float64(*event.Timestamp / 1000)
microseconds := float64(*event.Timestamp%1000) * 1000
messages = append(messages, map[string]any{
"date": seconds + (microseconds / 1e6),
"log": *event.Message,
"log-group": logGroupName,
"log-stream": logStreamWithoutRandom,
})
}

if result.NextForwardToken == nil || nextToken == *result.NextForwardToken {
break
}

nextToken = *result.NextForwardToken
}

if len(messages) == 0 {
return nil
}

out, err := json.Marshal(messages)
if err != nil {
return err
}
resp, err := http.Post("http://localhost/api/observability/ingestion/json", "image/jpeg", bytes.NewBuffer(out))
if err != nil {
return err
}
if resp.StatusCode != 200 {
return fmt.Errorf("response code is not 200")
}

fmt.Printf("Ingested log-group %s, stream %s: %d messages\n", logGroupName, logStreamWithoutRandom, len(messages))

return nil
}
20 changes: 20 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -18,8 +18,28 @@ require (
)

require (
github.com/aws/aws-sdk-go-v2 v1.31.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.5 // indirect
github.com/aws/aws-sdk-go-v2/config v1.27.33 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.32 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.13 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.18 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.18 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.17 // indirect
github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.40.2
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.19 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.19 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.17 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.61.2 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.22.7 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.7 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.30.7 // indirect
github.com/aws/smithy-go v1.21.0 // indirect
github.com/beevik/etree v1.4.1 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/jonboulle/clockwork v0.4.0 // indirect
github.com/josharian/native v1.1.0 // indirect
github.com/mattermost/xml-roundtrip-validator v0.1.0 // indirect
52 changes: 52 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,51 @@
github.com/aws/aws-sdk-go-v2 v1.30.5 h1:mWSRTwQAb0aLE17dSzztCVJWI9+cRMgqebndjwDyK0g=
github.com/aws/aws-sdk-go-v2 v1.30.5/go.mod h1:CT+ZPWXbYrci8chcARI3OmI/qgd+f6WtuLOoaIA8PR0=
github.com/aws/aws-sdk-go-v2 v1.31.0 h1:3V05LbxTSItI5kUqNwhJrrrY1BAXxXt0sN0l72QmG5U=
github.com/aws/aws-sdk-go-v2 v1.31.0/go.mod h1:ztolYtaEUtdpf9Wftr31CJfLVjOnD/CVRkKOOYgF8hA=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 h1:70PVAiL15/aBMh5LThwgXdSQorVr91L127ttckI9QQU=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4/go.mod h1:/MQxMqci8tlqDH+pjmoLu1i0tbWCUP1hhyMRuFxpQCw=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.5 h1:xDAuZTn4IMm8o1LnBZvmrL8JA1io4o3YWNXgohbf20g=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.5/go.mod h1:wYSv6iDS621sEFLfKvpPE2ugjTuGlAG7iROg0hLOkfc=
github.com/aws/aws-sdk-go-v2/config v1.27.33 h1:Nof9o/MsmH4oa0s2q9a0k7tMz5x/Yj5k06lDODWz3BU=
github.com/aws/aws-sdk-go-v2/config v1.27.33/go.mod h1:kEqdYzRb8dd8Sy2pOdEbExTTF5v7ozEXX0McgPE7xks=
github.com/aws/aws-sdk-go-v2/credentials v1.17.32 h1:7Cxhp/BnT2RcGy4VisJ9miUPecY+lyE9I8JvcZofn9I=
github.com/aws/aws-sdk-go-v2/credentials v1.17.32/go.mod h1:P5/QMF3/DCHbXGEGkdbilXHsyTBX5D3HSwcrSc9p20I=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.13 h1:pfQ2sqNpMVK6xz2RbqLEL0GH87JOwSxPV2rzm8Zsb74=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.13/go.mod h1:NG7RXPUlqfsCLLFfi0+IpKN4sCB9D9fw/qTaSB+xRoU=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.17 h1:pI7Bzt0BJtYA0N/JEC6B8fJ4RBrEMi1LBrkMdFYNSnQ=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.17/go.mod h1:Dh5zzJYMtxfIjYW+/evjQ8uj2OyR/ve2KROHGHlSFqE=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.18 h1:kYQ3H1u0ANr9KEKlGs/jTLrBFPo8P8NaH/w7A01NeeM=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.18/go.mod h1:r506HmK5JDUh9+Mw4CfGJGSSoqIiLCndAuqXuhbv67Y=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.17 h1:Mqr/V5gvrhA2gvgnF42Zh5iMiQNcOYthFYwCyrnuWlc=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.17/go.mod h1:aLJpZlCmjE+V+KtN1q1uyZkfnUWpQGpbsn89XPKyzfU=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.18 h1:Z7IdFUONvTcvS7YuhtVxN99v2cCoHRXOS4mTr0B/pUc=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.18/go.mod h1:DkKMmksZVVyat+Y+r1dEOgJEfUeA7UngIHWeKsi0yNc=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.17 h1:Roo69qTpfu8OlJ2Tb7pAYVuF0CpuUMB0IYWwYP/4DZM=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.17/go.mod h1:NcWPxQzGM1USQggaTVwz6VpqMZPX1CvDJLDh6jnOCa4=
github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.40.2 h1:q5+hHt4JBA+8K6uAvfLWpUs7ErVR0GNW0Xf5KTOl84c=
github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.40.2/go.mod h1:3p7NzlLlJesNGovq7Vqx8+0UibawzodrBRQAbaza6pI=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 h1:KypMCbLPPHEmf9DgMGw51jMj77VfGPAN2Kv4cfhlfgI=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4/go.mod h1:Vz1JQXliGcQktFTN/LN6uGppAIRoLBR2bMvIMP0gOjc=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.19 h1:FLMkfEiRjhgeDTCjjLoc3URo/TBkgeQbocA78lfkzSI=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.19/go.mod h1:Vx+GucNSsdhaxs3aZIKfSUjKVGsxN25nX2SRcdhuw08=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.19 h1:rfprUlsdzgl7ZL2KlXiUAoJnI/VxfHCvDFr2QDFj6u4=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.19/go.mod h1:SCWkEdRq8/7EK60NcvvQ6NXKuTcchAD4ROAsC37VEZE=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.17 h1:u+EfGmksnJc/x5tq3A+OD7LrMbSSR/5TrKLvkdy/fhY=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.17/go.mod h1:VaMx6302JHax2vHJWgRo+5n9zvbacs3bLU/23DNQrTY=
github.com/aws/aws-sdk-go-v2/service/s3 v1.61.2 h1:Kp6PWAlXwP1UvIflkIP6MFZYBNDCa4mFCGtxrpICVOg=
github.com/aws/aws-sdk-go-v2/service/s3 v1.61.2/go.mod h1:5FmD/Dqq57gP+XwaUnd5WFPipAuzrf0HmupX27Gvjvc=
github.com/aws/aws-sdk-go-v2/service/sso v1.22.7 h1:pIaGg+08llrP7Q5aiz9ICWbY8cqhTkyy+0SHvfzQpTc=
github.com/aws/aws-sdk-go-v2/service/sso v1.22.7/go.mod h1:eEygMHnTKH/3kNp9Jr1n3PdejuSNcgwLe1dWgQtO0VQ=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.7 h1:/Cfdu0XV3mONYKaOt1Gr0k1KvQzkzPyiKUdlWJqy+J4=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.7/go.mod h1:bCbAxKDqNvkHxRaIMnyVPXPo+OaPRwvmgzMxbz1VKSA=
github.com/aws/aws-sdk-go-v2/service/sts v1.30.7 h1:NKTa1eqZYw8tiHSRGpP0VtTdub/8KNk8sDkNPFaOKDE=
github.com/aws/aws-sdk-go-v2/service/sts v1.30.7/go.mod h1:NXi1dIAGteSaRLqYgarlhP/Ij0cFT+qmCwiJqWh/U5o=
github.com/aws/smithy-go v1.20.4 h1:2HK1zBdPgRbjFOHlfeQZfpC4r72MOb9bZkiFwggKO+4=
github.com/aws/smithy-go v1.20.4/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/aws/smithy-go v1.21.0 h1:H7L8dtDRk0P1Qm6y0ji7MCYMQObJ5R9CRpyPhRUkLYA=
github.com/aws/smithy-go v1.21.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/beevik/etree v1.1.0/go.mod h1:r8Aw8JqVegEf0w2fDnATrX9VpkMcyFeM0FhwO62wh+A=
github.com/beevik/etree v1.4.0 h1:oz1UedHRepuY3p4N5OjE0nK1WLCqtzHf25bxplKOHLs=
github.com/beevik/etree v1.4.0/go.mod h1:cyWiXwGoasx60gHvtnEh5x8+uIjUVnjWqBvEnhnqKDA=
@@ -19,6 +67,9 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gopacket/gopacket v1.3.0 h1:MouZCc+ej0vnqzB0WeiaO/6+tGvb+KU7UczxoQ+X0Yc=
github.com/gopacket/gopacket v1.3.0/go.mod h1:WnFrU1Xkf5lWKV38uKNR9+yYtppn+ZYzOyNqMeH4oNE=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
github.com/jonboulle/clockwork v0.3.0/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4=
@@ -85,6 +136,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
178 changes: 178 additions & 0 deletions pkg/observability/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package observability

import (
"bytes"
"fmt"
"io"
"path"
"strconv"
"strings"
"time"

"github.com/in4it/wireguard-server/pkg/logging"
"github.com/in4it/wireguard-server/pkg/storage"
)

func (o *Observability) WriteBufferToStorage(n int64) error {
o.ActiveBufferWriters.Add(1)
defer o.ActiveBufferWriters.Done()
o.WriteLock.Lock()
defer o.WriteLock.Unlock()
logging.DebugLog(fmt.Errorf("writing buffer to file. Buffer has: %d bytes", n))
// copy first to temporary buffer (storage might have latency)
tempBuf := bytes.NewBuffer(make([]byte, 0, n))
_, err := io.CopyN(tempBuf, o.Buffer, n)
if err != nil && err != io.EOF {
return fmt.Errorf("write error from buffer to temporary buffer: %s", err)
}
prefix := o.Buffer.ReadPrefix(n)
o.LastFlushed = time.Now()

for _, bufferPosAndPrefix := range mergeBufferPosAndPrefix(prefix) {
now := time.Now()
filename := bufferPosAndPrefix.prefix + "/data-" + strconv.FormatInt(now.Unix(), 10) + "-" + strconv.FormatUint(o.FlushOverflowSequence.Add(1), 10)
err = ensurePath(o.Storage, filename)
if err != nil {
return fmt.Errorf("ensure path error: %s", err)
}
file, err := o.Storage.OpenFileForWriting(filename)
if err != nil {
return fmt.Errorf("open file for writing error: %s", err)
}
_, err = io.CopyN(file, tempBuf, int64(bufferPosAndPrefix.offset))
if err != nil && err != io.EOF {
return fmt.Errorf("file write error: %s", err)
}
logging.DebugLog(fmt.Errorf("wrote file: %s", filename))
err = file.Close()
if err != nil {
return fmt.Errorf("file close error: %s", err)
}
}
return nil
}

func (o *Observability) monitorBuffer() {
for {
time.Sleep(FLUSH_TIME_MAX_MINUTES * time.Minute)
if time.Since(o.LastFlushed) >= (FLUSH_TIME_MAX_MINUTES*time.Minute) && o.Buffer.Len() > 0 {
if o.FlushOverflow.CompareAndSwap(false, true) {
err := o.WriteBufferToStorage(int64(o.Buffer.Len()))
o.FlushOverflow.Swap(true)
if err != nil {
logging.ErrorLog(fmt.Errorf("write log buffer to storage error: %s", err))
continue
}
}
o.LastFlushed = time.Now()
}
}
}

func (o *Observability) Ingest(data io.ReadCloser) error {
defer data.Close()
msgs, err := Decode(data)
if err != nil {
return fmt.Errorf("decode error: %s", err)
}
logging.DebugLog(fmt.Errorf("messages ingested: %d", len(msgs)))
if len(msgs) == 0 {
return nil // no messages to ingest
}
_, err = o.Buffer.Write(encodeMessage(msgs), FloatToDate(msgs[0].Date).Format(DATE_PREFIX))
if err != nil {
return fmt.Errorf("write error: %s", err)
}
if o.Buffer.Len() >= o.MaxBufferSize {
if o.FlushOverflow.CompareAndSwap(false, true) {
go func() { // write to storage
if n := o.Buffer.Len(); n >= o.MaxBufferSize {
err := o.WriteBufferToStorage(int64(n))
if err != nil {
logging.ErrorLog(fmt.Errorf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err))
}
}
o.FlushOverflow.Swap(false)
}()
}
}
return nil
}

func (o *Observability) Flush() error {
// wait until all data is flushed
o.ActiveBufferWriters.Wait()

// flush remaining data that hasn't been flushed
if n := o.Buffer.Len(); n >= 0 {
err := o.WriteBufferToStorage(int64(n))
if err != nil {
return fmt.Errorf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err)
}
}
return nil
}

func (c *ConcurrentRWBuffer) Write(p []byte, prefix string) (n int, err error) {
c.mu.Lock()
defer c.mu.Unlock()
c.prefix = append(c.prefix, BufferPosAndPrefix{prefix: prefix, offset: len(p)})
return c.buffer.Write(p)
}
func (c *ConcurrentRWBuffer) Read(p []byte) (n int, err error) {
c.mu.Lock()
defer c.mu.Unlock()
return c.buffer.Read(p)
}
func (c *ConcurrentRWBuffer) ReadPrefix(n int64) []BufferPosAndPrefix {
c.mu.Lock()
defer c.mu.Unlock()
totalOffset := 0
for k, v := range c.prefix {
if int64(totalOffset+v.offset) == n {
part1 := c.prefix[:k+1]
part2 := make([]BufferPosAndPrefix, len(c.prefix[k+1:]))
copy(part2, c.prefix[k+1:])
c.prefix = part2
return part1
}
totalOffset += v.offset
}
return nil
}
func (c *ConcurrentRWBuffer) Len() int {
return c.buffer.Len()
}
func (c *ConcurrentRWBuffer) Cap() int {
return c.buffer.Cap()
}

func ensurePath(storage storage.Iface, filename string) error {
base := path.Dir(filename)
baseSplit := strings.Split(base, "/")
fullPath := ""
for _, v := range baseSplit {
fullPath = path.Join(fullPath, v)
err := storage.EnsurePath(fullPath)
if err != nil {
return err
}
}
return nil
}

func mergeBufferPosAndPrefix(a []BufferPosAndPrefix) []BufferPosAndPrefix {
bufferPosAndPrefix := []BufferPosAndPrefix{}
for i := 0; i < len(a); i++ {
offset := a[i].offset
for y := i; y+1 < len(a) && a[y].prefix == a[y+1].prefix; y++ {
offset += a[y+1].offset
i++
}
bufferPosAndPrefix = append(bufferPosAndPrefix, BufferPosAndPrefix{
prefix: a[i].prefix,
offset: offset,
})
}
return bufferPosAndPrefix
}
307 changes: 307 additions & 0 deletions pkg/observability/buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,307 @@
package observability

import (
"bytes"
"encoding/json"
"fmt"
"io"
"slices"
"strconv"
"testing"

"github.com/in4it/wireguard-server/pkg/logging"
memorystorage "github.com/in4it/wireguard-server/pkg/storage/memory"
)

func TestIngestion(t *testing.T) {
logging.Loglevel = logging.LOG_DEBUG
totalMessagesToGenerate := 20
storage := &memorystorage.MockMemoryStorage{}
o := NewWithoutMonitor(storage, 20)
o.Storage = storage
payloads := IncomingData{}
for i := 0; i < totalMessagesToGenerate/10; i++ {
payloads = append(payloads, map[string]any{
"date": 1720613813.197045,
"log": "this is string: " + strconv.Itoa(i),
})
}

for i := 0; i < totalMessagesToGenerate/len(payloads); i++ {
payloadBytes, err := json.Marshal(payloads)
if err != nil {
t.Fatalf("marshal error: %s", err)
}
data := io.NopCloser(bytes.NewReader(payloadBytes))
err = o.Ingest(data)
if err != nil {
t.Fatalf("ingest error: %s", err)
}
}

err := o.Flush()
if err != nil {
t.Fatalf("flush error: %s", err)
}

dirlist, err := storage.ReadDir("")
if err != nil {
t.Fatalf("read dir error: %s", err)
}

totalMessages := 0
for _, file := range dirlist {
messages, err := storage.ReadFile(file)
if err != nil {
t.Fatalf("read file error: %s", err)
}
decodedMessages := decodeMessages(messages)
totalMessages += len(decodedMessages)
}
if len(dirlist) == 0 {
t.Fatalf("expected multiple files in directory, got %d", len(dirlist))
}

if totalMessages != totalMessagesToGenerate {
t.Fatalf("Tried to generate total message count of: %d; got: %d", totalMessagesToGenerate, totalMessages)
}
}

func TestIngestionMoreMessages(t *testing.T) {
t.Skip() // we can skip this for general unit testing
totalMessagesToGenerate := 10000000 // 10,000,000
storage := &memorystorage.MockMemoryStorage{}
o := NewWithoutMonitor(storage, MAX_BUFFER_SIZE)
payload := IncomingData{
{
"date": 1720613813.197045,
"log": "this is string: ",
},
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
t.Fatalf("marshal error: %s", err)
}

for i := 0; i < totalMessagesToGenerate; i++ {
data := io.NopCloser(bytes.NewReader(payloadBytes))
err := o.Ingest(data)
if err != nil {
t.Fatalf("ingest error: %s", err)
}
}

err = o.Flush()
if err != nil {
t.Fatalf("flush error: %s", err)
}

dirlist, err := storage.ReadDir("")
if err != nil {
t.Fatalf("read dir error: %s", err)
}

totalMessages := 0
for _, file := range dirlist {
messages, err := storage.ReadFile(file)
if err != nil {
t.Fatalf("read file error: %s", err)
}
decodedMessages := decodeMessages(messages)
totalMessages += len(decodedMessages)
}
if len(dirlist) == 0 {
t.Fatalf("expected multiple files in directory, got %d", len(dirlist))
}

if totalMessages != totalMessagesToGenerate {
t.Fatalf("Tried to generate total message count of: %d; got: %d", totalMessagesToGenerate, totalMessages)
}
fmt.Printf("Buffer size (read+unread): %d in %d files\n", o.Buffer.Cap(), len(dirlist))

}

func BenchmarkIngest10000000(b *testing.B) {
totalMessagesToGenerate := 10000000 // 10,000,000
storage := &memorystorage.MockMemoryStorage{}
o := NewWithoutMonitor(storage, MAX_BUFFER_SIZE)
payload := IncomingData{
{
"date": 1720613813.197045,
"log": "this is string",
},
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
b.Fatalf("marshal error: %s", err)
}

for i := 0; i < totalMessagesToGenerate; i++ {
data := io.NopCloser(bytes.NewReader(payloadBytes))
err := o.Ingest(data)
if err != nil {
b.Fatalf("ingest error: %s", err)
}
}

// wait until all data is flushed
o.ActiveBufferWriters.Wait()

// flush remaining data that hasn't been flushed
if n := o.Buffer.Len(); n >= 0 {
err := o.WriteBufferToStorage(int64(n))
if err != nil {
b.Fatalf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err)
}
}
}

func BenchmarkIngest100000000(b *testing.B) {
totalMessagesToGenerate := 10000000 // 10,000,000
storage := &memorystorage.MockMemoryStorage{}
o := NewWithoutMonitor(storage, MAX_BUFFER_SIZE)
o.Storage = storage
payload := IncomingData{
{
"date": 1720613813.197045,
"log": "this is string",
},
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
b.Fatalf("marshal error: %s", err)
}

for i := 0; i < totalMessagesToGenerate; i++ {
data := io.NopCloser(bytes.NewReader(payloadBytes))
err := o.Ingest(data)
if err != nil {
b.Fatalf("ingest error: %s", err)
}
}

// wait until all data is flushed
o.ActiveBufferWriters.Wait()

// flush remaining data that hasn't been flushed
if n := o.Buffer.Len(); n >= 0 {
err := o.WriteBufferToStorage(int64(n))
if err != nil {
b.Fatalf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err)
}
}
}

func TestEnsurePath(t *testing.T) {
storage := &memorystorage.MockMemoryStorage{}
err := ensurePath(storage, "a/b/c/filename.txt")
if err != nil {
t.Fatalf("error: %s", err)
}
}

func TestMergeBufferPosAndPrefix(t *testing.T) {
testCase1 := []BufferPosAndPrefix{
{
prefix: "abc",
offset: 3,
},
{
prefix: "abc",
offset: 9,
},
{
prefix: "abc",
offset: 2,
},
{
prefix: "abc2",
offset: 3,
},
{
prefix: "abc2",
offset: 2,
},
{
prefix: "abc3",
offset: 2,
},
}
expected1 := []BufferPosAndPrefix{
{
prefix: "abc",
offset: 14,
},
{
prefix: "abc2",
offset: 5,
},
{
prefix: "abc3",
offset: 2,
},
}
res := mergeBufferPosAndPrefix(testCase1)
if !slices.Equal(res, expected1) {
t.Fatalf("test case is not equal to expected\nGot: %+v\nExpected:%+v\n", res, expected1)
}
}

func TestReadPrefix(t *testing.T) {
storage := &memorystorage.MockMemoryStorage{}
o := NewWithoutMonitor(storage, MAX_BUFFER_SIZE)
o.Buffer.prefix = []BufferPosAndPrefix{
{
prefix: "abc",
offset: 3,
},
{
prefix: "abc",
offset: 9,
},
{
prefix: "abc",
offset: 2,
},
{
prefix: "abc2",
offset: 3,
},
{
prefix: "abc2",
offset: 2,
},
{
prefix: "abc3",
offset: 2,
},
}
expected1 := []BufferPosAndPrefix{
{
prefix: "abc",
offset: 3,
},
{
prefix: "abc",
offset: 9,
},
{
prefix: "abc",
offset: 2,
},
}
expected2 := []BufferPosAndPrefix{
{
prefix: "abc2",
offset: 3,
},
}
res := o.Buffer.ReadPrefix(int64(o.Buffer.prefix[0].offset + o.Buffer.prefix[1].offset + o.Buffer.prefix[2].offset))
if !slices.Equal(res, expected1) {
t.Fatalf("test case is not equal to expected\nGot: %+v\nExpected:%+v\n", res, expected1)
}
res2 := o.Buffer.ReadPrefix(3)
if !slices.Equal(res2, expected2) {
t.Fatalf("test case is not equal to expected\nGot: %+v\nExpected:%+v\n", res, expected2)
}
}
8 changes: 8 additions & 0 deletions pkg/observability/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package observability

const MAX_BUFFER_SIZE = 1024 * 1024 // 1 MB
const FLUSH_TIME_MAX_MINUTES = 1 // should have 5 as default at release

const TIMESTAMP_FORMAT = "2006-01-02T15:04:05"

const DATE_PREFIX = "2006/01/02"
101 changes: 91 additions & 10 deletions pkg/observability/decoding.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package observability

import (
"encoding/binary"
"encoding/json"
"fmt"
"io"
"math"
"reflect"
"strconv"
)

func Decode(r io.Reader) ([]FluentBitMessage, error) {
@@ -19,20 +23,97 @@ func Decode(r io.Reader) ([]FluentBitMessage, error) {
if len(m1) == 0 {
return result, fmt.Errorf("empty array")
}
switch m2 := m1[0].(type) {
case map[string]interface{}:
var fluentBitMessage FluentBitMessage
val, ok := m2["date"]
if ok {
fluentBitMessage.Date = val.(float64)
for _, m1Element := range m1 {
switch m2 := m1Element.(type) {
case map[string]interface{}:
var fluentBitMessage FluentBitMessage
fluentBitMessage.Data = make(map[string]string)
val, ok := m2["date"]
if ok {
fluentBitMessage.Date = val.(float64)
}
for key, value := range m2 {
if key != "date" {
switch valueTyped := value.(type) {
case string:
fluentBitMessage.Data[key] = valueTyped
case float64:
fluentBitMessage.Data[key] = strconv.FormatFloat(valueTyped, 'f', -1, 64)
case []byte:
fluentBitMessage.Data[key] = string(valueTyped)
default:
fmt.Printf("no hit on type: %s", reflect.TypeOf(valueTyped))
}
}
}
result = append(result, fluentBitMessage)
default:
return result, fmt.Errorf("invalid type: no map found in array")
}
fluentBitMessage.Data = m2
result = append(result, fluentBitMessage)
default:
return result, fmt.Errorf("invalid type: no map found in array")
}
default:
return result, fmt.Errorf("invalid type: no array found")
}
return result, nil
}

func decodeMessages(msgs []byte) []FluentBitMessage {
res := []FluentBitMessage{}
recordOffset := 0
for k := 0; k < len(msgs); k++ {
if k > recordOffset+8 && msgs[k] == 0xff && msgs[k-1] == 0xff {
res = append(res, decodeMessage(msgs[recordOffset:k]))
recordOffset = k + 1
}
}
return res
}
func decodeMessage(data []byte) FluentBitMessage {
bits := binary.LittleEndian.Uint64(data[0:8])
msg := FluentBitMessage{
Date: math.Float64frombits(bits),
Data: map[string]string{},
}
isKey := false
key := ""
start := 8
for kk := start; kk < len(data); kk++ {
if data[kk] == 0xff {
if isKey {
isKey = false
msg.Data[key] = string(data[start+1 : kk])
start = kk + 1
} else {
isKey = true
key = string(data[start:kk])
start = kk
}
}
}
// if last record didn't end with the terminator
if data[len(data)-1] != 0xff {
msg.Data[key] = string(data[start+1:])
}
return msg
}

func scanMessage(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
for i := 0; i < len(data); i++ {
if data[i] == 0xff && data[i-1] == 0xff {
return i + 1, data[0 : i-1], nil
}
}
// If we're at EOF, we have a final, non-terminated line. Return it.
if atEOF {
if len(data) > 1 && data[len(data)-1] == 0xff && data[len(data)-2] == 0xff {
return len(data[0 : len(data)-2]), data, nil
} else {
return len(data), data, nil
}
}
// Request more data.
return 0, nil, nil
}
199 changes: 196 additions & 3 deletions pkg/observability/decoding_test.go
Original file line number Diff line number Diff line change
@@ -2,12 +2,22 @@ package observability

import (
"bytes"
"encoding/json"
"testing"
)

func TestDecoding(t *testing.T) {
data := `[{"date":1720613813.197045,"rand_value":5523152494216581654}]`
messages, err := Decode(bytes.NewBuffer([]byte(data)))
payload := IncomingData{
{
"date": 1720613813.197045,
"rand_value": "rand",
},
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
t.Fatalf("json marshal error: %s", err)
}
messages, err := Decode(bytes.NewBuffer(payloadBytes))
if err != nil {
t.Fatalf("error: %s", err)
}
@@ -21,7 +31,190 @@ func TestDecoding(t *testing.T) {
if !ok {
t.Fatalf("rand_value key not found")
}
if val.(float64) != 5523152494216581654 {
if string(val) != "rand" {
t.Fatalf("wrong data returned: %s", val)
}
}

func TestDecodingMultiMessage(t *testing.T) {
payload := IncomingData{
{
"date": 1727119152.0,
"container_name": "/fluentbit-nginx-1",
"source": "stdout",
"log": "/docker-entrypoint.sh: /docker-entrypoint.d/ is not empty, will attempt to perform configuration",
"container_id": "7a9c8ae0ca6c5434b778fa0a2e74e038710b3f18dedb3478235291832f121186",
},
{
"date": 1727119152.0,
"source": "stdout",
"log": "/docker-entrypoint.sh: Looking for shell scripts in /docker-entrypoint.d/",
"container_id": "7a9c8ae0ca6c5434b778fa0a2e74e038710b3f18dedb3478235291832f121186",
"container_name": "/fluentbit-nginx-1",
},
{
"date": 1727119152.0,
"container_id": "7a9c8ae0ca6c5434b778fa0a2e74e038710b3f18dedb3478235291832f121186",
"container_name": "/fluentbit-nginx-1",
"source": "stdout",
"log": "/docker-entrypoint.sh: Launching /docker-entrypoint.d/10-listen-on-ipv6-by-default.sh",
},
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
t.Fatalf("json marshal error: %s", err)
}
messages, err := Decode(bytes.NewBuffer(payloadBytes))
if err != nil {
t.Fatalf("error: %s", err)
}
if len(messages) != len(payload) {
t.Fatalf("incorrect messages returned. Got %d, expected %d", len(messages), len(payload))
}
val, ok := messages[2].Data["container_id"]
if !ok {
t.Fatalf("container_id key not found")
}
if string(val) != "7a9c8ae0ca6c5434b778fa0a2e74e038710b3f18dedb3478235291832f121186" {
t.Fatalf("wrong data returned: %s", val)
}
}

func TestDecodeMessages(t *testing.T) {
msgs := []FluentBitMessage{
{
Date: 1720613813.197045,
Data: map[string]string{
"mykey": "this is myvalue",
"second key": "this is my second value",
"third key": "this is my third value",
},
},
{
Date: 1720613813.197099,
Data: map[string]string{
"second data set": "my value",
},
},
}
encoded := encodeMessage(msgs)
decoded := decodeMessages(encoded)

if len(msgs) != len(decoded) {
t.Fatalf("length doesn't match")
}
for k := range decoded {
if msgs[k].Date != decoded[k].Date {
t.Fatalf("date doesn't match")
}
if len(msgs[k].Data) != len(decoded[k].Data) {
t.Fatalf("length of data doesn't match")
}
for kk := range decoded[k].Data {
if msgs[k].Data[kk] != decoded[k].Data[kk] {
t.Fatalf("key/value pair in data doesn't match: key: %s. Data: %s vs %s", kk, msgs[k].Data[kk], decoded[k].Data[kk])
}
}
}
}

func TestDecodeMessage(t *testing.T) {
msgs := []FluentBitMessage{
{
Date: 1720613813.197099,
Data: map[string]string{
"second data set": "my value",
},
},
}
encoded := encodeMessage(msgs)
message := decodeMessage(encoded)

if message.Date != message.Date {
t.Fatalf("date doesn't match")
}
if len(msgs[0].Data) != len(message.Data) {
t.Fatalf("length of data doesn't match")
}
for kk := range message.Data {
if msgs[0].Data[kk] != message.Data[kk] {
t.Fatalf("key/value pair in data doesn't match: key: %s. Data: %s vs %s", kk, message.Data[kk], message.Data[kk])
}
}
}
func TestDecodeMessageWithoutTerminator(t *testing.T) {
msgs := []FluentBitMessage{
{
Date: 1720613813.197099,
Data: map[string]string{
"second data set": "my value",
},
},
}
encoded := encodeMessage(msgs)
message := decodeMessage(bytes.TrimSuffix(encoded, []byte{0xff, 0xff}))

if message.Date != message.Date {
t.Fatalf("date doesn't match")
}
if len(msgs[0].Data) != len(message.Data) {
t.Fatalf("length of data doesn't match: got: '%s', expected '%s'", message.Data, msgs[0].Data)
}
for kk := range message.Data {
if msgs[0].Data[kk] != message.Data[kk] {
t.Fatalf("key/value pair in data doesn't match: key: %s. Data: %s vs %s", kk, message.Data[kk], msgs[0].Data[kk])
}
}
}

func TestScanMessage(t *testing.T) {
msgs := []FluentBitMessage{
{
Date: 1720613813.197045,
Data: map[string]string{
"mykey": "this is myvalue",
"second key": "this is my second value",
"third key": "this is my third value",
},
},
{
Date: 1720613813.197099,
Data: map[string]string{
"second data set": "my value",
},
},
}
encoded := encodeMessage(msgs)
// first record
advance, record1, err := scanMessage(encoded, false)
if err != nil {
t.Fatalf("scan lines error: %s", err)
}
firstRecord := decodeMessages(append(record1, []byte{0xff, 0xff}...))
if len(firstRecord) == 0 {
t.Fatalf("first record is empty")
}
if firstRecord[0].Data["mykey"] != "this is myvalue" {
t.Fatalf("wrong data returned")
}
// second record
advance2, record2, err := scanMessage(encoded[advance:], false)
if err != nil {
t.Fatalf("scan lines error: %s", err)
}
secondRecord := decodeMessages(append(record2, []byte{0xff, 0xff}...))
if len(secondRecord) == 0 {
t.Fatalf("first record is empty")
}
if secondRecord[0].Data["second data set"] != "my value" {
t.Fatalf("wrong data returned in second record")
}
// third call
advance3, record3, err := scanMessage(encoded[advance+advance2:], false)
if err != nil {
t.Fatalf("scan lines error: %s", err)
}
if advance3 != 0 {
t.Fatalf("third record should be empty. Got: %+v", record3)
}
}
24 changes: 24 additions & 0 deletions pkg/observability/encoding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package observability

import (
"bytes"
"encoding/binary"
"math"
)

func encodeMessage(msgs []FluentBitMessage) []byte {
out := bytes.Buffer{}
for _, msg := range msgs {
var buf [8]byte
binary.LittleEndian.PutUint64(buf[:], math.Float64bits(msg.Date))
out.Write(buf[:])
for key, msgData := range msg.Data {
out.Write([]byte(key))
out.Write([]byte{0xff})
out.Write([]byte(msgData))
out.Write([]byte{0xff})
}
out.Write([]byte{0xff})
}
return out.Bytes()
}
81 changes: 78 additions & 3 deletions pkg/observability/handlers.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,96 @@
package observability

import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
"time"
)

func (o *Observability) observabilityHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
}

func (o *Observability) ingestionHandler(w http.ResponseWriter, r *http.Request) {
msgs, err := Decode(r.Body)
if err != nil {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusBadRequest)
return
}

if err := o.Ingest(r.Body); err != nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Printf("error: %s", err)
return
}
fmt.Printf("Got msgs: %+v\n", msgs)
w.WriteHeader(http.StatusOK)
}

func (o *Observability) logsHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusBadRequest)
return
}
if r.FormValue("fromDate") == "" {
o.returnError(w, fmt.Errorf("no from date supplied"), http.StatusBadRequest)
return
}
fromDate, err := time.Parse("2006-01-02", r.FormValue("fromDate"))
if err != nil {
o.returnError(w, fmt.Errorf("invalid date: %s", err), http.StatusBadRequest)
return
}
if r.FormValue("endDate") == "" {
o.returnError(w, fmt.Errorf("no end date supplied"), http.StatusBadRequest)
return
}
endDate, err := time.Parse("2006-01-02", r.FormValue("endDate"))
if err != nil {
o.returnError(w, fmt.Errorf("invalid date: %s", err), http.StatusBadRequest)
return
}
offset := 0
if r.FormValue("offset") != "" {
i, err := strconv.Atoi(r.FormValue("offset"))
if err == nil {
offset = i
}
}
maxLines := 0
if r.FormValue("maxLines") != "" {
i, err := strconv.Atoi(r.FormValue("maxLines"))
if err == nil {
maxLines = i
}
}
pos := int64(0)
if r.FormValue("pos") != "" {
i, err := strconv.ParseInt(r.FormValue("pos"), 10, 64)
if err == nil {
pos = i
}
}
displayTags := strings.Split(r.FormValue("display-tags"), ",")
filterTagsSplit := strings.Split(r.FormValue("filter-tags"), ",")
filterTags := []KeyValue{}
for _, tag := range filterTagsSplit {
kv := strings.Split(tag, "=")
if len(kv) == 2 {
filterTags = append(filterTags, KeyValue{Key: kv[0], Value: kv[1]})
}
}
out, err := o.getLogs(fromDate, endDate, pos, maxLines, offset, r.FormValue("search"), displayTags, filterTags)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Printf("get logs error: %s", err)
return
}
outBytes, err := json.Marshal(out)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Printf("marshal error: %s", err)
return
}
w.Write(outBytes)
}
66 changes: 66 additions & 0 deletions pkg/observability/handlers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package observability

import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"

memorystorage "github.com/in4it/wireguard-server/pkg/storage/memory"
)

func TestIngestionHandler(t *testing.T) {
storage := &memorystorage.MockMemoryStorage{}
o := NewWithoutMonitor(storage, 20)
o.Storage = storage
payload := IncomingData{
{
"date": 1720613813.197045,
"log": "this is a string",
},
}

payloadBytes, err := json.Marshal(payload)
if err != nil {
t.Fatalf("marshal error: %s", err)
}
req := httptest.NewRequest(http.MethodPost, "/api/observability/ingestion/json", bytes.NewReader(payloadBytes))
w := httptest.NewRecorder()
o.ingestionHandler(w, req)
res := w.Result()

if res.StatusCode != http.StatusOK {
t.Fatalf("expected status code OK. Got: %d", res.StatusCode)
}

// wait until all data is flushed
o.ActiveBufferWriters.Wait()

// flush remaining data that hasn't been flushed
if n := o.Buffer.Len(); n >= 0 {
err := o.WriteBufferToStorage(int64(n))
if err != nil {
t.Fatalf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err)
}
}

dirlist, err := storage.ReadDir("")
if err != nil {
t.Fatalf("read dir error: %s", err)
}
if len(dirlist) == 0 {
t.Fatalf("dir is empty")
}
messages, err := storage.ReadFile(dirlist[0])
if err != nil {
t.Fatalf("read file error: %s", err)
}
decodedMessages := decodeMessages(messages)
if decodedMessages[0].Date != 1720613813.197045 {
t.Fatalf("unexpected date. Got %f, expected: %f", decodedMessages[0].Date, 1720613813.197045)
}
if decodedMessages[0].Data["log"] != "this is a string" {
t.Fatalf("unexpected log data")
}
}
31 changes: 31 additions & 0 deletions pkg/observability/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package observability

import (
"fmt"
"math"
"net/http"
"strings"
"time"
)

func (o *Observability) returnError(w http.ResponseWriter, err error, statusCode int) {
fmt.Println("========= ERROR =========")
fmt.Printf("Error: %s\n", err)
fmt.Println("=========================")
w.WriteHeader(statusCode)
w.Write([]byte(`{"error": "` + strings.Replace(err.Error(), `"`, `\"`, -1) + `"}`))
}

func FloatToDate(datetime float64) time.Time {
datetimeInt := int64(datetime)
decimals := datetime - float64(datetimeInt)
nsecs := int64(math.Round(decimals * 1_000_000)) // precision to match golang's time.Time
return time.Unix(datetimeInt, nsecs*1000)
}

func DateToFloat(datetime time.Time) float64 {
seconds := float64(datetime.Unix())
nanoseconds := float64(datetime.Nanosecond()) / 1e9
fmt.Printf("nanosec: %f", nanoseconds)
return seconds + nanoseconds
}
15 changes: 15 additions & 0 deletions pkg/observability/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package observability

import (
"testing"
"time"
)

func TestFloatToDate2Way(t *testing.T) {
now := time.Now()
float := DateToFloat(now)
date := FloatToDate(float)
if date.Format(TIMESTAMP_FORMAT) != now.Format(TIMESTAMP_FORMAT) {
t.Fatalf("got: %s, expected: %s", date.Format(TIMESTAMP_FORMAT), now.Format(TIMESTAMP_FORMAT))
}
}
116 changes: 116 additions & 0 deletions pkg/observability/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package observability

import (
"bufio"
"fmt"
"sort"
"strings"
"time"
)

func (o *Observability) getLogs(fromDate, endDate time.Time, pos int64, maxLogLines, offset int, search string, displayTags []string, filterTags []KeyValue) (LogEntryResponse, error) {
logEntryResponse := LogEntryResponse{
Enabled: true,
LogEntries: []LogEntry{},
Tags: KeyValueInt{},
}

keys := make(map[KeyValue]int)

logFiles := []string{}

if maxLogLines == 0 {
maxLogLines = 100
}

for d := fromDate; d.Before(endDate) || d.Equal(endDate); d = d.AddDate(0, 0, 1) {
fileList, err := o.Storage.ReadDir(d.Format(DATE_PREFIX))
if err != nil {
logEntryResponse.NextPos = -1
return logEntryResponse, nil // can't read directory, return empty response
}
for _, filename := range fileList {
logFiles = append(logFiles, d.Format(DATE_PREFIX)+"/"+filename)
}
}

fileReaders, err := o.Storage.OpenFilesFromPos(logFiles, pos)
if err != nil {
return logEntryResponse, fmt.Errorf("error while reading files: %s", err)
}
for _, fileReader := range fileReaders {
defer fileReader.Close()
}

for _, logInputData := range fileReaders { // read multiple files
if len(logEntryResponse.LogEntries) >= maxLogLines {
break
}
scanner := bufio.NewScanner(logInputData)
scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) {
advance, token, err = scanMessage(data, atEOF)
pos += int64(advance)
return
})
for scanner.Scan() && len(logEntryResponse.LogEntries) < maxLogLines { // read multiple lines
// decode, store as logentry
logMessage := decodeMessage(scanner.Bytes())
logline, ok := logMessage.Data["log"]
if ok {
timestamp := FloatToDate(logMessage.Date).Add(time.Duration(offset) * time.Minute)
if search == "" || strings.Contains(logline, search) {
tags := []KeyValue{}
for _, tag := range displayTags {
if tagValue, ok := logMessage.Data[tag]; ok {
tags = append(tags, KeyValue{Key: tag, Value: tagValue})
}
}
filterMessage := true
if len(filterTags) == 0 {
filterMessage = false
} else {
for _, filter := range filterTags {
if tagValue, ok := logMessage.Data[filter.Key]; ok {
if tagValue == filter.Value {
filterMessage = false
}
}
}
}
if !filterMessage {
logEntry := LogEntry{
Timestamp: timestamp.Format(TIMESTAMP_FORMAT),
Data: logline,
Tags: tags,
}
logEntryResponse.LogEntries = append(logEntryResponse.LogEntries, logEntry)
for k, v := range logMessage.Data {
if k != "log" {
keys[KeyValue{Key: k, Value: v}] += 1
}
}
}
}
}
}
if err := scanner.Err(); err != nil {
return logEntryResponse, fmt.Errorf("log file read (scanner) error: %s", err)
}
}
if len(logEntryResponse.LogEntries) < maxLogLines {
logEntryResponse.NextPos = -1 // no more records
} else {
logEntryResponse.NextPos = pos
}

for k, v := range keys {
logEntryResponse.Tags = append(logEntryResponse.Tags, KeyValueTotal{
Key: k.Key,
Value: k.Value,
Total: v,
})
}
sort.Sort(logEntryResponse.Tags)

return logEntryResponse, nil
}
96 changes: 96 additions & 0 deletions pkg/observability/logs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package observability

import (
"bytes"
"encoding/json"
"io"
"strconv"
"strings"
"testing"
"time"

"github.com/in4it/wireguard-server/pkg/logging"
memorystorage "github.com/in4it/wireguard-server/pkg/storage/memory"
)

func TestGetLogs(t *testing.T) {
logging.Loglevel = logging.LOG_DEBUG
totalMessagesToGenerate := 100
storage := &memorystorage.MockMemoryStorage{}
o := NewWithoutMonitor(storage, 20)
timestamp := DateToFloat(time.Now())
payload := IncomingData{
{
"date": timestamp,
"log": "this is string: ",
},
}

for i := 0; i < totalMessagesToGenerate; i++ {
payload[0]["log"] = "this is string: " + strconv.Itoa(i)
payloadBytes, err := json.Marshal(payload)
if err != nil {
t.Fatalf("marshal error: %s", err)
}
data := io.NopCloser(bytes.NewReader(payloadBytes))
err = o.Ingest(data)
if err != nil {
t.Fatalf("ingest error: %s", err)
}
}

// wait until all data is flushed
o.ActiveBufferWriters.Wait()

// flush remaining data that hasn't been flushed
if n := o.Buffer.Len(); n >= 0 {
err := o.WriteBufferToStorage(int64(n))
if err != nil {
t.Fatalf("write log buffer to storage error (buffer: %d): %s", o.Buffer.Len(), err)
}
}

now := time.Now()
maxLogLines := 100
search := ""

logEntryResponse, err := o.getLogs(now, now, 0, maxLogLines, 0, search, []string{}, []KeyValue{})
if err != nil {
t.Fatalf("get logs error: %s", err)
}
if len(logEntryResponse.LogEntries) != totalMessagesToGenerate {
t.Fatalf("didn't get the same log entries as messaged we generated: got: %d, expected: %d", len(logEntryResponse.LogEntries), totalMessagesToGenerate)
}
if logEntryResponse.LogEntries[0].Timestamp != FloatToDate(timestamp).Format(TIMESTAMP_FORMAT) {
t.Fatalf("unexpected timestamp: %s vs %s", logEntryResponse.LogEntries[0].Timestamp, FloatToDate(timestamp).Format(TIMESTAMP_FORMAT))
}
}

func TestFloatToDate(t *testing.T) {
for i := 0; i < 10; i++ {
now := time.Now()
floatDate := float64(now.Unix()) + float64(now.Nanosecond())/1e9
floatToDate := FloatToDate(floatDate)
if now.Unix() != floatToDate.Unix() {
t.Fatalf("times are not equal. Got: %v, expected: %v", floatToDate, now)
}
/*if now.UnixNano() != floatToDate.UnixNano() {
t.Fatalf("times are not equal. Got: %v, expected: %v", floatToDate, now)
}*/
}
}

func TestKeyValue(t *testing.T) {
logEntryResponse := LogEntryResponse{
Tags: KeyValueInt{
{Key: "k", Value: "v", Total: 4},
},
}
out, err := json.Marshal(logEntryResponse)
if err != nil {
t.Fatalf("error: %s", err)
}
if !strings.Contains(string(out), `"tags":[{"key":"k","value":"v","total":4}]`) {
t.Fatalf("wrong output: %s", out)
}
}
21 changes: 16 additions & 5 deletions pkg/observability/new.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
package observability

import "net/http"
import (
"net/http"

func New() *Observability {
return &Observability{}
}
"github.com/in4it/wireguard-server/pkg/storage"
)

type Observability struct {
func New(defaultStorage storage.Iface) *Observability {
o := NewWithoutMonitor(defaultStorage, MAX_BUFFER_SIZE)
go o.monitorBuffer()
return o
}
func NewWithoutMonitor(storage storage.Iface, maxBufferSize int) *Observability {
o := &Observability{
Buffer: &ConcurrentRWBuffer{},
MaxBufferSize: maxBufferSize,
Storage: storage,
}
return o
}

type Iface interface {
1 change: 1 addition & 0 deletions pkg/observability/router.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ func (o *Observability) GetRouter() *http.ServeMux {
mux := http.NewServeMux()
mux.Handle("/api/observability/", http.HandlerFunc(o.observabilityHandler))
mux.Handle("/api/observability/ingestion/json", http.HandlerFunc(o.ingestionHandler))
mux.Handle("/api/observability/logs", http.HandlerFunc(o.logsHandler))

return mux
}
87 changes: 85 additions & 2 deletions pkg/observability/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,89 @@
package observability

import (
"bytes"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/in4it/wireguard-server/pkg/storage"
)

type IncomingData []map[string]any

type FluentBitMessage struct {
Date float64 `json:"date"`
Data map[string]any `json:"data"`
Date float64 `json:"date"`
Data map[string]string `json:"data"`
}

type Observability struct {
Storage storage.Iface
Buffer *ConcurrentRWBuffer
LastFlushed time.Time
FlushOverflow atomic.Bool
FlushOverflowSequence atomic.Uint64
ActiveBufferWriters sync.WaitGroup
WriteLock sync.Mutex
MaxBufferSize int
}

type ConcurrentRWBuffer struct {
buffer bytes.Buffer
prefix []BufferPosAndPrefix
mu sync.Mutex
}

type BufferPosAndPrefix struct {
prefix string
offset int
}

type LogEntryResponse struct {
Enabled bool `json:"enabled"`
LogEntries []LogEntry `json:"logEntries"`
Tags KeyValueInt `json:"tags"`
NextPos int64 `json:"nextPos"`
}

type LogEntry struct {
Timestamp string `json:"timestamp"`
Data string `json:"data"`
Tags []KeyValue `json:"tags"`
}

type KeyValueInt []KeyValueTotal

type KeyValueTotal struct {
Key string
Value string
Total int
}
type KeyValue struct {
Key string `json:"key"`
Value string `json:"value"`
}

func (kv KeyValueInt) MarshalJSON() ([]byte, error) {
res := "["
for _, v := range kv {
res += `{ "key" : "` + v.Key + `", "value": "` + v.Value + `", "total": ` + strconv.Itoa(v.Total) + ` },`
}
res = strings.TrimRight(res, ",")
res += "]"
return []byte(res), nil
}

func (kv KeyValueInt) Len() int {
return len(kv)
}
func (kv KeyValueInt) Less(i, j int) bool {
if kv[i].Key == kv[j].Key {
return kv[i].Value < kv[j].Value
}
return kv[i].Key < kv[j].Key
}
func (kv KeyValueInt) Swap(i, j int) {
kv[i], kv[j] = kv[j], kv[i]
}
4 changes: 2 additions & 2 deletions pkg/rest/context.go
Original file line number Diff line number Diff line change
@@ -90,10 +90,10 @@ func newContext(storage storage.Iface, serverType string) (*Context, error) {

if c.Observability == nil {
c.Observability = &Observability{
Client: observability.New(),
Client: observability.New(storage),
}
} else {
c.Observability.Client = observability.New()
c.Observability.Client = observability.New(storage)
}

return c, nil
2 changes: 1 addition & 1 deletion pkg/rest/login/auth_test.go
Original file line number Diff line number Diff line change
@@ -118,6 +118,6 @@ func TestAuthenticateMFAWithToken(t *testing.T) {
t.Fatalf("authentication error: %s", err)
}
if !loginResp.Authenticated {
t.Fatalf("expected not to be authenticated")
t.Fatalf("expected to be authenticated")
}
}
2 changes: 1 addition & 1 deletion pkg/rest/setup.go
Original file line number Diff line number Diff line change
@@ -134,7 +134,7 @@ func (c *Context) contextHandler(w http.ResponseWriter, r *http.Request) {
}
}

out, err := json.Marshal(ContextSetupResponse{SetupCompleted: c.SetupCompleted, CloudType: c.CloudType})
out, err := json.Marshal(ContextSetupResponse{SetupCompleted: c.SetupCompleted, CloudType: c.CloudType, ServerType: c.ServerType})
if err != nil {
c.returnError(w, err, http.StatusBadRequest)
return
1 change: 1 addition & 0 deletions pkg/rest/types.go
Original file line number Diff line number Diff line change
@@ -70,6 +70,7 @@ type ContextRequest struct {
type ContextSetupResponse struct {
SetupCompleted bool `json:"setupCompleted"`
CloudType string `json:"cloudType"`
ServerType string `json:"serverType"`
}

type AuthMethodsResponse struct {
44 changes: 42 additions & 2 deletions pkg/storage/memory/storage.go
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ import (
"os"
"path"
"strings"
"sync"
)

type MockReadWriterData []byte
@@ -23,12 +24,15 @@ func (m *MockReadWriterData) Write(p []byte) (nn int, err error) {
type MockMemoryStorage struct {
FileInfoData map[string]*FileInfo
Data map[string]*MockReadWriterData
Mu sync.Mutex
}

func (m *MockMemoryStorage) ConfigPath(filename string) string {
return path.Join("config", filename)
}
func (m *MockMemoryStorage) Rename(oldName, newName string) error {
m.Mu.Lock()
defer m.Mu.Unlock()
if m.Data == nil {
m.Data = make(map[string]*MockReadWriterData)
}
@@ -41,6 +45,8 @@ func (m *MockMemoryStorage) Rename(oldName, newName string) error {
return nil
}
func (m *MockMemoryStorage) FileExists(name string) bool {
m.Mu.Lock()
defer m.Mu.Unlock()
if m.Data == nil {
m.Data = make(map[string]*MockReadWriterData)
}
@@ -49,6 +55,8 @@ func (m *MockMemoryStorage) FileExists(name string) bool {
}

func (m *MockMemoryStorage) ReadFile(name string) ([]byte, error) {
m.Mu.Lock()
defer m.Mu.Unlock()
if m.Data == nil {
m.Data = make(map[string]*MockReadWriterData)
}
@@ -59,13 +67,17 @@ func (m *MockMemoryStorage) ReadFile(name string) ([]byte, error) {
return *val, nil
}
func (m *MockMemoryStorage) WriteFile(name string, data []byte) error {
m.Mu.Lock()
defer m.Mu.Unlock()
if m.Data == nil {
m.Data = make(map[string]*MockReadWriterData)
}
m.Data[name] = (*MockReadWriterData)(&data)
return nil
}
func (m *MockMemoryStorage) AppendFile(name string, data []byte) error {
m.Mu.Lock()
defer m.Mu.Unlock()
if m.Data == nil {
m.Data = make(map[string]*MockReadWriterData)
}
@@ -92,19 +104,25 @@ func (m *MockMemoryStorage) EnsureOwnership(filename, login string) error {
}

func (m *MockMemoryStorage) ReadDir(path string) ([]string, error) {
m.Mu.Lock()
defer m.Mu.Unlock()
if m.Data == nil {
m.Data = make(map[string]*MockReadWriterData)
}
res := []string{}
for k := range m.Data {
if strings.HasPrefix(k, path+"/") {
if path == "" {
res = append(res, strings.TrimSuffix(k, "/"))
} else if strings.HasPrefix(k, path+"/") {
res = append(res, strings.ReplaceAll(k, path+"/", ""))
}
}
return res, nil
}

func (m *MockMemoryStorage) Remove(name string) error {
m.Mu.Lock()
defer m.Mu.Unlock()
if m.Data == nil {
m.Data = make(map[string]*MockReadWriterData)
}
@@ -117,9 +135,27 @@ func (m *MockMemoryStorage) Remove(name string) error {
}

func (m *MockMemoryStorage) OpenFilesFromPos(names []string, pos int64) ([]io.ReadCloser, error) {
return nil, fmt.Errorf("not implemented")
m.Mu.Lock()
defer m.Mu.Unlock()
if m.Data == nil {
m.Data = make(map[string]*MockReadWriterData)
}
if pos > 0 {
return nil, fmt.Errorf("pos > 0 not implemented")
}
readClosers := []io.ReadCloser{}
for _, name := range names {
val, ok := m.Data[name]
if !ok {
return nil, fmt.Errorf("file does not exist")
}
readClosers = append(readClosers, io.NopCloser(bytes.NewBuffer(*val)))
}
return readClosers, nil
}
func (m *MockMemoryStorage) OpenFile(name string) (io.ReadCloser, error) {
m.Mu.Lock()
defer m.Mu.Unlock()
if m.Data == nil {
m.Data = make(map[string]*MockReadWriterData)
}
@@ -138,6 +174,8 @@ func (m *MockMemoryStorage) OpenFileForWriting(name string) (io.WriteCloser, err
return m.Data[name], nil
}
func (m *MockMemoryStorage) OpenFileForAppending(name string) (io.WriteCloser, error) {
m.Mu.Lock()
defer m.Mu.Unlock()
if m.Data == nil {
m.Data = make(map[string]*MockReadWriterData)
}
@@ -153,6 +191,8 @@ func (m *MockMemoryStorage) EnsurePermissions(name string, mode fs.FileMode) err
return nil
}
func (m *MockMemoryStorage) FileInfo(name string) (fs.FileInfo, error) {
m.Mu.Lock()
defer m.Mu.Unlock()
val, ok := m.FileInfoData[name]
if !ok {
return FileInfo{}, fmt.Errorf("couldn't get file info for: %s", name)
25 changes: 25 additions & 0 deletions pkg/storage/s3/list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package s3storage

import (
"context"
"fmt"
"strings"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
)

func (s *S3Storage) ReadDir(pathname string) ([]string, error) {
objectList, err := s.s3Client.ListObjectsV2(context.TODO(), &s3.ListObjectsV2Input{
Bucket: aws.String(s.bucketname),
Prefix: aws.String(s.prefix + "/" + strings.TrimLeft(pathname, "/")),
})
if err != nil {
return []string{}, fmt.Errorf("list object error: %s", err)
}
res := make([]string, len(objectList.Contents))
for k, object := range objectList.Contents {
res[k] = *object.Key
}
return res, nil
}
23 changes: 23 additions & 0 deletions pkg/storage/s3/new.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package s3storage

import (
"context"
"fmt"

"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
)

func New(bucketname, prefix string) (*S3Storage, error) {
sdkConfig, err := config.LoadDefaultConfig(context.TODO())
if err != nil {
return nil, fmt.Errorf("config load error: %s", err)
}
s3Client := s3.NewFromConfig(sdkConfig)

return &S3Storage{
bucketname: bucketname,
prefix: prefix,
s3Client: s3Client,
}, nil
}
42 changes: 42 additions & 0 deletions pkg/storage/s3/path.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package s3storage

import (
"io/fs"
"strings"
)

func (l *S3Storage) FileExists(filename string) bool {
return false
}

func (l *S3Storage) ConfigPath(filename string) string {
return CONFIG_PATH + "/" + strings.TrimLeft(filename, "/")
}

func (s *S3Storage) GetPath() string {
return s.prefix
}

func (l *S3Storage) EnsurePath(pathname string) error {
return nil
}

func (l *S3Storage) EnsureOwnership(filename, login string) error {
return nil
}

func (l *S3Storage) Remove(name string) error {
return nil
}

func (l *S3Storage) Rename(oldName, newName string) error {
return nil
}

func (l *S3Storage) EnsurePermissions(name string, mode fs.FileMode) error {
return nil
}

func (l *S3Storage) FileInfo(name string) (fs.FileInfo, error) {
return nil, nil
}
17 changes: 17 additions & 0 deletions pkg/storage/s3/read.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package s3storage

import (
"io"
)

func (l *S3Storage) ReadFile(name string) ([]byte, error) {
return nil, nil
}

func (l *S3Storage) OpenFilesFromPos(names []string, pos int64) ([]io.ReadCloser, error) {
return nil, nil
}

func (l *S3Storage) OpenFile(name string) (io.ReadCloser, error) {
return nil, nil
}
11 changes: 11 additions & 0 deletions pkg/storage/s3/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package s3storage

import "github.com/aws/aws-sdk-go-v2/service/s3"

const CONFIG_PATH = "config"

type S3Storage struct {
bucketname string
prefix string
s3Client *s3.Client
}
35 changes: 35 additions & 0 deletions pkg/storage/s3/write.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package s3storage

import (
"bytes"
"context"
"fmt"
"io"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
)

func (s *S3Storage) WriteFile(name string, data []byte) error {
_, err := s.s3Client.PutObject(context.TODO(), &s3.PutObjectInput{
Bucket: aws.String(s.bucketname),
Key: aws.String(name),
Body: bytes.NewReader(data),
})
if err != nil {
return fmt.Errorf("put object error: %s", err)
}
return nil
}

func (s *S3Storage) AppendFile(name string, data []byte) error {
return nil
}

func (s *S3Storage) OpenFileForWriting(name string) (io.WriteCloser, error) {
return nil, nil
}

func (s *S3Storage) OpenFileForAppending(name string) (io.WriteCloser, error) {
return nil, nil
}
4 changes: 2 additions & 2 deletions webapp/.gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Logs
logs
/logs
*.log
npm-debug.log*
yarn-debug.log*
@@ -129,4 +129,4 @@ dist
.yarn/install-state.gz
.pnp.*

.DS_Store
.DS_Store
37 changes: 35 additions & 2 deletions webapp/src/App.tsx
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ import { Profile } from "./Routes/Profile/Profile";
import { Upgrade } from "./Routes/Upgrade/Upgrade";
import { GetMoreLicenses } from "./Routes/Licenses/GetMoreLicenses";
import { PacketLogs } from "./Routes/PacketLogs/PacketLogs";
import { Logs } from "./Routes/Logs/Logs";

const queryClient = new QueryClient()

@@ -27,7 +28,7 @@ export default function App() {
return <MantineProvider theme={theme} forceColorScheme="light">
<QueryClientProvider client={queryClient}>
<BrowserRouter>
<AppInit>
<AppInit serverType="vpn">
<Auth>
<AppShell
navbar={{
@@ -38,7 +39,7 @@ export default function App() {
padding="md"
>
<AppShell.Navbar>
<NavBar />
<NavBar serverType="vpn" />
</AppShell.Navbar>
<AppShell.Main>
<Routes>
@@ -60,6 +61,38 @@ export default function App() {
</AppShell>
</Auth>
</AppInit>
<AppInit serverType="observability">
<Auth>
<AppShell
navbar={{
width: 300,
breakpoint: 'sm',
collapsed: { mobile: opened },
}}
padding="md"
>
<AppShell.Navbar>
<NavBar serverType="observability" />
</AppShell.Navbar>
<AppShell.Main>
<Routes>
<Route path="/" element={<Home />} />
<Route path="/users" element={<CheckRole role="admin"><Users /></CheckRole>} />
<Route path="/setup" element={<CheckRole role="admin"><Setup /></CheckRole>} />
<Route path="/setup/:page" element={<CheckRole role="admin"><Setup /></CheckRole>} />
<Route path="/auth-setup" element={<CheckRole role="admin"><AuthSetup /></CheckRole>} />
<Route path="/upgrade" element={<CheckRole role="admin"><Upgrade /></CheckRole>} />
<Route path="/licenses" element={<CheckRole role="admin"><GetMoreLicenses /></CheckRole>} />
<Route path="/logs" element={<Logs />} />
<Route path="/logout" element={<Logout />} />
<Route path="/login/:logintype/:id" element={<Navigate to={"/"} />} />
<Route path="/callback/:callbacktype/:id" element={<Navigate to={"/"} />} />
<Route path="/profile" element={<Profile />} />
</Routes>
</AppShell.Main>
</AppShell>
</Auth>
</AppInit>
</BrowserRouter>
</QueryClientProvider>
</MantineProvider>;
14 changes: 10 additions & 4 deletions webapp/src/AppInit/AppInit.tsx
Original file line number Diff line number Diff line change
@@ -6,11 +6,13 @@ import React, { useState } from 'react';
import { SetupBanner } from './SetupBanner';
import { AppSettings } from '../Constants/Constants';

type Props = {
children?: React.ReactNode
};

export const AppInit: React.FC<Props> = ({children}) => {
type Props = {
children?: React.ReactNode
serverType: string
};

export const AppInit: React.FC<Props> = ({children, serverType}) => {
const [setupCompleted, setSetupCompleted] = useState<boolean>(false);
const { isPending, error, data } = useQuery({
queryKey: ['context'],
@@ -22,6 +24,10 @@ import { AppSettings } from '../Constants/Constants';
if (isPending) return ''
if (error) return 'An backend error has occurred: ' + error.message

if (data.serverType !== serverType) {
return ''
}

if(!setupCompleted && data.setupCompleted) {
setSetupCompleted(true)
}
71 changes: 56 additions & 15 deletions webapp/src/NavBar/NavBar.tsx
Original file line number Diff line number Diff line change
@@ -18,25 +18,66 @@ import { NavLink, useLocation } from 'react-router-dom';
import { useAuthContext } from '../Auth/Auth';
import { Version } from './Version';

export function NavBar() {
type Props = {
serverType: string
};


export function NavBar({serverType}: Props) {
const {authInfo} = useAuthContext();
const location = useLocation();
const { pathname } = location;
const [active, setActive] = useState(pathname);

const data = authInfo.role === "admin" ? [
{ link: '/', label: 'Status', icon: TbBellRinging },
{ link: '/connection', label: 'VPN Connections', icon: TbPlugConnected },
{ link: '/users', label: 'Users', icon: TbUser },
{ link: '/setup', label: 'VPN Setup', icon: TbSettings },
{ link: '/auth-setup', label: 'Authentication & Provisioning', icon: TbCloudDataConnection },
{ link: '/packetlogs', label: 'Logging', icon: FaStream },
{ link: 'https://vpn-documentation.in4it.com', label: 'Documentation', icon: TbBook },
] :
[
{ link: '/connection', label: 'VPN Connections', icon: TbPlugConnected },
{ link: 'https://vpn-documentation.in4it.com', label: 'Documentation', icon: TbBook },
];
const vpnLinks = {
"admin": [
{ link: '/', label: 'Status', icon: TbBellRinging },
{ link: '/connection', label: 'VPN Connections', icon: TbPlugConnected },
{ link: '/users', label: 'Users', icon: TbUser },
{ link: '/setup', label: 'VPN Setup', icon: TbSettings },
{ link: '/auth-setup', label: 'Authentication & Provisioning', icon: TbCloudDataConnection },
{ link: '/packetlogs', label: 'Logging', icon: FaStream },
{ link: 'https://vpn-documentation.in4it.com', label: 'Documentation', icon: TbBook },
],
"user": [
{ link: '/connection', label: 'VPN Connections', icon: TbPlugConnected },
{ link: 'https://vpn-documentation.in4it.com', label: 'Documentation', icon: TbBook },
]
}
const observabilityLinks = {
"admin": [
{ link: '/', label: 'Status', icon: TbBellRinging },
{ link: '/logs', label: 'Logs', icon: FaStream },
{ link: '/users', label: 'Users', icon: TbUser },
{ link: '/setup', label: 'Setup', icon: TbSettings },
{ link: '/auth-setup', label: 'Authentication & Provisioning', icon: TbCloudDataConnection },
{ link: 'https://vpn-documentation.in4it.com', label: 'Documentation', icon: TbBook },
],
"user": [
{ link: '/logs', label: 'Logs', icon: FaStream },
{ link: 'https://vpn-documentation.in4it.com', label: 'Documentation', icon: TbBook },
]
}

const getData = () => {
if(serverType === "vpn") {
if (authInfo.role === "admin" ) {
return vpnLinks.admin
} else {
return vpnLinks.user
}
}
if(serverType === "observability") {
if (authInfo.role === "admin" ) {
return observabilityLinks.admin
} else {
return observabilityLinks.user
}
}
return []
}

const data = getData()

const links = data.map((item) => (
<NavLink
@@ -58,7 +99,7 @@ export function NavBar() {
<nav className={classes.navbar}>
<div className={classes.navbarMain}>
<Group className={classes.header} justify="space-between">
VPN Server
{serverType === "vpn" ? "VPN Server" : "Observability Server"}
<Code fw={700}><Version /></Code>
</Group>
{links}
210 changes: 210 additions & 0 deletions webapp/src/Routes/Logs/Logs.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
import { Card, Container, Text, Table, Title, Button, Grid, Popover, Group, TextInput, rem, ActionIcon, Checkbox, Highlight, MultiSelect} from "@mantine/core";
import { AppSettings } from "../../Constants/Constants";
import { useInfiniteQuery } from "@tanstack/react-query";
import { useAuthContext } from "../../Auth/Auth";
import { Link, useSearchParams } from "react-router-dom";
import { TbArrowRight, TbSearch, TbSettings } from "react-icons/tb";
import { DatePickerInput } from "@mantine/dates";
import { useEffect, useState } from "react";
import React from "react";

type LogsDataResponse = {
enabled: boolean;
logEntries: LogEntry[];
environments: string[];
nextPos: number;
tags: Tags[];
}
type LogEntry = {
data: string;
timestamp: string;
tags: Tag[];
}
type Tags = {
key: string;
value: string;
total: number;
}
type Tag = {
key: string;
value: string;
}

function getDate(date:Date) {
var dd = String(date.getDate()).padStart(2, '0');
var mm = String(date.getMonth() + 1).padStart(2, '0'); //January is 0!
var yyyy = date.getFullYear();
return yyyy + "-" + mm + '-' + dd;
}

export function Logs() {
const {authInfo} = useAuthContext();
const timezoneOffset = new Date().getTimezoneOffset() * -1
const [currentQueryParameters] = useSearchParams();
const dateParam = currentQueryParameters.get("date")
const [tags, setTags] = useState<Tag[]>([])
const [search, setSearch] = useState<string>("")
const [searchParam, setSearchParam] = useState<string>("")
const [columns, setColumns] = useState<string[]>([])
const [logsDate, setLogsDate] = useState<Date | null>(dateParam === null ? new Date() : new Date(dateParam));
const { isPending, fetchNextPage, hasNextPage, error, data } = useInfiniteQuery<LogsDataResponse>({
queryKey: ['logs', logsDate, tags, columns, searchParam],
queryFn: async ({ pageParam }) =>
fetch(AppSettings.url + '/observability/logs?display-tags='+encodeURIComponent(columns.join(","))+'&fromDate='+(logsDate == undefined ? getDate(new Date()) : getDate(logsDate)) + '&endDate='+(logsDate == undefined ? getDate(new Date()) : getDate(logsDate)) + "&pos="+pageParam+"&offset="+timezoneOffset+"&filter-tags="+encodeURIComponent(tags.map(t => t.key + "=" + t.value).join(","))+"&search="+encodeURIComponent(searchParam), {
headers: {
"Content-Type": "application/json",
"Authorization": "Bearer " + authInfo.token
},
}).then((res) => {
return res.json()
}
),
initialPageParam: 0,
getNextPageParam: (lastRequest) => lastRequest.nextPos === -1 ? null : lastRequest.nextPos,
})

const captureEnter = (e: React.KeyboardEvent<HTMLDivElement>) => {
if (e.key === "Enter") {
setSearchParam(search)
}
}

useEffect(() => {
const handleScroll = () => {
const { scrollTop, clientHeight, scrollHeight } =
document.documentElement;
if (scrollTop + clientHeight >= scrollHeight - 20) {
fetchNextPage();
}
};

window.addEventListener("scroll", handleScroll);
return () => {
window.removeEventListener("scroll", handleScroll);
};
}, [fetchNextPage])


if(error) return 'A backend error has occurred: ' + error.message

const rows = isPending ? [] : data.pages.map((group, groupIndex) => (
<React.Fragment key={groupIndex}>
{group.logEntries.map((row, i) => (
<Table.Tr key={i}>
<Table.Td>{row.timestamp}</Table.Td>
{columns.map(function(column){
return <Table.Td>{row.tags.filter((tag) => tag.key === column).map((tag => { return tag.value }))}</Table.Td>;
})}
<Table.Td>{searchParam === "" ? row.data : <Highlight color="lime" highlight={searchParam}>{row.data}</Highlight>}</Table.Td>
</Table.Tr>
))}
</React.Fragment>
));
return (
<Container my={40} size="80rem">
<Title ta="center" style={{marginBottom: 20}}>
Logs
</Title>
<Grid>
<Grid.Col span={4}>
<TextInput
placeholder="Search..."
rightSectionWidth={30}
size="xs"
leftSection={<TbSearch style={{ width: rem(18), height: rem(18) }} />}
rightSection={
<ActionIcon size={18} radius="xl" variant="filled" onClick={() => setSearchParam(search)}>
<TbArrowRight style={{ width: rem(14), height: rem(14) }} />
</ActionIcon>
}
onKeyDown={(e) => captureEnter(e)}
onChange={(e) => setSearch(e.currentTarget.value)}
value={search}
/>
</Grid.Col>
<Grid.Col span={4}>
<DatePickerInput
value={logsDate}
onChange={setLogsDate}
size="xs"
/>
</Grid.Col>
<Grid.Col span={2}>


</Grid.Col>
<Grid.Col span={2}>
<Group>
<Popover width={300} position="bottom" withArrow shadow="md">
<Popover.Target>
<Button variant="default" size="xs">Columns</Button>
</Popover.Target>
<Popover.Dropdown>
{data?.pages[0].tags
.filter((element, i) => {
if(i === 0 || element.key !== data?.pages[0].tags[i-1].key) {
return true
} else {
return false
}
})
.map((element) => {
return (
<Checkbox
key={element.key}
label={element.key}
radius="xs"
size="xs"
style={{marginBottom: 3}}
onChange={(event) => event.currentTarget.checked ? setColumns([...columns, element.key]) : setColumns(columns.filter((column) => { return column !== element.key } ))}
checked={columns.some((column) => column === element.key)}
/>
)
})}
</Popover.Dropdown>
</Popover>
<Popover width={300} position="bottom" withArrow shadow="md">
<Popover.Target>
<Button variant="default" size="xs">Filter</Button>
</Popover.Target>
<Popover.Dropdown>
{data?.pages[0].tags.map((element) => {
return (
<Checkbox
key={element.key +"="+element.value}
label={element.key + " = " + element.value.substring(0, 10) + (element.value.length > 10 ? "..." : "") + " (" + element.total + ")"}
radius="xs"
size="xs"
style={{marginBottom: 3}}
onChange={(event) => event.currentTarget.checked ? setTags([...tags, {key: element.key, value: element.value }]) : setTags(tags.filter((tag) => { return tag.key !== element.key || tag.value !== element.value } ))}
checked={tags.some((tag) => tag.key === element.key && tag.value === element.value)}
/>
)
})}
</Popover.Dropdown>
</Popover>
</Group>
</Grid.Col>
</Grid>
<Table>
<Table.Thead>
<Table.Tr key="heading">
<Table.Th>Timestamp</Table.Th>
{columns.map(function(column){
return <Table.Th>{column}</Table.Th>;
})}
<Table.Th>Log</Table.Th>
</Table.Tr>
</Table.Thead>
<Table.Tbody>
{rows}
</Table.Tbody>
</Table>
<Group justify="center">
{hasNextPage ? <Button onClick={() => fetchNextPage()} variant="default">Loading more...</Button> : null}
</Group>

</Container>

)
}

0 comments on commit fd9790b

Please sign in to comment.