From 73a7584de6c8fd4a772c4204668f2be348e4a1f3 Mon Sep 17 00:00:00 2001 From: matthewpeterkort Date: Tue, 17 Sep 2024 11:04:29 -0700 Subject: [PATCH 1/3] Adds support for sqlite --- .github/workflows/tests.yml | 51 ++- cmd/server/main.go | 2 + config/config.go | 11 + go.mod | 1 + go.sum | 3 +- psql/graph.go | 36 +- psql/schema.go | 4 +- psql/util.go | 6 +- server/server.go | 3 + sqlite/graph.go | 831 ++++++++++++++++++++++++++++++++++++ sqlite/graphdb.go | 227 ++++++++++ sqlite/index.go | 24 ++ sqlite/schema.go | 142 ++++++ test/sqlite.yml | 7 + 14 files changed, 1308 insertions(+), 40 deletions(-) create mode 100644 sqlite/graph.go create mode 100644 sqlite/graphdb.go create mode 100644 sqlite/index.go create mode 100644 sqlite/schema.go create mode 100644 test/sqlite.yml diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 91ef26d2..8c042a39 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -43,8 +43,7 @@ jobs: - name: run unit tests run: | go test ./test/... -config badger.yml - - + badgerTest: needs: build @@ -65,7 +64,7 @@ jobs: ./grip server --rpc-port 18202 --http-port 18201 --config ./test/badger.yml & sleep 5 make test-conformance - + pebbleTest: needs: build @@ -86,7 +85,7 @@ jobs: ./grip server --rpc-port 18202 --http-port 18201 --config ./test/pebble.yml & sleep 5 make test-conformance - + mongoTest: needs: build @@ -108,7 +107,7 @@ jobs: ./grip server --rpc-port 18202 --http-port 18201 --config ./test/mongo.yml & sleep 5 make test-conformance - + mongoCoreTest: needs: build @@ -130,7 +129,7 @@ jobs: ./grip server --rpc-port 18202 --http-port 18201 --config ./test/mongo-core-processor.yml & sleep 5 make test-conformance - + elasticTest: needs: build @@ -153,11 +152,11 @@ jobs: ./grip server --rpc-port 18202 --http-port 18201 --config ./test/elastic.yml & sleep 5 make test-conformance - - portgresTest: + + postgresTest: needs: build - name: Portgres Test + name: Postgres Test runs-on: ubuntu-latest steps: - name: Check out code @@ -176,7 +175,28 @@ jobs: ./grip server --rpc-port 18202 --http-port 18201 --config ./test/psql.yml & sleep 5 python conformance/run_conformance.py http://localhost:18201 --exclude index aggregations - + + + sqliteTest: + needs: build + name: Sqlite Test + runs-on: ubuntu-latest + steps: + - name: Check out code + uses: actions/checkout@v2 + - name: Python Dependencies for Conformance + run: pip install requests numpy + - name: Download grip + uses: actions/download-artifact@v2 + with: + name: gripBin + - name: Postgres Conformance + run: | + chmod +x grip + ./grip server --rpc-port 18202 --http-port 18201 --config ./test/sqlite.yml & + sleep 5 + python conformance/run_conformance.py http://localhost:18201 --exclude index aggregations + gripperTest: needs: build @@ -224,18 +244,17 @@ jobs: ./grip server --rpc-port 18202 --http-port 18201 --config ./test/badger-auth.yml & sleep 5 # simple auth - # run tests without credentials, should fail + # run tests without credentials, should fail if make test-conformance then echo "ERROR: Conformance tests ran without credentials." ; exit 1 else - echo "Got expected auth error" - fi + echo "Got expected auth error" + fi # run specialized role based tests make test-authorization ARGS="--grip_config_file_path test/badger-auth.yml" - - - + + #gridsTest: # needs: build diff --git a/cmd/server/main.go b/cmd/server/main.go index 3d7d1f2c..1da07614 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -93,6 +93,8 @@ var Cmd = &cobra.Command{ dconf.AddMongoDefault() } else if driver == "grids" { dconf.AddGridsDefault() + } else if driver == "sqlite" { + dconf.AddSqliteDefault() } } if pluginDir != "" { diff --git a/config/config.go b/config/config.go index 08f68eca..b5d6f119 100644 --- a/config/config.go +++ b/config/config.go @@ -15,6 +15,7 @@ import ( "github.com/bmeg/grip/log" "github.com/bmeg/grip/mongo" "github.com/bmeg/grip/psql" + "github.com/bmeg/grip/sqlite" "github.com/bmeg/grip/util" "github.com/bmeg/grip/util/duration" "github.com/bmeg/grip/util/rpc" @@ -35,6 +36,7 @@ type DriverConfig struct { MongoDB *mongo.Config PSQL *psql.Config ExistingSQL *esql.Config + Sqlite *sqlite.Config Gripper *gripper.Config } @@ -101,6 +103,12 @@ func (conf *Config) AddMongoDefault() { conf.Default = "mongo" } +func (conf *Config) AddSqliteDefault() { + c := sqlite.Config{DBName: "grip-sqlite.db"} + conf.Drivers["sqlite"] = DriverConfig{Sqlite: &c} + conf.Default = "sqlite" +} + func (conf *Config) AddGridsDefault() { n := "grip-grids.db" conf.Drivers["grids"] = DriverConfig{Grids: &n} @@ -130,6 +138,9 @@ func TestifyConfig(c *Config) { d.Elasticsearch.DBName = "gripdb-" + rand d.Elasticsearch.Synchronous = true } + if d.Sqlite != nil { + d.Sqlite.DBName = "gripdb-" + rand + } c.Drivers[c.Default] = d } diff --git a/go.mod b/go.mod index 976e8187..2fa8bf45 100644 --- a/go.mod +++ b/go.mod @@ -32,6 +32,7 @@ require ( github.com/lib/pq v1.10.9 github.com/logrusorgru/aurora v2.0.3+incompatible github.com/machinebox/graphql v0.2.2 + github.com/mattn/go-sqlite3 v1.14.23 github.com/minio/minio-go/v7 v7.0.73 github.com/mitchellh/hashstructure/v2 v2.0.2 github.com/mongodb/mongo-tools v0.0.0-20240715143021-aa6a140d3f17 diff --git a/go.sum b/go.sum index a90307be..1915da2b 100644 --- a/go.sum +++ b/go.sum @@ -276,8 +276,9 @@ github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27k github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/mattn/go-sqlite3 v1.14.23 h1:gbShiuAP1W5j9UOksQ06aiiqPMxYecovVGwmTxWtuw0= +github.com/mattn/go-sqlite3 v1.14.23/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= github.com/minio/minio-go/v7 v7.0.73 h1:qr2vi96Qm7kZ4v7LLebjte+MQh621fFWnv93p12htEo= diff --git a/psql/graph.go b/psql/graph.go index 73c2ce7e..5c841355 100644 --- a/psql/graph.go +++ b/psql/graph.go @@ -191,13 +191,13 @@ func (g *Graph) GetVertex(gid string, load bool) *gdbi.Vertex { if load { q = fmt.Sprintf(`SELECT * FROM %s WHERE gid='%s'`, g.v, gid) } - vrow := &row{} + vrow := &Row{} err := g.db.QueryRowx(q).StructScan(vrow) if err != nil { log.WithFields(log.Fields{"error": err, "query": q}).Error("GetVertex: StructScan") return nil } - vertex, err := convertVertexRow(vrow, load) + vertex, err := ConvertVertexRow(vrow, load) if err != nil { log.WithFields(log.Fields{"error": err}).Error("GetVertex: convertVertexRow") return nil @@ -211,13 +211,13 @@ func (g *Graph) GetEdge(gid string, load bool) *gdbi.Edge { if load { q = fmt.Sprintf(`SELECT * FROM %s WHERE gid='%s'`, g.e, gid) } - erow := &row{} + erow := &Row{} err := g.db.QueryRowx(q).StructScan(erow) if err != nil { log.WithFields(log.Fields{"error": err, "query": q}).Error("GetEdge: StructScan") return nil } - edge, err := convertEdgeRow(erow, load) + edge, err := ConvertEdgeRow(erow, load) if err != nil { log.WithFields(log.Fields{"error": err}).Error("GetEdge: convertEdgeRow") return nil @@ -241,12 +241,12 @@ func (g *Graph) GetVertexList(ctx context.Context, load bool) <-chan *gdbi.Verte } defer rows.Close() for rows.Next() { - vrow := &row{} + vrow := &Row{} if err := rows.StructScan(vrow); err != nil { log.WithFields(log.Fields{"error": err}).Error("GetVertexList: StructScan") continue } - v, err := convertVertexRow(vrow, load) + v, err := ConvertVertexRow(vrow, load) if err != nil { log.WithFields(log.Fields{"error": err}).Error("GetVertexList: convertVertexRow") continue @@ -303,12 +303,12 @@ func (g *Graph) GetEdgeList(ctx context.Context, load bool) <-chan *gdbi.Edge { } defer rows.Close() for rows.Next() { - erow := &row{} + erow := &Row{} if err := rows.StructScan(erow); err != nil { log.WithFields(log.Fields{"error": err}).Error("GetEdgeList: StructScan") continue } - e, err := convertEdgeRow(erow, load) + e, err := ConvertEdgeRow(erow, load) if err != nil { log.WithFields(log.Fields{"error": err}).Error("GetEdgeList: convertEdgeRow") continue @@ -352,12 +352,12 @@ func (g *Graph) GetVertexChannel(ctx context.Context, reqChan chan gdbi.ElementL } chunk := map[string]*gdbi.Vertex{} for rows.Next() { - vrow := &row{} + vrow := &Row{} if err := rows.StructScan(vrow); err != nil { log.WithFields(log.Fields{"error": err}).Error("GetVertexChannel: StructScan") continue } - v, err := convertVertexRow(vrow, load) + v, err := ConvertVertexRow(vrow, load) if err != nil { log.WithFields(log.Fields{"error": err}).Error("GetVertexChannel: convertVertexRow") continue @@ -451,12 +451,12 @@ func (g *Graph) GetOutChannel(ctx context.Context, reqChan chan gdbi.ElementLook return } for rows.Next() { - vrow := &row{} + vrow := &Row{} if err := rows.StructScan(vrow); err != nil { log.WithFields(log.Fields{"error": err}).Error("GetOutChannel: StructScan") continue } - v, err := convertVertexRow(vrow, load) + v, err := ConvertVertexRow(vrow, load) if err != nil { log.WithFields(log.Fields{"error": err}).Error("GetOutChannel: convertVertexRow") continue @@ -560,12 +560,12 @@ func (g *Graph) GetInChannel(ctx context.Context, reqChan chan gdbi.ElementLooku return } for rows.Next() { - vrow := &row{} + vrow := &Row{} if err := rows.StructScan(vrow); err != nil { log.WithFields(log.Fields{"error": err}).Error("GetInChannel: StructScan") continue } - v, err := convertVertexRow(vrow, load) + v, err := ConvertVertexRow(vrow, load) if err != nil { log.WithFields(log.Fields{"error": err}).Error("GetInChannel: convertVertexRow") continue @@ -657,12 +657,12 @@ func (g *Graph) GetOutEdgeChannel(ctx context.Context, reqChan chan gdbi.Element return } for rows.Next() { - erow := &row{} + erow := &Row{} if err := rows.StructScan(erow); err != nil { log.WithFields(log.Fields{"error": err}).Error("GetOutEdgeChannel: StructScan") continue } - e, err := convertEdgeRow(erow, load) + e, err := ConvertEdgeRow(erow, load) if err != nil { log.WithFields(log.Fields{"error": err}).Error("GetOutEdgeChannel: convertEdgeRow") continue @@ -754,12 +754,12 @@ func (g *Graph) GetInEdgeChannel(ctx context.Context, reqChan chan gdbi.ElementL return } for rows.Next() { - erow := &row{} + erow := &Row{} if err := rows.StructScan(erow); err != nil { log.WithFields(log.Fields{"error": err}).Error("GetInEdgeChannel: StructScan") continue } - e, err := convertEdgeRow(erow, load) + e, err := ConvertEdgeRow(erow, load) if err != nil { log.WithFields(log.Fields{"error": err}).Error("GetInEdgeChannel: convertEdgeRow") continue diff --git a/psql/schema.go b/psql/schema.go index 5b892ef9..7f5f5f4d 100644 --- a/psql/schema.go +++ b/psql/schema.go @@ -46,12 +46,12 @@ func (db *GraphDB) BuildSchema(ctx context.Context, graphID string, sampleN uint defer rows.Close() schema := make(map[string]interface{}) for rows.Next() { - vrow := &row{} + vrow := &Row{} if err := rows.StructScan(vrow); err != nil { log.WithFields(log.Fields{"error": err}).Error("BuildSchema: StructScan") continue } - v, err := convertVertexRow(vrow, true) + v, err := ConvertVertexRow(vrow, true) if err != nil { log.WithFields(log.Fields{"error": err}).Error("BuildSchema: convertVertexRow") continue diff --git a/psql/util.go b/psql/util.go index 38c3d2c0..45e92dec 100644 --- a/psql/util.go +++ b/psql/util.go @@ -8,7 +8,7 @@ import ( "github.com/bmeg/grip/gdbi" ) -type row struct { +type Row struct { Gid string Label string From string @@ -16,7 +16,7 @@ type row struct { Data []byte } -func convertVertexRow(row *row, load bool) (*gdbi.Vertex, error) { +func ConvertVertexRow(row *Row, load bool) (*gdbi.Vertex, error) { props := make(map[string]interface{}) if load { err := json.Unmarshal(row.Data, &props) @@ -33,7 +33,7 @@ func convertVertexRow(row *row, load bool) (*gdbi.Vertex, error) { return v, nil } -func convertEdgeRow(row *row, load bool) (*gdbi.Edge, error) { +func ConvertEdgeRow(row *Row, load bool) (*gdbi.Edge, error) { props := make(map[string]interface{}) if load { err := json.Unmarshal(row.Data, &props) diff --git a/server/server.go b/server/server.go index 2ffb33f2..d1d915a1 100644 --- a/server/server.go +++ b/server/server.go @@ -17,6 +17,7 @@ import ( "github.com/bmeg/grip/gripql" "github.com/bmeg/grip/jobstorage" "github.com/bmeg/grip/log" + "github.com/bmeg/grip/sqlite" "github.com/felixge/httpsnoop" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" @@ -136,6 +137,8 @@ func StartDriver(d config.DriverConfig, sources map[string]gripper.GRIPSourceCli return psql.NewGraphDB(*d.PSQL) } else if d.ExistingSQL != nil { return esql.NewGraphDB(*d.ExistingSQL) + } else if d.Sqlite != nil { + return sqlite.NewGraphDB(*d.Sqlite) } else if d.Gripper != nil { return gripper.NewGDBFromConfig(d.Gripper.Graph, d.Gripper.Mapping, sources) } diff --git a/sqlite/graph.go b/sqlite/graph.go new file mode 100644 index 00000000..7281f25e --- /dev/null +++ b/sqlite/graph.go @@ -0,0 +1,831 @@ +package sqlite + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/bmeg/grip/engine/core" + "github.com/bmeg/grip/gdbi" + "github.com/bmeg/grip/log" + "github.com/bmeg/grip/psql" + "github.com/bmeg/grip/timestamp" + "github.com/bmeg/grip/util" + "github.com/jmoiron/sqlx" + + _ "github.com/mattn/go-sqlite3" +) + +const batchSize int = 100 + +type Graph struct { + db *sqlx.DB + ts *timestamp.Timestamp + v string + e string + graph string +} + +// GetTimestamp gets the timestamp of last update +func (g *Graph) GetTimestamp() string { + return g.ts.Get(g.graph) +} + +// AddVertex adds a vertex to the database +func (g *Graph) AddVertex(vertices []*gdbi.Vertex) error { + txn, err := g.db.Begin() + if err != nil { + return fmt.Errorf("AddVertex: Begin Txn: %v", err) + } + + s := fmt.Sprintf( + `INSERT INTO %s (gid, label, data) VALUES ($1, $2, $3) + ON CONFLICT (gid) DO UPDATE SET + gid = excluded.gid, + label = excluded.label, + data = excluded.data;`, + g.v, + ) + stmt, err := txn.Prepare(s) + if err != nil { + return fmt.Errorf("AddVertex: Prepare Stmt: %v", err) + } + + for _, v := range vertices { + js, err := json.Marshal(v.Data) + if err != nil { + return fmt.Errorf("AddVertex: Stmt.Exec: %v", err) + } + _, err = stmt.Exec(v.ID, v.Label, js) + if err != nil { + return fmt.Errorf("AddVertex: Stmt.Exec: %v", err) + } + } + + err = stmt.Close() + if err != nil { + return fmt.Errorf("AddVertex: Stmt.Close: %v", err) + } + + err = txn.Commit() + if err != nil { + return fmt.Errorf("AddVertex: Txn.Commit: %v", err) + } + + return nil +} + +func (g *Graph) AddEdge(edges []*gdbi.Edge) error { + txn, err := g.db.Begin() + if err != nil { + return fmt.Errorf("AddEdge: Begin Txn: %v", err) + } + + s := fmt.Sprintf( + `INSERT INTO %s (gid, label, "from", "to", data) VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (gid) DO UPDATE SET + gid = excluded.gid, + label = excluded.label, + "from" = excluded."from", + "to" = excluded."to", + data = excluded.data;`, + g.e, + ) + stmt, err := txn.Prepare(s) + if err != nil { + return fmt.Errorf("AddEdge: Prepare Stmt: %v", err) + } + + for _, e := range edges { + js, err := json.Marshal(e.Data) + if err != nil { + return fmt.Errorf("AddEdge: Stmt.Exec: %v", err) + } + _, err = stmt.Exec(e.ID, e.Label, e.From, e.To, js) + if err != nil { + return fmt.Errorf("AddEdge: Stmt.Exec: %v", err) + } + } + + err = stmt.Close() + if err != nil { + return fmt.Errorf("AddEdge: Stmt.Close: %v", err) + } + + err = txn.Commit() + if err != nil { + return fmt.Errorf("AddEdge: Txn.Commit: %v", err) + } + + return nil +} + +func (g *Graph) GetVertex(gid string, load bool) *gdbi.Vertex { + q := fmt.Sprintf(`SELECT gid, label FROM %s WHERE gid='%s'`, g.v, gid) + if load { + q = fmt.Sprintf(`SELECT * FROM %s WHERE gid='%s'`, g.v, gid) + } + vrow := &psql.Row{} + err := g.db.QueryRowx(q).StructScan(vrow) + if err != nil { + log.WithFields(log.Fields{"error": err, "query": q}).Error("GetVertex: StructScan") + return nil + } + vertex, err := psql.ConvertVertexRow(vrow, load) + if err != nil { + log.WithFields(log.Fields{"error": err}).Error("GetVertex: ConvertVertexRow") + return nil + } + return vertex +} + +func (g *Graph) GetEdge(gid string, load bool) *gdbi.Edge { + q := fmt.Sprintf(`SELECT gid, label, "from", "to" FROM %s WHERE gid='%s'`, g.e, gid) + if load { + q = fmt.Sprintf(`SELECT * FROM %s WHERE gid='%s'`, g.e, gid) + } + erow := &psql.Row{} + err := g.db.QueryRowx(q).StructScan(erow) + if err != nil { + log.WithFields(log.Fields{"error": err, "query": q}).Error("GetEdge: StructScan") + return nil + } + edge, err := psql.ConvertEdgeRow(erow, load) + if err != nil { + log.WithFields(log.Fields{"error": err}).Error("GetEdge: convertEdgeRow") + return nil + } + return edge +} + +func (g *Graph) BulkAdd(stream <-chan *gdbi.GraphElement) error { + return util.StreamBatch(stream, 50, g.graph, g.AddVertex, g.AddEdge) +} + +func (g *Graph) BulkDel(Data *gdbi.DeleteData) error { + for _, v := range Data.Edges { + if err := g.DelEdge(v); err != nil { + return err + } + } + for _, v := range Data.Vertices { + if err := g.DelVertex(v); err != nil { + return err + } + } + return nil +} + +// Compiler returns a query compiler that uses the graph +func (g *Graph) Compiler() gdbi.Compiler { + return core.NewCompiler(g, core.IndexStartOptimize) //TODO: probably a better optimizer for vertex label search) +} + +// DelEdge is not implemented in the SQL driver +func (g *Graph) DelEdge(key string) error { + stmt := fmt.Sprintf("DELETE FROM %s WHERE gid='%s'", g.e, key) + _, err := g.db.Exec(stmt) + if err != nil { + return fmt.Errorf("deleting edge: %v", err) + } + return nil +} + +// DelVertex is not implemented in the SQL driver +func (g *Graph) DelVertex(key string) error { + stmt := fmt.Sprintf("DELETE FROM %s WHERE gid='%s'", g.v, key) + _, err := g.db.Exec(stmt) + if err != nil { + return fmt.Errorf("deleting vertex: %v", err) + } + + stmt = fmt.Sprintf(`DELETE FROM %s WHERE "from"='%s'`, g.e, key) + _, err = g.db.Exec(stmt) + if err != nil { + return fmt.Errorf("deleting outgoing edges for %s: %v", key, err) + } + + stmt = fmt.Sprintf(`DELETE FROM %s WHERE "to"='%s'`, g.e, key) + _, err = g.db.Exec(stmt) + if err != nil { + return fmt.Errorf("deleting incoming edges for %s: %v", key, err) + } + + return nil +} + +// GetVertexList produces a channel of all vertices in the graph +func (g *Graph) GetVertexList(ctx context.Context, load bool) <-chan *gdbi.Vertex { + o := make(chan *gdbi.Vertex, 100) + go func() { + defer close(o) + q := fmt.Sprintf("SELECT gid, label FROM %s", g.v) + if load { + q = fmt.Sprintf(`SELECT * FROM %s`, g.v) + } + rows, err := g.db.QueryxContext(ctx, q) + if err != nil { + log.WithFields(log.Fields{"error": err}).Error("GetVertexList: QueryxContext") + return + } + defer rows.Close() + for rows.Next() { + vrow := &psql.Row{} + if err := rows.StructScan(vrow); err != nil { + log.WithFields(log.Fields{"error": err}).Error("GetVertexList: StructScan") + continue + } + v, err := psql.ConvertVertexRow(vrow, load) + if err != nil { + log.WithFields(log.Fields{"error": err}).Error("GetVertexList: convertVertexRow") + continue + } + o <- v + } + if err := rows.Err(); err != nil { + log.WithFields(log.Fields{"error": err}).Error("GetVertexList: iterating") + } + }() + return o +} + +// VertexLabelScan produces a channel of all vertex ids where the vertex label matches `label` +func (g *Graph) VertexLabelScan(ctx context.Context, label string) chan string { + o := make(chan string, 100) + go func() { + defer close(o) + q := fmt.Sprintf("SELECT gid FROM %s WHERE label='%s'", g.v, label) + rows, err := g.db.QueryxContext(ctx, q) + if err != nil { + log.WithFields(log.Fields{"error": err}).Error("VertexLabelScan: QueryxContext") + return + } + defer rows.Close() + for rows.Next() { + var gid string + if err := rows.Scan(&gid); err != nil { + log.WithFields(log.Fields{"error": err}).Error("VertexLabelScan: Scan") + continue + } + o <- gid + } + if err := rows.Err(); err != nil { + log.WithFields(log.Fields{"error": err}).Error("VertexLabelScan: iterating") + } + }() + return o +} + +// GetEdgeList produces a channel of all edges in the graph +func (g *Graph) GetEdgeList(ctx context.Context, load bool) <-chan *gdbi.Edge { + o := make(chan *gdbi.Edge, 100) + go func() { + defer close(o) + q := fmt.Sprintf(`SELECT gid, label, "from", "to" FROM %s`, g.e) + if load { + q = fmt.Sprintf(`SELECT * FROM %s`, g.e) + } + rows, err := g.db.QueryxContext(ctx, q) + if err != nil { + log.WithFields(log.Fields{"error": err}).Error("GetEdgeList: QueryxContext") + return + } + defer rows.Close() + for rows.Next() { + erow := &psql.Row{} + if err := rows.StructScan(erow); err != nil { + log.WithFields(log.Fields{"error": err}).Error("GetEdgeList: StructScan") + continue + } + e, err := psql.ConvertEdgeRow(erow, load) + if err != nil { + log.WithFields(log.Fields{"error": err}).Error("GetEdgeList: convertEdgeRow") + continue + } + o <- e + } + if err := rows.Err(); err != nil { + log.WithFields(log.Fields{"error": err}).Error("GetEdgeList: iterating") + } + }() + return o +} + +// GetVertexChannel is passed a channel of vertex ids and it produces a channel of vertices +func (g *Graph) GetVertexChannel(ctx context.Context, reqChan chan gdbi.ElementLookup, load bool) chan gdbi.ElementLookup { + batches := gdbi.LookupBatcher(reqChan, batchSize, time.Microsecond) + + o := make(chan gdbi.ElementLookup, 100) + go func() { + defer close(o) + for batch := range batches { + idBatch := make([]string, 0, len(batch)) + signals := []gdbi.ElementLookup{} + for i := range batch { + if batch[i].IsSignal() { + signals = append(signals, batch[i]) + } else { + idBatch = append(idBatch, fmt.Sprintf("'%s'", batch[i].ID)) + } + } + if len(idBatch) > 0 { + ids := strings.Join(idBatch, ", ") + q := fmt.Sprintf("SELECT gid, label FROM %s WHERE gid IN (%s)", g.v, ids) + if load { + q = fmt.Sprintf("SELECT * FROM %s WHERE gid IN (%s)", g.v, ids) + } + rows, err := g.db.Queryx(q) + if err != nil { + log.WithFields(log.Fields{"error": err}).Error("GetVertexChannel: Queryx") + return + } + chunk := map[string]*gdbi.Vertex{} + for rows.Next() { + vrow := &psql.Row{} + if err := rows.StructScan(vrow); err != nil { + log.WithFields(log.Fields{"error": err}).Error("GetVertexChannel: StructScan") + continue + } + v, err := psql.ConvertVertexRow(vrow, load) + if err != nil { + log.WithFields(log.Fields{"error": err}).Error("GetVertexChannel: convertVertexRow") + continue + } + chunk[v.ID] = v + } + if err := rows.Err(); err != nil { + log.WithFields(log.Fields{"error": err}).Error("GetVertexChannel: iterating") + } + for _, id := range batch { + if x, ok := chunk[id.ID]; ok { + id.Vertex = x + o <- id + } + } + rows.Close() + } + for i := range signals { + o <- signals[i] + } + } + }() + return o +} + +// GetOutChannel is passed a channel of vertex ids and finds the connected vertices via outgoing edges +func (g *Graph) GetOutChannel(ctx context.Context, reqChan chan gdbi.ElementLookup, load bool, emitNull bool, edgeLabels []string) chan gdbi.ElementLookup { + batches := gdbi.LookupBatcher(reqChan, batchSize, time.Microsecond) + + o := make(chan gdbi.ElementLookup, 100) + go func() { + defer close(o) + for batch := range batches { + idBatch := make([]string, 0, len(batch)) + batchMap := make(map[string][]gdbi.ElementLookup, len(batch)) + batchMapReturnCount := make(map[string]int) + signals := []gdbi.ElementLookup{} + for i := range batch { + if batch[i].IsSignal() { + signals = append(signals, batch[i]) + } else { + idBatch = append(idBatch, fmt.Sprintf("'%s'", batch[i].ID)) + batchMap[batch[i].ID] = append(batchMap[batch[i].ID], batch[i]) + batchMapReturnCount[batch[i].ID] = 0 + } + } + if len(idBatch) > 0 { + ids := strings.Join(idBatch, ", ") + q := fmt.Sprintf( + `SELECT %s.gid, %s.label, %s."from" FROM %s INNER JOIN %s ON %s."to"=%s.gid WHERE %s."from" IN (%s)`, + // SELECT + g.v, g.v, g.e, + // FROM + g.v, + // INNER JOIN + g.e, + // ON + g.e, g.v, + // WHERE + g.e, + // IN + ids, + ) + if load { + q = fmt.Sprintf( + `SELECT %s.*, %s."from" FROM %s INNER JOIN %s ON %s."to"=%s.gid WHERE %s."from" IN (%s)`, + // SELECT + g.v, g.e, + // FROM + g.v, + // INNER JOIN + g.e, + // ON + g.e, g.v, + // WHERE + g.e, + // IN + ids, + ) + } + if len(edgeLabels) > 0 { + labels := make([]string, len(edgeLabels)) + for i := range edgeLabels { + labels[i] = fmt.Sprintf("'%s'", edgeLabels[i]) + } + q = fmt.Sprintf("%s AND %s.label IN (%s)", q, g.e, strings.Join(labels, ", ")) + } + rows, err := g.db.Queryx(q) + if err != nil { + log.WithFields(log.Fields{"error": err, "query": q}).Error("GetOutChannel: Queryx") + return + } + for rows.Next() { + vrow := &psql.Row{} + if err := rows.StructScan(vrow); err != nil { + log.WithFields(log.Fields{"error": err}).Error("GetOutChannel: StructScan") + continue + } + v, err := psql.ConvertVertexRow(vrow, load) + if err != nil { + log.WithFields(log.Fields{"error": err}).Error("GetOutChannel: convertVertexRow") + continue + } + r := batchMap[vrow.From] + batchMapReturnCount[vrow.From]++ + for _, ri := range r { + ri.Vertex = v + o <- ri + } + } + if err := rows.Err(); err != nil { + log.WithFields(log.Fields{"error": err}).Error("GetOutChannel: iterating") + } + rows.Close() + if emitNull { + for id, count := range batchMapReturnCount { + if count == 0 { + r := batchMap[id] + for _, ri := range r { + ri.Vertex = nil + o <- ri + } + } + } + } + } + for i := range signals { + o <- signals[i] + } + } + }() + return o +} + +// GetInChannel is passed a channel of vertex ids and finds the connected vertices via incoming edges +func (g *Graph) GetInChannel(ctx context.Context, reqChan chan gdbi.ElementLookup, load bool, emitNull bool, edgeLabels []string) chan gdbi.ElementLookup { + batches := gdbi.LookupBatcher(reqChan, batchSize, time.Microsecond) + + o := make(chan gdbi.ElementLookup, 100) + go func() { + defer close(o) + for batch := range batches { + idBatch := make([]string, 0, len(batch)) + batchMap := make(map[string][]gdbi.ElementLookup, len(batch)) + batchMapReturnCount := make(map[string]int) + signals := []gdbi.ElementLookup{} + for i := range batch { + if batch[i].IsSignal() { + signals = append(signals, batch[i]) + } else { + idBatch = append(idBatch, fmt.Sprintf("'%s'", batch[i].ID)) + batchMap[batch[i].ID] = append(batchMap[batch[i].ID], batch[i]) + batchMapReturnCount[batch[i].ID] = 0 + } + } + if len(idBatch) > 0 { + ids := strings.Join(idBatch, ", ") + q := fmt.Sprintf( + `SELECT %s.gid, %s.label, %s."to" FROM %s INNER JOIN %s ON %s."from"=%s.gid WHERE %s."to" IN (%s)`, + // SELECT + g.v, g.v, g.e, + // FROM + g.v, + // INNER JOIN + g.e, + // ON + g.e, g.v, + // WHERE + g.e, + // IN + ids, + ) + if load { + q = fmt.Sprintf( + `SELECT %s.*, %s."to" FROM %s INNER JOIN %s ON %s."from"=%s.gid WHERE %s."to" IN (%s)`, + // SELECT + g.v, g.e, + // FROM + g.v, + // INNER JOIN + g.e, + // ON + g.e, g.v, + // WHERE + g.e, + // IN + ids, + ) + } + if len(edgeLabels) > 0 { + labels := make([]string, len(edgeLabels)) + for i := range edgeLabels { + labels[i] = fmt.Sprintf("'%s'", edgeLabels[i]) + } + q = fmt.Sprintf("%s AND %s.label IN (%s)", q, g.e, strings.Join(labels, ", ")) + } + rows, err := g.db.Queryx(q) + if err != nil { + log.WithFields(log.Fields{"error": err, "query": q}).Error("GetInChannel: Queryx") + return + } + for rows.Next() { + vrow := &psql.Row{} + if err := rows.StructScan(vrow); err != nil { + log.WithFields(log.Fields{"error": err}).Error("GetInChannel: StructScan") + continue + } + v, err := psql.ConvertVertexRow(vrow, load) + if err != nil { + log.WithFields(log.Fields{"error": err}).Error("GetInChannel: convertVertexRow") + continue + } + r := batchMap[vrow.To] + batchMapReturnCount[vrow.To]++ + for _, ri := range r { + ri.Vertex = v + o <- ri + } + } + if err := rows.Err(); err != nil { + log.WithFields(log.Fields{"error": err}).Error("GetInChannel: iterating") + } + rows.Close() + if emitNull { + for id, count := range batchMapReturnCount { + if count == 0 { + r := batchMap[id] + for _, ri := range r { + ri.Vertex = nil + o <- ri + } + } + } + } + } + for i := range signals { + o <- signals[i] + } + } + }() + return o +} + +// GetOutEdgeChannel is passed a channel of vertex ids and finds the outgoing edges +func (g *Graph) GetOutEdgeChannel(ctx context.Context, reqChan chan gdbi.ElementLookup, load bool, emitNull bool, edgeLabels []string) chan gdbi.ElementLookup { + batches := gdbi.LookupBatcher(reqChan, batchSize, time.Microsecond) + + o := make(chan gdbi.ElementLookup, 100) + go func() { + defer close(o) + for batch := range batches { + idBatch := make([]string, 0, len(batch)) + batchMap := make(map[string][]gdbi.ElementLookup, len(batch)) + batchMapReturnCount := make(map[string]int) + signals := []gdbi.ElementLookup{} + for i := range batch { + if batch[i].IsSignal() { + signals = append(signals, batch[i]) + } else { + idBatch = append(idBatch, fmt.Sprintf("'%s'", batch[i].ID)) + batchMap[batch[i].ID] = append(batchMap[batch[i].ID], batch[i]) + batchMapReturnCount[batch[i].ID] = 0 + } + } + if len(idBatch) > 0 { + ids := strings.Join(idBatch, ", ") + q := fmt.Sprintf( + `SELECT gid, label, "from", "to" FROM %s WHERE %s."from" IN (%s)`, + // FROM + g.e, + // WHERE + g.e, + // IN + ids, + ) + if load { + q = fmt.Sprintf( + `SELECT * FROM %s WHERE %s."from" IN (%s)`, + // FROM + g.e, + // WHERE + g.e, + // IN + ids, + ) + } + if len(edgeLabels) > 0 { + labels := make([]string, len(edgeLabels)) + for i := range edgeLabels { + labels[i] = fmt.Sprintf("'%s'", edgeLabels[i]) + } + q = fmt.Sprintf("%s AND %s.label IN (%s)", q, g.e, strings.Join(labels, ", ")) + } + rows, err := g.db.Queryx(q) + if err != nil { + log.WithFields(log.Fields{"error": err, "query": q}).Error("GetOutEdgeChannel: Queryx") + return + } + for rows.Next() { + erow := &psql.Row{} + if err := rows.StructScan(erow); err != nil { + log.WithFields(log.Fields{"error": err}).Error("GetOutEdgeChannel: StructScan") + continue + } + e, err := psql.ConvertEdgeRow(erow, load) + if err != nil { + log.WithFields(log.Fields{"error": err}).Error("GetOutEdgeChannel: convertEdgeRow") + continue + } + r := batchMap[erow.From] + batchMapReturnCount[erow.From]++ + for _, ri := range r { + ri.Edge = e + o <- ri + } + } + if err := rows.Err(); err != nil { + log.WithFields(log.Fields{"error": err}).Error("GetOutEdgeChannel: iterating") + } + rows.Close() + if emitNull { + for id, count := range batchMapReturnCount { + if count == 0 { + r := batchMap[id] + for _, ri := range r { + ri.Edge = nil + o <- ri + } + } + } + } + } + for i := range signals { + o <- signals[i] + } + } + }() + return o +} + +// GetInEdgeChannel is passed a channel of vertex ids and finds the incoming edges +func (g *Graph) GetInEdgeChannel(ctx context.Context, reqChan chan gdbi.ElementLookup, load bool, emitNull bool, edgeLabels []string) chan gdbi.ElementLookup { + batches := gdbi.LookupBatcher(reqChan, batchSize, time.Microsecond) + + o := make(chan gdbi.ElementLookup, 100) + go func() { + defer close(o) + for batch := range batches { + idBatch := make([]string, 0, len(batch)) + batchMap := make(map[string][]gdbi.ElementLookup, len(batch)) + batchMapReturnCount := make(map[string]int) + signals := []gdbi.ElementLookup{} + for i := range batch { + if batch[i].IsSignal() { + signals = append(signals, batch[i]) + } else { + idBatch = append(idBatch, fmt.Sprintf("'%s'", batch[i].ID)) + batchMap[batch[i].ID] = append(batchMap[batch[i].ID], batch[i]) + batchMapReturnCount[batch[i].ID] = 0 + } + } + if len(idBatch) > 0 { + ids := strings.Join(idBatch, ", ") + q := fmt.Sprintf( + `SELECT gid, label, "from", "to" FROM %s WHERE %s."to" IN (%s)`, + // FROM + g.e, + // WHERE + g.e, + // IN + ids, + ) + if load { + q = fmt.Sprintf( + `SELECT * FROM %s WHERE %s."to" IN (%s)`, + // FROM + g.e, + // WHERE + g.e, + // IN + ids, + ) + } + if len(edgeLabels) > 0 { + labels := make([]string, len(edgeLabels)) + for i := range edgeLabels { + labels[i] = fmt.Sprintf("'%s'", edgeLabels[i]) + } + q = fmt.Sprintf("%s AND %s.label IN (%s)", q, g.e, strings.Join(labels, ", ")) + } + rows, err := g.db.Queryx(q) + if err != nil { + log.WithFields(log.Fields{"error": err, "query": q}).Error("GetInEdgeChannel: Queryx") + return + } + for rows.Next() { + erow := &psql.Row{} + if err := rows.StructScan(erow); err != nil { + log.WithFields(log.Fields{"error": err}).Error("GetInEdgeChannel: StructScan") + continue + } + e, err := psql.ConvertEdgeRow(erow, load) + if err != nil { + log.WithFields(log.Fields{"error": err}).Error("GetInEdgeChannel: convertEdgeRow") + continue + } + r := batchMap[erow.To] + batchMapReturnCount[erow.To]++ + for _, ri := range r { + ri.Edge = e + o <- ri + } + } + if err := rows.Err(); err != nil { + log.WithFields(log.Fields{"error": err}).Error("GetInEdgeChannel: iterating") + } + rows.Close() + if emitNull { + for id, count := range batchMapReturnCount { + if count == 0 { + r := batchMap[id] + for _, ri := range r { + ri.Edge = nil + o <- ri + } + } + } + } + } + for i := range signals { + o <- signals[i] + } + } + }() + return o +} + +// ListVertexLabels returns a list of vertex types in the graph +func (g *Graph) ListVertexLabels() ([]string, error) { + q := fmt.Sprintf("SELECT DISTINCT label FROM %s", g.v) + rows, err := g.db.Queryx(q) + if err != nil { + return nil, err + } + labels := []string{} + defer rows.Close() + for rows.Next() { + var l string + if err := rows.Scan(&l); err != nil { + return nil, err + } + labels = append(labels, l) + } + if err := rows.Err(); err != nil { + return nil, err + } + return labels, nil +} + +// ListEdgeLabels returns a list of edge types in the graph +func (g *Graph) ListEdgeLabels() ([]string, error) { + q := fmt.Sprintf("SELECT DISTINCT label FROM %s", g.e) + rows, err := g.db.Queryx(q) + if err != nil { + return nil, err + } + labels := []string{} + defer rows.Close() + for rows.Next() { + var l string + if err := rows.Scan(&l); err != nil { + return nil, err + } + labels = append(labels, l) + } + if err := rows.Err(); err != nil { + return nil, err + } + return labels, nil +} diff --git a/sqlite/graphdb.go b/sqlite/graphdb.go new file mode 100644 index 00000000..8344c117 --- /dev/null +++ b/sqlite/graphdb.go @@ -0,0 +1,227 @@ +package sqlite + +import ( + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/bmeg/grip/gdbi" + "github.com/bmeg/grip/gripql" + "github.com/bmeg/grip/log" + "github.com/bmeg/grip/timestamp" + "github.com/jmoiron/sqlx" +) + +// Config describes the configuration for the sql driver. +// See https://godoc.org/github.com/lib/pq#hdr-Connection_String_Parameters for details. +type Config struct { + DBName string +} + +// GraphDB manages graphs in the database +type GraphDB struct { + db *sqlx.DB + ts *timestamp.Timestamp +} + +// Create dir/file structure from path string +func createFileWithDirs(path string) error { + var dir string + if strings.Contains(path, "/") { + dir = filepath.Dir(path) + } + if dir != "." && dir != "" { + err := os.MkdirAll(dir, 0755) + if err != nil { + return fmt.Errorf("failed to create directories: %w", err) + } + } + file, err := os.Create(path) + if err != nil { + return fmt.Errorf("failed to create file: %w", err) + } + defer file.Close() + return nil +} + +// NewGraphDB creates a new GraphDB graph database interface +func NewGraphDB(conf Config) (gdbi.GraphDB, error) { + log.Info("Starting Sqlite3 Driver") + err := createFileWithDirs(conf.DBName) + if err != nil { + return nil, err + } + db, err := sqlx.Connect("sqlite3", conf.DBName) + if err != nil { + return nil, fmt.Errorf("connecting to database: %v", err) + } + db.SetMaxIdleConns(10) + + stmt := "CREATE TABLE IF NOT EXISTS graphs (graph_name varchar PRIMARY KEY, sanitized_graph_name varchar NOT NULL, vertex_table varchar NOT NULL, edge_table varchar NOT NULL)" + _, err = db.Exec(stmt) + if err != nil { + return nil, fmt.Errorf("creating graphs table: %v", err) + } + + ts := timestamp.NewTimestamp() + gdb := &GraphDB{db, &ts} + for _, i := range gdb.ListGraphs() { + gdb.ts.Touch(i) + } + return gdb, nil +} + +// Close the connection +func (db *GraphDB) Close() error { + return db.db.Close() +} + +// AddGraph creates a new graph named `graph` +func (db *GraphDB) AddGraph(graph string) error { + err := gripql.ValidateGraphName(graph) + if err != nil { + return err + } + + sanitizedName := strings.Replace(graph, "-", "_", -1) + vertexTable := fmt.Sprintf("%s_vertices", sanitizedName) + edgeTable := fmt.Sprintf("%s_edges", sanitizedName) + + stmt := fmt.Sprintf("INSERT INTO graphs (graph_name, sanitized_graph_name, vertex_table, edge_table) VALUES ('%s', '%s', '%s', '%s') ON CONFLICT DO NOTHING", graph, sanitizedName, vertexTable, edgeTable) + _, err = db.db.Exec(stmt) + if err != nil { + return fmt.Errorf("inserting row into graphs table: %v", err) + } + + stmt = fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (gid varchar PRIMARY KEY, label varchar NOT NULL, data jsonb)", vertexTable) + _, err = db.db.Exec(stmt) + if err != nil { + return fmt.Errorf("creating vertex table: %v", err) + } + + toIndex := []string{"label"} + for _, f := range toIndex { + err := db.createIndex(vertexTable, f) + if err != nil { + log.WithFields(log.Fields{"error": err}).Error("AddGraph: creating index") + } + } + + stmt = fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (gid varchar PRIMARY KEY, label varchar NOT NULL, "from" varchar NOT NULL, "to" varchar NOT NULL, data jsonb)`, edgeTable) + _, err = db.db.Exec(stmt) + if err != nil { + return fmt.Errorf("creating edge table: %v", err) + } + + toIndex = []string{"label", "from", "to"} + for _, f := range toIndex { + err := db.createIndex(edgeTable, f) + if err != nil { + log.WithFields(log.Fields{"error": err}).Error("AddGraph: creating index") + } + } + return nil +} + +func (db *GraphDB) createIndex(table, field string) error { + stmt := fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s_%s ON %s ("%s")`, table, field, table, field) + _, err := db.db.Exec(stmt) + if err != nil { + return fmt.Errorf("creating index for table %s on field %s: %v", table, field, err) + } + return nil +} + +type graphInfo struct { + GraphName string + SanitizedGraphName string + VertexTable string + EdgeTable string +} + +func (db *GraphDB) getGraphInfo(graph string) (*graphInfo, error) { + q := fmt.Sprintf("SELECT * FROM graphs where graph_name='%s'", graph) + info := make(map[string]interface{}) + err := db.db.QueryRowx(q).MapScan(info) + if err != nil { + return nil, fmt.Errorf("querying graphs table: %v", err) + } + return &graphInfo{ + GraphName: info["graph_name"].(string), + SanitizedGraphName: info["sanitized_graph_name"].(string), + VertexTable: info["vertex_table"].(string), + EdgeTable: info["edge_table"].(string), + }, nil +} + +// DeleteGraph deletes an existing graph named `graph` +func (db *GraphDB) DeleteGraph(graph string) error { + info, err := db.getGraphInfo(graph) + if err != nil { + return fmt.Errorf("DeleteGraph: %v", err) + } + + stmt := fmt.Sprintf("DROP TABLE IF EXISTS %s_vertices", info.VertexTable) + _, err = db.db.Exec(stmt) + if err != nil { + return fmt.Errorf("DeleteGraph: dropping vertex table: %v", err) + } + + stmt = fmt.Sprintf("DROP TABLE IF EXISTS %s_edges", info.EdgeTable) + _, err = db.db.Exec(stmt) + if err != nil { + return fmt.Errorf("DeleteGraph: dropping edge table: %v", err) + } + + stmt = fmt.Sprintf("DELETE FROM graphs where graph_name='%s'", graph) + _, err = db.db.Exec(stmt) + if err != nil { + return fmt.Errorf("DeleteGraph: deleting row from graphs table: %v", err) + } + + return nil +} + +// ListGraphs lists the graphs managed by this driver +func (db *GraphDB) ListGraphs() []string { + //fmt.Printf("PSQL listing graphs\n") + out := []string{} + rows, err := db.db.Queryx("SELECT graph_name FROM graphs") + if err != nil { + log.WithFields(log.Fields{"error": err}).Error("ListGraphs: Queryx") + return out + } + defer rows.Close() + var table string + for rows.Next() { + if err := rows.Scan(&table); err != nil { + log.WithFields(log.Fields{"error": err}).Error("ListGraphs: Scan") + return out + } + //fmt.Printf("Found %s\n", table) + out = append(out, table) + //out = append(out, strings.SplitN(table, "_", 2)[0]) + } + if err := rows.Err(); err != nil { + log.WithFields(log.Fields{"error": err}).Error("ListGraphs: iterating") + return out + } + //fmt.Printf("Graphs: %s\n", out) + return out +} + +// Graph obtains the gdbi.DBI for a particular graph +func (db *GraphDB) Graph(graph string) (gdbi.GraphInterface, error) { + info, err := db.getGraphInfo(graph) + if err != nil { + return nil, fmt.Errorf("graph '%s' was not found: %v", graph, err) + } + return &Graph{ + db: db.db, + v: info.VertexTable, + e: info.EdgeTable, + ts: db.ts, + graph: graph, + }, nil +} diff --git a/sqlite/index.go b/sqlite/index.go new file mode 100644 index 00000000..2862fd58 --- /dev/null +++ b/sqlite/index.go @@ -0,0 +1,24 @@ +package sqlite + +import ( + "errors" + + "github.com/bmeg/grip/gripql" +) + +// AddVertexIndex add index to vertices +func (g *Graph) AddVertexIndex(label string, field string) error { + return errors.New("not implemented") +} + +// DeleteVertexIndex delete index from vertices +func (g *Graph) DeleteVertexIndex(label string, field string) error { + return errors.New("not implemented") +} + +// GetVertexIndexList lists indices +func (g *Graph) GetVertexIndexList() <-chan *gripql.IndexID { + o := make(chan *gripql.IndexID) + defer close(o) + return o +} diff --git a/sqlite/schema.go b/sqlite/schema.go new file mode 100644 index 00000000..345473e1 --- /dev/null +++ b/sqlite/schema.go @@ -0,0 +1,142 @@ +package sqlite + +import ( + "context" + "fmt" + + "github.com/bmeg/grip/gripql" + "github.com/bmeg/grip/log" + "github.com/bmeg/grip/psql" + "github.com/bmeg/grip/util" + "golang.org/x/sync/errgroup" + "google.golang.org/protobuf/types/known/structpb" +) + +// BuildSchema returns the schema of a specific graph in the database +func (db *GraphDB) BuildSchema(ctx context.Context, graphID string, sampleN uint32, random bool) (*gripql.Graph, error) { + + var g errgroup.Group + + gi, err := db.Graph(graphID) + if err != nil { + return nil, err + } + + graph := gi.(*Graph) + + vSchemaChan := make(chan *gripql.Vertex) + eSchemaChan := make(chan *gripql.Edge) + + vLabels, err := graph.ListVertexLabels() + if err != nil { + return nil, err + } + + for _, label := range vLabels { + label := label + if label == "" { + continue + } + g.Go(func() error { + q := fmt.Sprintf("SELECT * FROM %s WHERE label='%s'", graph.v, label) + rows, err := graph.db.QueryxContext(ctx, q) + if err != nil { + log.WithFields(log.Fields{"error": err}).Error("BuildSchema: QueryxContext") + return err + } + defer rows.Close() + schema := make(map[string]interface{}) + for rows.Next() { + vrow := &psql.Row{} + if err := rows.StructScan(vrow); err != nil { + log.WithFields(log.Fields{"error": err}).Error("BuildSchema: StructScan") + continue + } + v, err := psql.ConvertVertexRow(vrow, true) + if err != nil { + log.WithFields(log.Fields{"error": err}).Error("BuildSchema: convertVertexRow") + continue + } + util.MergeMaps(schema, v.Data) + } + + sSchema, _ := structpb.NewStruct(schema) + vSchema := &gripql.Vertex{Gid: label, Label: "Vertex", Data: sSchema} + vSchemaChan <- vSchema + + return nil + }) + } + + eLabels, err := graph.ListEdgeLabels() + if err != nil { + return nil, err + } + + for _, label := range eLabels { + label := label + if label == "" { + continue + } + + g.Go(func() error { + q := fmt.Sprintf( + `SELECT a.label, b.label, c.label, b.data FROM %s as a INNER JOIN %s as b ON b."to"=a.gid INNER JOIN %s as c on b."from" = c.gid WHERE b.label = '%s' limit %d`, + graph.v, graph.e, graph.v, + label, sampleN, + ) + //fmt.Printf("Query: %s\n", q) + rows, err := graph.db.QueryxContext(ctx, q) + if err != nil { + log.WithFields(log.Fields{"error": err}).Error("BuildSchema: QueryxContext") + return err + } + defer rows.Close() + //schema := make(map[string]interface{}) + for rows.Next() { + if row, err := rows.SliceScan(); err != nil { + log.WithFields(log.Fields{"error": err}).Error("BuildSchema: SliceScan") + continue + } else { + eSchema := &gripql.Edge{ + Gid: fmt.Sprintf("(%s)--%s->(%s)", row[0], row[1], row[2]), + Label: label, + From: row[0].(string), + To: row[2].(string), + } + eSchemaChan <- eSchema + //fmt.Printf("Found: %s\n", row) + } + } + return nil + }) + + } + + wg := errgroup.Group{} + + vSchema := []*gripql.Vertex{} + eSchema := []*gripql.Edge{} + + wg.Go(func() error { + for s := range vSchemaChan { + vSchema = append(vSchema, s) + } + return nil + }) + wg.Go(func() error { + for s := range eSchemaChan { + eSchema = append(eSchema, s) + } + return nil + }) + + g.Wait() + close(vSchemaChan) + close(eSchemaChan) + + wg.Wait() + + schema := &gripql.Graph{Graph: graphID, Vertices: vSchema, Edges: eSchema} + return schema, nil +} diff --git a/test/sqlite.yml b/test/sqlite.yml new file mode 100644 index 00000000..d8039c70 --- /dev/null +++ b/test/sqlite.yml @@ -0,0 +1,7 @@ +Default: sqlite + +Drivers: + sqlite: + Sqlite: + DBName: tester/sqliteDB + From 6ce3de0bdb97d732b16e33b75c30610f70dee37e Mon Sep 17 00:00:00 2001 From: matthewpeterkort Date: Tue, 17 Sep 2024 11:11:47 -0700 Subject: [PATCH 2/3] update test workflow --- .github/workflows/tests.yml | 54 ++++++++++++++++++------------------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 8c042a39..945b31c4 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -13,18 +13,18 @@ jobs: runs-on: ubuntu-latest steps: - name: Set up Go 1.x - uses: actions/setup-go@v2 + uses: actions/setup-go@v5 with: - go-version: ^1.18 + go-version: ^1.22.6 - name: Check out code into the Go module directory - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Build run: go build -v ./ - name: Store grip - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: name: gripBin path: grip @@ -35,11 +35,11 @@ jobs: runs-on: ubuntu-latest steps: - name: Set up Go 1.x - uses: actions/setup-go@v2 + uses: actions/setup-go@v5 with: - go-version: ^1.18 + go-version: ^1.22.6 - name: Check out code - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: run unit tests run: | go test ./test/... -config badger.yml @@ -51,11 +51,11 @@ jobs: runs-on: ubuntu-latest steps: - name: Check out code - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Python Dependencies for Conformance run: pip install requests numpy - name: Download grip - uses: actions/download-artifact@v2 + uses: actions/download-artifact@v4 with: name: gripBin - name: Badger Test @@ -72,11 +72,11 @@ jobs: runs-on: ubuntu-latest steps: - name: Check out code - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Python Dependencies for Conformance run: pip install requests numpy - name: Download grip - uses: actions/download-artifact@v2 + uses: actions/download-artifact@v4 with: name: gripBin - name: Pebble Test @@ -93,11 +93,11 @@ jobs: runs-on: ubuntu-latest steps: - name: Check out code - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Python Dependencies for Conformance run: pip install requests numpy - name: Download grip - uses: actions/download-artifact@v2 + uses: actions/download-artifact@v4 with: name: gripBin - name: Mongo Conformance @@ -115,11 +115,11 @@ jobs: runs-on: ubuntu-latest steps: - name: Check out code - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Python Dependencies for Conformance run: pip install requests numpy - name: Download grip - uses: actions/download-artifact@v2 + uses: actions/download-artifact@v4 with: name: gripBin - name: Mongo Conformance @@ -137,11 +137,11 @@ jobs: runs-on: ubuntu-latest steps: - name: Check out code - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Python Dependencies for Conformance run: pip install requests numpy - name: Download grip - uses: actions/download-artifact@v2 + uses: actions/download-artifact@v4 with: name: gripBin - name: Elastic Conformance @@ -160,11 +160,11 @@ jobs: runs-on: ubuntu-latest steps: - name: Check out code - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Python Dependencies for Conformance run: pip install requests numpy - name: Download grip - uses: actions/download-artifact@v2 + uses: actions/download-artifact@v4 with: name: gripBin - name: Postgres Conformance @@ -183,11 +183,11 @@ jobs: runs-on: ubuntu-latest steps: - name: Check out code - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Python Dependencies for Conformance run: pip install requests numpy - name: Download grip - uses: actions/download-artifact@v2 + uses: actions/download-artifact@v4 with: name: gripBin - name: Postgres Conformance @@ -204,13 +204,13 @@ jobs: runs-on: ubuntu-latest steps: - name: Check out code - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Update pip run: pip install --upgrade pip - name: Python Dependencies for Conformance run: pip install -U requests numpy grpcio-tools protobuf - name: Download grip - uses: actions/download-artifact@v2 + uses: actions/download-artifact@v4 with: name: gripBin - name: Gripper Conformance @@ -230,11 +230,11 @@ jobs: runs-on: ubuntu-latest steps: - name: Check out code - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Python Dependencies for Conformance run: pip install requests numpy PyYAML - name: Download grip - uses: actions/download-artifact@v2 + uses: actions/download-artifact@v4 with: name: gripBin - name: Auth Test @@ -262,11 +262,11 @@ jobs: # runs-on: ubuntu-latest # steps: # - name: Check out code - # uses: actions/checkout@v2 + # uses: actions/checkout@v4 # - name: Python Dependencies for Conformance # run: pip install requests numpy # - name: Download grip - # uses: actions/download-artifact@v2 + # uses: actions/download-artifact@v4 # with: # name: gripBin # - name: GRIDs unit tests From 529684b9d505b7dc251003c0523581c8a681c5c1 Mon Sep 17 00:00:00 2001 From: matthewpeterkort Date: Tue, 17 Sep 2024 11:20:46 -0700 Subject: [PATCH 3/3] fix action test names --- .github/workflows/tests.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 945b31c4..2f15e4fd 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -122,7 +122,7 @@ jobs: uses: actions/download-artifact@v4 with: name: gripBin - - name: Mongo Conformance + - name: Mongo Core Conformance run: | chmod +x grip make start-mongo @@ -190,7 +190,7 @@ jobs: uses: actions/download-artifact@v4 with: name: gripBin - - name: Postgres Conformance + - name: Sqlite Conformance run: | chmod +x grip ./grip server --rpc-port 18202 --http-port 18201 --config ./test/sqlite.yml &