Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Feature/open search job #314

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion config/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

"github.com/bmeg/grip/accounts"
"github.com/bmeg/grip/jobstorage"
"github.com/bmeg/grip/util"
"github.com/bmeg/grip/util/duration"
)
Expand All @@ -18,7 +19,7 @@ type ServerConfig struct {
ReadOnly bool
EnablePlugins bool
PluginDir string
NoJobs bool
JobsDriver *jobstorage.JobsConfig
Accounts accounts.Config
DisableHTTPCache bool
// Should the server periodically build the graph schemas?
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ require (
github.com/minio/minio-go/v7 v7.0.73
github.com/mitchellh/hashstructure/v2 v2.0.2
github.com/mongodb/mongo-tools v0.0.0-20240715143021-aa6a140d3f17
github.com/opensearch-project/opensearch-go v1.1.0
github.com/opensearch-project/opensearch-go/v4 v4.1.0
github.com/paulbellamy/ratecounter v0.2.0
github.com/robertkrimen/otto v0.4.0
github.com/segmentio/ksuid v1.0.4
Expand Down
19 changes: 18 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230512164433-5d1fd1a340c9 h
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230512164433-5d1fd1a340c9/go.mod h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/aws/aws-sdk-go v1.29.11/go.mod h1:1KvfttTE3SPKMpo8g2c6jL3ZKfXtFvKscTgahTma5Xg=
github.com/aws/aws-sdk-go v1.42.27/go.mod h1:OGr6lGMAKGlG9CVrYnWYDKIyb829c6EVBRjxqjmPepc=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
Expand Down Expand Up @@ -226,6 +227,8 @@ github.com/jessevdk/go-flags v1.6.1/go.mod h1:Mk8T1hIAWpOiJiHa9rJASDK2UGWji0EuPG
github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c=
github.com/jhump/protoreflect v1.15.1/go.mod h1:jD/2GMKKE6OqX8qTjhADU1e6DShO+gavG9e0Q693nKo=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o=
github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
Expand Down Expand Up @@ -311,6 +314,10 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk=
github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0=
github.com/opensearch-project/opensearch-go v1.1.0 h1:eG5sh3843bbU1itPRjA9QXbxcg8LaZ+DjEzQH9aLN3M=
github.com/opensearch-project/opensearch-go v1.1.0/go.mod h1:+6/XHCuTH+fwsMJikZEWsucZ4eZMma3zNSeLrTtVGbo=
github.com/opensearch-project/opensearch-go/v4 v4.1.0 h1:YXNaMpMU0PC7suGyP13EuczkDT3K54QajgDnLKCZAz8=
github.com/opensearch-project/opensearch-go/v4 v4.1.0/go.mod h1:aSTMFGSLEoiG19US6Oo5udvWCjHap3mRcWBNV8rAFak=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/paulbellamy/ratecounter v0.2.0 h1:2L/RhJq+HA8gBQImDXtLPrDXK5qAj6ozWVK/zFXVJGs=
github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE=
Expand Down Expand Up @@ -393,11 +400,20 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
github.com/tidwall/gjson v1.17.1 h1:wlYEnwqAHgzmhNUFfw7Xalt2JzQvsMx2Se4PcoFCT/U=
github.com/tidwall/gjson v1.17.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg=
github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/wI2L/jsondiff v0.6.0 h1:zrsH3FbfVa3JO9llxrcDy/XLkYPLgoMX6Mz3T2PP2AI=
github.com/wI2L/jsondiff v0.6.0/go.mod h1:D6aQ5gKgPF9g17j+E9N7aasmU1O+XvfmWm1y8UMmNpw=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g=
Expand Down Expand Up @@ -460,6 +476,7 @@ golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211216030914-fe4d6282115f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
Expand Down
4 changes: 4 additions & 0 deletions gripper/test-graph/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ Drivers:

Sources:
tableServer: localhost:50051

Server:
JobsDriver:
File: jobs
12 changes: 12 additions & 0 deletions jobstorage/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package jobstorage

type OpenSearchConfig struct {
Address string
Username string
Password string
}

type JobsConfig struct {
File string
OpenSearch *OpenSearchConfig
}
42 changes: 13 additions & 29 deletions jobstorage/storage.go → jobstorage/fs_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,7 @@ import (
"github.com/kennygrant/sanitize"
)

type Stream struct {
Pipe gdbi.InPipe
DataType gdbi.DataType
MarkTypes map[string]gdbi.DataType
Query []*gripql.GraphStatement
}

type JobStorage interface {
List(graph string) (chan string, error)
Search(graph string, Query []*gripql.GraphStatement) (chan *gripql.JobStatus, error)
Spool(graph string, stream *Stream) (string, error)
Stream(ctx context.Context, graph, id string) (*Stream, error)
Delete(graph, id string) error
Status(graph, id string) (*gripql.JobStatus, error)
}

type Job struct {
type FileJob struct {
Status gripql.JobStatus
DataType gdbi.DataType
MarkTypes map[string]gdbi.DataType
Expand All @@ -45,6 +29,11 @@ func jobKey(graph, job string) string {
return fmt.Sprintf("%s/%s", sanitize.Name(graph), sanitize.Name(job))
}

type FSResults struct {
BaseDir string
jobs *sync.Map
}

func NewFSJobStorage(path string) *FSResults {
out := FSResults{path, &sync.Map{}}
if _, err := os.Stat(path); os.IsNotExist(err) {
Expand All @@ -61,7 +50,7 @@ func NewFSJobStorage(path string) *FSResults {
if err == nil {
sData, err := io.ReadAll(file)
if err == nil {
job := Job{}
job := FileJob{}
err := json.Unmarshal(sData, &job)
if err == nil {
log.Infof("Found job %s %s", graphName, jobName)
Expand All @@ -79,17 +68,12 @@ func NewFSJobStorage(path string) *FSResults {
return &out
}

type FSResults struct {
BaseDir string
jobs *sync.Map
}

func (fs *FSResults) List(graph string) (chan string, error) {
out := make(chan string)
go func() {
defer close(out)
fs.jobs.Range(func(key, value interface{}) bool {
vJob := value.(*Job)
vJob := value.(*FileJob)
if vJob.Status.Graph == graph {
out <- vJob.Status.Id
}
Expand All @@ -105,7 +89,7 @@ func (fs *FSResults) Search(graph string, Query []*gripql.GraphStatement) (chan
go func() {
defer close(out)
fs.jobs.Range(func(key, value interface{}) bool {
vJob := value.(*Job)
vJob := value.(*FileJob)
if vJob.Status.Graph == graph {
if JobMatch(qcs, vJob.StepChecksums) {
out <- &vJob.Status
Expand Down Expand Up @@ -137,7 +121,7 @@ func (fs *FSResults) Spool(graph string, stream *Stream) (string, error) {
}

cs, _ := TraversalChecksum(stream.Query)
job := &Job{
job := &FileJob{
Status: gripql.JobStatus{Query: stream.Query, Id: jobName, Graph: graph, Timestamp: time.Now().Format(time.RFC3339)},
DataType: stream.DataType,
MarkTypes: stream.MarkTypes,
Expand Down Expand Up @@ -174,7 +158,7 @@ func (fs *FSResults) Spool(graph string, stream *Stream) (string, error) {

func (fs *FSResults) Stream(ctx context.Context, graph, id string) (*Stream, error) {
if v, ok := fs.jobs.Load(jobKey(graph, id)); ok {
vJob := v.(*Job)
vJob := v.(*FileJob)
if vJob.Status.State == gripql.JobState_COMPLETE {
resultFile := filepath.Join(fs.BaseDir, sanitize.Name(graph), sanitize.Name(id), "results")
results, err := os.Open(resultFile)
Expand Down Expand Up @@ -215,7 +199,7 @@ func (fs *FSResults) Stream(ctx context.Context, graph, id string) (*Stream, err

func (fs *FSResults) Delete(graph, id string) error {
if v, ok := fs.jobs.Load(jobKey(graph, id)); ok {
vJob := v.(*Job)
vJob := v.(*FileJob)
if vJob.Status.State == gripql.JobState_RUNNING || vJob.Status.State == gripql.JobState_QUEUED {
return fmt.Errorf("Job cancel not yet implemented")
}
Expand All @@ -228,7 +212,7 @@ func (fs *FSResults) Delete(graph, id string) error {

func (fs *FSResults) Status(graph, id string) (*gripql.JobStatus, error) {
if v, ok := fs.jobs.Load(jobKey(graph, id)); ok {
vJob := v.(*Job)
vJob := v.(*FileJob)
a := vJob.Status
return &a, nil
}
Expand Down
24 changes: 24 additions & 0 deletions jobstorage/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package jobstorage

import (
"context"

"github.com/bmeg/grip/gdbi"
"github.com/bmeg/grip/gripql"
)

type Stream struct {
Pipe gdbi.InPipe
DataType gdbi.DataType
MarkTypes map[string]gdbi.DataType
Query []*gripql.GraphStatement
}

type JobStorage interface {
List(graph string) (chan string, error)
Search(graph string, Query []*gripql.GraphStatement) (chan *gripql.JobStatus, error)
Spool(graph string, stream *Stream) (string, error)
Stream(ctx context.Context, graph, id string) (*Stream, error)
Delete(graph, id string) error
Status(graph, id string) (*gripql.JobStatus, error)
}
156 changes: 156 additions & 0 deletions jobstorage/open_search.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package jobstorage

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"math/rand"
"net/http"
"time"

"github.com/bmeg/grip/gdbi"
"github.com/bmeg/grip/gripql"
"github.com/bmeg/grip/log"
opensearch "github.com/opensearch-project/opensearch-go/v4"
"github.com/opensearch-project/opensearch-go/v4/opensearchapi"
"github.com/opensearch-project/opensearch-go/v4/opensearchutil"
)

type OpenSearchStorage struct {
client *opensearchapi.Client
}

var OS_INDEX_LIST string = "gripql-job-status"

type OpenSearchJob struct {
Index string
Graph string
Status gripql.JobStatus
DataType gdbi.DataType
MarkTypes map[string]gdbi.DataType
StepChecksums []string
}

func NewOpenSearchStorage(addr string, username, password string) (JobStorage, error) {
log.Infof("OpenSearch Job Storage: %s %s", addr, username)
client, err := opensearchapi.NewClient(opensearchapi.Config{
Client: opensearch.Config{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
Addresses: []string{addr},
Username: username,
Password: password,
},
})
if err != nil {
return nil, err
}

resp, err := client.Indices.Exists(context.Background(), opensearchapi.IndicesExistsReq{Indices: []string{OS_INDEX_LIST}})
if err != nil {
if resp.StatusCode == 404 {
//Create the job list index if it doesn't exist
_, err := client.Indices.Create(context.Background(), opensearchapi.IndicesCreateReq{Index: OS_INDEX_LIST})
if err != nil {
return nil, err
}
} else {
log.Errorf("Contact error: %s %#v", err, resp)
return nil, err
}
}
return &OpenSearchStorage{
client: client,
}, nil
}

func (os *OpenSearchStorage) List(graph string) (chan string, error) {
cout := make(chan string, 5)
go func() {
defer close(cout)
searchResp, err := os.client.Search(
context.Background(),
&opensearchapi.SearchReq{
Indices: []string{OS_INDEX_LIST},
Params: opensearchapi.SearchParams{
Query: fmt.Sprintf(`Graph: "%s"`, graph),
},
},
)
if err == nil {
for _, i := range searchResp.Hits.Hits {
d := map[string]any{}
json.Unmarshal(i.Source, &d)
//log.Infof("Search response: %#v", d)
if x, ok := d["Index"]; ok {
cout <- x.(string)
}
}
} else {
log.Errorf("JobList error: %s", err)
}
}()
return cout, nil
}

func (os *OpenSearchStorage) Search(graph string, Query []*gripql.GraphStatement) (chan *gripql.JobStatus, error) {
return nil, nil
}

func (os *OpenSearchStorage) putJob(id string, job *OpenSearchJob) error {
resp, err := os.client.Index(context.Background(), opensearchapi.IndexReq{
Index: OS_INDEX_LIST,
DocumentID: id,
Body: opensearchutil.NewJSONReader(job),
})
log.Infof("Job Index resp: %#v %s", resp, err)
return err
}

func (os *OpenSearchStorage) Spool(graph string, stream *Stream) (string, error) {
jobName := fmt.Sprintf("grip-%10d", rand.Int())
jobID := graph + "-" + jobName

cs, _ := TraversalChecksum(stream.Query)
job := &OpenSearchJob{
Index: jobID,
Graph: graph,
Status: gripql.JobStatus{Query: stream.Query, Id: jobName, Graph: graph, Timestamp: time.Now().Format(time.RFC3339)},
DataType: stream.DataType,
MarkTypes: stream.MarkTypes,
StepChecksums: cs,
}
err := os.putJob(jobID, job)
if err != nil {
return "", err
}
tbStream := MarshalStream(stream.Pipe, 4) //TODO: make worker count configurable
go func() {
job.Status.State = gripql.JobState_RUNNING
log.Infof("Starting Job: %#v", job)
for i := range tbStream {
os.client.Index(context.Background(), opensearchapi.IndexReq{
Index: jobID,
Body: bytes.NewReader(i)})
job.Status.Count += 1
}
job.Status.State = gripql.JobState_COMPLETE
os.putJob(jobID, job)
}()
return jobName, nil
}

func (os *OpenSearchStorage) Stream(ctx context.Context, graph, id string) (*Stream, error) {
return nil, nil
}

func (os *OpenSearchStorage) Delete(graph, id string) error {
return nil
}

func (os *OpenSearchStorage) Status(graph, id string) (*gripql.JobStatus, error) {
return nil, nil
}
Loading
Loading