diff --git a/cmd/serve/serve.go b/cmd/serve/serve.go index 1c3699b..fe061e8 100644 --- a/cmd/serve/serve.go +++ b/cmd/serve/serve.go @@ -118,6 +118,7 @@ func serveCmd(_ *cobra.Command, _ []string) error { var fileApi index.FileAPI var cdxApi index.CdxAPI var idApi index.IdAPI + var reportApi index.ReportAPI var storageRefResolver loader.StorageRefResolver var filePathResolver loader.FilePathResolver @@ -160,6 +161,7 @@ func serveCmd(_ *cobra.Command, _ []string) error { cdxApi = db fileApi = db idApi = db + reportApi = db default: return fmt.Errorf("unknown index format: %s", indexFormat) } @@ -240,6 +242,7 @@ func serveCmd(_ *cobra.Command, _ []string) error { CdxAPI: cdxApi, FileAPI: fileApi, IdAPI: idApi, + ReportAPI: reportApi, StorageRefResolver: storageRefResolver, WarcLoader: l, }, handler, mw, pathPrefix) diff --git a/go.mod b/go.mod index 8a053ee..a67dc7d 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ toolchain go1.21.3 require ( github.com/bits-and-blooms/bloom/v3 v3.6.0 github.com/dgraph-io/badger/v4 v4.2.0 + github.com/google/uuid v1.5.0 github.com/gorilla/handlers v1.5.2 github.com/julienschmidt/httprouter v1.3.0 github.com/nlnwa/gowarc v1.1.2 @@ -39,7 +40,6 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/google/btree v1.1.2 // indirect github.com/google/flatbuffers v23.5.26+incompatible // indirect - github.com/google/uuid v1.5.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect diff --git a/index/api.go b/index/api.go index 84e7cf5..b205e47 100644 --- a/index/api.go +++ b/index/api.go @@ -106,3 +106,16 @@ type IdResponse interface { GetValue() string GetError() error } + +type ReportResponse interface { + GetReport() *schema.Report + GetError() error +} + +type ReportAPI interface { + Generate(context.Context, Request) (*schema.Report, error) + ListReports(context.Context, Request, chan<- ReportResponse) error + GetReport(context.Context, string) (*schema.Report, error) + CancelReport(context.Context, string) error + DeleteReport(context.Context, string) error +} diff --git a/internal/badgeridx/api.go b/internal/badgeridx/api.go index a0713c2..f763a94 100644 --- a/internal/badgeridx/api.go +++ b/internal/badgeridx/api.go @@ -115,7 +115,7 @@ func (db *DB) closest(ctx context.Context, request index.Request, results chan<- if err != nil { cdxResponse = keyvalue.CdxResponse{Error: err} } else if request.Filter().Eval(cdx) { - cdxResponse = keyvalue.CdxResponse{Cdx: cdx} + cdxResponse = keyvalue.CdxResponse{Value: cdx} } else { iter.Next() continue @@ -194,8 +194,8 @@ func (db *DB) search(ctx context.Context, req index.Request, results chan<- inde } if filter.Eval(result) { cdxResponse = &keyvalue.CdxResponse{ - Key: key, - Cdx: result, + Key: key, + Value: result, } } return nil diff --git a/internal/keyvalue/db.go b/internal/keyvalue/db.go index 75697c1..b006586 100644 --- a/internal/keyvalue/db.go +++ b/internal/keyvalue/db.go @@ -2,6 +2,7 @@ package keyvalue import ( "bytes" + "strings" "time" "github.com/nlnwa/gowarcserver/index" @@ -71,11 +72,35 @@ func MarshalFileInfo(fileInfo *schema.FileInfo, prefix string) (key []byte, valu type CdxKey []byte var spaceCharacter = []byte{32} -var colon = []byte{58} +var colonCharacter = []byte{58} +var slashCharacter = []byte{47} -func (ck CdxKey) Host() string { +func (ck CdxKey) Path() string { b := bytes.Split(ck, spaceCharacter)[0] - return string(b) + i := bytes.Index(b, slashCharacter) + if i == -1 { + return "" + } + return string(b[i:]) +} + +func (ck CdxKey) Domain() string { + b := bytes.Split(ck, spaceCharacter)[0] + b = bytes.Split(b, slashCharacter)[0] + s := string(b) + var sb strings.Builder + sb.Grow(len(s)) + t := strings.Split(s, ",") + for i := len(t) - 1; i >= 0; i-- { + if t[i] == "" { + continue + } + sb.WriteString(t[i]) + if i > 0 { + sb.WriteByte('.') + } + } + return sb.String() } func (ck CdxKey) Time() (time.Time, error) { @@ -96,17 +121,17 @@ func (ck CdxKey) SchemeAndUserInfo() string { func (ck CdxKey) Port() string { b := bytes.Split(ck, spaceCharacter)[2] - return string(bytes.Split(b, colon)[0]) + return string(bytes.Split(b, colonCharacter)[0]) } func (ck CdxKey) Scheme() string { b := bytes.Split(ck, spaceCharacter)[2] - return string(bytes.Split(b, colon)[1]) + return string(bytes.Split(b, colonCharacter)[1]) } func (ck CdxKey) UserInfo() string { b := bytes.Split(ck, spaceCharacter)[2] - return string(bytes.Split(b, colon)[2]) + return string(bytes.Split(b, colonCharacter)[2]) } type IdResponse struct { @@ -128,13 +153,13 @@ func (ir IdResponse) GetError() error { } type CdxResponse struct { - Key []byte - Cdx *schema.Cdx + Key CdxKey + Value *schema.Cdx Error error } func (cr CdxResponse) GetCdx() *schema.Cdx { - return cr.Cdx + return cr.Value } func (cr CdxResponse) GetError() error { @@ -153,3 +178,38 @@ func (fir FileInfoResponse) GetFileInfo() *schema.FileInfo { func (fir FileInfoResponse) GetError() error { return fir.Error } + +type ReportResponse struct { + Value *schema.Report + Error error +} + +func (rr ReportResponse) GetReport() *schema.Report { + return rr.Value +} + +func (rr ReportResponse) GetError() error { + return rr.Error +} + +type ReportData struct { + NrOfRecords int `json:"nrOfRecords"` + NrOfTargets int `json:"nrOfTargets"` + NrOfTargetCaptures int `json:"nrOfTargetCaptures"` + NrOfDomainsOrHosts int `json:"nrOfDomainsOrHosts"` + NrOfUrls int `json:"nrOfUrls"` + CountByStatusCode map[int32]int `json:"countByStatusCode"` + CountByRecordType map[string]int `json:"countByRecordType"` + CountByContentType map[string]int `json:"countByContentType"` + ContentLength int64 `json:"contentLength"` + PayloadLength int64 `json:"payloadLength"` + RecordLength int64 `json:"recordLength"` +} + +func NewRecordData() *ReportData { + return &ReportData{ + CountByStatusCode: make(map[int32]int), + CountByRecordType: make(map[string]int), + CountByContentType: make(map[string]int), + } +} diff --git a/internal/tikvidx/api.go b/internal/tikvidx/api.go index e41d0a6..12b7890 100644 --- a/internal/tikvidx/api.go +++ b/internal/tikvidx/api.go @@ -77,8 +77,8 @@ func (db *DB) Search(ctx context.Context, req index.Request, res chan<- index.Cd return &keyvalue.CdxResponse{Error: err} } else if req.Filter().Eval(cdx) { return &keyvalue.CdxResponse{ - Key: cdxKey, - Cdx: cdx, + Key: cdxKey, + Value: cdx, } } return nil @@ -254,5 +254,17 @@ func (db *DB) Delete(ctx context.Context) error { firstErr = err } + reportKey := keyvalue.KeyWithPrefix("", reportPrefix) + err = db.client.DeleteRange(ctx, reportKey, append(reportKey, 0xff)) + if err != nil && firstErr == nil { + firstErr = err + } + + reportDataKey := keyvalue.KeyWithPrefix("", reportDataPrefix) + err = db.client.DeleteRange(ctx, reportDataKey, append(reportDataKey, 0xff)) + if err != nil && firstErr == nil { + firstErr = err + } + return firstErr } diff --git a/internal/tikvidx/db.go b/internal/tikvidx/db.go index da9084a..8cb053c 100644 --- a/internal/tikvidx/db.go +++ b/internal/tikvidx/db.go @@ -34,9 +34,11 @@ import ( ) var ( - idPrefix = "i" - filePrefix = "f" - cdxPrefix = "c" + idPrefix = "i" + filePrefix = "f" + cdxPrefix = "c" + reportPrefix = "r_" + reportDataPrefix = "rd" ) const delimiter = "_" @@ -46,6 +48,7 @@ type DB struct { batch chan index.Record done chan struct{} wg sync.WaitGroup + tasks map[string]context.CancelFunc } func NewDB(options ...Option) (db *DB, err error) { @@ -61,6 +64,7 @@ func NewDB(options ...Option) (db *DB, err error) { idPrefix = dbName + delimiter + idPrefix + delimiter filePrefix = dbName + delimiter + filePrefix + delimiter cdxPrefix = dbName + delimiter + cdxPrefix + delimiter + reportPrefix = dbName + delimiter + reportPrefix + delimiter ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -75,6 +79,7 @@ func NewDB(options ...Option) (db *DB, err error) { db = &DB{ client: client, done: done, + tasks: make(map[string]context.CancelFunc), } if opts.ReadOnly { @@ -105,6 +110,8 @@ func NewDB(options ...Option) (db *DB, err error) { // Close stops the batch workers and closes the index databases. func (db *DB) Close() { + // TODO abort running report generation tasks + close(db.done) db.wg.Wait() _ = db.client.Close() diff --git a/internal/tikvidx/iter.go b/internal/tikvidx/iter.go index 3be5fd3..336edb9 100644 --- a/internal/tikvidx/iter.go +++ b/internal/tikvidx/iter.go @@ -75,7 +75,6 @@ type closestIter struct { valid bool cmp func(int64, int64) bool done chan struct{} - limit int } func newClosestIter(ctx context.Context, client *rawkv.Client, req index.Request) (iterator, error) { @@ -123,7 +122,6 @@ func newClosestIter(ctx context.Context, client *rawkv.Client, req index.Request forward: forwardChannel, backward: backwardChannel, done: done, - limit: req.Limit(), } return iter, iter.Next() @@ -200,7 +198,6 @@ type iter struct { valid bool next <-chan maybeKV done chan<- struct{} - limit int } func newIter(ctx context.Context, key []byte, client *rawkv.Client, req index.Request) (iterator, error) { @@ -230,7 +227,6 @@ func newIter(ctx context.Context, key []byte, client *rawkv.Client, req index.Re is := &iter{ next: result, done: done, - limit: req.Limit(), } return is, is.Next() diff --git a/internal/tikvidx/report.go b/internal/tikvidx/report.go new file mode 100644 index 0000000..5837a7e --- /dev/null +++ b/internal/tikvidx/report.go @@ -0,0 +1,266 @@ +package tikvidx + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + "github.com/mitchellh/mapstructure" + "github.com/nlnwa/gowarcserver/index" + "github.com/nlnwa/gowarcserver/internal/keyvalue" + "github.com/nlnwa/gowarcserver/schema" + "github.com/rs/zerolog/log" + "github.com/tikv/client-go/v2/rawkv" + "golang.org/x/net/publicsuffix" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/structpb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func (db *DB) saveReport(ctx context.Context, key []byte, report *schema.Report, reportData *keyvalue.ReportData) error { + report.Duration = durationpb.New(report.EndTime.AsTime().Sub(report.StartTime.AsTime())) + + var dataMap map[string]interface{} + mapstructure.Decode(reportData, &dataMap) + + data, err := structpb.NewStruct(dataMap) + if err != nil { + return err + } + report.Data = data + + value, err := proto.Marshal(report) + if err != nil { + return err + } + return db.client.Put(ctx, key, value) +} + +func (db *DB) Generate(ctx context.Context, request index.Request) (*schema.Report, error) { + // Start searching for records to get search errors early + // and specifically before we start generating the report + results := make(chan index.CdxResponse) + err := db.Search(ctx, request, results) + if err != nil { + return nil, err + } + + taskId, err := uuid.NewV7() + if err != nil { + return nil, err + } + reportId := taskId.String() + reportKey := keyvalue.KeyWithPrefix(reportId, reportPrefix) + + report := &schema.Report{ + Id: reportId, + StartTime: timestamppb.New(time.Now()), + Status: schema.Report_PENDING, + } + reportData := keyvalue.NewRecordData() + + err = db.saveReport(ctx, reportKey, report, reportData) + if err != nil { + return nil, err + } + + go func() { + var err error + ctx, cancel := context.WithCancel(context.Background()) + + // Add cancel function to db.tasks to be able to cancel the report generation + db.tasks[taskId.String()] = cancel + + // Defer cancel function to remove it from db.tasks when this function returns + defer func() { + cancel() + delete(db.tasks, taskId.String()) + + if err != nil { + report.Status = schema.Report_FAILED + report.Error = err.Error() + } else { + report.Status = schema.Report_COMPLETED + } + report.EndTime = timestamppb.New(time.Now()) + + err = db.saveReport(ctx, reportKey, report, reportData) + if err != nil { + log.Error().Err(err).Msg("failed to save report") + } + }() + + // We are now running + report.Status = schema.Report_RUNNING + + var kv keyvalue.CdxResponse + var key keyvalue.CdxKey + var value *schema.Cdx + var target, prevTarget string + var domainName, prevDomainName string + var uri, prevUri string + // var ts int64 + // var prevTs int64 + + for result := range results { + select { + case <-ctx.Done(): + err = ctx.Err() + return + default: + } + err = result.GetError() + if err != nil { + return + } + + reportData.NrOfRecords++ + + if reportData.NrOfRecords%rawkv.MaxRawKVScanLimit == 0 { + report.Duration = durationpb.New(time.Since(report.StartTime.AsTime())) + err = db.saveReport(ctx, reportKey, report, reportData) + if err != nil { + return + } + } + + kv = result.(keyvalue.CdxResponse) + key = kv.Key + value = kv.Value + + prevUri = uri + uri = value.Uri + + prevDomainName = domainName + domainName = key.Domain() + + // prevTs = ts + // ts = key.Unix() + + prevTarget = target + target, err = publicsuffix.EffectiveTLDPlusOne(domainName) + if err != nil { + err = fmt.Errorf("failed to get effective TLD+1 for %s: %w", domainName, err) + return + } + + // 1. Number of targets + if prevTarget != target { + reportData.NrOfTargets++ + // TODO if we wan't to get top 1000 targets we need to store it in the database + } + + // 2. Number of target captures + // TODO + reportData.NrOfTargetCaptures++ + + // 3. Number of URLs + if prevUri != uri { + // TODO use first part of key and check time interval + reportData.NrOfUrls++ + } + // 4. Distribution of status codes + reportData.CountByStatusCode[value.Hsc]++ + + // 5. Number of domains or hosts + if prevDomainName != domainName { + reportData.NrOfDomainsOrHosts++ + } + + reportData.CountByRecordType[value.Srt]++ + reportData.CountByContentType[value.Mct]++ + reportData.ContentLength += value.Cle + reportData.PayloadLength += value.Ple + reportData.RecordLength += value.Rle + } + }() + + return report, nil +} + +func (db *DB) CancelReport(ctx context.Context, id string) error { + cancel, ok := db.tasks[id] + if !ok { + return fmt.Errorf("no report with id '%s'", id) + } + cancel() + return nil +} + +func (db *DB) DeleteReport(ctx context.Context, id string) error { + report, err := db.GetReport(ctx, id) + if err != nil { + return err + } + if report == nil { + return fmt.Errorf("no report with id '%s'", id) + } + if report.Status == schema.Report_RUNNING { + return fmt.Errorf("report with id '%s' is running", id) + } + return db.client.Delete(ctx, keyvalue.KeyWithPrefix(id, reportPrefix)) +} + +func (db *DB) GetReport(ctx context.Context, id string) (*schema.Report, error) { + value, err := db.client.Get(ctx, keyvalue.KeyWithPrefix(id, reportPrefix)) + if err != nil { + return nil, err + } + if value == nil { + return nil, nil + } + report := new(schema.Report) + err = proto.Unmarshal(value, report) + if err != nil { + return nil, err + } + return report, nil +} + +func (db *DB) ListReports(ctx context.Context, req index.Request, res chan<- index.ReportResponse) error { + key := keyvalue.KeyWithPrefix("", reportPrefix) + it, err := newIter(ctx, key, db.client, req) + if err != nil { + return err + } + if it == nil { + close(res) + return nil + } + go func() { + defer close(res) + defer it.Close() + + count := 0 + + for it.Valid() { + var response keyvalue.ReportResponse + report := new(schema.Report) + err := proto.Unmarshal(it.Value(), report) + if err != nil { + response.Error = err + } else { + response.Value = report + } + select { + case <-ctx.Done(): + return + case res <- response: + if response.Error == nil { + count++ + } + } + if req.Limit() > 0 && count >= req.Limit() { + return + } + if err = it.Next(); err != nil { + res <- keyvalue.ReportResponse{Error: err} + return + } + } + }() + + return nil +} diff --git a/schema/report.pb.go b/schema/report.pb.go new file mode 100644 index 0000000..e83359c --- /dev/null +++ b/schema/report.pb.go @@ -0,0 +1,311 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.32.0 +// protoc v4.23.3 +// source: report.proto + +package schema + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + durationpb "google.golang.org/protobuf/types/known/durationpb" + structpb "google.golang.org/protobuf/types/known/structpb" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Report_Status int32 + +const ( + Report_UNKNOWN Report_Status = 0 + Report_PENDING Report_Status = 1 + Report_RUNNING Report_Status = 2 + Report_COMPLETED Report_Status = 3 + Report_FAILED Report_Status = 4 +) + +// Enum value maps for Report_Status. +var ( + Report_Status_name = map[int32]string{ + 0: "UNKNOWN", + 1: "PENDING", + 2: "RUNNING", + 3: "COMPLETED", + 4: "FAILED", + } + Report_Status_value = map[string]int32{ + "UNKNOWN": 0, + "PENDING": 1, + "RUNNING": 2, + "COMPLETED": 3, + "FAILED": 4, + } +) + +func (x Report_Status) Enum() *Report_Status { + p := new(Report_Status) + *p = x + return p +} + +func (x Report_Status) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (Report_Status) Descriptor() protoreflect.EnumDescriptor { + return file_report_proto_enumTypes[0].Descriptor() +} + +func (Report_Status) Type() protoreflect.EnumType { + return &file_report_proto_enumTypes[0] +} + +func (x Report_Status) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use Report_Status.Descriptor instead. +func (Report_Status) EnumDescriptor() ([]byte, []int) { + return file_report_proto_rawDescGZIP(), []int{0, 0} +} + +type Report struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + StartTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"` + Duration *durationpb.Duration `protobuf:"bytes,4,opt,name=duration,proto3" json:"duration,omitempty"` + EndTime *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=end_time,json=endTime,proto3" json:"end_time,omitempty"` + Error string `protobuf:"bytes,6,opt,name=error,proto3" json:"error,omitempty"` + Query *structpb.Struct `protobuf:"bytes,7,opt,name=query,proto3" json:"query,omitempty"` + Status Report_Status `protobuf:"varint,8,opt,name=status,proto3,enum=gowarcserver.schema.Report_Status" json:"status,omitempty"` + Data *structpb.Struct `protobuf:"bytes,9,opt,name=data,proto3" json:"data,omitempty"` +} + +func (x *Report) Reset() { + *x = Report{} + if protoimpl.UnsafeEnabled { + mi := &file_report_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Report) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Report) ProtoMessage() {} + +func (x *Report) ProtoReflect() protoreflect.Message { + mi := &file_report_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Report.ProtoReflect.Descriptor instead. +func (*Report) Descriptor() ([]byte, []int) { + return file_report_proto_rawDescGZIP(), []int{0} +} + +func (x *Report) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *Report) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *Report) GetStartTime() *timestamppb.Timestamp { + if x != nil { + return x.StartTime + } + return nil +} + +func (x *Report) GetDuration() *durationpb.Duration { + if x != nil { + return x.Duration + } + return nil +} + +func (x *Report) GetEndTime() *timestamppb.Timestamp { + if x != nil { + return x.EndTime + } + return nil +} + +func (x *Report) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +func (x *Report) GetQuery() *structpb.Struct { + if x != nil { + return x.Query + } + return nil +} + +func (x *Report) GetStatus() Report_Status { + if x != nil { + return x.Status + } + return Report_UNKNOWN +} + +func (x *Report) GetData() *structpb.Struct { + if x != nil { + return x.Data + } + return nil +} + +var File_report_proto protoreflect.FileDescriptor + +var file_report_proto_rawDesc = []byte{ + 0x0a, 0x0c, 0x72, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x13, + 0x67, 0x6f, 0x77, 0x61, 0x72, 0x63, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x73, 0x63, 0x68, + 0x65, 0x6d, 0x61, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x22, 0xcf, 0x03, 0x0a, 0x06, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x12, 0x0e, 0x0a, + 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, + 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, + 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, + 0x70, 0x52, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x35, 0x0a, 0x08, + 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, + 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x64, 0x75, 0x72, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x12, 0x35, 0x0a, 0x08, 0x65, 0x6e, 0x64, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, + 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, + 0x70, 0x52, 0x07, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, + 0x72, 0x6f, 0x72, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, + 0x12, 0x2d, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x12, + 0x3a, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0e, 0x32, + 0x22, 0x2e, 0x67, 0x6f, 0x77, 0x61, 0x72, 0x63, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2e, 0x73, + 0x63, 0x68, 0x65, 0x6d, 0x61, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x53, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x2b, 0x0a, 0x04, 0x64, + 0x61, 0x74, 0x61, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, + 0x63, 0x74, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x4a, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, + 0x0b, 0x0a, 0x07, 0x50, 0x45, 0x4e, 0x44, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, + 0x52, 0x55, 0x4e, 0x4e, 0x49, 0x4e, 0x47, 0x10, 0x02, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x4f, 0x4d, + 0x50, 0x4c, 0x45, 0x54, 0x45, 0x44, 0x10, 0x03, 0x12, 0x0a, 0x0a, 0x06, 0x46, 0x41, 0x49, 0x4c, + 0x45, 0x44, 0x10, 0x04, 0x42, 0x26, 0x5a, 0x24, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x6e, 0x6c, 0x6e, 0x77, 0x61, 0x2f, 0x67, 0x6f, 0x77, 0x61, 0x72, 0x63, 0x73, + 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_report_proto_rawDescOnce sync.Once + file_report_proto_rawDescData = file_report_proto_rawDesc +) + +func file_report_proto_rawDescGZIP() []byte { + file_report_proto_rawDescOnce.Do(func() { + file_report_proto_rawDescData = protoimpl.X.CompressGZIP(file_report_proto_rawDescData) + }) + return file_report_proto_rawDescData +} + +var file_report_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_report_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_report_proto_goTypes = []interface{}{ + (Report_Status)(0), // 0: gowarcserver.schema.Report.Status + (*Report)(nil), // 1: gowarcserver.schema.Report + (*timestamppb.Timestamp)(nil), // 2: google.protobuf.Timestamp + (*durationpb.Duration)(nil), // 3: google.protobuf.Duration + (*structpb.Struct)(nil), // 4: google.protobuf.Struct +} +var file_report_proto_depIdxs = []int32{ + 2, // 0: gowarcserver.schema.Report.start_time:type_name -> google.protobuf.Timestamp + 3, // 1: gowarcserver.schema.Report.duration:type_name -> google.protobuf.Duration + 2, // 2: gowarcserver.schema.Report.end_time:type_name -> google.protobuf.Timestamp + 4, // 3: gowarcserver.schema.Report.query:type_name -> google.protobuf.Struct + 0, // 4: gowarcserver.schema.Report.status:type_name -> gowarcserver.schema.Report.Status + 4, // 5: gowarcserver.schema.Report.data:type_name -> google.protobuf.Struct + 6, // [6:6] is the sub-list for method output_type + 6, // [6:6] is the sub-list for method input_type + 6, // [6:6] is the sub-list for extension type_name + 6, // [6:6] is the sub-list for extension extendee + 0, // [0:6] is the sub-list for field type_name +} + +func init() { file_report_proto_init() } +func file_report_proto_init() { + if File_report_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_report_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Report); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_report_proto_rawDesc, + NumEnums: 1, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_report_proto_goTypes, + DependencyIndexes: file_report_proto_depIdxs, + EnumInfos: file_report_proto_enumTypes, + MessageInfos: file_report_proto_msgTypes, + }.Build() + File_report_proto = out.File + file_report_proto_rawDesc = nil + file_report_proto_goTypes = nil + file_report_proto_depIdxs = nil +} diff --git a/schema/report.proto b/schema/report.proto new file mode 100644 index 0000000..5c23133 --- /dev/null +++ b/schema/report.proto @@ -0,0 +1,29 @@ +syntax = "proto3"; + +package gowarcserver.schema; + +import "google/protobuf/timestamp.proto"; +import "google/protobuf/duration.proto"; +import "google/protobuf/struct.proto"; + +option go_package = "github.com/nlnwa/gowarcserver/schema"; + +message Report { + enum Status { + UNKNOWN = 0; + PENDING = 1; + RUNNING = 2; + COMPLETED = 3; + FAILED = 4; + } + + string id = 1; + string name = 2; + google.protobuf.Timestamp start_time = 3; + google.protobuf.Duration duration = 4; + google.protobuf.Timestamp end_time = 5; + string error = 6; + google.protobuf.Struct query = 7; + Report.Status status = 8; + google.protobuf.Struct data = 9; +} \ No newline at end of file diff --git a/server/api/api.go b/server/api/api.go index 246c531..6203449 100644 --- a/server/api/api.go +++ b/server/api/api.go @@ -18,7 +18,7 @@ package api import ( "fmt" - "net/http" + "net/url" "regexp" "strconv" "strings" @@ -26,7 +26,7 @@ import ( "github.com/nlnwa/gowarcserver/index" "github.com/nlnwa/gowarcserver/surt" "github.com/nlnwa/gowarcserver/timestamp" - "github.com/nlnwa/whatwg-url/url" + whatwgUrl "github.com/nlnwa/whatwg-url/url" ) const ( @@ -67,7 +67,7 @@ var outputs = []string{OutputCdxj, OutputJson} // CoreAPI implements a subset of https://pywb.readthedocs.io/en/latest/manual/cdxserver_api.html. type CoreAPI struct { Collection string - Url *url.Url + Url *whatwgUrl.Url DateRange *DateRange MatchType string Limit int @@ -78,11 +78,11 @@ type CoreAPI struct { Fields []string } -func (capi *CoreAPI) Uri() *url.Url { +func (capi *CoreAPI) Uri() *whatwgUrl.Url { return capi.Url } -func ClosestAPI(closest string, u *url.Url) SearchRequest { +func ClosestAPI(closest string, u *whatwgUrl.Url) SearchRequest { return SearchRequest{ CoreAPI: &CoreAPI{ Url: u, @@ -190,10 +190,8 @@ func contains(s []string, e string) bool { var schemeRegExp = regexp.MustCompile(`^[a-z][a-z0-9+\-.]+(:.*)`) // Parse parses the request r into a *CoreAPI. -func Parse(r *http.Request) (*CoreAPI, error) { +func Parse(query url.Values) (*CoreAPI, error) { var err error - query := r.URL.Query() - coreApi := new(CoreAPI) // currently the "cdx" does not accept collection as a query or param @@ -212,7 +210,7 @@ func Parse(r *http.Request) (*CoreAPI, error) { if !schemeRegExp.MatchString(urlStr) { urlStr = "http://" + urlStr } - u, err := url.Parse(urlStr) + u, err := whatwgUrl.Parse(urlStr) if err != nil { return nil, err } diff --git a/server/api/api_test.go b/server/api/api_test.go index fa030f2..b4d957e 100644 --- a/server/api/api_test.go +++ b/server/api/api_test.go @@ -2,7 +2,6 @@ package api import ( "errors" - "net/http" "net/url" "reflect" "testing" @@ -119,11 +118,8 @@ func TestParse(t *testing.T) { if test.query.closest != "" { query.Set("closest", test.query.closest) } - reqUrl.RawQuery = query.Encode() - testRequest := &http.Request{URL: reqUrl} - - got, err := Parse(testRequest) + got, err := Parse(query) if err != nil { if test.err == nil { t.Errorf("unexpected error: %s", err) diff --git a/server/coreserver/handler.go b/server/coreserver/handler.go index 4b67a3d..1d3d566 100644 --- a/server/coreserver/handler.go +++ b/server/coreserver/handler.go @@ -42,12 +42,13 @@ type Handler struct { CdxAPI index.CdxAPI FileAPI index.FileAPI IdAPI index.IdAPI + ReportAPI index.ReportAPI StorageRefResolver loader.StorageRefResolver WarcLoader loader.WarcLoader } func (h Handler) search(w http.ResponseWriter, r *http.Request) { - coreAPI, err := api.Parse(r) + coreAPI, err := api.Parse(r.URL.Query()) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return @@ -97,7 +98,7 @@ type storageRef struct { } func (h Handler) listIds(w http.ResponseWriter, r *http.Request) { - coreAPI, err := api.Parse(r) + coreAPI, err := api.Parse(r.URL.Query()) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return @@ -166,7 +167,7 @@ func (h Handler) getStorageRefByURN(w http.ResponseWriter, r *http.Request) { } func (h Handler) listFiles(w http.ResponseWriter, r *http.Request) { - coreAPI, err := api.Parse(r) + coreAPI, err := api.Parse(r.URL.Query()) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return @@ -223,10 +224,16 @@ func (h Handler) getFileInfoByFilename(w http.ResponseWriter, r *http.Request) { log.Error().Err(err).Msgf("Failed to get file info: %s", filename) return } - _, err = fmt.Fprintln(w, protojson.Format(fileInfo)) + b, err := protojson.Marshal(fileInfo) if err != nil { - log.Warn().Err(err).Msgf("Failed to write file info: %s", protojson.Format(fileInfo)) + log.Warn().Err(err).Msgf("Failed to marshal file info: %s", fileInfo) + return + } + _, err = io.Copy(w, bytes.NewReader(b)) + if err != nil { + log.Warn().Err(err).Msgf("Failed to write report") } + _, _ = w.Write(lf) } func (h Handler) loadRecordByUrn(w http.ResponseWriter, r *http.Request) { @@ -273,3 +280,125 @@ func parseStorageRef(ref string) (filename string, offset int64, err error) { } return } + +func (h Handler) generateReport(w http.ResponseWriter, r *http.Request) { + if err := r.ParseForm(); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + coreAPI, err := api.Parse(r.Form) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + ctx, cancel := context.WithCancel(r.Context()) + defer cancel() + + res, err := h.ReportAPI.Generate(ctx, api.Request(coreAPI)) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + log.Error().Err(err).Msg("Failed to generate report") + return + } + + w.Header().Set("Content-Type", "application/json") + w.Header().Set("X-Content-Type-Options", "nosniff") + // w.Header().Set("Location", /report/) + w.WriteHeader(http.StatusAccepted) + err = json.NewEncoder(w).Encode(res) + if err != nil { + log.Err(err).Msg("Failed to encode generate report response") + } +} + +func (h Handler) getReport(w http.ResponseWriter, r *http.Request) { + params := httprouter.ParamsFromContext(r.Context()) + id := params.ByName("id") + + ctx, cancel := context.WithCancel(r.Context()) + defer cancel() + + report, err := h.ReportAPI.GetReport(ctx, id) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + log.Error().Err(err).Msgf("Failed to get report: %s", id) + return + } + if report == nil { + http.NotFound(w, r) + return + } + + b, err := protojson.Marshal(report) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + log.Error().Err(err).Msgf("Failed to marshal report: %v", report) + } + _, err = io.Copy(w, bytes.NewReader(b)) + if err != nil { + log.Warn().Err(err).Msgf("Failed to write report") + return + } + _, _ = w.Write(lf) +} + +func (h Handler) listReports(w http.ResponseWriter, r *http.Request) { + coreAPI, err := api.Parse(r.URL.Query()) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + ctx, cancel := context.WithCancel(r.Context()) + defer cancel() + + responses := make(chan index.ReportResponse) + + if err := h.ReportAPI.ListReports(ctx, api.Request(coreAPI), responses); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + log.Error().Err(err).Msg("Failed to list reports") + return + } + + start := time.Now() + count := 0 + defer func() { + log.Debug().Msgf("Found %d items in %s", count, time.Since(start)) + }() + + for res := range responses { + if res.GetError() != nil { + log.Warn().Err(res.GetError()).Msg("failed report result") + continue + } + v, err := protojson.Marshal(res.GetReport()) + if err != nil { + log.Warn().Err(err).Msg("failed to marshal report") + continue + } + _, err = io.Copy(w, bytes.NewReader(v)) + if err != nil { + log.Warn().Err(err).Msg("failed to write report") + return + } + _, _ = w.Write(lf) + count++ + } + _, _ = w.Write(lf) +} + +func (h Handler) deleteReport(w http.ResponseWriter, r *http.Request) { + params := httprouter.ParamsFromContext(r.Context()) + id := params.ByName("id") + + ctx, cancel := context.WithCancel(r.Context()) + defer cancel() + + err := h.ReportAPI.DeleteReport(ctx, id) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + log.Error().Err(err).Msgf("Failed to delete report: %s", id) + return + } +} diff --git a/server/coreserver/routes.go b/server/coreserver/routes.go index 4444451..a847b23 100644 --- a/server/coreserver/routes.go +++ b/server/coreserver/routes.go @@ -30,4 +30,16 @@ func Register(h Handler, r *httprouter.Router, mw func(http.Handler) http.Handle r.Handler("GET", pathPrefix+"/cdx", mw(http.HandlerFunc(h.search))) r.Handler("GET", pathPrefix+"/search", mw(http.HandlerFunc(h.search))) r.Handler("GET", pathPrefix+"/record/:urn", mw(http.HandlerFunc(h.loadRecordByUrn))) + + // Create report + r.Handler("POST", pathPrefix+"/report", mw(http.HandlerFunc(h.generateReport))) + + // Delete report + r.Handler("DELETE", pathPrefix+"/report/:id", mw(http.HandlerFunc(h.deleteReport))) + + // Get report + r.Handler("GET", pathPrefix+"/report/:id", mw(http.HandlerFunc(h.getReport))) + + // List reports + r.Handler("GET", pathPrefix+"/report", mw(http.HandlerFunc(h.listReports))) } diff --git a/server/warcserver/handler.go b/server/warcserver/handler.go index 471cf83..c08c03e 100644 --- a/server/warcserver/handler.go +++ b/server/warcserver/handler.go @@ -28,7 +28,7 @@ type Handler struct { } func (h Handler) index(w http.ResponseWriter, r *http.Request) { - coreAPI, err := api.Parse(r) + coreAPI, err := api.Parse(r.URL.Query()) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return