Skip to content

Commit

Permalink
Http (#126)
Browse files Browse the repository at this point in the history
* Remove errHandler in faver of http.Handler
* See: https://www.maragu.dk/blog/structuring-and-testing-http-handlers-in-go/
* Use the json lib for decoding
* Keysbuilder: No key generation on init
  • Loading branch information
ostcar authored Oct 30, 2020
1 parent 231a0a0 commit b53d65b
Show file tree
Hide file tree
Showing 19 changed files with 449 additions and 474 deletions.
12 changes: 8 additions & 4 deletions cmd/autoupdate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,18 @@ func run() error {
}

// Create tls http2 server.
handler := autoupdateHttp.New(service, authService)
mux := http.NewServeMux()
autoupdateHttp.Complex(mux, authService, service, service)
autoupdateHttp.Simple(mux, authService, service)
autoupdateHttp.Health(mux)

listenAddr := env["AUTOUPDATE_HOST"] + ":" + env["AUTOUPDATE_PORT"]
ln, err := buildHTTPListener(env, listenAddr, handler)
ln, err := buildHTTPListener(env, listenAddr)
if err != nil {
return fmt.Errorf("creating http listener: %w", err)
}
defer ln.Close()
srv := &http.Server{Addr: listenAddr, Handler: handler}
srv := &http.Server{Addr: listenAddr, Handler: mux}

// Shutdown logic in separate goroutine.
shutdownDone := make(chan struct{})
Expand All @@ -137,7 +141,7 @@ func run() error {
return nil
}

func buildHTTPListener(env map[string]string, addr string, handler http.Handler) (net.Listener, error) {
func buildHTTPListener(env map[string]string, addr string) (net.Listener, error) {
cert, err := getCert(env)
if err != nil {
return nil, fmt.Errorf("getting http certs: %w", err)
Expand Down
30 changes: 28 additions & 2 deletions internal/autoupdate/autoupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"time"

"github.com/ostcar/topic"
Expand Down Expand Up @@ -55,12 +56,11 @@ func New(datastore Datastore, restricter Restricter, closed <-chan struct{}) *Au
// returns a Connection object, that can be used to receive the data.
//
// There is no need to "close" the Connection object.
func (a *Autoupdate) Connect(userID int, kb KeysBuilder, tid uint64) *Connection {
func (a *Autoupdate) Connect(userID int, kb KeysBuilder) *Connection {
return &Connection{
autoupdate: a,
uid: userID,
kb: kb,
tid: tid,
}
}

Expand All @@ -69,6 +69,32 @@ func (a *Autoupdate) LastID() uint64 {
return a.topic.LastID()
}

// Live writes data in json-format to the given writer until it closes. It
// flushes after each message.
func (a *Autoupdate) Live(ctx context.Context, userID int, w io.Writer, kb KeysBuilder) error {
conn := a.Connect(userID, kb)
encoder := json.NewEncoder(w)

for {
// connection.Next() blocks, until there is new data or the client context
// or the server is closed.
data, err := conn.Next(ctx)
if err != nil {
return err
}

if err := encoder.Encode(data); err != nil {
return err
}

w.(flusher).Flush()
}
}

type flusher interface {
Flush()
}

// pruneTopic removes old data from the topic. Blocks until the service is
// closed.
func (a *Autoupdate) pruneTopic(closed <-chan struct{}) {
Expand Down
47 changes: 47 additions & 0 deletions internal/autoupdate/autoupdate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package autoupdate_test

import (
"context"
"testing"

"github.com/openslides/openslides-autoupdate-service/internal/autoupdate"
"github.com/openslides/openslides-autoupdate-service/internal/test"
)

func TestLive(t *testing.T) {
datastore := new(test.MockDatastore)
closed := make(chan struct{})
defer close(closed)
s := autoupdate.New(datastore, new(test.MockRestricter), closed)
kb := test.KeysBuilder{K: []string{"foo", "bar"}}

ctx, cancel := context.WithCancel(context.Background())
cancel()

w := new(mockWriter)
s.Live(ctx, 1, w, kb)

if len(w.lines) != 1 {
t.Fatalf("Got %d lines, expected 1", len(w.lines))
}

expect := `{"bar":"Hello World","foo":"Hello World"}` + "\n"
if got := w.lines[0]; got != expect {
t.Errorf("Got %s, expected %s", got, expect)
}
}

type mockWriter struct {
lines []string
buf []byte
}

func (w *mockWriter) Write(p []byte) (int, error) {
w.buf = append(w.buf, p...)
return len(p), nil
}

func (w *mockWriter) Flush() {
w.lines = append(w.lines, string(w.buf))
w.buf = nil
}
4 changes: 4 additions & 0 deletions internal/autoupdate/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ func (c *Connection) Next(ctx context.Context) (map[string]json.RawMessage, erro
c.tid = c.autoupdate.topic.LastID()
}

if err := c.kb.Update(ctx); err != nil {
return nil, fmt.Errorf("create keys for keysbuilder: %w", err)
}

data, err := c.autoupdate.RestrictedData(ctx, c.uid, c.kb.Keys()...)
if err != nil {
return nil, fmt.Errorf("get first time restricted data: %w", err)
Expand Down
24 changes: 12 additions & 12 deletions internal/autoupdate/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ func TestConnectionEmptyData(t *testing.T) {

s := autoupdate.New(datastore, new(test.MockRestricter), closed)

kb := mockKeysBuilder{keys: test.Str(doesExistKey, doesNotExistKey)}
kb := test.KeysBuilder{K: test.Str(doesExistKey, doesNotExistKey)}

t.Run("First responce", func(t *testing.T) {
c := s.Connect(1, kb, 0)
c := s.Connect(1, kb)

data, err := c.Next(context.Background())

Expand Down Expand Up @@ -138,7 +138,7 @@ func TestConnectionEmptyData(t *testing.T) {
},
} {
t.Run(tt.name, func(t *testing.T) {
c := s.Connect(1, kb, 0)
c := s.Connect(1, kb)
if _, err := c.Next(context.Background()); err != nil {
t.Errorf("c.Next() returned an error: %v", err)
}
Expand All @@ -161,7 +161,7 @@ func TestConnectionEmptyData(t *testing.T) {
}

t.Run("exit->not exist-> not exist", func(t *testing.T) {
c := s.Connect(1, kb, 0)
c := s.Connect(1, kb)
if _, err := c.Next(context.Background()); err != nil {
t.Errorf("c.Next() returned an error: %v", err)
}
Expand Down Expand Up @@ -190,8 +190,8 @@ func TestConnectionFilterData(t *testing.T) {
closed := make(chan struct{})
defer close(closed)
s := autoupdate.New(datastore, new(test.MockRestricter), closed)
kb := mockKeysBuilder{keys: test.Str("user/1/name")}
c := s.Connect(1, kb, 0)
kb := test.KeysBuilder{K: test.Str("user/1/name")}
c := s.Connect(1, kb)
if _, err := c.Next(context.Background()); err != nil {
t.Errorf("c.Next() returned an error: %v", err)
}
Expand All @@ -215,8 +215,8 @@ func TestConntectionFilterOnlyOneKey(t *testing.T) {
closed := make(chan struct{})
close(closed)
s := autoupdate.New(datastore, new(test.MockRestricter), closed)
kb := mockKeysBuilder{keys: test.Str("user/1/name")}
c := s.Connect(1, kb, 0)
kb := test.KeysBuilder{K: test.Str("user/1/name")}
c := s.Connect(1, kb)
if _, err := c.Next(context.Background()); err != nil {
t.Errorf("c.Next() returned an error: %v", err)
}
Expand Down Expand Up @@ -250,10 +250,10 @@ func BenchmarkFilterChanging(b *testing.B) {
for i := 0; i < keyCount; i++ {
keys = append(keys, fmt.Sprintf("user/%d/name", i))
}
kb := mockKeysBuilder{keys: keys}
kb := test.KeysBuilder{K: keys}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := s.Connect(1, kb, 0)
c := s.Connect(1, kb)

b.ResetTimer()

Expand All @@ -277,10 +277,10 @@ func BenchmarkFilterNotChanging(b *testing.B) {
for i := 0; i < keyCount; i++ {
keys = append(keys, fmt.Sprintf("user/%d/name", i))
}
kb := mockKeysBuilder{keys: keys}
kb := test.KeysBuilder{K: keys}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := s.Connect(1, kb, 0)
c := s.Connect(1, kb)

b.ResetTimer()

Expand Down
4 changes: 2 additions & 2 deletions internal/autoupdate/feature_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,11 @@ func TestFeatures(t *testing.T) {
},
} {
t.Run(tt.name, func(t *testing.T) {
b, err := keysbuilder.FromJSON(context.Background(), strings.NewReader(tt.request), s, 1)
b, err := keysbuilder.FromJSON(strings.NewReader(tt.request), s, 1)
if err != nil {
t.Fatalf("FromJSON() returned an unexpected error: %v", err)
}
c := s.Connect(1, b, 0)
c := s.Connect(1, b)
data, err := c.Next(context.Background())
if err != nil {
t.Fatalf("Can not get data: %v", err)
Expand Down
18 changes: 2 additions & 16 deletions internal/autoupdate/mock_test.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,15 @@
package autoupdate_test

import (
"context"

"github.com/openslides/openslides-autoupdate-service/internal/autoupdate"
"github.com/openslides/openslides-autoupdate-service/internal/test"
)

type mockKeysBuilder struct {
keys []string
}

func (m mockKeysBuilder) Update(context.Context) error {
return nil
}

func (m mockKeysBuilder) Keys() []string {
return m.keys
}

func getConnection(closed <-chan struct{}) (*autoupdate.Connection, *test.MockDatastore) {
datastore := new(test.MockDatastore)
s := autoupdate.New(datastore, new(test.MockRestricter), closed)
kb := mockKeysBuilder{keys: test.Str("user/1/name")}
c := s.Connect(1, kb, 0)
kb := test.KeysBuilder{K: test.Str("user/1/name")}
c := s.Connect(1, kb)

return c, datastore
}
10 changes: 0 additions & 10 deletions internal/http/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,6 @@ package http

import "fmt"

// noStatusCodeError helps the errorHandler do decide, if an status code can be
// set.
type noStatusCodeError struct {
wrapped error
}

func (e noStatusCodeError) Error() string {
return e.wrapped.Error()
}

type invalidRequestError struct {
err error
}
Expand Down
Loading

0 comments on commit b53d65b

Please sign in to comment.