Skip to content

Commit

Permalink
feat: reporting
Browse files Browse the repository at this point in the history
This commit implements reporting.
  • Loading branch information
maeb committed Jan 14, 2024
1 parent fcf3a08 commit ed7d2e7
Show file tree
Hide file tree
Showing 30 changed files with 1,764 additions and 462 deletions.
23 changes: 11 additions & 12 deletions cmd/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func serveCmd(_ *cobra.Command, _ []string) error {
var fileApi index.FileAPI
var cdxApi index.CdxAPI
var idApi index.IdAPI
var reportApi index.ReportAPI
var storageRefResolver loader.StorageRefResolver
var filePathResolver loader.FilePathResolver

Expand All @@ -142,6 +143,7 @@ func serveCmd(_ *cobra.Command, _ []string) error {
cdxApi = db
fileApi = db
idApi = db
reportApi = db
case "tikv":
db, err := tikvidx.NewDB(
tikvidx.WithPDAddress(viper.GetStringSlice("tikv-pd-addr")),
Expand All @@ -160,12 +162,13 @@ func serveCmd(_ *cobra.Command, _ []string) error {
cdxApi = db
fileApi = db
idApi = db
reportApi = db
default:
return fmt.Errorf("unknown index format: %s", indexFormat)
}

ctx, cancelIndexer := context.WithCancel(context.Background())
defer cancelIndexer()
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()

indexSource := viper.GetString("index-source")
if indexSource != "" {
Expand All @@ -176,7 +179,7 @@ func serveCmd(_ *cobra.Command, _ []string) error {
queue := index.NewWorkQueue(indexer,
viper.GetInt("index-workers"),
)
defer queue.Close()
defer queue.Wait()

var runner index.Runner
switch indexSource {
Expand Down Expand Up @@ -240,6 +243,7 @@ func serveCmd(_ *cobra.Command, _ []string) error {
CdxAPI: cdxApi,
FileAPI: fileApi,
IdAPI: idApi,
ReportAPI: reportApi,
StorageRefResolver: storageRefResolver,
WarcLoader: l,
}, handler, mw, pathPrefix)
Expand All @@ -251,12 +255,7 @@ func serveCmd(_ *cobra.Command, _ []string) error {
}

go func() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigs
log.Info().Msgf("Received %s signal, shutting down...", sig)

cancelIndexer()
<-ctx.Done()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand All @@ -270,8 +269,8 @@ func serveCmd(_ *cobra.Command, _ []string) error {
log.Info().Msgf("Starting server at :%v", port)

err := httpServer.ListenAndServe()
if errors.Is(err, http.ErrServerClosed) {
return nil
if err != nil && !errors.Is(err, http.ErrServerClosed) {
return err
}
return err
return nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ toolchain go1.21.3
require (
github.com/bits-and-blooms/bloom/v3 v3.6.0
github.com/dgraph-io/badger/v4 v4.2.0
github.com/google/uuid v1.5.0
github.com/gorilla/handlers v1.5.2
github.com/julienschmidt/httprouter v1.3.0
github.com/nlnwa/gowarc v1.1.2
Expand Down Expand Up @@ -39,7 +40,6 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/google/flatbuffers v23.5.26+incompatible // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
Expand Down
22 changes: 21 additions & 1 deletion index/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ const (
)

type Request interface {
Uri() *url.Url
Url() *url.Url
Ssurt() string
Sort() Sort
DateRange() DateRange
Expand Down Expand Up @@ -106,3 +106,23 @@ type IdResponse interface {
GetValue() string
GetError() error
}

type ReportResponse interface {
GetReport() *schema.Report
GetError() error
}

type ReportAPI interface {
CreateReport(context.Context, Request) (*schema.Report, error)
ListReports(context.Context, Request, chan<- ReportResponse) error
GetReport(context.Context, string) (*schema.Report, error)
CancelReport(context.Context, string) error
DeleteReport(context.Context, string) error
}

type ReportGenerator interface {
CdxAPI
AddTask(string, context.CancelFunc)
DeleteTask(string)
SaveReport(context.Context, *schema.Report) error
}
48 changes: 34 additions & 14 deletions index/autoindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,38 +53,52 @@ func WithExcludeDirs(res ...*regexp.Regexp) AutoIndexOption {

type Queue interface {
Add(path string)
Close()
}

type AutoIndexer struct {
Queue
opts *AutoIndexOptions
queue Queue
opts *AutoIndexOptions
done <-chan struct{}
}

func NewAutoIndexer(s Queue, options ...AutoIndexOption) AutoIndexer {
func NewAutoIndexer(queue Queue, options ...AutoIndexOption) AutoIndexer {
opts := new(AutoIndexOptions)
for _, apply := range options {
apply(opts)
}

return AutoIndexer{
Queue: s,
queue: queue,
opts: opts,
}
}

func (a AutoIndexer) Run(ctx context.Context) error {
a.done = ctx.Done()
defer a.queue.Close()

for _, path := range a.opts.Paths {
err := a.index(ctx.Done(), path)
err := a.index(path)
if err != nil {
log.Warn().Msgf(`Error indexing "%s": %v`, path, err)
}
}
return nil
}

func (a AutoIndexer) index(done <-chan struct{}, path string) error {
func (a AutoIndexer) enqueue(path string) {
select {
case <-a.done:
return
default:
a.queue.Add(path)
}
}

func (a AutoIndexer) index(path string) error {
select {
case <-done:
case <-a.done:
return nil
default:
}
Expand All @@ -93,16 +107,22 @@ func (a AutoIndexer) index(done <-chan struct{}, path string) error {
return fmt.Errorf("failed to get file info: %w", err)
}
if info.IsDir() {
if err := a.walk(done, path, 0); err != nil {
if err := a.walk(path, 0); err != nil {
return err
}
} else {
a.Add(path)
return nil
}
a.enqueue(path)

return nil
}

func (a AutoIndexer) walk(done <-chan struct{}, dir string, currentDepth int) error {
func (a AutoIndexer) walk(dir string, currentDepth int) error {
select {
case <-a.done:
return nil
default:
}
if a.opts.isExcluded(dir) {
return nil
}
Expand All @@ -112,15 +132,15 @@ func (a AutoIndexer) walk(done <-chan struct{}, dir string, currentDepth int) er
}
for _, entry := range entries {
select {
case <-done:
case <-a.done:
return nil
default:
}
path := filepath.Join(dir, entry.Name())
if !entry.IsDir() {
a.Queue.Add(path)
a.enqueue(path)
} else if currentDepth < a.opts.MaxDepth {
err = a.walk(done, path, currentDepth+1)
err = a.walk(path, currentDepth+1)
if err != nil {
return err
}
Expand Down
9 changes: 5 additions & 4 deletions index/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@ func WithMaxWait(maxWait time.Duration) KafkaIndexOption {

type KafkaIndexer struct {
kafka.ReaderConfig
Queue
queue Queue
}

func (k KafkaIndexer) Run(ctx context.Context) (err error) {
defer k.queue.Close()
defer func() {
r := recover()
switch v := r.(type) {
Expand All @@ -88,17 +89,17 @@ func (k KafkaIndexer) Run(ctx context.Context) (err error) {
if err != nil {
return err
}
k.Add(string(msg.Value))
k.queue.Add(string(msg.Value))
}
}

func NewKafkaIndexer(q Queue, options ...KafkaIndexOption) KafkaIndexer {
func NewKafkaIndexer(queue Queue, options ...KafkaIndexOption) KafkaIndexer {
readerConfig := new(kafka.ReaderConfig)
for _, apply := range options {
apply(readerConfig)
}
return KafkaIndexer{
ReaderConfig: *readerConfig,
Queue: q,
queue: queue,
}
}
8 changes: 4 additions & 4 deletions index/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ func NewWorkQueue(execute Worker, concurrency int) *WorkQueue {
return iw
}

func (iw *WorkQueue) Wait() {
iw.wg.Wait()
}

func (iw *WorkQueue) Close() {
// close the queue
close(iw.queue)
// and wait for it to drain
iw.wg.Wait()
}

// Add job to work queue
func (iw *WorkQueue) Add(job string) {
iw.queue <- job
}
2 changes: 1 addition & 1 deletion index/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func TestWorkQueue(t *testing.T) {
for i := 0; i < jobs; i++ {
queue.Add(strconv.Itoa(i))
}

queue.Close()
queue.Wait()

queueLength := len(queue.queue)
if queueLength != 0 {
Expand Down
8 changes: 4 additions & 4 deletions internal/badgeridx/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (db *DB) closest(ctx context.Context, request index.Request, results chan<-
if err != nil {
cdxResponse = keyvalue.CdxResponse{Error: err}
} else if request.Filter().Eval(cdx) {
cdxResponse = keyvalue.CdxResponse{Cdx: cdx}
cdxResponse = keyvalue.CdxResponse{Value: cdx}
} else {
iter.Next()
continue
Expand Down Expand Up @@ -194,8 +194,8 @@ func (db *DB) search(ctx context.Context, req index.Request, results chan<- inde
}
if filter.Eval(result) {
cdxResponse = &keyvalue.CdxResponse{
Key: key,
Cdx: result,
Key: key,
Value: result,
}
}
return nil
Expand All @@ -215,7 +215,7 @@ func (db *DB) search(ctx context.Context, req index.Request, results chan<- inde
case <-ctx.Done():
results <- keyvalue.CdxResponse{Error: ctx.Err()}
return nil
case results <- cdxResponse:
case results <- *cdxResponse:
if cdxResponse.GetError() == nil {
count++
}
Expand Down
25 changes: 20 additions & 5 deletions internal/badgeridx/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,16 @@ type DB struct {
// CdxIndex maps cdx key to cdx record
CdxIndex *badger.DB

// ReportIndex maps report id to report
ReportIndex *badger.DB

batch chan index.Record

done chan struct{}

wg sync.WaitGroup

tasks map[string]context.CancelFunc
}

func NewDB(options ...Option) (db *DB, err error) {
Expand All @@ -66,6 +71,7 @@ func NewDB(options ...Option) (db *DB, err error) {
var idIndex *badger.DB
var fileIndex *badger.DB
var cdxIndex *badger.DB
var reportIndex *badger.DB

batch := make(chan index.Record, opts.BatchMaxSize)
done := make(chan struct{})
Expand All @@ -79,13 +85,18 @@ func NewDB(options ...Option) (db *DB, err error) {
if cdxIndex, err = newBadgerDB(path.Join(opts.Path, opts.Database, "cdx-index"), opts.Compression, opts.ReadOnly, opts.Silent); err != nil {
return
}
if reportIndex, err = newBadgerDB(path.Join(opts.Path, opts.Database, "report-index"), opts.Compression, opts.ReadOnly, opts.Silent); err != nil {
return
}

db = &DB{
IdIndex: idIndex,
FileIndex: fileIndex,
CdxIndex: cdxIndex,
batch: batch,
done: done,
IdIndex: idIndex,
FileIndex: fileIndex,
CdxIndex: cdxIndex,
ReportIndex: reportIndex,
batch: batch,
done: done,
tasks: make(map[string]context.CancelFunc),
}

// We don't need to run batch and gc workers when operating in read-only mode.
Expand Down Expand Up @@ -151,11 +162,15 @@ func (db *DB) runValueLogGC(discardRatio float64) {

// Close stops the gc and batch workers and closes the index databases.
func (db *DB) Close() {
for _, cancel := range db.tasks {
cancel()
}
close(db.done)
db.wg.Wait()
_ = db.IdIndex.Close()
_ = db.FileIndex.Close()
_ = db.CdxIndex.Close()
_ = db.ReportIndex.Close()
}

// addFile checks if file is indexed or has not changed since indexing, and adds file to file index.
Expand Down
Loading

0 comments on commit ed7d2e7

Please sign in to comment.