Skip to content

Commit

Permalink
Add flow-db basic unit-test
Browse files Browse the repository at this point in the history
Signed-off-by: Mohamed Mahmoud <[email protected]>
  • Loading branch information
msherif1234 committed Apr 2, 2024
1 parent 7f947b6 commit e0951d8
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 15 deletions.
7 changes: 5 additions & 2 deletions cmd/flow-capture.go → cmd/flow_capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func runFlowCaptureOnAddr(port int, filename string) {
log.Fatal(err)
}
// Initialize sqlite DB
db := initFLowDB(filename)
db := initFlowDB(filename)
go func() {
<-utils.ExitChannel()
close(flowPackets)
Expand All @@ -102,7 +102,10 @@ func runFlowCaptureOnAddr(port int, filename string) {
}()
for fp := range flowPackets {
// Write flows to sqlite DB
queryFlowDB(fp.GenericMap.Value, db)
err = queryFlowDB(fp.GenericMap.Value, db)
if err != nil {
log.Error("Error while writing to DB:", err.Error())
}
go manageFlowsDisplay(fp.GenericMap.Value)
// append new line between each record to read file easilly
_, err = f.Write(append(fp.GenericMap.Value, []byte(",\n")...))
Expand Down
25 changes: 12 additions & 13 deletions cmd/flow-db.go → cmd/flow_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package cmd
import (
"database/sql"
"encoding/json"
"fmt"
"os"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
// need to import the sqlite3 driver
_ "github.com/mattn/go-sqlite3"
)

func initFLowDB(filename string) *sql.DB {
func initFlowDB(filename string) *sql.DB {
// SQLite is a file based database.
flowsDB := "./output/flow/" + filename + ".db"

Expand Down Expand Up @@ -38,8 +39,8 @@ func initFLowDB(filename string) *sql.DB {
return db
}

func queryFlowDB(fp []byte, db *sql.DB) {
insertFlowToDB(db, fp)
func queryFlowDB(fp []byte, db *sql.DB) error {
return insertFlowToDB(db, fp)
}

func createFlowsDBTable(db *sql.DB) error {
Expand Down Expand Up @@ -85,14 +86,13 @@ func createFlowsDBTable(db *sql.DB) error {
return nil
}

func insertFlowToDB(db *sql.DB, buf []byte) {
func insertFlowToDB(db *sql.DB, buf []byte) error {
flow := config.GenericMap{}

// Unmarshal the JSON string into the flow object
err := json.Unmarshal(buf, &flow)
if err != nil {
log.Errorf("Error: %s", err)
return
return fmt.Errorf("error: %w", err)
}
// Insert message into database
var flowSQL string
Expand All @@ -113,8 +113,7 @@ func insertFlowToDB(db *sql.DB, buf []byte) {
statement, err := db.Prepare(flowSQL) // Prepare statement.
// This is good to avoid SQL injections
if err != nil {
log.Errorf("Error preparing SQL: %v", err.Error())
return
return fmt.Errorf("error preparing SQL: %v", err.Error())
}

if flow["PktDropLatestDropCause"] != 0 && flow["DnsId"] != 0 {
Expand Down Expand Up @@ -143,9 +142,9 @@ func insertFlowToDB(db *sql.DB, buf []byte) {
flow["TimeFlowRttNs"])
}
if err != nil {
log.Errorf("Error inserting into database: %v", err.Error())
return
return fmt.Errorf("error inserting into database: %v", err.Error())
}
return nil
}

func QueryFlowsDB(query, fileName string) ([]string, error) {
Expand All @@ -163,7 +162,7 @@ func QueryFlowsDB(query, fileName string) ([]string, error) {
func queryDB(db *sql.DB, query string) ([]string, error) {
rows, err := db.Query(query)
if err != nil {
return nil, err
return nil, fmt.Errorf("error querying database: %v", err.Error())
}
defer rows.Close()

Expand All @@ -172,13 +171,13 @@ func queryDB(db *sql.DB, query string) ([]string, error) {
for rows.Next() {
var message string
if err := rows.Scan(&message); err != nil {
return nil, err
return nil, fmt.Errorf("error scanning row: %v", err.Error())
}
result = append(result, message)
}

if err := rows.Err(); err != nil {
return nil, err
return nil, fmt.Errorf("error iterating rows: %v", err.Error())
}

return result, nil
Expand Down
52 changes: 52 additions & 0 deletions cmd/flow_db_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package cmd

import (
"os"
"testing"
)

func init() {
err := os.MkdirAll("./output/flow", 0700)
if err != nil {
panic(err)
}
}

func TestInitFlowDB(t *testing.T) {
db := initFlowDB("test")
if db == nil {
t.Error("Expected database to initialize successfully")
}
defer os.RemoveAll("./output")
err := createFlowsDBTable(db)
if err != nil {
t.Error("Unexpected error creating flows table")
}

// Test success case
// {"Bytes":32}
bs := []byte{123, 34, 66, 121, 116, 101, 115, 34, 58, 51, 50, 125}
err = insertFlowToDB(db, bs)
if err != nil {
t.Errorf("Unexpected error inserting flow: %v", err)
}

q, err := queryDB(db, "SELECT Bytes FROM flow")
if err != nil {
t.Error("Unexpected error querying flow")
}
if len(q) != 1 {
t.Error("Expected 1 row in flow table")
}

if q[0] != "32" {
t.Error("Unexpected result from query")
}

// Test DB error case
db.Close()
err = insertFlowToDB(db, []byte("1"))
if err == nil {
t.Error("Expected error for closed DB")
}
}
File renamed without changes.
File renamed without changes.

0 comments on commit e0951d8

Please sign in to comment.