Skip to content

Commit

Permalink
bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
okankoAMZ committed Jul 28, 2022
1 parent 74bca13 commit b068ea6
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions integration/test/performancetest/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (transmitter *TransmitterAPI) SendItem(packet map[string]interface{}) (stri
err = transmitter.UpdateItem(packet) //try to update the item
//this may be overwritten by other test threads, in that case it will return a specific error
if err !=nil{
return err
return "",err
}
fmt.Println("SendItem Completed")
return sentItem, err
Expand Down Expand Up @@ -274,10 +274,9 @@ Params:
testHash: this is the hash of the last item, used like a version check
*/
func (transmitter * TransmitterAPI) UpdateItem(packet map[string]interface{}) error{
var err error
var ae *types.ConditionalCheckFailedException // this exception represent the atomic check has failed
rand.Seed(time.Now().UnixNano())
randomSleepDuration := time.Duration(rand.Intn(UPDATE_DELAY_THRESHOLD)*time.Second)
randomSleepDuration := time.Duration(rand.Intn(UPDATE_DELAY_THRESHOLD))*time.Second
hash:= packet[HASH].(string)
for attemptCount:=0; attemptCount< MAX_ATTEMPTS; attemptCount++{
fmt.Println("Updating:",hash)
Expand All @@ -288,7 +287,10 @@ func (transmitter * TransmitterAPI) UpdateItem(packet map[string]interface{}) er
commitDate := fmt.Sprintf("%d",int(item[0][COMMIT_DATE].(float64)))
year := fmt.Sprintf("%d",int(item[0][PARTITION_KEY].(float64)))
testHash := item[0][TEST_ID].(string)
targetAttributes := PacketMerger(packet,item[0])
targetAttributes,err := transmitter.PacketMerger(packet,item[0])
if err !=nil{
return err
}
//setup the update expression
expressionAttributeValues := make(map[string]types.AttributeValue)
expression := ""
Expand Down Expand Up @@ -341,14 +343,14 @@ Param: commit hash in terms of string
func (transmitter * TransmitterAPI) UpdateReleaseTag(hash string) error{
var err error
var ae *types.ConditionalCheckFailedException
attributes := map[string]types.AttributeValue{
"IS_RELEASE":&types.AttributeValueMemberBOOL{Value: true},
}
packet := make(map[string]interface{})
packet[HASH] = hash
packet[IS_RELEASE] = true
for attemptCount:=0; attemptCount< MAX_ATTEMPTS; attemptCount++ {//concurrency retry
/*solving parallel updates using an optimistic lock and retry,
this may result in a livelock;
however, random sleeps makes this possiblity nearly impossible*/
err = transmitter.UpdateItem(hash,attributes) //try to update the item
err = transmitter.UpdateItem(packet) //try to update the item
//this may be overwritten by other test threads, in that case it will return a specific error
if errors.As(err,&ae){ //check if our call got overwritten
// item has changed
Expand Down

0 comments on commit b068ea6

Please sign in to comment.