-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.go
969 lines (891 loc) · 40.1 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
package dynamomq
import (
"context"
"errors"
"sort"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/vvatanabe/dynamomq/internal/clock"
"github.com/vvatanabe/dynamomq/internal/constant"
)
const (
defaultQueryLimit = 250
maxFirstMessagesInQueue = 100
)
// Client is an interface for interacting with a DynamoDB-based message queue system.
// It provides methods for various operations on messages within the queue.
// This interface is generic and works with any type T, which represents the structure of the message content.
type Client[T any] interface {
// SendMessage sends a message to the DynamoDB-based queue.
SendMessage(ctx context.Context, params *SendMessageInput[T]) (*SendMessageOutput[T], error)
// ReceiveMessage retrieves and processes a message from a DynamoDB-based queue.
ReceiveMessage(ctx context.Context, params *ReceiveMessageInput) (*ReceiveMessageOutput[T], error)
// ChangeMessageVisibility changes the visibility of a specific message in a DynamoDB-based queue.
ChangeMessageVisibility(ctx context.Context, params *ChangeMessageVisibilityInput) (*ChangeMessageVisibilityOutput[T], error)
// DeleteMessage deletes a specific message from a DynamoDB-based queue.
DeleteMessage(ctx context.Context, params *DeleteMessageInput) (*DeleteMessageOutput, error)
// MoveMessageToDLQ moves a specific message from a DynamoDB-based queue to a Dead Letter Queue (DLQ).
MoveMessageToDLQ(ctx context.Context, params *MoveMessageToDLQInput) (*MoveMessageToDLQOutput[T], error)
// RedriveMessage restore a specific message from a DynamoDB-based Dead Letter Queue (DLQ).
RedriveMessage(ctx context.Context, params *RedriveMessageInput) (*RedriveMessageOutput[T], error)
// GetMessage get a specific message from a DynamoDB-based queue.
GetMessage(ctx context.Context, params *GetMessageInput) (*GetMessageOutput[T], error)
// GetQueueStats is a method for obtaining statistical information about a DynamoDB-based queue.
GetQueueStats(ctx context.Context, params *GetQueueStatsInput) (*GetQueueStatsOutput, error)
// GetDLQStats get statistical information about a DynamoDB-based Dead Letter Queue (DLQ).
GetDLQStats(ctx context.Context, params *GetDLQStatsInput) (*GetDLQStatsOutput, error)
// ListMessages get a list of messages from a DynamoDB-based queue.
ListMessages(ctx context.Context, params *ListMessagesInput) (*ListMessagesOutput[T], error)
// ReplaceMessage replace a specific message within a DynamoDB-based queue.
ReplaceMessage(ctx context.Context, params *ReplaceMessageInput[T]) (*ReplaceMessageOutput, error)
}
// ClientOptions defines configuration options for the DynamoMQ client.
//
// Note: The following fields are primarily used for testing purposes.
// They allow for stubbing of operations during tests, facilitating the mocking of behavior without relying on a real DynamoDB instance:
//
// - Clock
// - MarshalMap
// - UnmarshalMap
// - UnmarshalListOfMaps
// - BuildExpression
//
// In typical use, these testing fields should not be modified. They are provided to support advanced use cases, like unit testing, where control over these operations is necessary.
type ClientOptions struct {
// DynamoDB is a pointer to the DynamoDB client used for database operations.
DynamoDB *dynamodb.Client
// TableName is the name of the DynamoDB table used for the queue.
TableName string
// QueueingIndexName is the name of the index used for queueing operations.
QueueingIndexName string
// MaximumReceives is the maximum number of times a message is delivered before being moved to the DLQ.
MaximumReceives int
// UseFIFO is a boolean indicating if the queue should behave as a First-In-First-Out (FIFO) queue.
UseFIFO bool
// BaseEndpoint is the base endpoint URL for DynamoDB requests.
BaseEndpoint string
// RetryMaxAttempts is the maximum number of attempts for retrying failed DynamoDB operations.
RetryMaxAttempts int
// Clock is an abstraction of time operations, allowing control over time during tests.
Clock clock.Clock
// MarshalMap is a function to marshal objects into a map of DynamoDB attribute values.
MarshalMap func(in interface{}) (map[string]types.AttributeValue, error)
// UnmarshalMap is a function to unmarshal a map of DynamoDB attribute values into objects.
UnmarshalMap func(m map[string]types.AttributeValue, out interface{}) error
// UnmarshalListOfMaps is a function to unmarshal a list of maps of DynamoDB attribute values into objects.
UnmarshalListOfMaps func(l []map[string]types.AttributeValue, out interface{}) error
// BuildExpression is a function to build DynamoDB expressions from a builder.
BuildExpression func(b expression.Builder) (expression.Expression, error)
}
// WithTableName is an option function to set the table name for the DynamoMQ client.
// Use this function to specify the name of the DynamoDB table that the client will use for storing and retrieving messages.
// By default, the table name is set to "dynamo-mq-table".
func WithTableName(tableName string) func(*ClientOptions) {
return func(s *ClientOptions) {
s.TableName = tableName
}
}
// WithQueueingIndexName is an option function to set the queue index name for the DynamoMQ client.
// This function allows defining a custom index name that the client will use for queue operations, optimizing message handling.
// By default, the index name is set to "dynamo-mq-index-queue_type-sent_at".
func WithQueueingIndexName(queueingIndexName string) func(*ClientOptions) {
return func(s *ClientOptions) {
s.QueueingIndexName = queueingIndexName
}
}
// WithUseFIFO is an option function to enable FIFO (First-In-First-Out) behavior for the DynamoMQ client.
// Setting this option to true makes the client treat the queue as a FIFO queue; otherwise, it is treated as a standard queue.
// By default, this option is set to false.
func WithUseFIFO(useFIFO bool) func(*ClientOptions) {
return func(s *ClientOptions) {
s.UseFIFO = useFIFO
}
}
// WithAWSDynamoDBClient is an option function to set a custom AWS DynamoDB client for the DynamoMQ client.
// This function is used to provide a pre-configured DynamoDB client that the DynamoMQ client will use for all interactions with DynamoDB.
func WithAWSDynamoDBClient(client *dynamodb.Client) func(*ClientOptions) {
return func(s *ClientOptions) {
s.DynamoDB = client
}
}
// WithAWSBaseEndpoint is an option function to set a custom base endpoint for AWS services.
// This function is useful when you want the client to interact with a specific AWS service endpoint, such as a local or a different regional endpoint.
// If the DynamoDB client is set using the WithAWSDynamoDBClient function, this option function is ignored.
func WithAWSBaseEndpoint(baseEndpoint string) func(*ClientOptions) {
return func(s *ClientOptions) {
s.BaseEndpoint = baseEndpoint
}
}
// WithAWSRetryMaxAttempts is an option function to set the maximum number of retry attempts for AWS service calls.
// Use this function to define how many times the client should retry a failed AWS service call.
// If the DynamoDB client is set using the WithAWSDynamoDBClient function, this option function is ignored.
func WithAWSRetryMaxAttempts(retryMaxAttempts int) func(*ClientOptions) {
return func(s *ClientOptions) {
s.RetryMaxAttempts = retryMaxAttempts
}
}
// NewFromConfig creates a new DynamoMQ client using the provided AWS configuration and any additional client options.
// This function initializes a new client with default settings, which can be customized using option functions.
// It returns an error if the initialization of the DynamoDB client fails.
func NewFromConfig[T any](cfg aws.Config, optFns ...func(*ClientOptions)) (Client[T], error) {
o := &ClientOptions{
TableName: constant.DefaultTableName,
QueueingIndexName: constant.DefaultQueueingIndexName,
RetryMaxAttempts: constant.DefaultRetryMaxAttempts,
UseFIFO: false,
Clock: &clock.RealClock{},
MarshalMap: attributevalue.MarshalMap,
UnmarshalMap: attributevalue.UnmarshalMap,
UnmarshalListOfMaps: attributevalue.UnmarshalListOfMaps,
BuildExpression: func(b expression.Builder) (expression.Expression, error) {
return b.Build()
},
}
for _, opt := range optFns {
opt(o)
}
c := &ClientImpl[T]{
tableName: o.TableName,
queueingIndexName: o.QueueingIndexName,
maximumReceives: o.MaximumReceives,
useFIFO: o.UseFIFO,
dynamoDB: o.DynamoDB,
clock: o.Clock,
marshalMap: o.MarshalMap,
unmarshalMap: o.UnmarshalMap,
unmarshalListOfMaps: o.UnmarshalListOfMaps,
buildExpression: o.BuildExpression,
}
if c.dynamoDB != nil {
return c, nil
}
c.dynamoDB = dynamodb.NewFromConfig(cfg, func(options *dynamodb.Options) {
options.RetryMaxAttempts = o.RetryMaxAttempts
if o.BaseEndpoint != "" {
options.BaseEndpoint = aws.String(o.BaseEndpoint)
}
})
return c, nil
}
// ClientImpl is a concrete implementation of the dynamomq.Client interface.
// Note: ClientImpl cannot be used directly. Always use the dynamomq.NewFromConfig function to create an instance.
type ClientImpl[T any] struct {
dynamoDB *dynamodb.Client
tableName string
queueingIndexName string
maximumReceives int
useFIFO bool
clock clock.Clock
marshalMap func(in interface{}) (map[string]types.AttributeValue, error)
unmarshalMap func(m map[string]types.AttributeValue, out interface{}) error
unmarshalListOfMaps func(l []map[string]types.AttributeValue, out interface{}) error
buildExpression func(b expression.Builder) (expression.Expression, error)
}
// SendMessageInput represents the input parameters for sending a message to a DynamoDB-based queue.
// This struct uses the generic type T, supporting messages of various data types.
type SendMessageInput[T any] struct {
// ID is a unique identifier for the message.
ID string
// Data is the content of the message to be sent to the queue. The type T determines the format of the message.
Data T
// DelaySeconds is the delay time (in seconds) before the message is sent to the queue.
DelaySeconds int
}
// SendMessageOutput represents the result of a message sending operation.
// This struct also uses the generic type T and contains information about the sent message.
type SendMessageOutput[T any] struct {
// SentMessage is a pointer to the Message type containing information about the sent message.
SentMessage *Message[T]
}
// SendMessage sends a message to the DynamoDB-based message queue. It checks for message ID duplication and handles message delays if specified.
// This function takes a context and a SendMessageInput parameter. SendMessageInput contains the message ID, data, and an optional delay in seconds.
// If the message ID already exists in the queue, it returns an IDDuplicatedError. Otherwise, it adds the message to the queue.
// The function also handles message delays. If DelaySeconds is greater than 0 in the input parameter, the message will be delayed accordingly before being sent.
func (c *ClientImpl[T]) SendMessage(ctx context.Context, params *SendMessageInput[T]) (*SendMessageOutput[T], error) {
if params == nil {
params = &SendMessageInput[T]{}
}
retrieved, err := c.GetMessage(ctx, &GetMessageInput{
ID: params.ID,
})
if err != nil {
return &SendMessageOutput[T]{}, err
}
if retrieved.Message != nil {
return &SendMessageOutput[T]{}, &IDDuplicatedError{}
}
now := c.clock.Now()
message := NewMessage(params.ID, params.Data, now)
if params.DelaySeconds > 0 {
message.delayToSentAt(time.Duration(params.DelaySeconds) * time.Second)
}
err = c.put(ctx, message)
if err != nil {
return &SendMessageOutput[T]{}, err
}
return &SendMessageOutput[T]{
SentMessage: message,
}, nil
}
// ReceiveMessageInput represents the input parameters for receiving a message from a DynamoDB-based queue.
type ReceiveMessageInput struct {
// QueueType is the type of queue from which the message is to be retrieved. QueueType specifies the kind of queue, such as STANDARD or DLQ.
QueueType QueueType
// VisibilityTimeout is the timeout in seconds during which the message becomes invisible to other receivers.
VisibilityTimeout int
}
// ReceiveMessageOutput represents the result of a message receiving operation.
// This struct uses the generic type T and contains information about the received message.
type ReceiveMessageOutput[T any] struct {
// ReceivedMessage is A pointer to the Message type containing information about the received message.
// The type T determines the format of the message content.
ReceivedMessage *Message[T]
}
// ReceiveMessage retrieves and processes a message from a DynamoDB-based queue using the generic type T.
// The selection process involves constructing and executing a DynamoDB query based on the queue type and visibility timeout.
// After a message is selected, its status, including visibility and version, is updated to ensure the message remains invisible and in processing for a defined period. This process is crucial for maintaining queue integrity and preventing duplicate message delivery.
// If no messages are available for reception, an EmptyQueueError is returned. Additionally, when FIFO (First In, First Out) is enabled, the method guarantees that only one valid message is processed at a time.
func (c *ClientImpl[T]) ReceiveMessage(ctx context.Context, params *ReceiveMessageInput) (*ReceiveMessageOutput[T], error) {
if params == nil {
params = &ReceiveMessageInput{}
}
if params.QueueType == "" {
params.QueueType = QueueTypeStandard
}
if params.VisibilityTimeout <= 0 {
params.VisibilityTimeout = constant.DefaultVisibilityTimeoutInSeconds
}
selected, err := c.selectMessage(ctx, params)
if err != nil {
return &ReceiveMessageOutput[T]{}, err
}
updated, err := c.processSelectedMessage(ctx, selected)
if err != nil {
return &ReceiveMessageOutput[T]{}, err
}
return &ReceiveMessageOutput[T]{
ReceivedMessage: updated,
}, nil
}
func (c *ClientImpl[T]) selectMessage(ctx context.Context, params *ReceiveMessageInput) (*Message[T], error) {
builder := expression.NewBuilder().
WithKeyCondition(expression.Key("queue_type").Equal(expression.Value(params.QueueType)))
expr, err := c.buildExpression(builder)
if err != nil {
return nil, BuildingExpressionError{Cause: err}
}
selected, err := c.executeQuery(ctx, params, expr)
if err != nil {
return nil, err
}
if selected == nil {
return nil, &EmptyQueueError{}
}
return selected, nil
}
func (c *ClientImpl[T]) executeQuery(ctx context.Context, params *ReceiveMessageInput, expr expression.Expression) (*Message[T], error) {
var exclusiveStartKey map[string]types.AttributeValue
var selectedItem *Message[T]
for {
queryResult, err := c.dynamoDB.Query(ctx, &dynamodb.QueryInput{
IndexName: aws.String(c.queueingIndexName),
TableName: aws.String(c.tableName),
KeyConditionExpression: expr.KeyCondition(),
ExpressionAttributeNames: expr.Names(),
ExpressionAttributeValues: expr.Values(),
Limit: aws.Int32(defaultQueryLimit),
ScanIndexForward: aws.Bool(true),
ExclusiveStartKey: exclusiveStartKey,
})
if err != nil {
return nil, handleDynamoDBError(err)
}
exclusiveStartKey = queryResult.LastEvaluatedKey
selectedItem, err = c.processQueryResult(params, queryResult)
if err != nil {
return nil, err
}
if selectedItem != nil || exclusiveStartKey == nil {
break
}
}
return selectedItem, nil
}
func (c *ClientImpl[T]) processQueryResult(params *ReceiveMessageInput, queryResult *dynamodb.QueryOutput) (*Message[T], error) {
var selected *Message[T]
for _, itemMap := range queryResult.Items {
message := Message[T]{}
if err := c.unmarshalMap(itemMap, &message); err != nil {
return nil, UnmarshalingAttributeError{Cause: err}
}
if err := message.markAsProcessing(c.clock.Now(), secToDur(params.VisibilityTimeout)); err == nil {
selected = &message
break
}
if c.useFIFO {
return nil, &EmptyQueueError{}
}
}
return selected, nil
}
func (c *ClientImpl[T]) processSelectedMessage(ctx context.Context, message *Message[T]) (*Message[T], error) {
builder := expression.NewBuilder().
WithUpdate(expression.
Add(expression.Name("version"), expression.Value(1)).
Add(expression.Name("receive_count"), expression.Value(1)).
Set(expression.Name("updated_at"), expression.Value(message.UpdatedAt)).
Set(expression.Name("received_at"), expression.Value(message.ReceivedAt)).
Set(expression.Name("invisible_until_at"), expression.Value(message.InvisibleUntilAt))).
WithCondition(expression.Name("version").Equal(expression.Value(message.Version)))
expr, err := c.buildExpression(builder)
if err != nil {
return nil, BuildingExpressionError{Cause: err}
}
updated, err := c.updateDynamoDBItem(ctx, message.ID, &expr)
if err != nil {
return nil, err
}
return updated, nil
}
// ChangeMessageVisibilityInput represents the input parameters for changing the visibility timeout of a specific message in a DynamoDB-based queue.
type ChangeMessageVisibilityInput struct {
// ID is The unique identifier of the message for which visibility is to be changed.
ID string
// VisibilityTimeout is The new timeout in seconds during which the message becomes invisible to other receivers.
// After this time elapses, the message will become visible in the queue again
VisibilityTimeout int
}
// ChangeMessageVisibilityOutput represents the result of the operation to change the visibility of a message.
// This struct uses the generic type T and contains information about the message whose visibility has been changed.
type ChangeMessageVisibilityOutput[T any] struct {
// ChangedMessage is a pointer to the Message type containing information about the message with changed visibility.
// The type T determines the format of the message content.
ChangedMessage *Message[T]
}
// ChangeMessageVisibility changes the visibility of a specific message in a DynamoDB-based queue.
// It retrieves the message based on the specified message ID and alters its visibility timeout.
// The visibility timeout specifies the duration during which the message, once retrieved from the queue, becomes invisible to other clients. Modifying this timeout value allows dynamic adjustment of the message processing time.
func (c *ClientImpl[T]) ChangeMessageVisibility(ctx context.Context, params *ChangeMessageVisibilityInput) (*ChangeMessageVisibilityOutput[T], error) {
if params == nil {
params = &ChangeMessageVisibilityInput{}
}
retrieved, err := c.GetMessage(ctx, &GetMessageInput{
ID: params.ID,
})
if err != nil {
return &ChangeMessageVisibilityOutput[T]{}, err
}
if retrieved.Message == nil {
return &ChangeMessageVisibilityOutput[T]{}, &IDNotFoundError{}
}
message := retrieved.Message
message.changeVisibility(c.clock.Now(), secToDur(params.VisibilityTimeout))
builder := expression.NewBuilder().
WithUpdate(expression.
Add(expression.Name("version"), expression.Value(1)).
Set(expression.Name("updated_at"), expression.Value(message.UpdatedAt)).
Set(expression.Name("invisible_until_at"), expression.Value(message.InvisibleUntilAt))).
WithCondition(expression.Name("version").Equal(expression.Value(message.Version)))
expr, err := c.buildExpression(builder)
if err != nil {
return &ChangeMessageVisibilityOutput[T]{}, BuildingExpressionError{Cause: err}
}
retried, err := c.updateDynamoDBItem(ctx, message.ID, &expr)
if err != nil {
return &ChangeMessageVisibilityOutput[T]{}, err
}
return &ChangeMessageVisibilityOutput[T]{
ChangedMessage: retried,
}, nil
}
// DeleteMessageInput represents the input parameters for deleting a specific message from a DynamoDB-based queue.
type DeleteMessageInput struct {
// ID is the unique identifier of the message to be deleted from the queue.
ID string
}
// DeleteMessageOutput represents the result of the delete message operation.
// This struct is empty as the delete operation does not return any specific information.
type DeleteMessageOutput struct{}
// DeleteMessage deletes a specific message from a DynamoDB-based queue.
// It directly deletes the message from DynamoDB based on the specified message ID.
func (c *ClientImpl[T]) DeleteMessage(ctx context.Context, params *DeleteMessageInput) (*DeleteMessageOutput, error) {
if params == nil {
params = &DeleteMessageInput{}
}
out := &DeleteMessageOutput{}
if params.ID == "" {
return out, &IDNotProvidedError{}
}
_, err := c.dynamoDB.DeleteItem(ctx, &dynamodb.DeleteItemInput{
TableName: &c.tableName,
Key: map[string]types.AttributeValue{
"id": &types.AttributeValueMemberS{
Value: params.ID,
},
},
})
if err != nil {
return out, handleDynamoDBError(err)
}
return out, nil
}
// MoveMessageToDLQInput represents the input parameters for moving a specific message from a DynamoDB-based queue to a Dead Letter Queue (DLQ).
type MoveMessageToDLQInput struct {
// ID is the unique identifier of the message to be moved to the DLQ.
ID string
}
// MoveMessageToDLQOutput represents the result of the operation to move a message to the DLQ.
// This struct uses the generic type T and contains information about the message that has been moved.
type MoveMessageToDLQOutput[T any] struct {
// MovedMessage is a pointer to the Message type containing information about the moved message.
// The type T determines the format of the message content.
MovedMessage *Message[T]
}
// MoveMessageToDLQ moves a specific message from a DynamoDB-based queue to a Dead Letter Queue (DLQ).
// It locates the message based on the specified message ID and marks it for the DLQ.
// Moving a message to the DLQ allows for the isolation of failed message processing, facilitating later analysis and reprocessing.
func (c *ClientImpl[T]) MoveMessageToDLQ(ctx context.Context, params *MoveMessageToDLQInput) (*MoveMessageToDLQOutput[T], error) {
if params == nil {
params = &MoveMessageToDLQInput{}
}
retrieved, err := c.GetMessage(ctx, &GetMessageInput{
ID: params.ID,
})
if err != nil {
return &MoveMessageToDLQOutput[T]{}, err
}
if retrieved.Message == nil {
return &MoveMessageToDLQOutput[T]{}, &IDNotFoundError{}
}
message := retrieved.Message
if markedErr := message.markAsMovedToDLQ(c.clock.Now()); markedErr != nil {
//lint:ignore nilerr reason
return &MoveMessageToDLQOutput[T]{
MovedMessage: message,
}, nil
}
builder := expression.NewBuilder().
WithUpdate(expression.
Add(expression.Name("version"), expression.Value(1)).
Set(expression.Name("receive_count"), expression.Value(message.ReceiveCount)).
Set(expression.Name("queue_type"), expression.Value(message.QueueType)).
Set(expression.Name("updated_at"), expression.Value(message.UpdatedAt)).
Set(expression.Name("sent_at"), expression.Value(message.SentAt)).
Set(expression.Name("received_at"), expression.Value(message.ReceivedAt)).
Set(expression.Name("invisible_until_at"), expression.Value(message.InvisibleUntilAt))).
WithCondition(expression.Name("version").Equal(expression.Value(message.Version)))
expr, err := c.buildExpression(builder)
if err != nil {
return &MoveMessageToDLQOutput[T]{}, BuildingExpressionError{Cause: err}
}
updated, err := c.updateDynamoDBItem(ctx, params.ID, &expr)
if err != nil {
return &MoveMessageToDLQOutput[T]{}, err
}
return &MoveMessageToDLQOutput[T]{
MovedMessage: updated,
}, nil
}
// RedriveMessageInput represents the input parameters for restoring a specific message from a DynamoDB-based Dead Letter Queue (DLQ) back to the STANDARD queue.
type RedriveMessageInput struct {
// ID is the unique identifier of the message to be redriven from the DLQ.
ID string
}
// RedriveMessageOutput represents the result of the operation to redrive a message from the DLQ.
// This struct uses the generic type T and contains information about the message that has been restored.
type RedriveMessageOutput[T any] struct {
// RedroveMessage is a pointer to the Message type containing information about the redriven message.
// The type T determines the format of the message content.
RedroveMessage *Message[T]
}
// RedriveMessage restore a specific message from a DynamoDB-based Dead Letter Queue (DLQ).
// It locates the message based on the specified message ID and marks it as restored from the DLQ to the standard queue.
// This process is essential for reprocessing messages that have failed to be processed and is a crucial function in error handling within the message queue system.
func (c *ClientImpl[T]) RedriveMessage(ctx context.Context, params *RedriveMessageInput) (*RedriveMessageOutput[T], error) {
if params == nil {
params = &RedriveMessageInput{}
}
retrieved, err := c.GetMessage(ctx, &GetMessageInput{
ID: params.ID,
})
if err != nil {
return &RedriveMessageOutput[T]{}, err
}
if retrieved.Message == nil {
return &RedriveMessageOutput[T]{}, &IDNotFoundError{}
}
message := retrieved.Message
err = message.markAsRestoredFromDLQ(c.clock.Now())
if err != nil {
return &RedriveMessageOutput[T]{}, err
}
builder := expression.NewBuilder().
WithUpdate(expression.Add(
expression.Name("version"),
expression.Value(1),
).Set(
expression.Name("queue_type"),
expression.Value(message.QueueType),
).Set(
expression.Name("updated_at"),
expression.Value(message.UpdatedAt),
).Set(
expression.Name("sent_at"),
expression.Value(message.SentAt),
).Set(
expression.Name("invisible_until_at"),
expression.Value(message.InvisibleUntilAt),
)).
WithCondition(expression.Name("version").
Equal(expression.Value(message.Version)))
expr, err := c.buildExpression(builder)
if err != nil {
return nil, BuildingExpressionError{Cause: err}
}
updated, err := c.updateDynamoDBItem(ctx, params.ID, &expr)
if err != nil {
return &RedriveMessageOutput[T]{}, err
}
return &RedriveMessageOutput[T]{
RedroveMessage: updated,
}, nil
}
// GetQueueStatsInput represents the input parameters for obtaining statistical information about a DynamoDB-based queue.
// This struct does not contain any fields as it's used to request general queue statistics without the need for specific parameters.
type GetQueueStatsInput struct{}
// GetQueueStatsOutput represents the output containing statistical information about a DynamoDB-based queue.
type GetQueueStatsOutput struct {
// First100IDsInQueue is an array of the first 100 message IDs currently in the queue.
First100IDsInQueue []string `json:"first_100_IDs_in_queue"`
// First100IDsInQueueProcessing is an array of the first 100 message IDs that are currently being processed.
First100IDsInQueueProcessing []string `json:"first_100_IDs_in_queue_processing"`
// TotalMessagesInQueue is the total number of messages present in the queue.
TotalMessagesInQueue int `json:"total_messages_in_queue"`
// TotalMessagesInQueueProcessing is the total number of messages that are currently in the process of being handled.
TotalMessagesInQueueProcessing int `json:"total_messages_in_queue_processing"`
// TotalMessagesInQueueReady is the total number of messages in the queue that are ready to be processed and have not started processing yet.
TotalMessagesInQueueReady int `json:"total_messages_in_queue_ready"`
}
// GetQueueStats get statistical information about a DynamoDB-based queue.
// It provides statistics about the messages in the queue and their processing status. This includes the IDs of the first 100 messages in the queue, the first 100 IDs of messages selected for processing, the total number of records in the queue, the number of records currently in processing, and the number of records awaiting processing.
// This function provides essential information for monitoring and analyzing the message queue system, aiding in understanding the status of the queue.
func (c *ClientImpl[T]) GetQueueStats(ctx context.Context, _ *GetQueueStatsInput) (*GetQueueStatsOutput, error) {
builder := expression.NewBuilder().
WithKeyCondition(expression.KeyEqual(expression.Key("queue_type"), expression.Value(QueueTypeStandard)))
expr, err := c.buildExpression(builder)
if err != nil {
return &GetQueueStatsOutput{}, BuildingExpressionError{Cause: err}
}
stats, err := c.queryAndCalculateQueueStats(ctx, expr)
if err != nil {
return &GetQueueStatsOutput{}, err
}
return stats, nil
}
func (c *ClientImpl[T]) queryAndCalculateQueueStats(ctx context.Context, expr expression.Expression) (*GetQueueStatsOutput, error) {
var (
stats = &GetQueueStatsOutput{
First100IDsInQueue: make([]string, 0),
First100IDsInQueueProcessing: make([]string, 0),
TotalMessagesInQueue: 0,
TotalMessagesInQueueProcessing: 0,
TotalMessagesInQueueReady: 0,
}
exclusiveStartKey map[string]types.AttributeValue
)
for {
queryOutput, err := c.dynamoDB.Query(ctx, &dynamodb.QueryInput{
IndexName: aws.String(c.queueingIndexName),
TableName: aws.String(c.tableName),
ExpressionAttributeNames: expr.Names(),
KeyConditionExpression: expr.KeyCondition(),
ScanIndexForward: aws.Bool(true),
Limit: aws.Int32(defaultQueryLimit),
ExpressionAttributeValues: expr.Values(),
ExclusiveStartKey: exclusiveStartKey,
})
if err != nil {
return nil, handleDynamoDBError(err)
}
exclusiveStartKey = queryOutput.LastEvaluatedKey
err = c.processQueryItemsForQueueStats(queryOutput.Items, stats)
if err != nil {
return nil, err
}
if exclusiveStartKey == nil {
break
}
}
stats.TotalMessagesInQueueReady = stats.TotalMessagesInQueue - stats.TotalMessagesInQueueProcessing
return stats, nil
}
func (c *ClientImpl[T]) processQueryItemsForQueueStats(items []map[string]types.AttributeValue, stats *GetQueueStatsOutput) error {
for _, itemMap := range items {
stats.TotalMessagesInQueue++
item := Message[T]{}
err := c.unmarshalMap(itemMap, &item)
if err != nil {
return UnmarshalingAttributeError{Cause: err}
}
c.updateQueueStatsFromItem(&item, stats)
}
return nil
}
func (c *ClientImpl[T]) updateQueueStatsFromItem(message *Message[T], stats *GetQueueStatsOutput) {
if message.GetStatus(c.clock.Now()) == StatusProcessing {
stats.TotalMessagesInQueueProcessing++
if len(stats.First100IDsInQueueProcessing) < maxFirstMessagesInQueue {
stats.First100IDsInQueueProcessing = append(stats.First100IDsInQueueProcessing, message.ID)
}
}
if len(stats.First100IDsInQueue) < maxFirstMessagesInQueue {
stats.First100IDsInQueue = append(stats.First100IDsInQueue, message.ID)
}
}
// GetDLQStatsInput represents the input parameters for obtaining statistical information about a DynamoDB-based Dead Letter Queue (DLQ).
// This struct does not contain any fields as it's used to request general DLQ statistics without the need for specific parameters.
type GetDLQStatsInput struct{}
// GetDLQStatsOutput represents the output containing statistical information about the Dead Letter Queue (DLQ).
type GetDLQStatsOutput struct {
// First100IDsInQueue is an array of the first 100 message IDs currently in the DLQ.
First100IDsInQueue []string `json:"first_100_IDs_in_queue"`
// TotalMessagesInDLQ is the total number of messages present in the DLQ.
TotalMessagesInDLQ int `json:"total_messages_in_DLQ"`
}
// GetDLQStats get statistical information about a DynamoDB-based Dead Letter Queue (DLQ).
// It provides statistics on the messages within the DLQ. This includes the IDs of the first 100 messages in the queue and the total number of records in the DLQ.
// This functions offers vital information for monitoring and analyzing the message queue system, aiding in understanding the status of the DLQ.
func (c *ClientImpl[T]) GetDLQStats(ctx context.Context, _ *GetDLQStatsInput) (*GetDLQStatsOutput, error) {
builder := expression.NewBuilder().
WithKeyCondition(expression.KeyEqual(expression.Key("queue_type"), expression.Value(QueueTypeDLQ)))
expr, err := c.buildExpression(builder)
if err != nil {
return &GetDLQStatsOutput{}, BuildingExpressionError{Cause: err}
}
stats, err := c.queryAndCalculateDLQStats(ctx, expr)
if err != nil {
return &GetDLQStatsOutput{}, err
}
return stats, nil
}
func (c *ClientImpl[T]) queryAndCalculateDLQStats(ctx context.Context, expr expression.Expression) (*GetDLQStatsOutput, error) {
var (
stats = &GetDLQStatsOutput{
First100IDsInQueue: make([]string, 0),
TotalMessagesInDLQ: 0,
}
lastEvaluatedKey map[string]types.AttributeValue
)
for {
queryOutput, err := c.dynamoDB.Query(ctx, &dynamodb.QueryInput{
IndexName: aws.String(c.queueingIndexName),
TableName: aws.String(c.tableName),
ExpressionAttributeNames: expr.Names(),
ExpressionAttributeValues: expr.Values(),
KeyConditionExpression: expr.KeyCondition(),
Limit: aws.Int32(defaultQueryLimit),
ScanIndexForward: aws.Bool(true),
ExclusiveStartKey: lastEvaluatedKey,
})
if err != nil {
return &GetDLQStatsOutput{}, handleDynamoDBError(err)
}
lastEvaluatedKey = queryOutput.LastEvaluatedKey
err = c.processQueryItemsForDLQStats(queryOutput.Items, stats)
if err != nil {
return nil, err
}
if lastEvaluatedKey == nil {
break
}
}
return stats, nil
}
func (c *ClientImpl[T]) processQueryItemsForDLQStats(items []map[string]types.AttributeValue, stats *GetDLQStatsOutput) error {
for _, itemMap := range items {
stats.TotalMessagesInDLQ++
if len(stats.First100IDsInQueue) < maxFirstMessagesInQueue {
item := Message[T]{}
err := c.unmarshalMap(itemMap, &item)
if err != nil {
return UnmarshalingAttributeError{Cause: err}
}
stats.First100IDsInQueue = append(stats.First100IDsInQueue, item.ID)
}
}
return nil
}
// GetMessageInput represents the input parameters for retrieving a specific message from a DynamoDB-based queue.
type GetMessageInput struct {
// ID is the unique identifier of the message to be retrieved from the queue.
ID string
}
// GetMessageOutput represents the result of the operation to retrieve a message.
// This struct uses the generic type T and contains information about the retrieved message.
type GetMessageOutput[T any] struct {
// Message is a pointer to the Message type containing information about the retrieved message.
// The type T determines the format of the message content.
Message *Message[T]
}
// GetMessage get a specific message from a DynamoDB-based queue.
// It retrieves the message from DynamoDB based on the specified message ID. The retrieved message is then unmarshaled into the specified generic type T.
func (c *ClientImpl[T]) GetMessage(ctx context.Context, params *GetMessageInput) (*GetMessageOutput[T], error) {
if params == nil {
params = &GetMessageInput{}
}
if params.ID == "" {
return &GetMessageOutput[T]{}, &IDNotProvidedError{}
}
resp, err := c.dynamoDB.GetItem(ctx, &dynamodb.GetItemInput{
Key: map[string]types.AttributeValue{
"id": &types.AttributeValueMemberS{Value: params.ID},
},
TableName: aws.String(c.tableName),
ConsistentRead: aws.Bool(true),
})
if err != nil {
return &GetMessageOutput[T]{}, handleDynamoDBError(err)
}
if resp.Item == nil {
return &GetMessageOutput[T]{}, nil
}
item := Message[T]{}
err = c.unmarshalMap(resp.Item, &item)
if err != nil {
return &GetMessageOutput[T]{}, UnmarshalingAttributeError{Cause: err}
}
return &GetMessageOutput[T]{
Message: &item,
}, nil
}
// ListMessagesInput represents the input parameters for listing messages from a DynamoDB-based queue.
type ListMessagesInput struct {
// Size is the number of messages to be listed from the queue. It determines the maximum size of the returned message list.
Size int32
}
// ListMessagesOutput represents the result of the operation to list messages from the queue.
// This struct uses the generic type T and contains an array of messages.
type ListMessagesOutput[T any] struct {
// Messages is an array of pointers to Message types, containing information about each listed message.
// The type T determines the format of the message content for each message in the array.
Messages []*Message[T]
}
// ListMessages get a list of messages from a DynamoDB-based queue.
// It scans and retrieves messages from DynamoDB based on the specified size parameter. If the size is not specified or is zero or less, a default maximum list size of 10 is used.
// The retrieved messages are unmarshaled into an array of the generic type T and are sorted based on the update time.
func (c *ClientImpl[T]) ListMessages(ctx context.Context, params *ListMessagesInput) (*ListMessagesOutput[T], error) {
if params == nil {
params = &ListMessagesInput{}
}
if params.Size <= 0 {
params.Size = constant.DefaultMaxListMessages
}
output, err := c.dynamoDB.Scan(ctx, &dynamodb.ScanInput{
TableName: &c.tableName,
Limit: aws.Int32(params.Size),
})
if err != nil {
return &ListMessagesOutput[T]{}, handleDynamoDBError(err)
}
var messages []*Message[T]
err = c.unmarshalListOfMaps(output.Items, &messages)
if err != nil {
return &ListMessagesOutput[T]{}, UnmarshalingAttributeError{Cause: err}
}
sort.Slice(messages, func(i, j int) bool {
return messages[i].UpdatedAt < messages[j].UpdatedAt
})
return &ListMessagesOutput[T]{Messages: messages}, nil
}
// ReplaceMessageInput represents the input parameters for replacing a specific message in a DynamoDB-based queue.
// This struct uses the generic type T for the message content.
type ReplaceMessageInput[T any] struct {
// Message is pointer to the Message type containing the new message data that will replace the existing message in the queue.
// The type T determines the format of the new message content.
Message *Message[T]
}
// ReplaceMessageOutput represents the result of the operation to replace a message in the queue.
// This struct is empty as the replace message operation does not return any specific information.
type ReplaceMessageOutput struct {
}
// ReplaceMessage replace a specific message within a DynamoDB-based queue.
// It searches for an existing message based on the specified message ID and deletes it if found. Then, a new message is added to the queue.
// If a message with the specified ID does not exist, the new message is directly added to the queue.
func (c *ClientImpl[T]) ReplaceMessage(ctx context.Context, params *ReplaceMessageInput[T]) (*ReplaceMessageOutput, error) {
if params == nil {
params = &ReplaceMessageInput[T]{
Message: &Message[T]{},
}
}
retrieved, err := c.GetMessage(ctx, &GetMessageInput{
ID: params.Message.ID,
})
if err != nil {
return &ReplaceMessageOutput{}, err
}
if retrieved.Message != nil {
_, delErr := c.dynamoDB.DeleteItem(ctx, &dynamodb.DeleteItemInput{
TableName: aws.String(c.tableName),
Key: map[string]types.AttributeValue{
"id": &types.AttributeValueMemberS{Value: params.Message.ID},
},
})
if delErr != nil {
return &ReplaceMessageOutput{}, handleDynamoDBError(delErr)
}
}
return &ReplaceMessageOutput{}, c.put(ctx, params.Message)
}
func (c *ClientImpl[T]) put(ctx context.Context, message *Message[T]) error {
item, err := c.marshalMap(message)
if err != nil {
return MarshalingAttributeError{Cause: err}
}
_, err = c.dynamoDB.PutItem(ctx, &dynamodb.PutItemInput{
TableName: aws.String(c.tableName),
Item: item,
})
if err != nil {
return handleDynamoDBError(err)
}
return nil
}
func (c *ClientImpl[T]) updateDynamoDBItem(ctx context.Context,
id string, expr *expression.Expression) (*Message[T], error) {
outcome, err := c.dynamoDB.UpdateItem(ctx, &dynamodb.UpdateItemInput{
Key: map[string]types.AttributeValue{
"id": &types.AttributeValueMemberS{
Value: id,
},
},
TableName: aws.String(c.tableName),
ConditionExpression: expr.Condition(),
ExpressionAttributeNames: expr.Names(),
ExpressionAttributeValues: expr.Values(),
UpdateExpression: expr.Update(),
ReturnValues: types.ReturnValueAllNew,
})
if err != nil {
return nil, handleDynamoDBError(err)
}
message := Message[T]{}
err = c.unmarshalMap(outcome.Attributes, &message)
if err != nil {
return nil, UnmarshalingAttributeError{Cause: err}
}
return &message, nil
}
func handleDynamoDBError(err error) error {
var cause *types.ConditionalCheckFailedException
if errors.As(err, &cause) {
return &ConditionalCheckFailedError{Cause: cause}
}
return DynamoDBAPIError{Cause: err}
}
func secToDur(sec int) time.Duration {
return time.Duration(sec) * time.Second
}