1
+ use crate :: flow_store:: connection:: FlowStore ;
1
2
use async_trait:: async_trait;
2
- use log:: { debug , error} ;
3
- use redis:: { AsyncCommands , RedisError } ;
3
+ use log:: error;
4
+ use redis:: { AsyncCommands , RedisError , RedisResult } ;
4
5
use tucana:: sagittarius:: { Flow , Flows } ;
5
- use crate :: flow_store:: connection:: FlowStore ;
6
6
7
7
#[ derive( Debug ) ]
8
8
pub struct FlowStoreError {
@@ -19,7 +19,7 @@ enum FlowStoreErrorKind {
19
19
20
20
/// Trait representing a service for managing flows in a Redis.
21
21
#[ async_trait]
22
- pub trait FlowStoreService {
22
+ pub trait FlowStoreServiceBase {
23
23
async fn new ( redis_client_arc : FlowStore ) -> Self ;
24
24
async fn insert_flow ( & mut self , flow : Flow ) -> Result < i64 , FlowStoreError > ;
25
25
async fn insert_flows ( & mut self , flows : Flows ) -> Result < i64 , FlowStoreError > ;
@@ -29,15 +29,16 @@ pub trait FlowStoreService {
29
29
}
30
30
31
31
/// Struct representing a service for managing flows in a Redis.
32
- struct FlowServiceBase {
32
+ #[ derive( Clone ) ]
33
+ pub struct FlowStoreService {
33
34
pub ( crate ) redis_client_arc : FlowStore ,
34
35
}
35
36
36
37
/// Implementation of a service for managing flows in a Redis.
37
38
#[ async_trait]
38
- impl FlowStoreService for FlowServiceBase {
39
- async fn new ( redis_client_arc : FlowStore ) -> Self {
40
- Self { redis_client_arc }
39
+ impl FlowStoreServiceBase for FlowStoreService {
40
+ async fn new ( redis_client_arc : FlowStore ) -> FlowStoreService {
41
+ FlowStoreService { redis_client_arc }
41
42
}
42
43
43
44
/// Insert a list of flows into Redis
@@ -56,15 +57,9 @@ impl FlowStoreService for FlowServiceBase {
56
57
}
57
58
} ;
58
59
59
- let parsed_flow = connection
60
- . set :: < String , String , i64 > ( flow. flow_id . to_string ( ) , serialized_flow)
61
- . await ;
60
+ let insert_result: RedisResult < ( ) > = connection. set ( flow. flow_id , serialized_flow) . await ;
62
61
63
- match parsed_flow {
64
- Ok ( modified) => {
65
- debug ! ( "Inserted flow" ) ;
66
- Ok ( modified)
67
- }
62
+ match insert_result {
68
63
Err ( redis_error) => {
69
64
error ! ( "An Error occurred {}" , redis_error) ;
70
65
Err ( FlowStoreError {
@@ -73,6 +68,7 @@ impl FlowStoreService for FlowServiceBase {
73
68
reason : redis_error. to_string ( ) ,
74
69
} )
75
70
}
71
+ _ => Ok ( 1 ) ,
76
72
}
77
73
}
78
74
@@ -91,13 +87,10 @@ impl FlowStoreService for FlowServiceBase {
91
87
/// Deletes a flow
92
88
async fn delete_flow ( & mut self , flow_id : i64 ) -> Result < i64 , RedisError > {
93
89
let mut connection = self . redis_client_arc . lock ( ) . await ;
94
- let deleted_flow = connection. del :: < i64 , i64 > ( flow_id) . await ;
90
+ let deleted_flow: RedisResult < i64 > = connection. del ( flow_id) . await ;
95
91
96
92
match deleted_flow {
97
- Ok ( changed_amount) => {
98
- debug ! ( "{} flows where deleted" , changed_amount) ;
99
- deleted_flow
100
- }
93
+ Ok ( int) => Ok ( int) ,
101
94
Err ( redis_error) => {
102
95
error ! ( "An Error occurred {}" , redis_error) ;
103
96
Err ( redis_error)
@@ -140,3 +133,272 @@ impl FlowStoreService for FlowServiceBase {
140
133
Ok ( int_keys)
141
134
}
142
135
}
136
+
137
+ #[ cfg( test) ]
138
+ mod tests {
139
+ use crate :: flow_store:: connection:: create_flow_store_connection;
140
+ use crate :: flow_store:: connection:: FlowStore ;
141
+ use crate :: flow_store:: service:: FlowStoreService ;
142
+ use crate :: flow_store:: service:: FlowStoreServiceBase ;
143
+ use redis:: AsyncCommands ;
144
+ use serial_test:: serial;
145
+ use testcontainers:: core:: IntoContainerPort ;
146
+ use testcontainers:: core:: WaitFor ;
147
+ use testcontainers:: runners:: AsyncRunner ;
148
+ use testcontainers:: GenericImage ;
149
+ use tucana:: sagittarius:: { Flow , Flows } ;
150
+
151
+ macro_rules! redis_integration_test {
152
+ ( $test_name: ident, $consumer: expr) => {
153
+ #[ tokio:: test]
154
+ #[ serial]
155
+ async fn $test_name( ) {
156
+ let port: u16 = 6379 ;
157
+ let image_name = "redis" ;
158
+ let wait_message = "Ready to accept connections" ;
159
+
160
+ let container = GenericImage :: new( image_name, "latest" )
161
+ . with_exposed_port( port. tcp( ) )
162
+ . with_wait_for( WaitFor :: message_on_stdout( wait_message) )
163
+ . start( )
164
+ . await
165
+ . unwrap( ) ;
166
+
167
+ let host = container. get_host( ) . await . unwrap( ) ;
168
+ let host_port = container. get_host_port_ipv4( port) . await . unwrap( ) ;
169
+ let url = format!( "redis://{host}:{host_port}" ) ;
170
+ println!( "Redis server started correctly on: {}" , url. clone( ) ) ;
171
+
172
+ let connection = create_flow_store_connection( url) . await ;
173
+ let base = FlowStoreService :: new( connection. clone( ) ) . await ;
174
+
175
+ $consumer( connection, base) . await ;
176
+ let _ = container. stop( ) . await ;
177
+ }
178
+ } ;
179
+ }
180
+
181
+ redis_integration_test ! (
182
+ insert_one_flow,
183
+ ( |connection: FlowStore , mut service: FlowStoreService | async move {
184
+ let flow = Flow {
185
+ flow_id: 1 ,
186
+ r#type: "" . to_string( ) ,
187
+ settings: vec![ ] ,
188
+ starting_node: None ,
189
+ } ;
190
+
191
+ match service. insert_flow( flow. clone( ) ) . await {
192
+ Ok ( i) => println!( "{}" , i) ,
193
+ Err ( err) => println!( "{}" , err. reason) ,
194
+ } ;
195
+
196
+ let redis_result: Option <String > = {
197
+ let mut redis_cmd = connection. lock( ) . await ;
198
+ redis_cmd. get( "1" ) . await . unwrap( )
199
+ } ;
200
+
201
+ println!( "{}" , redis_result. clone( ) . unwrap( ) ) ;
202
+
203
+ assert!( redis_result. is_some( ) ) ;
204
+ let redis_flow: Flow = serde_json:: from_str( & * redis_result. unwrap( ) ) . unwrap( ) ;
205
+ assert_eq!( redis_flow, flow) ;
206
+ } )
207
+ ) ;
208
+
209
+ redis_integration_test ! (
210
+ insert_will_overwrite_existing_flow,
211
+ ( |connection: FlowStore , mut service: FlowStoreService | async move {
212
+ let flow = Flow {
213
+ flow_id: 1 ,
214
+ r#type: "" . to_string( ) ,
215
+ settings: vec![ ] ,
216
+ starting_node: None ,
217
+ } ;
218
+
219
+ match service. insert_flow( flow. clone( ) ) . await {
220
+ Ok ( i) => println!( "{}" , i) ,
221
+ Err ( err) => println!( "{}" , err. reason) ,
222
+ } ;
223
+
224
+ let flow_overwrite = Flow {
225
+ flow_id: 1 ,
226
+ r#type: "ABC" . to_string( ) ,
227
+ settings: vec![ ] ,
228
+ starting_node: None ,
229
+ } ;
230
+
231
+ let _ = service. insert_flow( flow_overwrite) . await ;
232
+ let amount = service. get_all_flow_ids( ) . await ;
233
+ assert_eq!( amount. unwrap( ) . len( ) , 1 ) ;
234
+
235
+ let redis_result: Option <String > = {
236
+ let mut redis_cmd = connection. lock( ) . await ;
237
+ redis_cmd. get( "1" ) . await . unwrap( )
238
+ } ;
239
+
240
+ println!( "{}" , redis_result. clone( ) . unwrap( ) ) ;
241
+
242
+ assert!( redis_result. is_some( ) ) ;
243
+ let redis_flow: Flow = serde_json:: from_str( & * redis_result. unwrap( ) ) . unwrap( ) ;
244
+ assert_eq!( redis_flow. r#type, "ABC" . to_string( ) ) ;
245
+ } )
246
+ ) ;
247
+
248
+ redis_integration_test ! (
249
+ insert_many_flows,
250
+ ( |_connection: FlowStore , mut service: FlowStoreService | async move {
251
+ let flow_one = Flow {
252
+ flow_id: 1 ,
253
+ r#type: "" . to_string( ) ,
254
+ settings: vec![ ] ,
255
+ starting_node: None ,
256
+ } ;
257
+
258
+ let flow_two = Flow {
259
+ flow_id: 2 ,
260
+ r#type: "" . to_string( ) ,
261
+ settings: vec![ ] ,
262
+ starting_node: None ,
263
+ } ;
264
+
265
+ let flow_three = Flow {
266
+ flow_id: 3 ,
267
+ r#type: "" . to_string( ) ,
268
+ settings: vec![ ] ,
269
+ starting_node: None ,
270
+ } ;
271
+
272
+ let flow_vec = vec![ flow_one. clone( ) , flow_two. clone( ) , flow_three. clone( ) ] ;
273
+ let flows = Flows { flows: flow_vec } ;
274
+
275
+ let amount = service. insert_flows( flows) . await . unwrap( ) ;
276
+ assert_eq!( amount, 3 ) ;
277
+ } )
278
+ ) ;
279
+
280
+ redis_integration_test ! (
281
+ delete_one_existing_flow,
282
+ ( |connection: FlowStore , mut service: FlowStoreService | async move {
283
+ let flow = Flow {
284
+ flow_id: 1 ,
285
+ r#type: "" . to_string( ) ,
286
+ settings: vec![ ] ,
287
+ starting_node: None ,
288
+ } ;
289
+
290
+ match service. insert_flow( flow. clone( ) ) . await {
291
+ Ok ( i) => println!( "{}" , i) ,
292
+ Err ( err) => println!( "{}" , err. reason) ,
293
+ } ;
294
+
295
+ let result = service. delete_flow( 1 ) . await ;
296
+
297
+ assert_eq!( result. unwrap( ) , 1 ) ;
298
+
299
+ let redis_result: Option <String > = {
300
+ let mut redis_cmd = connection. lock( ) . await ;
301
+ redis_cmd. get( "1" ) . await . unwrap( )
302
+ } ;
303
+
304
+ assert!( redis_result. is_none( ) ) ;
305
+ } )
306
+ ) ;
307
+
308
+ redis_integration_test ! (
309
+ delete_one_non_existing_flow,
310
+ ( |_connection: FlowStore , mut service: FlowStoreService | async move {
311
+ let result = service. delete_flow( 1 ) . await ;
312
+ assert_eq!( result. unwrap( ) , 0 ) ;
313
+ } )
314
+ ) ;
315
+
316
+ redis_integration_test ! (
317
+ delete_many_existing_flows,
318
+ ( |_connection: FlowStore , mut service: FlowStoreService | async move {
319
+ let flow_one = Flow {
320
+ flow_id: 1 ,
321
+ r#type: "" . to_string( ) ,
322
+ settings: vec![ ] ,
323
+ starting_node: None ,
324
+ } ;
325
+
326
+ let flow_two = Flow {
327
+ flow_id: 2 ,
328
+ r#type: "" . to_string( ) ,
329
+ settings: vec![ ] ,
330
+ starting_node: None ,
331
+ } ;
332
+
333
+ let flow_three = Flow {
334
+ flow_id: 3 ,
335
+ r#type: "" . to_string( ) ,
336
+ settings: vec![ ] ,
337
+ starting_node: None ,
338
+ } ;
339
+
340
+ let flow_vec = vec![ flow_one. clone( ) , flow_two. clone( ) , flow_three. clone( ) ] ;
341
+ let flows = Flows { flows: flow_vec } ;
342
+
343
+ let amount = service. insert_flows( flows) . await . unwrap( ) ;
344
+ assert_eq!( amount, 3 ) ;
345
+
346
+ let deleted_amount = service. delete_flows( vec![ 1 , 2 , 3 ] ) . await ;
347
+ assert_eq!( deleted_amount. unwrap( ) , 3 ) ;
348
+ } )
349
+ ) ;
350
+
351
+ redis_integration_test ! (
352
+ delete_many_non_existing_flows,
353
+ ( |_connection: FlowStore , mut service: FlowStoreService | async move {
354
+ let deleted_amount = service. delete_flows( vec![ 1 , 2 , 3 ] ) . await ;
355
+ assert_eq!( deleted_amount. unwrap( ) , 0 ) ;
356
+ } )
357
+ ) ;
358
+
359
+ redis_integration_test ! (
360
+ get_existing_flow_ids,
361
+ ( |_connection: FlowStore , mut service: FlowStoreService | async move {
362
+ let flow_one = Flow {
363
+ flow_id: 1 ,
364
+ r#type: "" . to_string( ) ,
365
+ settings: vec![ ] ,
366
+ starting_node: None ,
367
+ } ;
368
+
369
+ let flow_two = Flow {
370
+ flow_id: 2 ,
371
+ r#type: "" . to_string( ) ,
372
+ settings: vec![ ] ,
373
+ starting_node: None ,
374
+ } ;
375
+
376
+ let flow_three = Flow {
377
+ flow_id: 3 ,
378
+ r#type: "" . to_string( ) ,
379
+ settings: vec![ ] ,
380
+ starting_node: None ,
381
+ } ;
382
+
383
+ let flow_vec = vec![ flow_one. clone( ) , flow_two. clone( ) , flow_three. clone( ) ] ;
384
+ let flows = Flows { flows: flow_vec } ;
385
+
386
+ let amount = service. insert_flows( flows) . await . unwrap( ) ;
387
+ assert_eq!( amount, 3 ) ;
388
+
389
+ let mut flow_ids = service. get_all_flow_ids( ) . await . unwrap( ) ;
390
+ flow_ids. sort( ) ;
391
+
392
+ assert_eq!( flow_ids, vec![ 1 , 2 , 3 ] ) ;
393
+ } )
394
+ ) ;
395
+
396
+ redis_integration_test ! (
397
+ get_empty_flow_ids,
398
+ ( |_connection: FlowStore , mut service: FlowStoreService | async move {
399
+ let flow_ids = service. get_all_flow_ids( ) . await ;
400
+ assert_eq!( flow_ids. unwrap( ) , Vec :: <i64 >:: new( ) ) ;
401
+ } )
402
+ ) ;
403
+
404
+ }
0 commit comments