Skip to content

Commit

Permalink
Merge pull request #315 from bmeg/feature/BulkDelete
Browse files Browse the repository at this point in the history
Feature/bulk delete
  • Loading branch information
kellrott authored Aug 12, 2024
2 parents b172cf7 + 359a032 commit 99c314b
Show file tree
Hide file tree
Showing 25 changed files with 878 additions and 323 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ proto:
--go_opt paths=source_relative \
--go-grpc_out ./ \
--go-grpc_opt paths=source_relative \
--grpc-gateway_out ./ \
--grpc-gateway_out allow_delete_body=true:./ \
--grpc-gateway_opt logtostderr=true \
--grpc-gateway_opt paths=source_relative \
--grpc-rest-direct_out . \
Expand Down Expand Up @@ -130,7 +130,7 @@ test-authorization:
# ---------------------
start-mongo:
@docker rm -f grip-mongodb-test > /dev/null 2>&1 || echo
docker run -d --name grip-mongodb-test -p 27017:27017 docker.io/mongo:3.6.4 > /dev/null
docker run -d --name grip-mongodb-test -p 27017:27017 mongo:7.0.13-rc0-jammy > /dev/null

start-elastic:
@docker rm -f grip-es-test > /dev/null 2>&1 || echo
Expand Down
1 change: 1 addition & 0 deletions accounts/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ var MethodMap = map[string]Operation{
"/gripql.Edit/AddVertex": Write,
"/gripql.Edit/AddEdge": Write,
"/gripql.Edit/BulkAdd": Write,
"/gripql.Edit/BulkDelete": Write,
"/gripql.Edit/AddGraph": Write,
"/gripql.Edit/DeleteGraph": Write,
"/gripql.Edit/DeleteVertex": Write,
Expand Down
5 changes: 5 additions & 0 deletions accounts/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ func streamAuthInterceptor(auth Authenticate, access Access) grpc.StreamServerIn
//stream URL formatting, each write request can
//reference a different graph
return handler(srv, &BulkWriteFilter{ss, user, access})
} else if info.FullMethod == "/gripql.Edit/BulkDelete" {
return handler(srv, &BulkWriteFilter{ss, user, access})
} else {
log.Errorf("Unknown input streaming op %#v!!!", info)
return handler(srv, ss)
Expand Down Expand Up @@ -190,6 +192,9 @@ func getUnaryRequestGraph(req interface{}, info *grpc.UnaryServerInfo) (string,
case "/gripql.Edit/SampleSchema":
o := req.(*gripql.GraphID)
return o.Graph, nil
case "/gripql.Edit/BulkDelete":
o := req.(*gripql.DeleteData)
return o.Graph, nil
case "/gripql.Configure/StartPlugin", "/gripql.Configure/ListPlugins", "/gripql.Configure/ListDrivers":
return "*", nil //these operations effect all graphs
}
Expand Down
89 changes: 89 additions & 0 deletions cmd/delete/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package delete

import (
"encoding/json"
"fmt"
"io/ioutil"
"os"

"github.com/bmeg/grip/gripql"
"github.com/bmeg/grip/log"
"github.com/bmeg/grip/util/rpc"
"github.com/spf13/cobra"
)

var host = "localhost:8202"
var file string
var edges []string
var vertices []string
var graph string
var data Data

type Data struct {
Graph string `json:"graph"`
Edges []string `json:"edges"`
Vertices []string `json:"vertices"`
}

// Cmd command line declaration
var Cmd = &cobra.Command{
Use: "delete <graph>",
Short: "Delete data from a graph",
Long: `JSON File Format: {
"graph": 'graph_name',
"edges":['list of edge ids'],
"vertices":['list of vertice ids']
}
comma delimited --edges or --vertices arguments are also supported ex:
--edges="edge1,edge2"`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
if file == "" && edges == nil && vertices == nil {
return fmt.Errorf("no input file path or --edges or --vertices arg was provided")
}

conn, err := gripql.Connect(rpc.ConfigWithDefaults(host), true)
if err != nil {
return err
}
graph = args[0]

if file != "" {
jsonFile, err := os.Open(file)
if err != nil {
log.Errorf("Failed to open file: %s", err)
}
defer jsonFile.Close()

// Read the JSON file
byteValue, err := ioutil.ReadAll(jsonFile)
if err != nil {
log.Errorf("Failed to read file: %s", err)
}

// Unmarshal the JSON into the Data struct
err = json.Unmarshal(byteValue, &data)
if err != nil {
log.Errorf("Failed to unmarshal JSON: %s", err)
}
} else if edges != nil || vertices != nil {
data.Edges = edges
data.Vertices = vertices
}

log.WithFields(log.Fields{"graph": graph}).Info("deleting data")
log.Info("VALUE OF DATA: ", data.Edges, data.Vertices)
conn.BulkDelete(&gripql.DeleteData{Graph: graph, Vertices: data.Vertices, Edges: data.Edges})

return nil
},
}

