Skip to content

Commit

Permalink
MLPAB-2059 Index donor names and alter queries to work with that (#1219)
Browse files Browse the repository at this point in the history
  • Loading branch information
hawx authored May 7, 2024
1 parent 7957acb commit 720ecc7
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 69 deletions.
19 changes: 16 additions & 3 deletions internal/app/donor_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,16 @@ func (s *donorStore) GetByKeys(ctx context.Context, keys []dynamo.Keys) ([]actor
var donors []actor.DonorProvidedDetails
err = attributevalue.UnmarshalListOfMaps(items, &donors)

mappedDonors := map[string]actor.DonorProvidedDetails{}
for _, donor := range donors {
mappedDonors[donor.PK.PK()+"|"+donor.SK.SK()] = donor
}

clear(donors)
for i, key := range keys {
donors[i] = mappedDonors[key.PK.PK()+"|"+key.SK.SK()]
}

return donors, err
}

Expand All @@ -268,9 +278,12 @@ func (s *donorStore) Put(ctx context.Context, donor *actor.DonorProvidedDetails)
donor.UpdatedAt = s.now()

if err := s.searchClient.Index(ctx, search.Lpa{
PK: donor.PK.PK(),
SK: donor.SK.SK(),
DonorFullName: donor.Donor.FullName(),
PK: donor.PK.PK(),
SK: donor.SK.SK(),
Donor: search.LpaDonor{
FirstNames: donor.Donor.FirstNames,
LastName: donor.Donor.LastName,
},
}); err != nil {
return fmt.Errorf("donorStore index failed: %w", err)
}
Expand Down
19 changes: 14 additions & 5 deletions internal/app/donor_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,14 +291,23 @@ func TestDonorStoreLatestWhenDataStoreError(t *testing.T) {
}

func TestDonorStoreGetByKeys(t *testing.T) {
keys := []dynamo.Keys{{}}
donors := []actor.DonorProvidedDetails{{LpaID: "1"}, {LpaID: "2"}}
av0, _ := attributevalue.MarshalMap(donors[0])
keys := []dynamo.Keys{
{PK: dynamo.LpaKey("1"), SK: dynamo.LpaOwnerKey(dynamo.DonorKey("a"))},
{PK: dynamo.LpaKey("2"), SK: dynamo.LpaOwnerKey(dynamo.DonorKey("b"))},
{PK: dynamo.LpaKey("3"), SK: dynamo.LpaOwnerKey(dynamo.DonorKey("c"))},
}
donors := []actor.DonorProvidedDetails{
{PK: dynamo.LpaKey("1"), SK: dynamo.LpaOwnerKey(dynamo.DonorKey("a")), LpaID: "1"},
{PK: dynamo.LpaKey("2"), SK: dynamo.LpaOwnerKey(dynamo.DonorKey("b")), LpaID: "2"},
{PK: dynamo.LpaKey("3"), SK: dynamo.LpaOwnerKey(dynamo.DonorKey("c")), LpaID: "3"},
}
av0, _ := attributevalue.MarshalMap(donors[2])
av1, _ := attributevalue.MarshalMap(donors[1])
av2, _ := attributevalue.MarshalMap(donors[0])

dynamoClient := newMockDynamoClient(t)
dynamoClient.ExpectAllByKeys(ctx, keys,
[]map[string]types.AttributeValue{av0, av1}, nil)
[]map[string]types.AttributeValue{av0, av1, av2}, nil)

donorStore := &donorStore{dynamoClient: dynamoClient}

Expand Down Expand Up @@ -356,7 +365,7 @@ func TestDonorStorePutWhenUIDSet(t *testing.T) {

searchClient := newMockSearchClient(t)
searchClient.EXPECT().
Index(ctx, search.Lpa{PK: dynamo.LpaKey("5").PK(), SK: dynamo.DonorKey("an-id").SK(), DonorFullName: "x y"}).
Index(ctx, search.Lpa{PK: dynamo.LpaKey("5").PK(), SK: dynamo.DonorKey("an-id").SK(), Donor: search.LpaDonor{FirstNames: "x", LastName: "y"}}).
Return(nil)

donorStore := &donorStore{dynamoClient: dynamoClient, searchClient: searchClient, now: testNowFn}
Expand Down
9 changes: 6 additions & 3 deletions internal/app/uid_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,12 @@ func (s *uidStore) Set(ctx context.Context, lpaID, sessionID, organisationID, ui
}

if err := s.searchClient.Index(ctx, search.Lpa{
PK: dynamo.LpaKey(lpaID).PK(),
SK: sk.SK(),
DonorFullName: donor.Donor.FullName(),
PK: dynamo.LpaKey(lpaID).PK(),
SK: sk.SK(),
Donor: search.LpaDonor{
FirstNames: donor.Donor.FirstNames,
LastName: donor.Donor.LastName,
},
}); err != nil {
return fmt.Errorf("uidStore index failed: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/app/uid_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ func TestUidStoreSet(t *testing.T) {
searchClient := newMockSearchClient(t)
searchClient.EXPECT().
Index(ctx, search.Lpa{
PK: dynamo.LpaKey("lpa-id").PK(),
SK: tc.sk.SK(),
DonorFullName: "x y",
PK: dynamo.LpaKey("lpa-id").PK(),
SK: tc.sk.SK(),
Donor: search.LpaDonor{FirstNames: "x", LastName: "y"},
}).
Return(nil)

Expand Down
17 changes: 15 additions & 2 deletions internal/page/fixtures/supporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/base64"
"fmt"
"log"
"net/http"
"strconv"
"strings"
Expand Down Expand Up @@ -303,11 +304,23 @@ func Supporter(
}

func waitForLPAIndex(searchClient *search.Client, organisationCtx context.Context) {
count := 0

for range time.Tick(time.Second) {
if resp, _ := searchClient.Query(organisationCtx, search.QueryRequest{
resp, err := searchClient.Query(organisationCtx, search.QueryRequest{
Page: 1,
PageSize: 1,
}); resp != nil && len(resp.Keys) > 0 {
})
if err != nil {
log.Println(err)
}

if count > 10 {
return
}
count++

if resp != nil && len(resp.Keys) > 0 {
break
}
}
Expand Down
92 changes: 52 additions & 40 deletions internal/search/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,22 @@ import (
requestsigner "github.com/opensearch-project/opensearch-go/v4/signer/awsv2"
)

const (
indexDefinition = `
{
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 0
}
var indexDefinition = map[string]any{
"settings": map[string]any{
"index": map[string]any{
"number_of_shards": 1,
"number_of_replicas": 0,
},
"mappings": {
"properties": {
"DonorFullNameText": { "type": "text" },
"DonorFullName": { "type": "keyword", "copy_to": "DonorFullNameText" },
"SK": { "type": "keyword" }
}
}
}`
)
},
"mappings": map[string]any{
"properties": map[string]any{
"PK": map[string]any{"type": "keyword"},
"SK": map[string]any{"type": "keyword"},
"Donor.FirstNames": map[string]any{"type": "keyword"},
"Donor.LastName": map[string]any{"type": "keyword"},
},
},
}

type opensearchapiClient interface {
Search(ctx context.Context, req *opensearchapi.SearchReq) (*opensearchapi.SearchResp, error)
Expand All @@ -50,9 +48,14 @@ type QueryResponse struct {
}

type Lpa struct {
DonorFullName string
PK string
SK string
PK string
SK string
Donor LpaDonor
}

type LpaDonor struct {
FirstNames string
LastName string
}

type QueryRequest struct {
Expand Down Expand Up @@ -88,14 +91,16 @@ func NewClient(cfg aws.Config, endpoint, indexName string, indexingEnabled bool)
}

func (c *Client) CreateIndices(ctx context.Context) error {
_, err := c.indices.Exists(ctx, opensearchapi.IndicesExistsReq{Indices: []string{c.indexName}})
if err == nil {
return nil
body, err := json.Marshal(indexDefinition)
if err != nil {
return err
}

settings := strings.NewReader(indexDefinition)
if _, err := c.indices.Exists(ctx, opensearchapi.IndicesExistsReq{Indices: []string{c.indexName}}); err == nil {
return nil
}

if _, err := c.indices.Create(ctx, opensearchapi.IndicesCreateReq{Index: c.indexName, Body: settings}); err != nil {
if _, err := c.indices.Create(ctx, opensearchapi.IndicesCreateReq{Index: c.indexName, Body: bytes.NewReader(body)}); err != nil {
return fmt.Errorf("search could not create index: %w", err)
}

Expand Down Expand Up @@ -128,11 +133,7 @@ func (c *Client) Query(ctx context.Context, req QueryRequest) (*QueryResponse, e
}

body, err := json.Marshal(map[string]any{
"query": map[string]any{
"match": map[string]any{
"SK": sk,
},
},
"query": baseQuery(sk),
})
if err != nil {
return nil, err
Expand All @@ -144,7 +145,7 @@ func (c *Client) Query(ctx context.Context, req QueryRequest) (*QueryResponse, e
Params: opensearchapi.SearchParams{
From: aws.Int((req.Page - 1) * req.PageSize),
Size: aws.Int(req.PageSize),
Sort: []string{"DonorFullName"},
Sort: []string{"Donor.FirstNames", "Donor.LastName"},
},
})
if err != nil {
Expand Down Expand Up @@ -181,15 +182,7 @@ func (c *Client) CountWithQuery(ctx context.Context, req CountWithQueryReq) (int
"track_total_hits": true,
}

query := map[string]map[string]any{
"bool": {
"must": map[string]any{
"match": map[string]string{
"SK": sk,
},
},
},
}
query := baseQuery(sk)

if req.MustNotExist != "" {
query["bool"]["must_not"] = map[string]any{
Expand Down Expand Up @@ -217,6 +210,25 @@ func (c *Client) CountWithQuery(ctx context.Context, req CountWithQueryReq) (int
return resp.Hits.Total.Value, err
}

func baseQuery(sk string) map[string]map[string]any {
return map[string]map[string]any{
"bool": {
"must": []map[string]any{
{
"match": map[string]string{
"SK": sk,
},
},
{
"prefix": map[string]string{
"PK": dynamo.LpaKey("").PK(),
},
},
},
},
}
}

func getSKFromContext(ctx context.Context) (string, error) {
session, err := page.SessionDataFromContext(ctx)
if err != nil {
Expand Down
25 changes: 13 additions & 12 deletions internal/search/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/aws/aws-sdk-go-v2/aws"
Expand Down Expand Up @@ -53,12 +52,14 @@ func TestNewClient(t *testing.T) {
}

func TestClientCreateIndices(t *testing.T) {
data, _ := json.Marshal(indexDefinition)

indices := newMockIndicesClient(t)
indices.EXPECT().
Exists(ctx, opensearchapi.IndicesExistsReq{Indices: []string{testIndexName}}).
Return(nil, expectedError)
indices.EXPECT().
Create(ctx, opensearchapi.IndicesCreateReq{Index: testIndexName, Body: strings.NewReader(indexDefinition)}).
Create(ctx, opensearchapi.IndicesCreateReq{Index: testIndexName, Body: bytes.NewReader(data)}).
Return(nil, nil)

client := &Client{indices: indices, indexName: testIndexName, indexingEnabled: true}
Expand Down Expand Up @@ -97,18 +98,18 @@ func TestClientIndex(t *testing.T) {
Index(ctx, opensearchapi.IndexReq{
Index: testIndexName,
DocumentID: "LPA--2020",
Body: bytes.NewReader([]byte(`{"DonorFullName":"x y","PK":"LPA#2020","SK":"abc#123"}`)),
Body: bytes.NewReader([]byte(`{"PK":"LPA#2020","SK":"abc#123","Donor":{"FirstNames":"x","LastName":"y"}}`)),
}).
Return(nil, nil)

client := &Client{svc: svc, indexName: testIndexName, indexingEnabled: true}
err := client.Index(ctx, Lpa{DonorFullName: "x y", PK: dynamo.LpaKey("2020").PK(), SK: "abc#123"})
err := client.Index(ctx, Lpa{Donor: LpaDonor{FirstNames: "x", LastName: "y"}, PK: dynamo.LpaKey("2020").PK(), SK: "abc#123"})
assert.Nil(t, err)
}

func TestClientIndexWhenNotEnabled(t *testing.T) {
client := &Client{}
err := client.Index(ctx, Lpa{DonorFullName: "x y", PK: dynamo.LpaKey("2020").PK(), SK: "abc#123"})
err := client.Index(ctx, Lpa{Donor: LpaDonor{FirstNames: "x", LastName: "y"}, PK: dynamo.LpaKey("2020").PK(), SK: "abc#123"})
assert.Nil(t, err)
}

Expand All @@ -119,7 +120,7 @@ func TestClientIndexWhenIndexErrors(t *testing.T) {
Return(nil, expectedError)

client := &Client{svc: svc, indexingEnabled: true}
err := client.Index(ctx, Lpa{DonorFullName: "x y", PK: dynamo.LpaKey("2020").PK(), SK: "abc#123"})
err := client.Index(ctx, Lpa{Donor: LpaDonor{FirstNames: "x", LastName: "y"}, PK: dynamo.LpaKey("2020").PK(), SK: "abc#123"})
assert.Equal(t, expectedError, err)
}

Expand Down Expand Up @@ -171,11 +172,11 @@ func TestClientQuery(t *testing.T) {
svc.EXPECT().
Search(ctx, &opensearchapi.SearchReq{
Indices: []string{testIndexName},
Body: bytes.NewReader([]byte(fmt.Sprintf(`{"query":{"match":{"SK":"%s"}}}`, tc.sk.SK()))),
Body: bytes.NewReader([]byte(fmt.Sprintf(`{"query":{"bool":{"must":[{"match":{"SK":"%s"}},{"prefix":{"PK":"LPA#"}}]}}}`, tc.sk.SK()))),
Params: opensearchapi.SearchParams{
From: aws.Int(tc.from),
Size: aws.Int(10),
Sort: []string{"DonorFullName"},
Sort: []string{"Donor.FirstNames", "Donor.LastName"},
},
}).
Return(resp, nil)
Expand Down Expand Up @@ -240,22 +241,22 @@ func TestClientCountWithQuery(t *testing.T) {
}{
"no query - donor": {
query: CountWithQueryReq{},
body: []byte(`{"query":{"bool":{"must":{"match":{"SK":"DONOR#1"}}}},"size":0,"track_total_hits":true}`),
body: []byte(`{"query":{"bool":{"must":[{"match":{"SK":"DONOR#1"}},{"prefix":{"PK":"LPA#"}}]}},"size":0,"track_total_hits":true}`),
session: &page.SessionData{SessionID: "1"},
},
"no query - organisation": {
query: CountWithQueryReq{},
body: []byte(`{"query":{"bool":{"must":{"match":{"SK":"ORGANISATION#1"}}}},"size":0,"track_total_hits":true}`),
body: []byte(`{"query":{"bool":{"must":[{"match":{"SK":"ORGANISATION#1"}},{"prefix":{"PK":"LPA#"}}]}},"size":0,"track_total_hits":true}`),
session: &page.SessionData{OrganisationID: "1"},
},
"MustNotExist query - donor": {
query: CountWithQueryReq{MustNotExist: "a-field"},
body: []byte(`{"query":{"bool":{"must":{"match":{"SK":"DONOR#1"}},"must_not":{"exists":{"field":"a-field"}}}},"size":0,"track_total_hits":true}`),
body: []byte(`{"query":{"bool":{"must":[{"match":{"SK":"DONOR#1"}},{"prefix":{"PK":"LPA#"}}],"must_not":{"exists":{"field":"a-field"}}}},"size":0,"track_total_hits":true}`),
session: &page.SessionData{SessionID: "1"},
},
"MustNotExist query - organisation": {
query: CountWithQueryReq{MustNotExist: "a-field"},
body: []byte(`{"query":{"bool":{"must":{"match":{"SK":"ORGANISATION#1"}},"must_not":{"exists":{"field":"a-field"}}}},"size":0,"track_total_hits":true}`),
body: []byte(`{"query":{"bool":{"must":[{"match":{"SK":"ORGANISATION#1"}},{"prefix":{"PK":"LPA#"}}],"must_not":{"exists":{"field":"a-field"}}}},"size":0,"track_total_hits":true}`),
session: &page.SessionData{OrganisationID: "1"},
},
}
Expand Down
2 changes: 1 addition & 1 deletion terraform/environment/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -119,5 +119,5 @@ locals {

mock_onelogin_version = "latest"

search_index_name = "lpas_${local.environment_name}"
search_index_name = "lpas_v2_${local.environment_name}"
}

0 comments on commit 720ecc7

Please sign in to comment.