Skip to content

Commit

Permalink
Add error handling for invalid json
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewpeterkort committed Nov 22, 2024
1 parent 18460fc commit 89cf150
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 37 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ ENV PATH="/go/bin:${PATH}"
ADD ./ /go/src/github.com/bmeg/grip-graphql
WORKDIR /go/src/github.com/bmeg/grip-graphql

RUN go install github.com/bmeg/[email protected]

RUN go install github.com/bmeg/[email protected]
RUN go build --buildmode=plugin ./graphql_gen3
RUN go build --buildmode=plugin ./gen3_writer
RUN go build --buildmode=plugin ./grip-graphql-endpoint
Expand Down
16 changes: 16 additions & 0 deletions gen3_writer/fixtures/compbio-examples-fhir/invalid_json.ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
dsfdsf
{"reso
urceType":"Observation","id":"de328d36-2db2-43cd-b0fe-92f0e8302726","meta":{"versionId":"1","lastUpdated":"2023-01-26T14:21:56.658+00:00","source":"#DmW9sueQ4yuQdyA9","profile":["http://hl7.org/fhir/us/core/StructureDefinition/us-core-observation-lab"]},"status":"final","category":[{"coding":[{"system":"http://terminology.hl7.org/CodeSystem/observation-category","code":"laboratory","display":"laboratory"}]}],"code":{"coding":[{"system":"http://loinc.org","code":"2069-3","display":"Chloride"}],"text":"Chloride"},"subject":{"reference":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"},"encounter":{"reference":"Encounter/fb61addc-09d0-4546-a934-b5d5ce16d5e7"},"effectiveDateTime":"2012-12-15T14:40:53-05:00","issued":"2012-12-15T14:40:53.188-05:00","valueQuantity":{"value":102.2,"unit":"mmol/L","system":"http://unitsofmeasure.org","code":"mmol/L"},"links":[{"rel":"subject_Patient","href":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"}]}
{"resourceType":"Observation","id":"e257467d-a850-4ed8-ab1a-eddd6c49f111","meta":{"versionId":"1","lastUpdated":"2023-01-26T14:21:56.658+00:00","source":"#DmW9sueQ4yuQdyA9","profile":["http://hl7.org/fhir/StructureDefinition/vitalsigns","http://hl7.org/fhir/StructureDefinition/bp"]},"status":"final","category":[{"coding":[{"system":"http://terminology.hl7.org/CodeSystem/observation-category","code":"vital-signs","display":"vital-signs"}]}],"code":{"coding":[{"system":"http://loinc.org","code":"85354-9","display":"Blood Pressure"}],"text":"Blood Pressure"},"subject":{"reference":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"},"encounter":{"reference":"Encounter/b9200600-e594-46cb-8a00-7fdb47792665"},"effectiveDateTime":"2013-12-21T14:40:53-05:00","issued":"2013-12-21T14:40:53.188-05:00","component":[{"code":{"coding":[{"system":"http://loinc.org","code":"8462-4","display":"Diastolic Blood Pressure"}],"text":"Diastolic Blood Pressure"},"valueQuantity":{"value":83,"unit":"mm[Hg]","system":"http://unitsofmeasure.org","code":"mm[Hg]"}},{"code":{"coding":[{"system":"http://loinc.org","code":"8480-6","display":"Systolic Blood Pressure"}],"text":"Systolic Blood Pressure"},"valueQuantity":{"value":153,"unit":"mm[Hg]","system":"http://unitsofmeasure.org","code":"mm[Hg]"}}],"links":[{"rel":"subject_Patient","href":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"}]}
{"resourceType":"Observation","id":"7bbaa006-c5bb-4ee4-b8ce-3cae329dfbd2","meta":{"versionI
d":"1","lastUpdated":"2023-01-26T14:21:56.658+00:00","source":"#DmW9sueQ4yuQdyA9","profile":["http://hl7.org/fhir/us/core/StructureDefinition/us-core-observation-lab"]},"status":"final","category":[{"coding":[{"system":"http://terminology.hl7.org/CodeSystem/observation-category","code":"laboratory","display":"laboratory"}]}],"code":{"coding":[{"system":"http://loinc.org","code":"2339-0","display":"Glucose"}],"text":"Glucose"},"subject":{"reference":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"},"encounter":{"reference":"Encounter/2163ebd2-7cca-4de3-b2ec-ec8b22cc5d34"},"effectiveDateTime":"2016-01-02T14:40:53-05:00","issued":"2016-01-02T14:40:53.188-05:00","valueQuantity":{"value":67.69,"unit":"mg/dL","system":"http://unitsofmeasure.org","code":"mg/dL"},"links":[{"rel":"subject_Patient","href":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"}]}
{"resourceType":"Observation","id":"e124ee34-83ff-4b68-9d8a-5b3a52ffe496","meta":{"versionId":"1","lastUpdated":"2020:00","source":"#DmW9sueQ4yuQdyA9","profile":["http://hl7.org/fhir/us/core/StructureDefinition/us-core-observation-lab"]},"status":"final","category":[{"coding":[{"system":"http://terminology.hl7.org/CodeSystem/observation-category","code":"laboratory","display":"laboratory"}]}],"code":{"coding":[{"system":"http://loinc.org","code":"49765-1","display":"Calcium"}],"text":"Calcium"},"subject":{"reference":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"},"encounter":{"reference":"Encounter/bd62656a-5e43-46a5-9584-d2402786f7f5"},"effectiveDateTime":"2019-01-19T14:40:53-05:00","issued":"2019-01-19T14:40:53.188-05:00","valueQuantity":{"value":8.59,"unit":"mg/dL","system":"http://unitsofmeasure.org","code":"mg/dL"},"links":[{"rel":"subject_Patient","href":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"}]}
["value1", "value2"]
{"resourceType":"Observation","id":"e28354e9-1126-4636-bba1-e2ed2a6ec05b","meta":{"versionId":"1","lastUpdated":"2023-01-26T14:21:56.658+00:00","source":"#DmW9sueQ4yuQdyA9","profile":["http://hl7.org/fhir/us/core/StructureDefinition/us-core-observation-lab"]},"status":"final","category":[{"coding":[{"system":"http://terminology.hl7.org/CodeSystem/observation-category","code":"laboratory","display":"laboratory"}]}],"code":{"coding":[{"system":"http://loinc.org","code":"20565-8","display":"Carbon Dioxide"}],"text":"Carbon Dioxide"},"subject":{"reference":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"},"encounter":{"reference":"Encounter/fb61addc-09d0-4546-a934-b5d5ce16d5e7"},"effectiveDateTime":"2012-12-15T14:40:53-05:00","issued":"2012-12-15T14:40:53.188-05:00","valueQuantity":{"value":28.2,"unit":"mmol/L","system":"http://unitsofmeasure.org","code":"mmol/L"},"links":[{"rel":"subject_Patient","href":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"}]}
"just a string"
{"resourceType":"Observation","id":"47ad1be4-44fd-44e3-94f7-e0389838c70f","meta":{"versionId":"1","lastUpdated":"2023-01-26T14:21:56.658+00:00","source":"#DmW9sueQ4yuQdyA9","profile":["http://hl7.org/fhir/StructureDefinition/vitalsigns","http://hl7.org/fhir/StructureDefinition/heartrate"]},"status":"final","category":[{"coding":[{"system":"http://terminology.hl7.org/CodeSystem/observation-category","code":"vital-signs","display":"vital-signs"}]}],"code":{"coding":[{"system":"http://loinc.org","code":"8867-4","display":"Heart rate"}],"text":"Heart rate"},"subject":{"reference":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"},"encounter":{"reference":"Encounter/b9200600-e594-46cb-8a00-7fdb47792665"},"effectiveDateTime":"2013-12-21T14:40:53-05:00","issued":"2013-12-21T14:40:53.188-05:00","valueQuantity":{"value":78,"unit":"/min","system":"http://unitsofmeasure.org","code":"/min"},"links":[{"rel":"subject_Patient","href":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"}]}
234234
#a mix of invalid json, invalid lines, and valid lines
{"resourceType":"Observation","id":"e28354e9-1126-4636-bba1-e2ed2a6ec05b","meta":{"versionId":"1","lastUpdated":"2023-01-26T14:21:56.658+00:00","source":"#DmW9sueQ4yuQdyA9","profile":["http://hl7.org/fhir/us/core/StructureDefinition/us-core-observation-lab"]},"status":"final","category":[{"coding":[{"system":"http://terminology.hl7.org/CodeSystem/observation-category","code":"laboratory","display":"laboratory"}]}],"code":{"coding":[{"system":"http://loinc.org","code":"20565-8","display":"Carbon Dioxide"}],"text":"Carbon Dioxide"},"subject":{"reference":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"},"encounter":{"reference":"Encounter/fb61addc-09d0-4546-a934-b5d5ce16d5e7"},"effectiveDateTime":"2012-12-15T14:40:53-05:00","issued":"2012-12-15T14:40:53.188-05:00","valueQuantity":{"value":28.2,"unit":"mmol/L","system":"http://unitsofmeasure.org","code":"mmol/L"},"links":[{"rel":"subject_Patient","href":"Patient/45c11dad-2b38-4c8e-822e-7abff8a1ee1d"}]}
{ 123: "value1", true: "value2" }
{"key": { "key": { "key": { "key": { "key": { "key": "value" } } } } }}
64 changes: 64 additions & 0 deletions gen3_writer/gen3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ func Test_Json_Load_Validation_Errors(t *testing.T) {
t.Error("Error: ", errors)
return
}
t.Log(data)

if data["message"] == nil || len(data["message"].([]any)) != 2 {
t.Error("Expected return message of length 2")
Expand All @@ -326,6 +327,69 @@ func Test_Json_Load_Validation_Errors(t *testing.T) {

}

func Test_Json_Load_Invalid_Json(t *testing.T) {
file, err := os.Open("fixtures/compbio-examples-fhir/invalid_json.ndjson")
if err != nil {
t.Error(err)
return
}
defer file.Close()

body := &bytes.Buffer{}
writer := multipart.NewWriter(body)

part, _ := writer.CreateFormFile("file", filepath.Base("fixtures/compbio-examples-fhir/ValidationErrors.ndjson"))
io.Copy(part, file)
writer.Close()

req := &Request{
url: "http://localhost:8201/graphql/JSONTEST/bulk-load-raw/ohsu-test",
method: "POST",
headers: map[string]any{"Authorization": createToken(false, true, true)},
}

request, err := http.NewRequest(req.method, req.url, body)
if err != nil {
t.Error("Error creating request:", err)
return
}
for key, val := range req.headers {
request.Header.Set(key, val.(string))
}
request.Header.Set("Content-Type", writer.FormDataContentType())

client := &http.Client{}
resp, err := client.Do(request)
if err != nil {
t.Error("Error sending request:", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
t.Logf("server responded with status: %d", resp.StatusCode)
}

buf := new(bytes.Buffer)
_, err = buf.ReadFrom(resp.Body)
if err != nil {
t.Error("Error reading response:", err)
return
}

var data map[string]interface{}
errors := json.Unmarshal([]byte(buf.String()), &data)
if errors != nil {
t.Error("Error: ", errors)
return
}
t.Log(data)

if data["status"].(float64) != 206 {
t.Error()
}

}

func Test_Load_Malformed_Token(t *testing.T) {
/* Server returns a 400 given an unparsable token */
err, responses := bulkLoad("http://localhost:8201/graphql/TEST/bulk-load/ohsu-test",
Expand Down
34 changes: 16 additions & 18 deletions gen3_writer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ func StartMultipartForm(c *gin.Context, writer gin.ResponseWriter, request *http
}
return nil, conn, buf
}

func (gh *Handler) AddJsonSchema(c *gin.Context) {
/* This function assumes that Json schema of json format will be submitted and must be converted into grip scheam format */
writer, request, graph := getFields(c)
Expand Down Expand Up @@ -719,6 +720,7 @@ func (gh *Handler) BulkStreamRaw(c *gin.Context) {
host := "localhost:8202"
var err error
var res *gripql.BulkJsonEditResult
var warnings []string

err = request.ParseMultipartForm(1024 * 1024 * 1024) // 10 GB limit
if err != nil {
Expand All @@ -736,11 +738,13 @@ func (gh *Handler) BulkStreamRaw(c *gin.Context) {
conn, err := gripql.Connect(rpc.ConfigWithDefaults(host), true)
wait := make(chan bool)

VertChan, err := streamJsonFromReader(reader, graph, project_id, 5)
if err != nil {
RegError(c, writer, graph, GetInternalServerErr(err))
return
}
VertChan, warnChan := streamJsonFromReader(reader, graph, project_id, 5)

go func() {
for warning := range warnChan {
warnings = append(warnings, warning)
}
}()

go func() {
err, res = conn.BulkAddRaw(VertChan)
Expand All @@ -751,21 +755,15 @@ func (gh *Handler) BulkStreamRaw(c *gin.Context) {
}()
<-wait

if len(res.Errors) == 1 {
RegError(c, writer, graph, &middleware.ServerError{StatusCode: 500, Message: fmt.Sprintf("[500] bulk-load-raw %s", res.Errors[0])})
return
} else if res.InsertCount == 0 && res.Errors == nil {
// This implies that no file was uploaded so EOF triggered immediately and exited
RegError(c, writer, graph, &middleware.ServerError{StatusCode: 400, Message: "[400] bulk-load-raw file of length 0 provided"})
return
} else if len(res.Errors) > 1 {
log.WithFields(log.Fields{
"graph": graph,
"status": 206,
}).Info(res.Errors)
nonLoadedEdges := append(res.Errors, warnings...)
if len(nonLoadedEdges) > 0 {
if res.InsertCount == 0 {
RegError(c, writer, graph, &middleware.ServerError{StatusCode: 500, Message: fmt.Sprintln("[500] bulk-load-raw", nonLoadedEdges)})
return
}
c.AbortWithStatusJSON(206, gin.H{
"status": 206,
"message": res.Errors,
"message": nonLoadedEdges,
"data": nil,
})
return
Expand Down
37 changes: 20 additions & 17 deletions gen3_writer/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package main

import (
"bufio"
"encoding/json"
"fmt"
"io"
"sync"

Expand Down Expand Up @@ -71,10 +73,7 @@ func StreamEdgesFromReader(reader io.Reader, workers int) (chan *gripql.Edge, er
if workers > 99 {
workers = 99
}
lineChan, err := processReader(reader, workers)
if err != nil {
return nil, err
}
lineChan := processReader(reader, workers)

edgeChan := make(chan *gripql.Edge, workers)
var wg sync.WaitGroup
Expand Down Expand Up @@ -112,10 +111,7 @@ func StreamVerticesFromReader(reader io.Reader, workers int) (chan *gripql.Verte
if workers > 99 {
workers = 99
}
lineChan, err := processReader(reader, workers)
if err != nil {
return nil, err
}
lineChan := processReader(reader, workers)

vertChan := make(chan *gripql.Vertex, workers)
var wg sync.WaitGroup
Expand Down Expand Up @@ -145,12 +141,12 @@ func StreamVerticesFromReader(reader io.Reader, workers int) (chan *gripql.Verte

return vertChan, nil
}
func streamJsonFromReader(reader io.Reader, graph string, project_id string, workers int) (chan *gripql.RawJson, error) {
lineChan, err := processReader(reader, workers)
if err != nil {
return nil, err
}

func streamJsonFromReader(reader io.Reader, graph string, project_id string, workers int) (chan *gripql.RawJson, chan string) {
lineChan := processReader(reader, workers)

vertChan := make(chan *gripql.RawJson, workers)
warnings := make(chan string, workers)
var wg sync.WaitGroup
jum := protojson.UnmarshalOptions{DiscardUnknown: true}

Expand All @@ -159,14 +155,20 @@ func streamJsonFromReader(reader io.Reader, graph string, project_id string, wor
go func() {
defer wg.Done()
for line := range lineChan {
if !json.Valid([]byte(line)) {
log.WithFields(log.Fields{"line": line}).Errorf("Skipping invalid JSON line")
warnings <- fmt.Sprintf("Invalid Json: %s", line)
continue
}
rawData := &gripql.RawJson{
Data: &structpb.Struct{},
Graph: graph,
ProjectId: project_id,
}
err := jum.Unmarshal([]byte(line), rawData.Data)
if err != nil {
log.WithFields(log.Fields{"error": err}).Errorf("Unmarshaling vertex: %s", line)
log.WithFields(log.Fields{"error": err}).Errorf("Unmarshaling into rawData.Data: %s", line)
warnings <- fmt.Sprintf("Error: %v when unmarshaling: %s", err, line)
continue
}
vertChan <- rawData
Expand All @@ -177,12 +179,13 @@ func streamJsonFromReader(reader io.Reader, graph string, project_id string, wor
go func() {
wg.Wait()
close(vertChan)
close(warnings)
}()

return vertChan, nil
return vertChan, warnings
}

func processReader(reader io.Reader, chansize int) (<-chan string, error) {
func processReader(reader io.Reader, chansize int) <-chan string {
scanner := bufio.NewScanner(reader)
buf := make([]byte, 0, 64*1024)
maxCapacity := 16 * 1024 * 1024
Expand All @@ -200,5 +203,5 @@ func processReader(reader io.Reader, chansize int) (<-chan string, error) {
close(lineChan)
}()

return lineChan, nil
return lineChan
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/bmeg/grip-graphql
go 1.22.5

require (
github.com/bmeg/grip v0.0.0-20241119230816-a7f6fdd48051
github.com/bmeg/grip v0.0.0-20241122195322-515a682d56f7
github.com/dop251/goja v0.0.0-20240707163329-b1681fb2a2f5
github.com/gin-gonic/gin v1.8.1
github.com/golang-jwt/jwt/v5 v5.2.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ github.com/bmeg/grip v0.0.0-20241119224722-821431f4728c h1:uRXx01IeIDav7jcMJaelO
github.com/bmeg/grip v0.0.0-20241119224722-821431f4728c/go.mod h1:fic/942cjKIBWnZ9HKzoknVhQHWOHYkbfCoTTQLT+2w=
github.com/bmeg/grip v0.0.0-20241119230816-a7f6fdd48051 h1:r5AE3WgJqdxA7DJ2U7d2KMB1kaePYz45XoBez9P+BiI=
github.com/bmeg/grip v0.0.0-20241119230816-a7f6fdd48051/go.mod h1:fic/942cjKIBWnZ9HKzoknVhQHWOHYkbfCoTTQLT+2w=
github.com/bmeg/grip v0.0.0-20241122195322-515a682d56f7 h1:/y6m1fI9tZaaCy/DAgk+uZmQHdlaykx1G8rCJXQswiA=
github.com/bmeg/grip v0.0.0-20241122195322-515a682d56f7/go.mod h1:2YAjHdHHKDeUHtOui5g/xRFk6uosktkzLPFPY7n/4d8=
github.com/bmeg/jsonpath v0.0.0-20210207014051-cca5355553ad h1:ICgBexeLB7iv/IQz4rsP+MimOXFZUwWSPojEypuOaQ8=
github.com/bmeg/jsonpath v0.0.0-20210207014051-cca5355553ad/go.mod h1:ft96Irkp72C7ZrUWRenG7LrF0NKMxXdRvsypo5Njhm4=
github.com/bmeg/jsonschema/v5 v5.3.4-0.20241111204732-55db82022a92 h1:Myx/j+WxfEg+P3nDaizR1hBpjKSLgvr4ydzgp1/1pAU=
Expand Down

0 comments on commit 89cf150

Please sign in to comment.