Skip to content

Commit

Permalink
feat: reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
maeb committed Jan 12, 2024
1 parent 6ebe581 commit c34d0b9
Show file tree
Hide file tree
Showing 16 changed files with 874 additions and 42 deletions.
3 changes: 3 additions & 0 deletions cmd/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func serveCmd(_ *cobra.Command, _ []string) error {
var fileApi index.FileAPI
var cdxApi index.CdxAPI
var idApi index.IdAPI
var reportApi index.ReportAPI
var storageRefResolver loader.StorageRefResolver
var filePathResolver loader.FilePathResolver

Expand Down Expand Up @@ -160,6 +161,7 @@ func serveCmd(_ *cobra.Command, _ []string) error {
cdxApi = db
fileApi = db
idApi = db
reportApi = db
default:
return fmt.Errorf("unknown index format: %s", indexFormat)
}
Expand Down Expand Up @@ -240,6 +242,7 @@ func serveCmd(_ *cobra.Command, _ []string) error {
CdxAPI: cdxApi,
FileAPI: fileApi,
IdAPI: idApi,
ReportAPI: reportApi,
StorageRefResolver: storageRefResolver,
WarcLoader: l,
}, handler, mw, pathPrefix)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ toolchain go1.21.3
require (
github.com/bits-and-blooms/bloom/v3 v3.6.0
github.com/dgraph-io/badger/v4 v4.2.0
github.com/google/uuid v1.5.0
github.com/gorilla/handlers v1.5.2
github.com/julienschmidt/httprouter v1.3.0
github.com/nlnwa/gowarc v1.1.2
Expand Down Expand Up @@ -39,7 +40,6 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/google/flatbuffers v23.5.26+incompatible // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
Expand Down
13 changes: 13 additions & 0 deletions index/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,16 @@ type IdResponse interface {
GetValue() string
GetError() error
}

type ReportResponse interface {
GetReport() *schema.Report
GetError() error
}

type ReportAPI interface {
Generate(context.Context, Request) (*schema.Report, error)
ListReports(context.Context, Request, chan<- ReportResponse) error
GetReport(context.Context, string) (*schema.Report, error)
CancelReport(context.Context, string) error
DeleteReport(context.Context, string) error
}
6 changes: 3 additions & 3 deletions internal/badgeridx/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (db *DB) closest(ctx context.Context, request index.Request, results chan<-
if err != nil {
cdxResponse = keyvalue.CdxResponse{Error: err}
} else if request.Filter().Eval(cdx) {
cdxResponse = keyvalue.CdxResponse{Cdx: cdx}
cdxResponse = keyvalue.CdxResponse{Value: cdx}
} else {
iter.Next()
continue
Expand Down Expand Up @@ -194,8 +194,8 @@ func (db *DB) search(ctx context.Context, req index.Request, results chan<- inde
}
if filter.Eval(result) {
cdxResponse = &keyvalue.CdxResponse{
Key: key,
Cdx: result,
Key: key,
Value: result,
}
}
return nil
Expand Down
78 changes: 69 additions & 9 deletions internal/keyvalue/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package keyvalue

import (
"bytes"
"strings"
"time"

"github.com/nlnwa/gowarcserver/index"
Expand Down Expand Up @@ -71,11 +72,35 @@ func MarshalFileInfo(fileInfo *schema.FileInfo, prefix string) (key []byte, valu
type CdxKey []byte

var spaceCharacter = []byte{32}
var colon = []byte{58}
var colonCharacter = []byte{58}
var slashCharacter = []byte{47}

func (ck CdxKey) Host() string {
func (ck CdxKey) Path() string {
b := bytes.Split(ck, spaceCharacter)[0]
return string(b)
i := bytes.Index(b, slashCharacter)
if i == -1 {
return ""
}
return string(b[i:])
}

func (ck CdxKey) Domain() string {
b := bytes.Split(ck, spaceCharacter)[0]
b = bytes.Split(b, slashCharacter)[0]
s := string(b)
var sb strings.Builder
sb.Grow(len(s))
t := strings.Split(s, ",")
for i := len(t) - 1; i >= 0; i-- {
if t[i] == "" {
continue
}
sb.WriteString(t[i])
if i > 0 {
sb.WriteByte('.')
}
}
return sb.String()
}

func (ck CdxKey) Time() (time.Time, error) {
Expand All @@ -96,17 +121,17 @@ func (ck CdxKey) SchemeAndUserInfo() string {

func (ck CdxKey) Port() string {
b := bytes.Split(ck, spaceCharacter)[2]
return string(bytes.Split(b, colon)[0])
return string(bytes.Split(b, colonCharacter)[0])
}

func (ck CdxKey) Scheme() string {
b := bytes.Split(ck, spaceCharacter)[2]
return string(bytes.Split(b, colon)[1])
return string(bytes.Split(b, colonCharacter)[1])
}

func (ck CdxKey) UserInfo() string {
b := bytes.Split(ck, spaceCharacter)[2]
return string(bytes.Split(b, colon)[2])
return string(bytes.Split(b, colonCharacter)[2])
}

type IdResponse struct {
Expand All @@ -128,13 +153,13 @@ func (ir IdResponse) GetError() error {
}

type CdxResponse struct {
Key []byte
Cdx *schema.Cdx
Key CdxKey
Value *schema.Cdx
Error error
}

func (cr CdxResponse) GetCdx() *schema.Cdx {
return cr.Cdx
return cr.Value
}

func (cr CdxResponse) GetError() error {
Expand All @@ -153,3 +178,38 @@ func (fir FileInfoResponse) GetFileInfo() *schema.FileInfo {
func (fir FileInfoResponse) GetError() error {
return fir.Error
}

type ReportResponse struct {
Value *schema.Report
Error error
}

func (rr ReportResponse) GetReport() *schema.Report {
return rr.Value
}

func (rr ReportResponse) GetError() error {
return rr.Error
}

type ReportData struct {
NrOfRecords int `json:"nrOfRecords"`
NrOfTargets int `json:"nrOfTargets"`
NrOfTargetCaptures int `json:"nrOfTargetCaptures"`
NrOfDomainsOrHosts int `json:"nrOfDomainsOrHosts"`
NrOfUrls int `json:"nrOfUrls"`
CountByStatusCode map[int32]int `json:"countByStatusCode"`
CountByRecordType map[string]int `json:"countByRecordType"`
CountByContentType map[string]int `json:"countByContentType"`
ContentLength int64 `json:"contentLength"`
PayloadLength int64 `json:"payloadLength"`
RecordLength int64 `json:"recordLength"`
}

func NewRecordData() *ReportData {
return &ReportData{
CountByStatusCode: make(map[int32]int),
CountByRecordType: make(map[string]int),
CountByContentType: make(map[string]int),
}
}
16 changes: 14 additions & 2 deletions internal/tikvidx/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ func (db *DB) Search(ctx context.Context, req index.Request, res chan<- index.Cd
return &keyvalue.CdxResponse{Error: err}
} else if req.Filter().Eval(cdx) {
return &keyvalue.CdxResponse{
Key: cdxKey,
Cdx: cdx,
Key: cdxKey,
Value: cdx,
}
}
return nil
Expand Down Expand Up @@ -254,5 +254,17 @@ func (db *DB) Delete(ctx context.Context) error {
firstErr = err
}

reportKey := keyvalue.KeyWithPrefix("", reportPrefix)
err = db.client.DeleteRange(ctx, reportKey, append(reportKey, 0xff))
if err != nil && firstErr == nil {
firstErr = err
}

reportDataKey := keyvalue.KeyWithPrefix("", reportDataPrefix)
err = db.client.DeleteRange(ctx, reportDataKey, append(reportDataKey, 0xff))
if err != nil && firstErr == nil {
firstErr = err
}

return firstErr
}
13 changes: 10 additions & 3 deletions internal/tikvidx/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ import (
)

var (
idPrefix = "i"
filePrefix = "f"
cdxPrefix = "c"
idPrefix = "i"
filePrefix = "f"
cdxPrefix = "c"
reportPrefix = "r_"
reportDataPrefix = "rd"
)

const delimiter = "_"
Expand All @@ -46,6 +48,7 @@ type DB struct {
batch chan index.Record
done chan struct{}
wg sync.WaitGroup
tasks map[string]context.CancelFunc
}

func NewDB(options ...Option) (db *DB, err error) {
Expand All @@ -61,6 +64,7 @@ func NewDB(options ...Option) (db *DB, err error) {
idPrefix = dbName + delimiter + idPrefix + delimiter
filePrefix = dbName + delimiter + filePrefix + delimiter
cdxPrefix = dbName + delimiter + cdxPrefix + delimiter
reportPrefix = dbName + delimiter + reportPrefix + delimiter

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
Expand All @@ -75,6 +79,7 @@ func NewDB(options ...Option) (db *DB, err error) {
db = &DB{
client: client,
done: done,
tasks: make(map[string]context.CancelFunc),
}

if opts.ReadOnly {
Expand Down Expand Up @@ -105,6 +110,8 @@ func NewDB(options ...Option) (db *DB, err error) {

// Close stops the batch workers and closes the index databases.
func (db *DB) Close() {
// TODO abort running report generation tasks

close(db.done)
db.wg.Wait()
_ = db.client.Close()
Expand Down
4 changes: 0 additions & 4 deletions internal/tikvidx/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ type closestIter struct {
valid bool
cmp func(int64, int64) bool
done chan struct{}
limit int
}

func newClosestIter(ctx context.Context, client *rawkv.Client, req index.Request) (iterator, error) {
Expand Down Expand Up @@ -123,7 +122,6 @@ func newClosestIter(ctx context.Context, client *rawkv.Client, req index.Request
forward: forwardChannel,
backward: backwardChannel,
done: done,
limit: req.Limit(),
}

return iter, iter.Next()
Expand Down Expand Up @@ -200,7 +198,6 @@ type iter struct {
valid bool
next <-chan maybeKV
done chan<- struct{}
limit int
}

func newIter(ctx context.Context, key []byte, client *rawkv.Client, req index.Request) (iterator, error) {
Expand Down Expand Up @@ -230,7 +227,6 @@ func newIter(ctx context.Context, key []byte, client *rawkv.Client, req index.Re
is := &iter{
next: result,
done: done,
limit: req.Limit(),
}

return is, is.Next()
Expand Down
Loading

0 comments on commit c34d0b9

Please sign in to comment.