diff --git a/client.go b/client.go index a85c9a28..dcd1303c 100644 --- a/client.go +++ b/client.go @@ -273,7 +273,7 @@ func (c *client) Read(ctx context.Context, fieldsToRead []string, entity DomainO } // map results to entity fields - re.SetFieldValues(entity, results) + re.SetFieldValues(entity, results, columnsToRead) return nil } @@ -390,11 +390,11 @@ func (c *client) Range(ctx context.Context, r *RangeOp) ([]DomainObject, string, return nil, "", errors.Wrap(err, "Range") } - objectArray := objectsFromValueArray(r.sop.object, values, re) + objectArray := objectsFromValueArray(r.sop.object, values, re, nil) return objectArray, token, nil } -func objectsFromValueArray(object DomainObject, values []map[string]FieldValue, re *RegisteredEntity) []DomainObject { +func objectsFromValueArray(object DomainObject, values []map[string]FieldValue, re *RegisteredEntity, columnsToRead []string) []DomainObject { goType := reflect.TypeOf(object).Elem() // get the reflect.Type of the client entity doType := reflect.TypeOf((*DomainObject)(nil)).Elem() slice := reflect.MakeSlice(reflect.SliceOf(doType), 0, len(values)) // make a slice of these @@ -402,7 +402,7 @@ func objectsFromValueArray(object DomainObject, values []map[string]FieldValue, elements.Elem().Set(slice) for _, flist := range values { // for each row returned newObject := reflect.New(goType).Interface() // make a new entity - re.SetFieldValues(newObject.(DomainObject), flist) // fill it in from server values + re.SetFieldValues(newObject.(DomainObject), flist, columnsToRead) // fill it in from server values slice = reflect.Append(slice, reflect.ValueOf(newObject.(DomainObject))) // append to slice } return slice.Interface().([]DomainObject) @@ -434,7 +434,7 @@ func (c *client) ScanEverything(ctx context.Context, sop *ScanOp) ([]DomainObjec if err != nil { return nil, "", err } - objectArray := objectsFromValueArray(sop.object, values, re) + objectArray := objectsFromValueArray(sop.object, values, re, nil) return objectArray, token, nil } diff --git a/client_test.go b/client_test.go index 209d64e9..96d0e165 100644 --- a/client_test.go +++ b/client_test.go @@ -31,8 +31,11 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/assert" + "fmt" + dosaRenamed "github.com/uber-go/dosa" - "github.com/uber-go/dosa/connectors/devnull" + _ "github.com/uber-go/dosa/connectors/devnull" + _ "github.com/uber-go/dosa/connectors/memory" "github.com/uber-go/dosa/mocks" ) @@ -57,27 +60,88 @@ var ( ctx = context.TODO() scope = "test" namePrefix = "team.service" - nullConnector = &devnull.Connector{} + nullConnector dosaRenamed.Connector ) +func init() { + nullConnector, _ = dosaRenamed.GetConnector("devnull", nil) +} + +// ExampleNewClient initializes a client using the devnull connector, which discards all +// the data you send it and always returns no rows. It's only useful for testing dosa. func ExampleNewClient() { // initialize registrar reg, err := dosaRenamed.NewRegistrar("test", "myteam.myservice", cte1) if err != nil { // registration will fail if the object is tagged incorrectly - panic("dosaRenamed.NewRegister returned an error") + fmt.Printf("NewRegistrar error: %s", err) + return } // use a devnull connector for example purposes - conn := &devnull.Connector{} + conn, err := dosaRenamed.GetConnector("devnull", nil) + if err != nil { + fmt.Printf("GetConnector error: %s", err) + return + } // create the client using the registry and connector client := dosaRenamed.NewClient(reg, conn) err = client.Initialize(context.Background()) if err != nil { - errors.Wrap(err, "client.Initialize returned an error") + fmt.Printf("Initialize error: %s", err) + return + } +} + +// ExampleGetConnector gets an in-memory connector that can be used for testing your code. +// The in-memory connector always starts off with no rows, so you'll need to add rows to +// your "database" before reading them +func ExampleGetConnector() { + // register your entities so the engine can separate your data based on table names. + // Scopes and prefixes are not used by the in-memory connector, and are ignored, but + // your list of entities is important. In this case, we only have one, our ClientTestEntity1 + reg, err := dosaRenamed.NewRegistrar("test", "myteam.myservice", &ClientTestEntity1{}) + if err != nil { + fmt.Printf("NewRegistrar error: %s", err) + return + } + + // Find the memory connector. There is no configuration information so pass a nil + // For this to work, you must force the init method of memory to run first, which happens + // when we imported memory in the import list, with an underscore to just get the side effects + conn, _ := dosaRenamed.GetConnector("memory", nil) + + // now construct a client from the registry and the connector + client := dosaRenamed.NewClient(reg, conn) + + // initialize the client; this should always work for the in-memory connector + if err = client.Initialize(context.Background()); err != nil { + fmt.Printf("Initialize error: %s", err) + return + } + + // now populate an entity and insert it into the memory store + if err := client.CreateIfNotExists(context.Background(), &ClientTestEntity1{ + ID: int64(1), + Name: "rkuris", + Email: "rkuris@uber.com"}); err != nil { + fmt.Printf("CreateIfNotExists error: %s", err) + return + } + + // create an entity to hold the read result, just populate the key + e := ClientTestEntity1{ID: int64(1)} + // now read the data from the "database", all columns + err = client.Read(context.Background(), dosaRenamed.All(), &e) + if err != nil { + fmt.Printf("Read error: %s", err) + return } + // great! It worked, so display the information we stored earlier + fmt.Printf("id:%d Name:%q Email:%q\n", e.ID, e.Name, e.Email) + // Output: id:1 Name:"rkuris" Email:"rkuris@uber.com" } func TestNewClient(t *testing.T) { @@ -151,7 +215,7 @@ func TestClient_Read(t *testing.T) { assert.NoError(t, c3.Initialize(ctx)) assert.NoError(t, c3.Read(ctx, fieldsToRead, cte1)) assert.Equal(t, cte1.ID, results["id"]) - assert.Equal(t, cte1.Name, results["name"]) + assert.NotEqual(t, cte1.Name, results["name"]) assert.Equal(t, cte1.Email, results["email"]) } diff --git a/connector.go b/connector.go index e8cf90c1..7b6bdb2e 100644 --- a/connector.go +++ b/connector.go @@ -92,16 +92,19 @@ type SchemaStatus struct { // Connector is the interface that must be implemented for a backend service // It can also be implemented using an RPC such as thrift (dosa-idl) +// When fields are returned from read/range/search/scan methods, it's legal for the connector +// to return more fields than originally requested. The caller of the connector should never mutate +// the returned columns either, in case they are from a cache type Connector interface { // DML operations (CRUD + search) // CreateIfNotExists creates a row, but only if it does not exist. CreateIfNotExists(ctx context.Context, ei *EntityInfo, values map[string]FieldValue) error // Read fetches a row by primary key - // If fieldsToRead is empty or nil, all non-key fields would be fetched. - Read(ctx context.Context, ei *EntityInfo, keys map[string]FieldValue, fieldsToRead []string) (values map[string]FieldValue, err error) + // If minimumFields is empty or nil, all non-key fields would be fetched. + Read(ctx context.Context, ei *EntityInfo, keys map[string]FieldValue, minimumFields []string) (values map[string]FieldValue, err error) // MultiRead fetches several rows by primary key - // If fieldsToRead is empty or nil, all non-key fields would be fetched. - MultiRead(ctx context.Context, ei *EntityInfo, keys []map[string]FieldValue, fieldsToRead []string) (results []*FieldValuesOrError, err error) + // If minimumFields is empty or nil, all non-key fields would be fetched. + MultiRead(ctx context.Context, ei *EntityInfo, keys []map[string]FieldValue, minimumFields []string) (results []*FieldValuesOrError, err error) // Upsert updates some columns of a row, or creates a new one if it doesn't exist yet. Upsert(ctx context.Context, ei *EntityInfo, values map[string]FieldValue) error // MultiUpsert updates some columns of several rows, or creates a new ones if they doesn't exist yet @@ -111,14 +114,14 @@ type Connector interface { // MultiRemove removes multiple rows MultiRemove(ctx context.Context, ei *EntityInfo, multiKeys []map[string]FieldValue) (result []error, err error) // Range does a range scan using a set of conditions. - // If fieldsToRead is empty or nil, all fields (including key fields) would be fetched. - Range(ctx context.Context, ei *EntityInfo, columnConditions map[string][]*Condition, fieldsToRead []string, token string, limit int) ([]map[string]FieldValue, string, error) + // If minimumFields is empty or nil, all fields (including key fields) would be fetched. + Range(ctx context.Context, ei *EntityInfo, columnConditions map[string][]*Condition, minimumFields []string, token string, limit int) ([]map[string]FieldValue, string, error) // Search does a search against a field marked 'searchable' - // If fieldsToRead is empty or nil, all fields (including key fields) would be fetched. - Search(ctx context.Context, ei *EntityInfo, fieldPairs FieldNameValuePair, fieldsToRead []string, token string, limit int) (multiValues []map[string]FieldValue, nextToken string, err error) + // If minimumFields is empty or nil, all fields (including key fields) would be fetched. + Search(ctx context.Context, ei *EntityInfo, fieldPairs FieldNameValuePair, minimumFields []string, token string, limit int) (multiValues []map[string]FieldValue, nextToken string, err error) // Scan reads the whole table, for doing a sequential search or dump/load use cases - // If fieldsToRead is empty or nil, all fields (including key fields) would be fetched. - Scan(ctx context.Context, ei *EntityInfo, fieldsToRead []string, token string, limit int) (multiValues []map[string]FieldValue, nextToken string, err error) + // If minimumFields is empty or nil, all fields (including key fields) would be fetched. + Scan(ctx context.Context, ei *EntityInfo, minimumFields []string, token string, limit int) (multiValues []map[string]FieldValue, nextToken string, err error) // DDL operations (schema) // CheckSchema validates that the set of entities you have provided is valid and registered already diff --git a/connectors/base/base.go b/connectors/base/base.go index 1e21f923..8d385a10 100644 --- a/connectors/base/base.go +++ b/connectors/base/base.go @@ -54,19 +54,19 @@ func (c *Connector) CreateIfNotExists(ctx context.Context, ei *dosa.EntityInfo, } // Read calls Next -func (c *Connector) Read(ctx context.Context, ei *dosa.EntityInfo, values map[string]dosa.FieldValue, fieldsToRead []string) (map[string]dosa.FieldValue, error) { +func (c *Connector) Read(ctx context.Context, ei *dosa.EntityInfo, values map[string]dosa.FieldValue, minimumFields []string) (map[string]dosa.FieldValue, error) { if c.Next == nil { return nil, ErrNoMoreConnector{} } - return c.Next.Read(ctx, ei, values, fieldsToRead) + return c.Next.Read(ctx, ei, values, minimumFields) } // MultiRead calls Next -func (c *Connector) MultiRead(ctx context.Context, ei *dosa.EntityInfo, values []map[string]dosa.FieldValue, fieldsToRead []string) ([]*dosa.FieldValuesOrError, error) { +func (c *Connector) MultiRead(ctx context.Context, ei *dosa.EntityInfo, values []map[string]dosa.FieldValue, minimumFields []string) ([]*dosa.FieldValuesOrError, error) { if c.Next == nil { return nil, ErrNoMoreConnector{} } - return c.Next.MultiRead(ctx, ei, values, fieldsToRead) + return c.Next.MultiRead(ctx, ei, values, minimumFields) } // Upsert calls Next @@ -102,27 +102,27 @@ func (c *Connector) MultiRemove(ctx context.Context, ei *dosa.EntityInfo, multiV } // Range calls Next -func (c *Connector) Range(ctx context.Context, ei *dosa.EntityInfo, columnConditions map[string][]*dosa.Condition, fieldsToRead []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { +func (c *Connector) Range(ctx context.Context, ei *dosa.EntityInfo, columnConditions map[string][]*dosa.Condition, minimumFields []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { if c.Next == nil { return nil, "", ErrNoMoreConnector{} } - return c.Next.Range(ctx, ei, columnConditions, fieldsToRead, token, limit) + return c.Next.Range(ctx, ei, columnConditions, minimumFields, token, limit) } // Search calls Next -func (c *Connector) Search(ctx context.Context, ei *dosa.EntityInfo, fieldPairs dosa.FieldNameValuePair, fieldsToRead []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { +func (c *Connector) Search(ctx context.Context, ei *dosa.EntityInfo, fieldPairs dosa.FieldNameValuePair, minimumFields []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { if c.Next == nil { return nil, "", ErrNoMoreConnector{} } - return c.Next.Search(ctx, ei, fieldPairs, fieldsToRead, token, limit) + return c.Next.Search(ctx, ei, fieldPairs, minimumFields, token, limit) } // Scan calls Next -func (c *Connector) Scan(ctx context.Context, ei *dosa.EntityInfo, fieldsToRead []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { +func (c *Connector) Scan(ctx context.Context, ei *dosa.EntityInfo, minimumFields []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { if c.Next == nil { return nil, "", ErrNoMoreConnector{} } - return c.Next.Scan(ctx, ei, fieldsToRead, token, limit) + return c.Next.Scan(ctx, ei, minimumFields, token, limit) } // CheckSchema calls Next diff --git a/connectors/base/base_test.go b/connectors/base/base_test.go index bef294a1..4a85e3ab 100644 --- a/connectors/base/base_test.go +++ b/connectors/base/base_test.go @@ -58,21 +58,21 @@ func TestBase_CreateIfNotExists(t *testing.T) { } func TestBase_Read(t *testing.T) { - fieldsToRead := make([]string, 1) - _, err := bc.Read(ctx, testInfo, testValues, fieldsToRead) + minimumFields := make([]string, 1) + _, err := bc.Read(ctx, testInfo, testValues, minimumFields) assert.Error(t, err) - val, err := bcWNext.Read(ctx, testInfo, testValues, fieldsToRead) + val, err := bcWNext.Read(ctx, testInfo, testValues, minimumFields) assert.Nil(t, val) assert.Error(t, err) } func TestBase_MultiRead(t *testing.T) { - fieldsToRead := make([]string, 1) - _, e := bc.MultiRead(ctx, testInfo, testMultiValues, fieldsToRead) + minimumFields := make([]string, 1) + _, e := bc.MultiRead(ctx, testInfo, testMultiValues, minimumFields) assert.Error(t, e) - v, e := bcWNext.MultiRead(ctx, testInfo, testMultiValues, fieldsToRead) + v, e := bcWNext.MultiRead(ctx, testInfo, testMultiValues, minimumFields) assert.NotNil(t, v) assert.Nil(t, e) } @@ -113,31 +113,31 @@ func TestBase_MultiRemove(t *testing.T) { func TestBase_Range(t *testing.T) { conditions := make(map[string][]*dosa.Condition) - fieldsToRead := make([]string, 1) - _, _, err := bc.Range(ctx, testInfo, conditions, fieldsToRead, "", 0) + minimumFields := make([]string, 1) + _, _, err := bc.Range(ctx, testInfo, conditions, minimumFields, "", 0) assert.Error(t, err) - vals, _, err := bcWNext.Range(ctx, testInfo, conditions, fieldsToRead, "", 0) + vals, _, err := bcWNext.Range(ctx, testInfo, conditions, minimumFields, "", 0) assert.Nil(t, vals) assert.Error(t, err) } func TestBase_Search(t *testing.T) { - fieldsToRead := make([]string, 1) - _, _, err := bc.Search(ctx, testInfo, testPairs, fieldsToRead, "", 0) + minimumFields := make([]string, 1) + _, _, err := bc.Search(ctx, testInfo, testPairs, minimumFields, "", 0) assert.Error(t, err) - vals, _, err := bcWNext.Search(ctx, testInfo, testPairs, fieldsToRead, "", 0) + vals, _, err := bcWNext.Search(ctx, testInfo, testPairs, minimumFields, "", 0) assert.Nil(t, vals) assert.Error(t, err) } func TestBase_Scan(t *testing.T) { - fieldsToRead := make([]string, 1) - _, _, err := bc.Scan(ctx, testInfo, fieldsToRead, "", 0) + minimumFields := make([]string, 1) + _, _, err := bc.Scan(ctx, testInfo, minimumFields, "", 0) assert.Error(t, err) - vals, _, err := bcWNext.Scan(ctx, testInfo, fieldsToRead, "", 0) + vals, _, err := bcWNext.Scan(ctx, testInfo, minimumFields, "", 0) assert.Nil(t, vals) assert.Error(t, err) } diff --git a/connectors/devnull/devnull.go b/connectors/devnull/devnull.go index 4eedb2d0..cc49567e 100644 --- a/connectors/devnull/devnull.go +++ b/connectors/devnull/devnull.go @@ -35,12 +35,12 @@ func (c *Connector) CreateIfNotExists(ctx context.Context, ei *dosa.EntityInfo, } // Read always returns a not found error -func (c *Connector) Read(ctx context.Context, ei *dosa.EntityInfo, values map[string]dosa.FieldValue, fieldsToRead []string) (map[string]dosa.FieldValue, error) { +func (c *Connector) Read(ctx context.Context, ei *dosa.EntityInfo, values map[string]dosa.FieldValue, minimumFields []string) (map[string]dosa.FieldValue, error) { return nil, &dosa.ErrNotFound{} } // MultiRead returns a set of not found errors for each key you specify -func (c *Connector) MultiRead(ctx context.Context, ei *dosa.EntityInfo, values []map[string]dosa.FieldValue, fieldsToRead []string) ([]*dosa.FieldValuesOrError, error) { +func (c *Connector) MultiRead(ctx context.Context, ei *dosa.EntityInfo, values []map[string]dosa.FieldValue, minimumFields []string) ([]*dosa.FieldValuesOrError, error) { errors := make([]*dosa.FieldValuesOrError, len(values)) for inx := range values { errors[inx] = &dosa.FieldValuesOrError{Error: &dosa.ErrNotFound{}} @@ -78,17 +78,17 @@ func (c *Connector) MultiRemove(ctx context.Context, ei *dosa.EntityInfo, multiV } // Range is not yet implementedS -func (c *Connector) Range(ctx context.Context, ei *dosa.EntityInfo, columnConditions map[string][]*dosa.Condition, fieldsToRead []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { +func (c *Connector) Range(ctx context.Context, ei *dosa.EntityInfo, columnConditions map[string][]*dosa.Condition, minimumFields []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { return nil, "", &dosa.ErrNotFound{} } // Search is not yet implemented -func (c *Connector) Search(ctx context.Context, ei *dosa.EntityInfo, fieldPairs dosa.FieldNameValuePair, fieldsToRead []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { +func (c *Connector) Search(ctx context.Context, ei *dosa.EntityInfo, fieldPairs dosa.FieldNameValuePair, minimumFields []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { return nil, "", &dosa.ErrNotFound{} } // Scan is not yet implemented -func (c *Connector) Scan(ctx context.Context, ei *dosa.EntityInfo, fieldsToRead []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { +func (c *Connector) Scan(ctx context.Context, ei *dosa.EntityInfo, minimumFields []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { return nil, "", &dosa.ErrNotFound{} } diff --git a/connectors/devnull/devnull_test.go b/connectors/devnull/devnull_test.go index 7ec218e4..3fd0033d 100644 --- a/connectors/devnull/devnull_test.go +++ b/connectors/devnull/devnull_test.go @@ -52,15 +52,15 @@ func TestDevNull_CreateIfNotExists(t *testing.T) { } func TestDevNull_Read(t *testing.T) { - fieldsToRead := make([]string, 1) - val, err := sut.Read(ctx, testInfo, testValues, fieldsToRead) + minimumFields := make([]string, 1) + val, err := sut.Read(ctx, testInfo, testValues, minimumFields) assert.Nil(t, val) assert.Error(t, err) } func TestDevNull_MultiRead(t *testing.T) { - fieldsToRead := make([]string, 1) - v, e := sut.MultiRead(ctx, testInfo, testMultiValues, fieldsToRead) + minimumFields := make([]string, 1) + v, e := sut.MultiRead(ctx, testInfo, testMultiValues, minimumFields) assert.NotNil(t, v) assert.Nil(t, e) } @@ -89,22 +89,22 @@ func TestDevNull_MultiRemove(t *testing.T) { func TestDevNull_Range(t *testing.T) { conditions := make(map[string][]*dosa.Condition) - fieldsToRead := make([]string, 1) - vals, _, err := sut.Range(ctx, testInfo, conditions, fieldsToRead, "", 0) + minimumFields := make([]string, 1) + vals, _, err := sut.Range(ctx, testInfo, conditions, minimumFields, "", 0) assert.Nil(t, vals) assert.Error(t, err) } func TestDevNull_Search(t *testing.T) { - fieldsToRead := make([]string, 1) - vals, _, err := sut.Search(ctx, testInfo, testPairs, fieldsToRead, "", 0) + minimumFields := make([]string, 1) + vals, _, err := sut.Search(ctx, testInfo, testPairs, minimumFields, "", 0) assert.Nil(t, vals) assert.Error(t, err) } func TestDevNull_Scan(t *testing.T) { - fieldsToRead := make([]string, 1) - vals, _, err := sut.Scan(ctx, testInfo, fieldsToRead, "", 0) + minimumFields := make([]string, 1) + vals, _, err := sut.Scan(ctx, testInfo, minimumFields, "", 0) assert.Nil(t, vals) assert.Error(t, err) } diff --git a/connectors/memory/memory.go b/connectors/memory/memory.go new file mode 100644 index 00000000..e321e06c --- /dev/null +++ b/connectors/memory/memory.go @@ -0,0 +1,449 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package memory + +import ( + "bytes" + "context" + "encoding/gob" + "sort" + "sync" + "time" + + "encoding/binary" + + "github.com/satori/go.uuid" + "github.com/uber-go/dosa" + "github.com/uber-go/dosa/connectors/base" +) + +// Connector is an in-memory connector. +// The in-memory connector stores its data like this: +// map[string]map[string][]map[string]dosa.FieldValue +// +// the first 'string' is the table name (entity name) +// the second 'string' is the partition key, encoded using encoding/gob to guarantee uniqueness +// within each 'partition' you have a list of rows ([]map[string]dosa.FieldValue) +// these rows are kept ordered so that reads are lightning fast and searches are quick too +// the row itself is a map of field name to value (map[string]dosaFieldValue]) +// +// A read-write mutex lock is used to control concurrency, making reads work in parallel but +// writes are not. There is no attempt to improve the concurrency of the read or write path by +// adding more granular locks. +type Connector struct { + base.Connector + data map[string]map[string][]map[string]dosa.FieldValue + lock sync.RWMutex +} + +// partitionKeyBuilder extracts the partition key components from the map and encodes them, +// generating a unique string. It uses the encoding/gob method to make a byte array as the +// key, and returns this as a string +func partitionKeyBuilder(ei *dosa.EntityInfo, values map[string]dosa.FieldValue) string { + encodedKey := bytes.Buffer{} + encoder := gob.NewEncoder(&encodedKey) + for _, k := range ei.Def.Key.PartitionKeys { + _ = encoder.Encode(values[k]) + } + return string(encodedKey.Bytes()) +} + +// findInsertionPoint locates the place within a partition where the data belongs. +// It inspects the clustering key values found in the insertMe value and figures out +// where they go in the data slice. It doesn't change anything, but it does let you +// know if it found an exact match or if it's just not there. When it's not there, +// it indicates where it is supposed to get inserted +func findInsertionPoint(ei *dosa.EntityInfo, data []map[string]dosa.FieldValue, insertMe map[string]dosa.FieldValue) (found bool, idx int) { + found = false + idx = sort.Search(len(data), func(offset int) bool { + cmp := compareRows(ei, data[offset], insertMe) + if cmp == 0 { + found = true + } + return cmp >= 0 + }) + return +} + +// compareRows compares two maps of row data based on clustering keys. It handles ascending/descending +// based on the passed-in schema +func compareRows(ei *dosa.EntityInfo, v1 map[string]dosa.FieldValue, v2 map[string]dosa.FieldValue) (cmp int8) { + keys := ei.Def.Key.ClusteringKeys + for _, key := range keys { + d1 := v1[key.Name] + d2 := v2[key.Name] + cmp = compareType(d1, d2) + if key.Descending { + cmp = -cmp + } + if cmp != 0 { + return cmp + } + } + return cmp +} + +// This function returns the time bits from a UUID +// You would have to scale this to nanos to make a +// time.Time but we don't generally need that for +// comparisons. See RFC 4122 for these bit offsets +func timeFromUUID(u uuid.UUID) int64 { + low := int64(binary.BigEndian.Uint32(u[0:4])) + mid := int64(binary.BigEndian.Uint16(u[4:6])) + hi := int64((binary.BigEndian.Uint16(u[6:8]) & 0x0fff)) + return low + (mid << 32) + (hi << 48) +} + +// compareType compares a single DOSA field based on the type. This code assumes the types of each +// of the columns are the same, or it will panic +func compareType(d1 dosa.FieldValue, d2 dosa.FieldValue) int8 { + switch d1 := d1.(type) { + case dosa.UUID: + u1 := uuid.FromStringOrNil(string(d1)) + u2 := uuid.FromStringOrNil(string(d2.(dosa.UUID))) + if u1.Version() != u2.Version() { + if u1.Version() < u2.Version() { + return -1 + } + return 1 + } + if u1.Version() == 1 { + // compare time UUIDs + t1 := timeFromUUID(u1) + t2 := timeFromUUID(u2) + if t1 == t2 { + return 0 + } + if t1 < t2 { + return -1 + } + return 1 + } + + // version + if string(d1) == string(d2.(dosa.UUID)) { + return 0 + } + if string(d1) < string(d2.(dosa.UUID)) { + return -1 + } + return 1 + case string: + if d1 == d2.(string) { + return 0 + } + if d1 < d2.(string) { + return -1 + } + return 1 + case int64: + if d1 == d2.(int64) { + return 0 + } + if d1 < d2.(int64) { + return -1 + } + return 1 + case int32: + if d1 == d2.(int32) { + return 0 + } + if d1 < d2.(int32) { + return -1 + } + return 1 + case float64: + if d1 == d2.(float64) { + return 0 + } + if d1 < d2.(float64) { + return -1 + } + return 1 + case []byte: + c := bytes.Compare(d1, d2.([]byte)) + if c == 0 { + return 0 + } + if c < 0 { + return -1 + } + return 1 + case time.Time: + if d1.Equal(d2.(time.Time)) { + return 0 + } + if d1.Before(d2.(time.Time)) { + return -1 + } + return 1 + case bool: + if d1 == d2.(bool) { + return 0 + } + if d1 == false { + return -1 + } + return 1 + } + panic(d1) +} + +// CreateIfNotExists inserts a row if it isn't already there. The basic flow is: +// Find the partition, if it's not there, then create it and insert the row there +// If the partition is there, and there's data in it, and there's no clustering key, then fail +// Otherwise, search the partition for the exact same clustering keys. If there, fail +// if not, then insert it at the right spot (sort.Search does most of the heavy lifting here) +func (c *Connector) CreateIfNotExists(_ context.Context, ei *dosa.EntityInfo, values map[string]dosa.FieldValue) error { + return c.mergedInsert(ei, values, func(into map[string]dosa.FieldValue, from map[string]dosa.FieldValue) error { + return &dosa.ErrAlreadyExists{} + }) +} + +// Read searches for a row. First, it finds the partition, then it searches in the partition for +// the data, and returns it when it finds it. Again, sort.Search does most of the heavy lifting +// within a partition +func (c *Connector) Read(_ context.Context, ei *dosa.EntityInfo, values map[string]dosa.FieldValue, minimumFields []string) (map[string]dosa.FieldValue, error) { + c.lock.RLock() + defer c.lock.RUnlock() + if c.data[ei.Def.Name] == nil { + return nil, &dosa.ErrNotFound{} + } + entityRef := c.data[ei.Def.Name] + encodedPartitionKey := partitionKeyBuilder(ei, values) + partitionRef := entityRef[encodedPartitionKey] + // no data in this partition? easy out! + if len(partitionRef) == 0 { + return nil, &dosa.ErrNotFound{} + } + + if len(ei.Def.ClusteringKeySet()) == 0 { + return partitionRef[0], nil + } + // clustering key, search for the value in the set + found, inx := findInsertionPoint(ei, partitionRef, values) + if !found { + return nil, &dosa.ErrNotFound{} + } + return partitionRef[inx], nil +} + +// Upsert works a lot like CreateIfNotExists but merges the data when it finds an existing row +func (c *Connector) Upsert(_ context.Context, ei *dosa.EntityInfo, values map[string]dosa.FieldValue) error { + return c.mergedInsert(ei, values, func(into map[string]dosa.FieldValue, from map[string]dosa.FieldValue) error { + for k, v := range from { + into[k] = v + } + return nil + }) +} + +func (c *Connector) mergedInsert(ei *dosa.EntityInfo, + values map[string]dosa.FieldValue, + mergeFunc func(map[string]dosa.FieldValue, map[string]dosa.FieldValue) error) error { + c.lock.Lock() + defer c.lock.Unlock() + + if c.data[ei.Def.Name] == nil { + c.data[ei.Def.Name] = make(map[string][]map[string]dosa.FieldValue) + } + entityRef := c.data[ei.Def.Name] + encodedPartitionKey := partitionKeyBuilder(ei, values) + if entityRef[encodedPartitionKey] == nil { + entityRef[encodedPartitionKey] = make([]map[string]dosa.FieldValue, 0, 1) + } + partitionRef := entityRef[encodedPartitionKey] + // no data in this partition? easy out! + if len(partitionRef) == 0 { + entityRef[encodedPartitionKey] = append(entityRef[encodedPartitionKey], values) + return nil + } + + if len(ei.Def.ClusteringKeySet()) == 0 { + // no clustering key, so the row must already exist, merge it + return mergeFunc(partitionRef[0], values) + } + // there is a clustering key, find the insertion point (binary search would be fastest) + found, offset := findInsertionPoint(ei, partitionRef, values) + if found { + return mergeFunc(partitionRef[offset], values) + } + // perform slice magic to insert value at given offset + l := len(entityRef[encodedPartitionKey]) // get length + entityRef[encodedPartitionKey] = append(entityRef[encodedPartitionKey], entityRef[encodedPartitionKey][l-1]) // copy last element + // scoot over remaining elements + copy(entityRef[encodedPartitionKey][offset+1:], entityRef[encodedPartitionKey][offset:]) + // and plunk value into appropriate location + entityRef[encodedPartitionKey][offset] = values + return nil +} + +// Remove deletes a single row +func (c *Connector) Remove(_ context.Context, ei *dosa.EntityInfo, values map[string]dosa.FieldValue) error { + c.lock.Lock() + defer c.lock.Unlock() + if c.data[ei.Def.Name] == nil { + return nil + } + entityRef := c.data[ei.Def.Name] + encodedPartitionKey := partitionKeyBuilder(ei, values) + if entityRef[encodedPartitionKey] == nil { + return nil + } + partitionRef := entityRef[encodedPartitionKey] + // no data in this partition? easy out! + if len(partitionRef) == 0 { + return nil + } + + // no clustering keys? Simple, delete this + if len(ei.Def.ClusteringKeySet()) == 0 { + entityRef[encodedPartitionKey] = nil + return nil + } + found, offset := findInsertionPoint(ei, partitionRef, values) + if found { + entityRef[encodedPartitionKey] = append(entityRef[encodedPartitionKey][:offset], entityRef[encodedPartitionKey][offset+1:]...) + } + return nil +} + +// Range returns a slice of data from the datastore +func (c *Connector) Range(_ context.Context, ei *dosa.EntityInfo, columnConditions map[string][]*dosa.Condition, minimumFields []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { + c.lock.RLock() + defer c.lock.RUnlock() + if c.data[ei.Def.Name] == nil { + return nil, "", &dosa.ErrNotFound{} + } + entityRef := c.data[ei.Def.Name] + + // find the equals conditions on each of the partition keys + values := make(map[string]dosa.FieldValue) + for _, pk := range ei.Def.Key.PartitionKeys { + // TODO: assert len(columnConditions[pk] == 1 + // TODO: assert columnConditions[pk][0].Op is equals + values[pk] = columnConditions[pk][0].Value + } + + encodedPartitionKey := partitionKeyBuilder(ei, values) + partitionRef := entityRef[encodedPartitionKey] + // no data in this partition? easy out! + if len(partitionRef) == 0 { + return nil, "", &dosa.ErrNotFound{} + } + // hunt through the partitionRef and return values that match search criteria + // TODO: This can be done much faster using a binary search + startinx, endinx := 0, len(partitionRef)-1 + for startinx < len(partitionRef) && !matchesClusteringConditions(ei, columnConditions, partitionRef[startinx]) { + startinx++ + } + // TODO: adjust startinx with a passed in token + for endinx >= startinx && !matchesClusteringConditions(ei, columnConditions, partitionRef[endinx]) { + endinx-- + + } + if endinx <= startinx { + return nil, "", &dosa.ErrNotFound{} + } + // TODO: enforce limits and return a token when there are more rows + return partitionRef[startinx : endinx+1], "", nil +} + +// matchesClusteringConditions checks if a data row matches the conditions in the columnConditions that apply to +// clustering columns. If a condition does NOT match, it returns false, otherwise true +// This function is pretty fast if there are no conditions on the clustering columns +func matchesClusteringConditions(ei *dosa.EntityInfo, columnConditions map[string][]*dosa.Condition, data map[string]dosa.FieldValue) bool { + for _, col := range ei.Def.Key.ClusteringKeys { + if conds, ok := columnConditions[col.Name]; ok { + // conditions exist on this clustering key + for _, cond := range conds { + if !passCol(data[col.Name], cond) { + return false + } + } + } + } + return true +} + +// passCol checks if a column passes a specific condition +func passCol(data dosa.FieldValue, cond *dosa.Condition) bool { + cmp := compareType(data, cond.Value) + switch cond.Op { + case dosa.Eq: + return cmp == 0 + case dosa.Gt: + return cmp > 0 + case dosa.GtOrEq: + return cmp >= 0 + case dosa.Lt: + return cmp < 0 + case dosa.LtOrEq: + return cmp <= 0 + } + panic("invalid operator " + cond.Op.String()) +} + +// Scan returns all the rows +func (c *Connector) Scan(_ context.Context, ei *dosa.EntityInfo, minimumFields []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { + c.lock.RLock() + defer c.lock.RUnlock() + if c.data[ei.Def.Name] == nil { + return nil, "", &dosa.ErrNotFound{} + } + entityRef := c.data[ei.Def.Name] + allTheThings := make([]map[string]dosa.FieldValue, 0) + // TODO: stop when we reach the limit, and make a token for continuation + for _, vals := range entityRef { + allTheThings = append(allTheThings, vals...) + } + if len(allTheThings) == 0 { + return nil, "", &dosa.ErrNotFound{} + } + return allTheThings, "", nil +} + +// CheckSchema is just a stub; there is no schema management for the in memory connector +// since creating a new one leaves you with no data! +func (c *Connector) CheckSchema(ctx context.Context, scope, namePrefix string, ed []*dosa.EntityDefinition) (int32, error) { + return 1, nil +} + +// Shutdown deletes all the data +func (c *Connector) Shutdown() error { + c.lock.Lock() + defer c.lock.Unlock() + c.data = nil + return nil +} + +// NewConnector creates a new in-memory connector +func NewConnector() *Connector { + c := Connector{} + c.data = make(map[string]map[string][]map[string]dosa.FieldValue) + return &c +} + +func init() { + dosa.RegisterConnector("memory", func(args map[string]interface{}) (dosa.Connector, error) { + return NewConnector(), nil + }) +} diff --git a/connectors/memory/memory_test.go b/connectors/memory/memory_test.go new file mode 100644 index 00000000..2764d7d9 --- /dev/null +++ b/connectors/memory/memory_test.go @@ -0,0 +1,634 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package memory + +import ( + "context" + "testing" + "time" + + "sort" + + "github.com/satori/go.uuid" + "github.com/stretchr/testify/assert" + "github.com/uber-go/dosa" +) + +var testSchemaRef = dosa.SchemaRef{ + Scope: "scope1", + NamePrefix: "namePrefix", + EntityName: "eName", + Version: 12345, +} + +var testEi = &dosa.EntityInfo{ + Ref: &testSchemaRef, + Def: &dosa.EntityDefinition{ + Columns: []*dosa.ColumnDefinition{ + {Name: "f1", Type: dosa.String}, + {Name: "c1", Type: dosa.Int64}, + {Name: "c2", Type: dosa.Double}, + {Name: "c3", Type: dosa.String}, + {Name: "c4", Type: dosa.Blob}, + {Name: "c5", Type: dosa.Bool}, + {Name: "c6", Type: dosa.Int32}, + {Name: "c7", Type: dosa.TUUID}, + }, + Key: &dosa.PrimaryKey{ + PartitionKeys: []string{"f1"}, + }, + Name: "t1", + }, +} +var clusteredEi = &dosa.EntityInfo{ + Ref: &testSchemaRef, + Def: &dosa.EntityDefinition{ + Columns: []*dosa.ColumnDefinition{ + {Name: "f1", Type: dosa.String}, + {Name: "c1", Type: dosa.Int64}, + {Name: "c2", Type: dosa.Double}, + {Name: "c3", Type: dosa.String}, + {Name: "c4", Type: dosa.Blob}, + {Name: "c5", Type: dosa.Bool}, + {Name: "c6", Type: dosa.Int32}, + {Name: "c7", Type: dosa.TUUID}, + }, + Key: &dosa.PrimaryKey{ + PartitionKeys: []string{"f1"}, + ClusteringKeys: []*dosa.ClusteringKey{ + {Name: "c1", Descending: false}, + {Name: "c7", Descending: true}, + }, + }, + Name: "t1", + }, +} + +func TestConnector_CreateIfNotExists(t *testing.T) { + sut := NewConnector() + + err := sut.CreateIfNotExists(context.TODO(), testEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data"), + }) + assert.NoError(t, err) + + err = sut.CreateIfNotExists(context.TODO(), testEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data"), + }) + + assert.True(t, dosa.ErrorIsAlreadyExists(err)) +} +func TestConnector_Upsert(t *testing.T) { + sut := NewConnector() + + err := sut.Upsert(context.TODO(), testEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data"), + }) + assert.NoError(t, err) + vals, err := sut.Read(context.TODO(), testEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data")}, []string{"c1"}) + assert.NoError(t, err) + assert.Nil(t, vals["c1"]) + + err = sut.Upsert(context.TODO(), testEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data"), + "c1": dosa.FieldValue(int64(1)), + }) + assert.NoError(t, err) + + vals, err = sut.Read(context.TODO(), testEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data")}, []string{"c1"}) + assert.NoError(t, err) + assert.Equal(t, dosa.FieldValue(int64(1)), vals["c1"]) +} + +func TestConnector_Read(t *testing.T) { + sut := NewConnector() + + // read with no data + vals, err := sut.Read(context.TODO(), testEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data")}, []string{"c1"}) + assert.True(t, dosa.ErrorIsNotFound(err)) + + err = sut.CreateIfNotExists(context.TODO(), testEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data"), + "c1": dosa.FieldValue(int64(1)), + "c2": dosa.FieldValue(float64(2)), + }) + assert.NoError(t, err) + + vals, err = sut.Read(context.TODO(), testEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data")}, []string{"c1"}) + assert.NoError(t, err) + assert.Equal(t, int64(1), vals["c1"]) + + vals, err = sut.Read(context.TODO(), testEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data")}, dosa.All()) + assert.NoError(t, err) + assert.Equal(t, int64(1), vals["c1"]) + assert.Equal(t, float64(2), vals["c2"]) + assert.Equal(t, "data", vals["f1"]) + + // read a key that isn't there + vals, err = sut.Read(context.TODO(), testEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("not there")}, dosa.All()) + assert.True(t, dosa.ErrorIsNotFound(err)) + + // now delete the one that is + err = sut.Remove(context.TODO(), testEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data")}) + assert.NoError(t, err) + + // read the deleted key + vals, err = sut.Read(context.TODO(), testEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data")}, dosa.All()) + assert.True(t, dosa.ErrorIsNotFound(err)) + + // insert into clustered entity + id := dosa.NewUUID() + err = sut.CreateIfNotExists(context.TODO(), clusteredEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("key"), + "c1": dosa.FieldValue(int64(1)), + "c2": dosa.FieldValue(float64(1.2)), + "c7": dosa.FieldValue(id)}) + assert.NoError(t, err) + + // read that row + vals, err = sut.Read(context.TODO(), clusteredEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("key"), + "c1": dosa.FieldValue(int64(1)), + "c7": dosa.FieldValue(id)}, dosa.All()) + assert.NoError(t, err) + assert.Equal(t, dosa.FieldValue(float64(1.2)), vals["c2"]) + + // and fail a read on a clustered key + vals, err = sut.Read(context.TODO(), clusteredEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("key"), + "c1": dosa.FieldValue(int64(2)), + "c7": dosa.FieldValue(id)}, dosa.All()) + assert.True(t, dosa.ErrorIsNotFound(err)) +} + +func TestConnector_Remove(t *testing.T) { + sut := NewConnector() + + // remove with no data + err := sut.Remove(context.TODO(), testEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data")}) + assert.NoError(t, err) + + // create a single row + err = sut.CreateIfNotExists(context.TODO(), testEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data"), + "c1": dosa.FieldValue(int64(1)), + "c2": dosa.FieldValue(float64(2)), + }) + assert.NoError(t, err) + + // remove something not there + err = sut.Remove(context.TODO(), testEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("nothere")}) + assert.NoError(t, err) + + // insert into clustered entity + id := dosa.NewUUID() + err = sut.CreateIfNotExists(context.TODO(), clusteredEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("key"), + "c1": dosa.FieldValue(int64(1)), + "c2": dosa.FieldValue(float64(1.2)), + "c7": dosa.FieldValue(id)}) + assert.NoError(t, err) + + // remove something not there, but matches partition + err = sut.Remove(context.TODO(), clusteredEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("key"), + "c1": dosa.FieldValue(int64(1)), + "c7": dosa.FieldValue(dosa.NewUUID())}) + assert.NoError(t, err) + + // and remove the partitioned value + err = sut.Remove(context.TODO(), clusteredEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("key"), + "c1": dosa.FieldValue(int64(1)), + "c7": dosa.FieldValue(id)}) + assert.NoError(t, err) + + // remove it again, now that there's nothing at all there (corner case) + err = sut.Remove(context.TODO(), clusteredEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("key"), + "c1": dosa.FieldValue(int64(1)), + "c7": dosa.FieldValue(id)}) + assert.NoError(t, err) +} + +func TestConnector_Shutdown(t *testing.T) { + sut := NewConnector() + + err := sut.Shutdown() + assert.NoError(t, err) + assert.Nil(t, sut.data) +} + +// test CreateIfNotExists with partitioning +func TestConnector_CreateIfNotExists2(t *testing.T) { + sut := NewConnector() + + testUUIDs := make([]dosa.UUID, 10) + for x := 0; x < 10; x++ { + testUUIDs[x] = dosa.NewUUID() + } + + // first, insert 10 random UUID values into same partition key + for x := 0; x < 10; x++ { + err := sut.CreateIfNotExists(context.TODO(), clusteredEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data"), + "c1": dosa.FieldValue(int64(1)), + "c7": dosa.FieldValue(testUUIDs[x])}) + assert.NoError(t, err) + } + // attempt to insert them all again + for x := 0; x < 10; x++ { + err := sut.CreateIfNotExists(context.TODO(), clusteredEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data"), + "c1": dosa.FieldValue(int64(1)), + "c7": dosa.FieldValue(testUUIDs[x])}) + assert.True(t, dosa.ErrorIsAlreadyExists(err)) + } + // now, insert them again, but this time with a different secondary key + for x := 0; x < 10; x++ { + err := sut.CreateIfNotExists(context.TODO(), clusteredEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data"), + "c1": dosa.FieldValue(int64(2)), + "c7": dosa.FieldValue(testUUIDs[x])}) + assert.NoError(t, err) + } + // and with a different primary key + for x := 0; x < 10; x++ { + err := sut.CreateIfNotExists(context.TODO(), clusteredEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("different"), + "c1": dosa.FieldValue(int64(1)), + "c7": dosa.FieldValue(testUUIDs[x])}) + assert.NoError(t, err) + } + data, token, err := sut.Range(context.TODO(), clusteredEi, map[string][]*dosa.Condition{ + "f1": {{Op: dosa.Eq, Value: dosa.FieldValue("data")}}, + }, dosa.All(), "", 200) + assert.NoError(t, err) + assert.Empty(t, token) + assert.Len(t, data, 20) +} + +func TestConnector_Upsert2(t *testing.T) { + sut := NewConnector() + + testUUIDs := make([]dosa.UUID, 10) + for x := 0; x < 10; x++ { + testUUIDs[x] = dosa.NewUUID() + } + + // first, insert 10 random UUID values into same partition key + for x := 0; x < 10; x++ { + err := sut.Upsert(context.TODO(), clusteredEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data"), + "c1": dosa.FieldValue(int64(1)), + "c7": dosa.FieldValue(testUUIDs[x])}) + assert.NoError(t, err) + } + // attempt to insert them all again + for x := 0; x < 10; x++ { + err := sut.Upsert(context.TODO(), clusteredEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data"), + "c1": dosa.FieldValue(int64(1)), + "c6": dosa.FieldValue(int32(x)), + "c7": dosa.FieldValue(testUUIDs[x])}) + assert.NoError(t, err) + } + // now, insert them again, but this time with a different secondary key + for x := 0; x < 10; x++ { + err := sut.Upsert(context.TODO(), clusteredEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data"), + "c1": dosa.FieldValue(int64(2)), + "c7": dosa.FieldValue(testUUIDs[x])}) + assert.NoError(t, err) + } + // and with a different primary key + for x := 0; x < 10; x++ { + err := sut.Upsert(context.TODO(), clusteredEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("different"), + "c1": dosa.FieldValue(int64(1)), + "c7": dosa.FieldValue(testUUIDs[x])}) + assert.NoError(t, err) + } + data, token, err := sut.Range(context.TODO(), clusteredEi, map[string][]*dosa.Condition{ + "f1": {{Op: dosa.Eq, Value: dosa.FieldValue("data")}}, + }, dosa.All(), "", 200) + assert.NoError(t, err) + assert.Empty(t, token) + assert.Len(t, data, 20) + assert.NotNil(t, data[0]["c6"]) +} + +func TestConnector_Range(t *testing.T) { + const idcount = 10 + sut := NewConnector() + + // no data at all (corner case) + data, token, err := sut.Range(context.TODO(), clusteredEi, map[string][]*dosa.Condition{ + "f1": {{Op: dosa.Eq, Value: dosa.FieldValue("data")}}, + }, dosa.All(), "", 200) + assert.True(t, dosa.ErrorIsNotFound(err)) + assert.Empty(t, token) + assert.Empty(t, data) + + // insert some data into data/1/uuid with a random set of uuids + // we insert them in a random order + testUUIDs := make([]dosa.UUID, idcount) + for x := 0; x < idcount; x++ { + testUUIDs[x] = dosa.NewUUID() + } + for x := 0; x < idcount; x++ { + err := sut.CreateIfNotExists(context.TODO(), clusteredEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data"), + "c1": dosa.FieldValue(int64(1)), + "c7": dosa.FieldValue(testUUIDs[x])}) + assert.NoError(t, err) + } + + // search using a different partition key + data, token, err = sut.Range(context.TODO(), clusteredEi, map[string][]*dosa.Condition{ + "f1": {{Op: dosa.Eq, Value: dosa.FieldValue("wrongdata")}}, + }, dosa.All(), "", 200) + assert.True(t, dosa.ErrorIsNotFound(err)) + + sort.Sort(ByUUID(testUUIDs)) + // search using the right partition key, and check that the data was insertion-sorted + // correctly + data, _, _ = sut.Range(context.TODO(), clusteredEi, map[string][]*dosa.Condition{ + "f1": {{Op: dosa.Eq, Value: dosa.FieldValue("data")}}, + "c1": {{Op: dosa.Eq, Value: dosa.FieldValue(int64(1))}}, + }, dosa.All(), "", 200) + for idx, row := range data { + assert.Equal(t, testUUIDs[idx], row["c7"]) + } + + // find the midpoint and look for all values greater than that + data, token, err = sut.Range(context.TODO(), clusteredEi, map[string][]*dosa.Condition{ + "f1": {{Op: dosa.Eq, Value: dosa.FieldValue("data")}}, + "c1": {{Op: dosa.Eq, Value: dosa.FieldValue(int64(1))}}, + "c7": {{Op: dosa.Gt, Value: dosa.FieldValue(testUUIDs[idcount/2-1])}}, + }, dosa.All(), "", 200) + assert.NoError(t, err) + assert.Len(t, data, idcount/2-1) + + // there's one more for greater than or equal + data, token, err = sut.Range(context.TODO(), clusteredEi, map[string][]*dosa.Condition{ + "f1": {{Op: dosa.Eq, Value: dosa.FieldValue("data")}}, + "c1": {{Op: dosa.Eq, Value: dosa.FieldValue(int64(1))}}, + "c7": {{Op: dosa.GtOrEq, Value: dosa.FieldValue(testUUIDs[idcount/2-1])}}, + }, dosa.All(), "", 200) + assert.NoError(t, err) + assert.Len(t, data, idcount/2) + + // find the midpoint and look for all values less than that + data, token, err = sut.Range(context.TODO(), clusteredEi, map[string][]*dosa.Condition{ + "f1": {{Op: dosa.Eq, Value: dosa.FieldValue("data")}}, + "c1": {{Op: dosa.Eq, Value: dosa.FieldValue(int64(1))}}, + "c7": {{Op: dosa.Lt, Value: dosa.FieldValue(testUUIDs[idcount/2])}}, + }, dosa.All(), "", 200) + assert.NoError(t, err) + assert.Len(t, data, idcount/2-1) + + // and same for less than or equal + data, token, err = sut.Range(context.TODO(), clusteredEi, map[string][]*dosa.Condition{ + "f1": {{Op: dosa.Eq, Value: dosa.FieldValue("data")}}, + "c1": {{Op: dosa.Eq, Value: dosa.FieldValue(int64(1))}}, + "c7": {{Op: dosa.LtOrEq, Value: dosa.FieldValue(testUUIDs[idcount/2])}}, + }, dosa.All(), "", 200) + assert.NoError(t, err) + assert.Len(t, data, idcount/2) + + // look off the end of the left side, so greater than maximum (edge case) + // (uuids are ordered descending so this is non-intuitively backwards) + data, token, err = sut.Range(context.TODO(), clusteredEi, map[string][]*dosa.Condition{ + "f1": {{Op: dosa.Eq, Value: dosa.FieldValue("data")}}, + "c1": {{Op: dosa.Eq, Value: dosa.FieldValue(int64(1))}}, + "c7": {{Op: dosa.Gt, Value: dosa.FieldValue(testUUIDs[0])}}, + }, dosa.All(), "", 200) + assert.True(t, dosa.ErrorIsNotFound(err)) + + // look off the end of the left side, so greater than maximum + data, token, err = sut.Range(context.TODO(), clusteredEi, map[string][]*dosa.Condition{ + "f1": {{Op: dosa.Eq, Value: dosa.FieldValue("data")}}, + "c1": {{Op: dosa.Eq, Value: dosa.FieldValue(int64(1))}}, + "c7": {{Op: dosa.Lt, Value: dosa.FieldValue(testUUIDs[idcount-1])}}, + }, dosa.All(), "", 200) + assert.True(t, dosa.ErrorIsNotFound(err)) +} + +func TestConnector_TimeUUIDs(t *testing.T) { + sut := NewConnector() + const idcount = 10 + + // insert a bunch of values with V1 timestamps as clustering keys + for x := 0; x < idcount; x++ { + err := sut.Upsert(context.TODO(), clusteredEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data"), + "c1": dosa.FieldValue(int64(1)), + "c6": dosa.FieldValue(int32(x)), + "c7": dosa.FieldValue(dosa.UUID(uuid.NewV1().String()))}) + assert.NoError(t, err) + } + + // read them back, they should be in reverse order + data, _, _ := sut.Range(context.TODO(), clusteredEi, map[string][]*dosa.Condition{ + "f1": {{Op: dosa.Eq, Value: dosa.FieldValue("data")}}, + "c1": {{Op: dosa.Eq, Value: dosa.FieldValue(int64(1))}}, + }, dosa.All(), "", 200) + + // check that the order is backwards + for idx, row := range data { + assert.Equal(t, int32(idcount-idx-1), row["c6"]) + } + + // now mix in a few V4 UUIDs + for x := 0; x < idcount; x++ { + err := sut.Upsert(context.TODO(), clusteredEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data"), + "c1": dosa.FieldValue(int64(1)), + "c6": dosa.FieldValue(int32(idcount + x)), + "c7": dosa.FieldValue(dosa.NewUUID())}) + assert.NoError(t, err) + } + + // the V4's should all be first, since V4 UUIDs sort > V1 UUIDs + data, _, _ = sut.Range(context.TODO(), clusteredEi, map[string][]*dosa.Condition{ + "f1": {{Op: dosa.Eq, Value: dosa.FieldValue("data")}}, + "c1": {{Op: dosa.Eq, Value: dosa.FieldValue(int64(1))}}, + }, dosa.All(), "", 200) + for _, row := range data[0:idcount] { + assert.True(t, row["c6"].(int32) >= idcount, row["c6"]) + } + +} + +type ByUUID []dosa.UUID + +func (u ByUUID) Len() int { return len(u) } +func (u ByUUID) Swap(i, j int) { u[i], u[j] = u[j], u[i] } +func (u ByUUID) Less(i, j int) bool { return string(u[i]) > string(u[j]) } + +func BenchmarkConnector_CreateIfNotExists(b *testing.B) { + sut := NewConnector() + for x := 0; x < b.N; x++ { + id := dosa.NewUUID() + err := sut.CreateIfNotExists(context.TODO(), clusteredEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("key"), + "c1": dosa.FieldValue(int64(1)), + "c7": dosa.FieldValue(id)}) + assert.NoError(b, err) + if x%1000 == 0 { + sut.data = nil + } + } +} + +func BenchmarkConnector_Read(b *testing.B) { + const idcount = 100 + testUUIDs := make([]dosa.UUID, idcount) + for x := 0; x < idcount; x++ { + testUUIDs[x] = dosa.NewUUID() + } + sut := NewConnector() + for x := 0; x < idcount; x++ { + err := sut.CreateIfNotExists(context.TODO(), clusteredEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data"), + "c1": dosa.FieldValue(int64(1)), + "c7": dosa.FieldValue(testUUIDs[x])}) + assert.NoError(b, err) + } + + for x := 0; x < b.N; x++ { + _, err := sut.Read(context.TODO(), clusteredEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data"), + "c1": dosa.FieldValue(int64(1)), + "c7": dosa.FieldValue(testUUIDs[x%idcount])}, dosa.All()) + assert.NoError(b, err) + } +} + +func TestCompareType(t *testing.T) { + tuuid := dosa.NewUUID() + v1uuid := dosa.UUID(uuid.NewV1().String()) + v1newer := dosa.UUID(uuid.NewV1().String()) + tests := []struct { + t1, t2 dosa.FieldValue + result int8 + }{ + {dosa.FieldValue(int32(1)), dosa.FieldValue(int32(1)), 0}, + {dosa.FieldValue(int64(1)), dosa.FieldValue(int64(1)), 0}, + {dosa.FieldValue("test"), dosa.FieldValue("test"), 0}, + {dosa.FieldValue(time.Time{}), dosa.FieldValue(time.Time{}), 0}, + {dosa.FieldValue(tuuid), dosa.FieldValue(tuuid), 0}, + {dosa.FieldValue(v1uuid), dosa.FieldValue(v1uuid), 0}, + {dosa.FieldValue(false), dosa.FieldValue(false), 0}, + {dosa.FieldValue([]byte{1}), dosa.FieldValue([]byte{1}), 0}, + {dosa.FieldValue(1.0), dosa.FieldValue(1.0), 0}, + + {dosa.FieldValue(int32(1)), dosa.FieldValue(int32(2)), -1}, + {dosa.FieldValue(int64(1)), dosa.FieldValue(int64(2)), -1}, + {dosa.FieldValue("test"), dosa.FieldValue("test2"), -1}, + {dosa.FieldValue(time.Time{}), dosa.FieldValue(time.Time{}.Add(time.Duration(1))), -1}, + {dosa.FieldValue(v1uuid), dosa.FieldValue(tuuid), -1}, + {dosa.FieldValue(v1uuid), dosa.FieldValue(v1newer), -1}, + {dosa.FieldValue(false), dosa.FieldValue(true), -1}, + {dosa.FieldValue([]byte{1}), dosa.FieldValue([]byte{2}), -1}, + {dosa.FieldValue(0.9), dosa.FieldValue(1.0), -1}, + + {dosa.FieldValue(int32(2)), dosa.FieldValue(int32(1)), 1}, + {dosa.FieldValue(int64(2)), dosa.FieldValue(int64(1)), 1}, + {dosa.FieldValue("test2"), dosa.FieldValue("test"), 1}, + {dosa.FieldValue(time.Time{}.Add(time.Duration(1))), dosa.FieldValue(time.Time{}), 1}, + {dosa.FieldValue(tuuid), dosa.FieldValue(v1uuid), 1}, + {dosa.FieldValue(v1newer), dosa.FieldValue(v1uuid), 1}, + {dosa.FieldValue(true), dosa.FieldValue(false), 1}, + {dosa.FieldValue([]byte{2}), dosa.FieldValue([]byte{1}), 1}, + {dosa.FieldValue(1.1), dosa.FieldValue(1.0), 1}, + } + for _, test := range tests { + assert.Equal(t, test.result, compareType(test.t1, test.t2)) + } + + assert.Panics(t, func() { compareType(t, t) }) +} + +func TestConnector_Scan(t *testing.T) { + sut := NewConnector() + testUUIDs := make([]dosa.UUID, 10) + for x := 0; x < 10; x++ { + testUUIDs[x] = dosa.NewUUID() + } + // scan with nothing there yet + _, token, err := sut.Scan(context.TODO(), clusteredEi, dosa.All(), "", 100) + assert.True(t, dosa.ErrorIsNotFound(err)) + assert.Empty(t, token) + + // first, insert 10 random UUID values into two partition keys + for x := 0; x < 10; x++ { + err := sut.Upsert(context.TODO(), clusteredEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data" + string(x%2)), + "c1": dosa.FieldValue(int64(1)), + "c7": dosa.FieldValue(testUUIDs[x])}) + assert.NoError(t, err) + } + + data, token, err := sut.Scan(context.TODO(), clusteredEi, dosa.All(), "", 100) + assert.NoError(t, err) + assert.Len(t, data, 10) + assert.Empty(t, token) + + // there's an odd edge case when you delete everything, so do that, then call scan + for x := 0; x < 10; x++ { + err := sut.Remove(context.TODO(), clusteredEi, map[string]dosa.FieldValue{ + "f1": dosa.FieldValue("data" + string(x%2)), + "c1": dosa.FieldValue(int64(1)), + "c7": dosa.FieldValue(testUUIDs[x])}) + assert.NoError(t, err) + } + data, token, err = sut.Scan(context.TODO(), clusteredEi, dosa.All(), "", 100) + assert.True(t, dosa.ErrorIsNotFound(err)) + assert.Empty(t, token) +} + +func TestConstruction(t *testing.T) { + c, err := dosa.GetConnector("memory", nil) + assert.NoError(t, err) + assert.IsType(t, NewConnector(), c) + + v, err := c.CheckSchema(context.TODO(), "dummy", "dummy", nil) + assert.Equal(t, int32(1), v) + assert.NoError(t, err) +} + +func TestPanics(t *testing.T) { + assert.Panics(t, func() { + passCol(dosa.FieldValue(int64(1)), &dosa.Condition{Op: 0, Value: dosa.FieldValue(int64(1))}) + }) +} diff --git a/connectors/random/random.go b/connectors/random/random.go index 4050167d..ebdc3881 100644 --- a/connectors/random/random.go +++ b/connectors/random/random.go @@ -52,9 +52,9 @@ func randomString(slen int) string { // Data generates some random data. Because our test is blackbox in a different package, // we have to export this -func Data(ei *dosa.EntityInfo, fieldsToRead []string) map[string]dosa.FieldValue { +func Data(ei *dosa.EntityInfo, minimumFields []string) map[string]dosa.FieldValue { var result = map[string]dosa.FieldValue{} - for _, field := range fieldsToRead { + for _, field := range minimumFields { var v dosa.FieldValue cd := ei.Def.FindColumnDefinition(field) switch cd.Type { @@ -95,17 +95,17 @@ func Data(ei *dosa.EntityInfo, fieldsToRead []string) map[string]dosa.FieldValue } // Read always returns random data of the type specified -func (c *Connector) Read(ctx context.Context, ei *dosa.EntityInfo, values map[string]dosa.FieldValue, fieldsToRead []string) (map[string]dosa.FieldValue, error) { +func (c *Connector) Read(ctx context.Context, ei *dosa.EntityInfo, values map[string]dosa.FieldValue, minimumFields []string) (map[string]dosa.FieldValue, error) { - return Data(ei, fieldsToRead), nil + return Data(ei, minimumFields), nil } // MultiRead returns a set of random data for each key you specify -func (c *Connector) MultiRead(ctx context.Context, ei *dosa.EntityInfo, values []map[string]dosa.FieldValue, fieldsToRead []string) ([]*dosa.FieldValuesOrError, error) { +func (c *Connector) MultiRead(ctx context.Context, ei *dosa.EntityInfo, values []map[string]dosa.FieldValue, minimumFields []string) ([]*dosa.FieldValuesOrError, error) { vals := make([]*dosa.FieldValuesOrError, len(values)) for inx := range values { vals[inx] = &dosa.FieldValuesOrError{ - Values: Data(ei, fieldsToRead), + Values: Data(ei, minimumFields), } } return vals, nil @@ -141,22 +141,22 @@ func (c *Connector) MultiRemove(ctx context.Context, ei *dosa.EntityInfo, multiV } // Range returns a random set of data, and a random continuation token -func (c *Connector) Range(ctx context.Context, ei *dosa.EntityInfo, columnConditions map[string][]*dosa.Condition, fieldsToRead []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { +func (c *Connector) Range(ctx context.Context, ei *dosa.EntityInfo, columnConditions map[string][]*dosa.Condition, minimumFields []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { vals := make([]map[string]dosa.FieldValue, limit) for inx := range vals { - vals[inx] = Data(ei, fieldsToRead) + vals[inx] = Data(ei, minimumFields) } return vals, randomString(32), nil } // Search also returns a random set of data, just like Range -func (c *Connector) Search(ctx context.Context, ei *dosa.EntityInfo, fieldPairs dosa.FieldNameValuePair, fieldsToRead []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { - return c.Range(ctx, ei, map[string][]*dosa.Condition{}, fieldsToRead, token, limit) +func (c *Connector) Search(ctx context.Context, ei *dosa.EntityInfo, fieldPairs dosa.FieldNameValuePair, minimumFields []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { + return c.Range(ctx, ei, map[string][]*dosa.Condition{}, minimumFields, token, limit) } // Scan also returns a random set of data, like Range and Search -func (c *Connector) Scan(ctx context.Context, ei *dosa.EntityInfo, fieldsToRead []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { - return c.Range(ctx, ei, map[string][]*dosa.Condition{}, fieldsToRead, token, limit) +func (c *Connector) Scan(ctx context.Context, ei *dosa.EntityInfo, minimumFields []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { + return c.Range(ctx, ei, map[string][]*dosa.Condition{}, minimumFields, token, limit) } // CheckSchema always returns a slice of int32 values that match its index diff --git a/connectors/random/random_test.go b/connectors/random/random_test.go index 184ef679..d49b028d 100644 --- a/connectors/random/random_test.go +++ b/connectors/random/random_test.go @@ -58,7 +58,7 @@ var ( testPairs = dosa.FieldNameValuePair{} testValues = make(map[string]dosa.FieldValue) testMultiValues = make([]map[string]dosa.FieldValue, 50) - fieldsToRead = []string{"booltype", "int32type", "int64type", "doubletype", "stringtype", "blobtype", "timetype", "uuidtype"} + minimumFields = []string{"booltype", "int32type", "int64type", "doubletype", "stringtype", "blobtype", "timetype", "uuidtype"} ctx = context.Background() ) @@ -67,21 +67,21 @@ func TestRandom_CreateIfNotExists(t *testing.T) { } func TestRandom_Read(t *testing.T) { - val, err := sut.Read(ctx, testInfo, testValues, fieldsToRead) + val, err := sut.Read(ctx, testInfo, testValues, minimumFields) assert.NoError(t, err) assert.NotNil(t, val) - for _, field := range fieldsToRead { + for _, field := range minimumFields { assert.NotNil(t, val[field]) } } func TestRandom_MultiRead(t *testing.T) { - v, e := sut.MultiRead(ctx, testInfo, testMultiValues, fieldsToRead) + v, e := sut.MultiRead(ctx, testInfo, testMultiValues, minimumFields) assert.NotNil(t, v) assert.Nil(t, e) assert.Equal(t, len(testMultiValues), len(v)) for i := range v { - for _, field := range fieldsToRead { + for _, field := range minimumFields { assert.NotNil(t, v[i].Values[field]) } } @@ -111,19 +111,19 @@ func TestRandom_MultiRemove(t *testing.T) { func TestRandom_Range(t *testing.T) { conditions := make(map[string][]*dosa.Condition) - vals, _, err := sut.Range(ctx, testInfo, conditions, fieldsToRead, "", 32) + vals, _, err := sut.Range(ctx, testInfo, conditions, minimumFields, "", 32) assert.NotNil(t, vals) assert.NoError(t, err) } func TestRandom_Search(t *testing.T) { - vals, _, err := sut.Search(ctx, testInfo, testPairs, fieldsToRead, "", 32) + vals, _, err := sut.Search(ctx, testInfo, testPairs, minimumFields, "", 32) assert.NotNil(t, vals) assert.NoError(t, err) } func TestRandom_Scan(t *testing.T) { - vals, _, err := sut.Scan(ctx, testInfo, fieldsToRead, "", 32) + vals, _, err := sut.Scan(ctx, testInfo, minimumFields, "", 32) assert.NotNil(t, vals) assert.NoError(t, err) } @@ -168,6 +168,6 @@ func TestRandom_Shutdown(t *testing.T) { func TestRandom_badTypePanic(t *testing.T) { testInfo.Def.Columns[0].Type = dosa.Invalid assert.Panics(t, func() { - random.Data(testInfo, fieldsToRead) + random.Data(testInfo, minimumFields) }) } diff --git a/connectors/yarpc/helpers.go b/connectors/yarpc/helpers.go index d75ad685..90f3a939 100644 --- a/connectors/yarpc/helpers.go +++ b/connectors/yarpc/helpers.go @@ -23,9 +23,9 @@ package yarpc import ( "time" + "github.com/pkg/errors" "github.com/uber-go/dosa" dosarpc "github.com/uber/dosa-idl/.gen/dosa" - "github.com/pkg/errors" ) // RawValueAsInterface converts a value from the wire to an object implementing the interface @@ -216,15 +216,15 @@ func decodeResults(ei *dosa.EntityInfo, invals dosarpc.FieldValueMap) map[string return result } -func makeRPCFieldsToRead(fieldsToRead []string) map[string]struct{} { - var rpcFieldsToRead map[string]struct{} - if fieldsToRead != nil { - rpcFieldsToRead = map[string]struct{}{} - for _, field := range fieldsToRead { - rpcFieldsToRead[field] = struct{}{} +func makeRPCminimumFields(minimumFields []string) map[string]struct{} { + var rpcminimumFields map[string]struct{} + if minimumFields != nil { + rpcminimumFields = map[string]struct{}{} + for _, field := range minimumFields { + rpcminimumFields[field] = struct{}{} } } - return rpcFieldsToRead + return rpcminimumFields } func entityInfoToSchemaRef(ei *dosa.EntityInfo) *dosarpc.SchemaRef { scope := ei.Ref.Scope diff --git a/connectors/yarpc/yarpc.go b/connectors/yarpc/yarpc.go index 85f7cb54..6f8cd434 100644 --- a/connectors/yarpc/yarpc.go +++ b/connectors/yarpc/yarpc.go @@ -190,13 +190,13 @@ func (c *Connector) Upsert(ctx context.Context, ei *dosa.EntityInfo, values map[ } // Read reads a single entity -func (c *Connector) Read(ctx context.Context, ei *dosa.EntityInfo, keys map[string]dosa.FieldValue, fieldsToRead []string) (map[string]dosa.FieldValue, error) { +func (c *Connector) Read(ctx context.Context, ei *dosa.EntityInfo, keys map[string]dosa.FieldValue, minimumFields []string) (map[string]dosa.FieldValue, error) { // Convert the fields from the client's map to a set of fields to read - var rpcFieldsToRead map[string]struct{} - if fieldsToRead != nil { - rpcFieldsToRead = map[string]struct{}{} - for _, field := range fieldsToRead { - rpcFieldsToRead[field] = struct{}{} + var rpcMinimumFields map[string]struct{} + if minimumFields != nil { + rpcMinimumFields = map[string]struct{}{} + for _, field := range minimumFields { + rpcMinimumFields[field] = struct{}{} } } @@ -215,7 +215,7 @@ func (c *Connector) Read(ctx context.Context, ei *dosa.EntityInfo, keys map[stri readRequest := &dosarpc.ReadRequest{ Ref: entityInfoToSchemaRef(ei), KeyValues: rpcFields, - FieldsToRead: rpcFieldsToRead, + FieldsToRead: rpcMinimumFields, } response, err := c.Client.Read(ctx, readRequest) @@ -234,9 +234,9 @@ func (c *Connector) Read(ctx context.Context, ei *dosa.EntityInfo, keys map[stri } // MultiRead reads multiple entities at one time -func (c *Connector) MultiRead(ctx context.Context, ei *dosa.EntityInfo, keys []map[string]dosa.FieldValue, fieldsToRead []string) ([]*dosa.FieldValuesOrError, error) { +func (c *Connector) MultiRead(ctx context.Context, ei *dosa.EntityInfo, keys []map[string]dosa.FieldValue, minimumFields []string) ([]*dosa.FieldValuesOrError, error) { // Convert the fields from the client's map to a set of fields to read - rpcFieldsToRead := makeRPCFieldsToRead(fieldsToRead) + rpcMinimumFields := makeRPCminimumFields(minimumFields) // convert the keys to RPC's Value rpcFields := make([]dosarpc.FieldValueMap, len(keys)) @@ -256,7 +256,7 @@ func (c *Connector) MultiRead(ctx context.Context, ei *dosa.EntityInfo, keys []m request := &dosarpc.MultiReadRequest{ Ref: entityInfoToSchemaRef(ei), KeyValues: rpcFields, - FieldsToRead: rpcFieldsToRead, + FieldsToRead: rpcMinimumFields, } response, err := c.Client.MultiRead(ctx, request) @@ -323,9 +323,9 @@ func (c *Connector) MultiRemove(ctx context.Context, ei *dosa.EntityInfo, multiK } // Range does a scan across a range -func (c *Connector) Range(ctx context.Context, ei *dosa.EntityInfo, columnConditions map[string][]*dosa.Condition, fieldsToRead []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { +func (c *Connector) Range(ctx context.Context, ei *dosa.EntityInfo, columnConditions map[string][]*dosa.Condition, minimumFields []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { limit32 := int32(limit) - rpcFieldsToRead := makeRPCFieldsToRead(fieldsToRead) + rpcMinimumFields := makeRPCminimumFields(minimumFields) rpcConditions := []*dosarpc.Condition{} for field, conditions := range columnConditions { // Warning: Don't remove this line. @@ -347,7 +347,7 @@ func (c *Connector) Range(ctx context.Context, ei *dosa.EntityInfo, columnCondit Token: &token, Limit: &limit32, Conditions: rpcConditions, - FieldsToRead: rpcFieldsToRead, + FieldsToRead: rpcMinimumFields, } response, err := c.Client.Range(ctx, &rangeRequest) if err != nil { @@ -361,19 +361,19 @@ func (c *Connector) Range(ctx context.Context, ei *dosa.EntityInfo, columnCondit } // Search is not yet implemented -func (c *Connector) Search(ctx context.Context, ei *dosa.EntityInfo, fieldPairs dosa.FieldNameValuePair, fieldsToRead []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { +func (c *Connector) Search(ctx context.Context, ei *dosa.EntityInfo, fieldPairs dosa.FieldNameValuePair, minimumFields []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { panic("not implemented") } // Scan marshals a scan request into YaRPC -func (c *Connector) Scan(ctx context.Context, ei *dosa.EntityInfo, fieldsToRead []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { +func (c *Connector) Scan(ctx context.Context, ei *dosa.EntityInfo, minimumFields []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { limit32 := int32(limit) - rpcFieldsToRead := makeRPCFieldsToRead(fieldsToRead) + rpcMinimumFields := makeRPCminimumFields(minimumFields) scanRequest := dosarpc.ScanRequest{ Ref: entityInfoToSchemaRef(ei), Token: &token, Limit: &limit32, - FieldsToRead: rpcFieldsToRead, + FieldsToRead: rpcMinimumFields, } response, err := c.Client.Scan(ctx, &scanRequest) if err != nil { diff --git a/connectors/yarpc/yarpc_test.go b/connectors/yarpc/yarpc_test.go index c844ea2d..3e6f7a29 100644 --- a/connectors/yarpc/yarpc_test.go +++ b/connectors/yarpc/yarpc_test.go @@ -425,8 +425,8 @@ func TestYaRPCClient_CreateIfNotExists(t *testing.T) { // cover the conversion error case err = sut.CreateIfNotExists(ctx, testEi, map[string]dosa.FieldValue{"c7": dosa.UUID("")}) assert.Error(t, err) - assert.Contains(t, err.Error(), "\"c7\"") // must contain name of bad field - assert.Contains(t, err.Error(), "too short") // must mention that the uuid is too short + assert.Contains(t, err.Error(), "\"c7\"") // must contain name of bad field + assert.Contains(t, err.Error(), "too short") // must mention that the uuid is too short assert.NoError(t, sut.Shutdown()) } @@ -476,8 +476,8 @@ func TestYaRPCClient_Upsert(t *testing.T) { // cover the conversion error case err = sut.Upsert(ctx, testEi, map[string]dosa.FieldValue{"c7": dosa.UUID("")}) assert.Error(t, err) - assert.Contains(t, err.Error(), "\"c7\"") // must contain name of bad field - assert.Contains(t, err.Error(), "too short") // must mention that the uuid is too short + assert.Contains(t, err.Error(), "\"c7\"") // must contain name of bad field + assert.Contains(t, err.Error(), "too short") // must mention that the uuid is too short // make sure we actually called CreateIfNotExists on the interface ctrl.Finish() @@ -791,8 +791,8 @@ func TestConnector_Remove(t *testing.T) { // cover the conversion error case err = sut.Remove(ctx, testEi, map[string]dosa.FieldValue{"c7": dosa.UUID("321")}) assert.Error(t, err) - assert.Contains(t, err.Error(), "\"c7\"") // must contain name of bad field - assert.Contains(t, err.Error(), "too short") // must mention that the uuid is too short + assert.Contains(t, err.Error(), "\"c7\"") // must contain name of bad field + assert.Contains(t, err.Error(), "too short") // must mention that the uuid is too short // make sure we actually called Read on the interface ctrl.Finish() diff --git a/examples/testing/doc.go b/examples/testing/doc.go index ec4445af..3ecddafe 100644 --- a/examples/testing/doc.go +++ b/examples/testing/doc.go @@ -91,5 +91,70 @@ // // A complete, runnable example of this can be found in our testing examples package (https://github.com/uber-go/dosa/tree/master/examples/testing). // +// EqRangeOp and EqScanOp +// +// In addition to the MockClient, dosa provides two useful gomock.Matchers. EqRangeOp allows you to verify +// that an expected call to +// Range is made with a specific RangeOp. EqScanOp does the same thing, except for +// the +// Scan function. For instance, assume we have the following entity: +// +// type MenuItem struct { +// dosa.Entity `dosa:"primaryKey=((MenuUUID), MenuItemUUID)"` +// MenuUUID dosa.UUID +// MenuItemUUID dosa.UUID +// Name string +// Description string +// } +// +// Let's also assume we add the following receiver function to our DataStore struct: +// +// func (d *Datastore) GetMenu(ctx context.Context, menuUUID dosa.UUID) ([]*MenuItem, error) { +// op := dosa.NewRangeOp(&MenuItem{}).Eq("MenuUUID", menuUUID).Limit(50) +// rangeCtx, rangeCancelFn := context.WithTimeout(ctx, 1*time.Second) +// defer rangeCancelFn() +// +// objs, _, err := d.client.Range(rangeCtx, op) +// if err != nil { +// return nil, err +// } +// +// menuItems := make([]*MenuItem, len(objs)) +// for i, obj := range objs { +// menuItems[i] = obj.(*MenuItem) +// } +// return menuItems, nil +// } +// +// In our tests, we could verify that a particular list of MenuItem entities were queried for using the EqRangeOplike so: +// +// +// func TestGetMenu(t *testing.T) { +// ctrl := gomock.NewController(t) +// defer ctrl.Finish() +// +// expectedOp := dosa.NewRangeOp(&examples.MenuItem{}).Eq("MenuUUID", menuUUID).Limit(50) +// +// // mock error from Range call +// c1 := mocks.NewMockClient(ctrl) +// c1.EXPECT().Initialize(gomock.Any()).Return(nil).Times(1) +// c1.EXPECT().Range(gomock.Any(), dosa.EqRangeOp(expectedOp)).Return(nil, "", errors.New("Range Error")).Times(1) +// ds1, _ := examples.NewDatastore(c1) +// +// m1, err1 := ds1.GetMenu(ctx, menuUUID) +// assert.Error(t, err1) +// assert.Nil(t, m1) +// +// // happy path +// c2 := mocks.NewMockClient(ctrl) +// c2.EXPECT().Initialize(gomock.Any()).Return(nil).Times(1) +// c2.EXPECT().Range(gomock.Any(), dosa.EqRangeOp(expectedOp)).Return(objMenu, "", nil).Times(1) +// ds2, _ := examples.NewDatastore(c2) +// +// m2, err2 := ds2.GetMenu(ctx, menuUUID) +// assert.NoError(t, err2) +// assert.Equal(t, menu, m2) +// } +// // package testingexamples diff --git a/registry.go b/registry.go index a130f936..39d5bc09 100644 --- a/registry.go +++ b/registry.go @@ -158,15 +158,22 @@ func (e *RegisteredEntity) ColumnNames(fieldNames []string) ([]string, error) { // SetFieldValues is a helper for populating a DOSA entity with the given // fieldName->value map -func (e *RegisteredEntity) SetFieldValues(entity DomainObject, fieldValues map[string]FieldValue) { +func (e *RegisteredEntity) SetFieldValues(entity DomainObject, fieldValues map[string]FieldValue, fieldsToRead []string) { r := reflect.ValueOf(entity).Elem() - for columnName, fieldValue := range fieldValues { + if fieldsToRead == nil { + for columnName := range fieldValues { + fieldsToRead = append(fieldsToRead, columnName) + } + } + //for columnName, fieldValue := range fieldValues { + for _, columnName := range fieldsToRead { // column name may be different from the entity's field name, so we // have to look it up along the way. fieldName, ok := e.table.ColToField[columnName] if !ok { continue // we ignore fields that we don't know about } + fieldValue := fieldValues[columnName] val := r.FieldByName(fieldName) if !val.IsValid() { panic("Field " + fieldName + " is is not a valid field for " + e.table.StructName) diff --git a/registry_test.go b/registry_test.go index 3e938be6..a770a021 100644 --- a/registry_test.go +++ b/registry_test.go @@ -154,17 +154,17 @@ func TestRegisteredEntity_SetFieldValues(t *testing.T) { // invalid entity assert.Panics(t, func() { - re.SetFieldValues(&RegistryTestInvalid{PrimaryKey: 1}, validFieldValues) + re.SetFieldValues(&RegistryTestInvalid{PrimaryKey: 1}, validFieldValues, []string{"name", "email"}) }) // invalid values are skipped - re.SetFieldValues(entity, invalidFieldValues) + re.SetFieldValues(entity, invalidFieldValues, []string{"id", "name", "invalid"}) assert.Equal(t, entity.ID, invalidFieldValues["id"]) assert.Equal(t, entity.Name, invalidFieldValues["name"]) assert.Equal(t, entity.Email, "foo@email.com") // valid - re.SetFieldValues(entity, validFieldValues) + re.SetFieldValues(entity, validFieldValues, []string{"id", "name", "email"}) assert.Equal(t, entity.ID, validFieldValues["id"]) assert.Equal(t, entity.Name, validFieldValues["name"]) assert.Equal(t, entity.Email, validFieldValues["email"])