Skip to content
This repository has been archived by the owner on May 6, 2024. It is now read-only.

Commit

Permalink
support multiple responses from an HTTP RPC method invocation (#44)
Browse files Browse the repository at this point in the history
Support reading and writing multiple responses from a method invocation.
  • Loading branch information
petar committed Jun 17, 2022
1 parent 0af973b commit d34b6b5
Show file tree
Hide file tree
Showing 10 changed files with 1,357 additions and 1,261 deletions.
1 change: 1 addition & 0 deletions blueprints/base/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var (
HTTPHandlerFunc = cg.GoTypeRef{PkgPath: "net/http", TypeName: "HandlerFunc"}
HTTPRequest = cg.GoTypeRef{PkgPath: "net/http", TypeName: "Request"}
HTTPResponseWriter = cg.GoTypeRef{PkgPath: "net/http", TypeName: "ResponseWriter"}
HTTPFlusher = cg.GoTypeRef{PkgPath: "net/http", TypeName: "Flusher"}
URL = cg.GoTypeRef{PkgPath: "net/url", TypeName: "URL"}
URLParse = cg.GoRef{PkgPath: "net/url", Name: "Parse"}
URLValues = cg.GoRef{PkgPath: "net/url", Name: "Values"}
Expand Down
15 changes: 8 additions & 7 deletions blueprints/base/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ var (
IPLDLinkType = &cg.GoTypeRef{PkgPath: IPLDDatamodelPkg, TypeName: "Link"}
IPLDNodePrototypeType = &cg.GoTypeRef{PkgPath: IPLDDatamodelPkg, TypeName: "NodePrototype"}

IPLDDeepEqual = &cg.GoRef{PkgPath: IPLDPkg, Name: "DeepEqual"}
IPLDEncode = &cg.GoRef{PkgPath: IPLDPkg, Name: "Encode"}
IPLDEncodeStreaming = &cg.GoRef{PkgPath: IPLDPkg, Name: "EncodeStreaming"}
IPLDDecode = &cg.GoRef{PkgPath: IPLDPkg, Name: "Decode"}
IPLDDecodeStreaming = &cg.GoRef{PkgPath: IPLDPkg, Name: "DecodeStreaming"}
DAGJSONEncode = cg.GoRef{PkgPath: "github.com/ipld/go-ipld-prime/codec/dagjson", Name: "Encode"}
DAGJSONDecode = cg.GoRef{PkgPath: "github.com/ipld/go-ipld-prime/codec/dagjson", Name: "Decode"}
IPLDDeepEqual = &cg.GoRef{PkgPath: IPLDPkg, Name: "DeepEqual"}
IPLDEncode = &cg.GoRef{PkgPath: IPLDPkg, Name: "Encode"}
IPLDEncodeStreaming = &cg.GoRef{PkgPath: IPLDPkg, Name: "EncodeStreaming"}
IPLDDecode = &cg.GoRef{PkgPath: IPLDPkg, Name: "Decode"}
IPLDDecodeStreaming = &cg.GoRef{PkgPath: IPLDPkg, Name: "DecodeStreaming"}
DAGJSONEncode = cg.GoRef{PkgPath: "github.com/ipld/go-ipld-prime/codec/dagjson", Name: "Encode"}
DAGJSONDecode = cg.GoRef{PkgPath: "github.com/ipld/go-ipld-prime/codec/dagjson", Name: "Decode"}
DAGJSONDecodeOptions = cg.GoTypeRef{PkgPath: "github.com/ipld/go-ipld-prime/codec/dagjson", TypeName: "DecodeOptions"}
)

var (
Expand Down
29 changes: 20 additions & 9 deletions blueprints/services/dagjson-over-http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (x GoClientImpl) GoDef() cg.Blueprint {
"IOReadCloser": base.IOReadCloser,
"IPLDEncode": base.IPLDEncode,
"IPLDDecodeStreaming": base.IPLDDecodeStreaming,
"DAGJSONDecodeOptions": base.DAGJSONDecodeOptions,
"Errorf": base.Errorf,
"URLValues": base.URLValues,
"HTTPNewRequestWithContext": base.HTTPNewRequestWithContext,
Expand Down Expand Up @@ -278,26 +279,36 @@ func (c *{{.Type}}) {{.AsyncMethodDecl}} {
func {{.ProcessReturnAsync}}(ctx {{.Context}}, ch chan<- {{.MethodReturnAsync}}, r {{.IOReadCloser}}) {
defer close(ch)
defer r.Close()
opt := {{.DAGJSONDecodeOptions}}{
ParseLinks: true,
ParseBytes: true,
DontParseBeyondEnd: true,
}
for {
var out {{.MethodReturnAsync}}
n, err := {{.IPLDDecodeStreaming}}(r, {{.DAGJSONDecode}})
n, err := {{.IPLDDecodeStreaming}}(r, opt.Decode)
if {{.ErrorsIs}}(err, {{.IOEOF}}) || {{.ErrorsIs}}(err, {{.IOErrUnexpectedEOF}}) {
return
}
if err != nil {
out = {{.MethodReturnAsync}}{Err: {{.ErrProto}}{Cause: err}} // IPLD decode error
} else {
env := &{{.ReturnEnvelope}}{}
if err = env.Parse(n); err != nil {
out = {{.MethodReturnAsync}}{Err: {{.ErrProto}}{Cause: err}} // schema decode error
} else if env.Error != nil {
out = {{.MethodReturnAsync}}{Err: {{.ErrService}}{Cause: {{.ErrorsNew}}(string(env.Error.Code))}} // service-level error
} else if env.{{.MethodName}} != nil {
out = {{.MethodReturnAsync}}{Resp: env.{{.MethodName}}}
var x [1]byte
if k, err := r.Read(x[:]); k != 1 || x[0] != '\n' {
out = {{.MethodReturnAsync}}{Err: {{.ErrProto}}{Cause: {{.Errorf}}("missing new line after result: err (%v), read (%d), char (%q)", err, k, string(x[:]))}} // Edelweiss decode error
} else {
continue
env := &{{.ReturnEnvelope}}{}
if err = env.Parse(n); err != nil {
out = {{.MethodReturnAsync}}{Err: {{.ErrProto}}{Cause: err}} // schema decode error
} else if env.Error != nil {
out = {{.MethodReturnAsync}}{Err: {{.ErrService}}{Cause: {{.ErrorsNew}}(string(env.Error.Code))}} // service-level error
} else if env.{{.MethodName}} != nil {
out = {{.MethodReturnAsync}}{Resp: env.{{.MethodName}}}
} else {
continue
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions blueprints/services/dagjson-over-http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (x GoServerImpl) GoDef() cg.Blueprint {
"DAGJSONEncode": base.DAGJSONEncode,
"EdelweissString": base.EdelweissString,
"BytesBuffer": base.BytesBuffer,
"HTTPFlusher": base.HTTPFlusher,
}
methodDecls = append(methodDecls, cg.T{
Data: bmDecl,
Expand Down Expand Up @@ -88,6 +89,9 @@ func (x GoServerImpl) GoDef() cg.Blueprint {
}
buf.WriteByte("\n"[0])
writer.Write(buf.Bytes())
if f, ok := writer.({{.HTTPFlusher}}); ok {
f.Flush()
}
}
`,
})
Expand Down
Loading

0 comments on commit d34b6b5

Please sign in to comment.