func init() {
flags := Cmd.Flags()
flags.StringVar(&host, "host", host, "grip server url")
flags.StringSliceVar(&edges, "edges", edges, "grip edges list")
flags.StringSliceVar(&vertices, "vertices", vertices, "grip vertices list")
flags.StringVar(&file, "file", file, "file name")
}
3 changes: 3 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"

"github.com/bmeg/grip/cmd/create"
"github.com/bmeg/grip/cmd/delete"
"github.com/bmeg/grip/cmd/drop"
"github.com/bmeg/grip/cmd/dump"
"github.com/bmeg/grip/cmd/erclient"
Expand Down Expand Up @@ -69,6 +70,8 @@ func init() {
RootCmd.AddCommand(plugin.Cmd)
RootCmd.AddCommand(version.Cmd)
RootCmd.AddCommand(kvload.Cmd)
RootCmd.AddCommand(delete.Cmd)

}

var genBashCompletionCmd = &cobra.Command{
Expand Down
2 changes: 1 addition & 1 deletion conformance/run_conformance.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
print("Running Conformance with %s" % gripql.__file__)

args = create_arg_parser()

# returns test modules starting with "ot_"
tests = filter_tests(args, prefix="ot_")

Expand Down
56 changes: 53 additions & 3 deletions conformance/tests/ot_bulk.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@


def test_bulkload(man):
errors = []

Expand All @@ -22,7 +20,7 @@ def test_bulkload(man):
bulk.addEdge("4", "5", "created", {"weight": 1.0})

err = bulk.execute()
#print(err)

if err.get("errorCount", 0) != 0:
print(err)
errors.append("Bulk insertion error")
Expand Down Expand Up @@ -68,3 +66,55 @@ def test_bulkload_validate(man):
errors.append("Validation error not detected")

return errors


def test_bulk_delete(man):
errors = []
G = man.writeTest()

G.addVertex("vertex1", "Person", {"name": "marko", "age": "29"})
G.addVertex("vertex2", "Person", {"name": "vadas", "age": "27"})
G.addVertex("vertex3", "Software", {"name": "lop", "lang": "java"})
G.addVertex("vertex4", "Person", {"name": "josh", "age": "32"})
G.addVertex("vertex5", "Software", {"name": "ripple", "lang": "java"})
G.addVertex("vertex6", "Person", {"name": "peter", "age": "35"})

G.addEdge("vertex1", "vertex3", "created", {"weight": 0.4}, gid="edge1")
G.addEdge("vertex1", "vertex2", "knows", {"weight": 0.5}, gid="edge2")
G.addEdge("vertex1", "vertex4", "knows", {"weight": 1.0}, gid="edge3")
G.addEdge("vertex4", "vertex3", "created", {"weight": 0.4}, gid="edge4")
G.addEdge("vertex6", "vertex3", "created", {"weight": 0.2}, gid="edge5")
G.addEdge("vertex3", "vertex5", "created", {"weight": 1.0}, gid="edge6")
G.addEdge("vertex6", "vertex5", "created", {"weight": 1.0}, gid="edge7")
G.addEdge("vertex4", "vertex5", "created", {"weight": 0.4}, gid="edge8")
G.addEdge("vertex4", "vertex6", "created", {"weight": 0.4}, gid="edge9")

G.delete(vertices=["vertex1", "vertex2",
"vertex3"],
edges=[])

Ecount = G.query().E().count().execute()[0]["count"]
Vcount = G.query().V().count().execute()[0]["count"]
if Ecount != 3:
errors.append(f"Wrong number of edges {Ecount} != 3")
if Vcount != 3:
errors.append(f"Wrong number of vertices {Vcount} != 3")

G.delete(vertices=[], edges=["edge7"])
Ecount = G.query().E().count().execute()[0]["count"]
Vcount = G.query().V().count().execute()[0]["count"]
if Ecount != 2:
errors.append(f"Wrong number of edges {Ecount} != 2")
if Vcount != 3:
errors.append(f"Wrong number of vertices {Vcount} != 3")


G.delete(vertices=["vertex5", "vertex6"], edges=["edge9"])
Ecount = G.query().E().count().execute()[0]["count"]
Vcount = G.query().V().count().execute()[0]["count"]
if Ecount != 0:
errors.append(f"Wrong number of edges {Ecount} != 0")
if Vcount != 1:
errors.append(f"Wrong number of vertices {Vcount} != 1")

return errors
14 changes: 14 additions & 0 deletions elastic/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,20 @@ func (es *Graph) BulkAdd(stream <-chan *gdbi.GraphElement) error {
return util.StreamBatch(stream, 50, es.graph, es.AddVertex, es.AddEdge)
}

func (es *Graph) BulkDel(Data *gdbi.DeleteData) error {
for _, v := range Data.Edges {
if err := es.DelEdge(v); err != nil {
return err
}
}
for _, v := range Data.Vertices {
if err := es.DelVertex(v); err != nil {
return err
}
}
return nil
}

// DelEdge deletes edge `eid`
func (es *Graph) DelEdge(eid string) error {
ctx := context.Background()
Expand Down
29 changes: 29 additions & 0 deletions etl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import json


def extract_ids_from_ndjson(input_file, output_file):
ids = []

# Read the NDJSON file
with open(input_file, 'r') as f:
for line in f:
data = json.loads(line.strip())
ids.append(data['gid'])

with open(input_file_edge, 'r') as f:
for line in f:
data = json.loads(line.strip())
ids.append(data['gid'])


# Write the IDs to the output file in the specified format
with open(output_file, 'w') as f:
f.write('[' + ','.join([f'"{gid}"' for gid in ids]) + ']')

# Specify the input and output file paths
input_file = 'OUT/Observation.vertex.json'
input_file_edge= 'OUT/Observation.in.edge.json'
output_file = 'output.json'

# Extract the IDs and write them to the output file
extract_ids_from_ndjson(input_file, output_file)
4 changes: 4 additions & 0 deletions existing-sql/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func (g *Graph) BulkAdd(stream <-chan *gdbi.GraphElement) error {
return errors.New("not implemented")
}

func (g *Graph) BulkDel(data *gdbi.DeleteData) error {
return errors.New("not implemented")
}

// DelVertex is not implemented in the SQL driver
func (g *Graph) DelVertex(key string) error {
return errors.New("not implemented")
Expand Down
7 changes: 7 additions & 0 deletions gdbi/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ type GraphElement struct {
Graph string
}

type DeleteData struct {
Graph string
Vertices []string
Edges []string
}

type Aggregate struct {
Name string
Key interface{}
Expand Down Expand Up @@ -157,6 +163,7 @@ type GraphInterface interface {
AddEdge(edge []*Edge) error

BulkAdd(<-chan *GraphElement) error
BulkDel(*DeleteData) error

DelVertex(key string) error
DelEdge(key string) error
Expand Down
4 changes: 4 additions & 0 deletions gripper/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ func (t *TabularGraph) BulkAdd(stream <-chan *gdbi.GraphElement) error {
return fmt.Errorf("GRIPPER is ReadOnly")
}

func (t *TabularGraph) BulkDel(*gdbi.DeleteData) error {
return fmt.Errorf("GRIPPER is ReadOnly")
}

func (t *TabularGraph) Compiler() gdbi.Compiler {
return core.NewCompiler(t, TabularOptimizer)
}
Expand Down
5 changes: 5 additions & 0 deletions gripql/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ func (client Client) BulkAdd(elemChan chan *GraphElement) error {
return err
}

func (client Client) BulkDelete(delete *DeleteData) error {
_, err := client.EditC.BulkDelete(context.Background(), delete)
return err
}

// GetVertex obtains a vertex from a graph by `id`
func (client Client) GetVertex(graph string, id string) (*Vertex, error) {
v, err := client.QueryC.GetVertex(context.Background(), &ElementID{Graph: graph, Id: id})
Expand Down
7 changes: 7 additions & 0 deletions gripql/gripql.gw.client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions gripql/gripql.pb.dgw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 99c314b

Please sign in to comment.