@@ -588,22 +588,22 @@ impl<Exe: Executor> TopicProducer<Exe> {
588588 match self . batch . as_ref ( ) {
589589 None => Err ( ProducerError :: Custom ( "not a batching producer" . to_string ( ) ) . into ( ) ) ,
590590 Some ( batch) => {
591- let messages = {
592- let guard = batch. lock ( ) . await ;
593- guard. get_messages ( ) . await
594- } ;
595- let message_count = messages. len ( ) ;
591+ let mut payload: Vec < u8 > = Vec :: new ( ) ;
592+ let mut receipts = Vec :: new ( ) ;
593+ let message_count;
596594
597- if message_count == 0 {
598- return Ok ( ( ) ) ;
595+ {
596+ let batch = batch. lock ( ) . await ;
597+ let messages = batch. get_messages ( ) . await ;
598+ message_count = messages. len ( ) ;
599+ for ( tx, message) in messages {
600+ receipts. push ( tx) ;
601+ message. serialize ( & mut payload) ;
602+ }
599603 }
600604
601- let mut payload: Vec < u8 > = Vec :: new ( ) ;
602- let mut receipts = Vec :: with_capacity ( message_count) ;
603-
604- for ( tx, message) in messages {
605- receipts. push ( tx) ;
606- message. serialize ( & mut payload) ;
605+ if message_count == 0 {
606+ return Ok ( ( ) ) ;
607607 }
608608
609609 let message = ProducerMessage {
@@ -665,38 +665,29 @@ impl<Exe: Executor> TopicProducer<Exe> {
665665 Ok ( SendFuture ( rx) )
666666 }
667667 Some ( batch) => {
668- // Push into batch while holding the lock, and conditionally drain.
669- let ( maybe_drained, counter) = {
670- let guard = batch. lock ( ) . await ;
671- guard. push_back ( ( tx, message) ) . await ;
672-
673- if guard. is_full ( ) . await {
674- let drained = guard. get_messages ( ) . await ; // await inner storage lock
675- let count = drained. len ( ) as i32 ;
676- ( Some ( drained) , count)
677- } else {
678- ( None , 0 )
679- }
680- } ;
681-
682- if counter == 0 {
683- return Ok ( SendFuture ( rx) ) ;
684- }
685-
686- // Serialize WITHOUT holding the outer lock.
687668 let mut payload: Vec < u8 > = Vec :: new ( ) ;
688669 let mut receipts = Vec :: new ( ) ;
670+ let mut counter = 0i32 ;
689671
690- if let Some ( messages) = maybe_drained {
691- for ( tx, message) in messages {
692- receipts. push ( tx) ;
693- message. serialize ( & mut payload) ;
672+ {
673+ let batch = batch. lock ( ) . await ;
674+ batch. push_back ( ( tx, message) ) . await ;
675+
676+ if batch. is_full ( ) . await {
677+ for ( tx, message) in batch. get_messages ( ) . await {
678+ receipts. push ( tx) ;
679+ message. serialize ( & mut payload) ;
680+ counter += 1 ;
681+ }
694682 }
695683 }
696684
685+ if counter == 0 {
686+ return Ok ( SendFuture ( rx) ) ;
687+ }
688+
697689 trace ! ( "sending a batched message of size {}" , counter) ;
698690
699- // Build & compress synchronously, then spawn the receipt await.
700691 let message = ProducerMessage {
701692 payload,
702693 num_messages_in_batch : Some ( counter) ,
@@ -1272,270 +1263,3 @@ impl<T: SerializeMessage + Sized, Exe: Executor> MessageBuilder<'_, T, Exe> {
12721263 producer. send_raw ( producer_message) . await
12731264 }
12741265}
1275-
1276- #[ cfg( test) ]
1277- mod tests {
1278- use super :: * ;
1279- use futures:: executor:: block_on;
1280-
1281- fn pm_with (
1282- payload : Vec < u8 > ,
1283- props : & [ ( & str , & str ) ] ,
1284- partition_key : Option < & str > ,
1285- ordering_key : Option < & [ u8 ] > ,
1286- event_time : Option < u64 > ,
1287- ) -> ProducerMessage {
1288- let mut properties = HashMap :: new ( ) ;
1289- for ( k, v) in props {
1290- properties. insert ( ( * k) . to_string ( ) , ( * v) . to_string ( ) ) ;
1291- }
1292- ProducerMessage {
1293- payload,
1294- properties,
1295- partition_key : partition_key. map ( |s| s. to_string ( ) ) ,
1296- ordering_key : ordering_key. map ( |b| b. to_vec ( ) ) ,
1297- event_time,
1298- ..Default :: default ( )
1299- }
1300- }
1301-
1302- #[ test]
1303- fn send_future_errors_when_sender_dropped ( ) {
1304- let ( tx, rx) = futures:: channel:: oneshot:: channel :: < Result < CommandSendReceipt , Error > > ( ) ;
1305- // Drop the sender immediately to simulate an unexpected disconnect:
1306- drop ( tx) ;
1307-
1308- let fut = SendFuture ( rx) ;
1309- let err = block_on ( fut) . expect_err ( "expected an error when sender is dropped" ) ;
1310-
1311- // It should be mapped to a ProducerError::Custom inside Error::Producer
1312- match err {
1313- Error :: Producer ( ProducerError :: Custom ( msg) ) => {
1314- assert ! (
1315- msg. contains( "unexpectedly disconnected" ) ,
1316- "unexpected error message: {msg}"
1317- ) ;
1318- }
1319- other => panic ! ( "unexpected error variant: {:?}" , other) ,
1320- }
1321- }
1322-
1323- #[ test]
1324- fn message_converts_into_producer_message ( ) {
1325- let mut props = HashMap :: new ( ) ;
1326- props. insert ( "a" . to_string ( ) , "1" . to_string ( ) ) ;
1327- props. insert ( "b" . to_string ( ) , "2" . to_string ( ) ) ;
1328-
1329- let m = Message {
1330- payload : b"hello" . to_vec ( ) ,
1331- properties : props. clone ( ) ,
1332- partition_key : Some ( "key" . into ( ) ) ,
1333- ordering_key : Some ( vec ! [ 1 , 2 , 3 ] ) ,
1334- replicate_to : vec ! [ "r1" . into( ) , "r2" . into( ) ] ,
1335- event_time : Some ( 42 ) ,
1336- schema_version : Some ( vec ! [ 9 , 9 ] ) ,
1337- deliver_at_time : Some ( 123456789 ) ,
1338- } ;
1339-
1340- let pm: ProducerMessage = m. clone ( ) . into ( ) ;
1341-
1342- assert_eq ! ( pm. payload, m. payload) ;
1343- assert_eq ! ( pm. properties, m. properties) ;
1344- assert_eq ! ( pm. partition_key, m. partition_key) ;
1345- assert_eq ! ( pm. ordering_key, m. ordering_key) ;
1346- assert_eq ! ( pm. replicate_to, m. replicate_to) ;
1347- assert_eq ! ( pm. event_time, m. event_time) ;
1348- assert_eq ! ( pm. schema_version, m. schema_version) ;
1349- assert_eq ! ( pm. deliver_at_time, m. deliver_at_time) ;
1350-
1351- // And defaults that the producer fills later:
1352- assert ! ( pm. num_messages_in_batch. is_none( ) ) ;
1353- assert ! ( pm. compression. is_none( ) ) ;
1354- assert ! ( pm. uncompressed_size. is_none( ) ) ;
1355- }
1356-
1357- #[ test]
1358- fn batch_fills_on_length_threshold_and_drains ( ) {
1359- block_on ( async {
1360- let batch = Batch :: new ( 3 , None ) ;
1361-
1362- // Not full initially
1363- assert ! ( !batch. is_full( ) . await ) ;
1364-
1365- // Push 3 small messages; should become full because length == 3
1366- for i in 0 ..3 {
1367- let ( tx, _rx) = oneshot:: channel :: < Result < CommandSendReceipt , Error > > ( ) ;
1368- let msg = pm_with (
1369- vec ! [ i as u8 ] ,
1370- & [ ( "k" , "v" ) ] ,
1371- Some ( "p" ) ,
1372- Some ( & [ 7 , 8 ] ) ,
1373- Some ( 99 ) ,
1374- ) ;
1375- batch. push_back ( ( tx, msg) ) . await ;
1376- }
1377-
1378- assert ! (
1379- batch. is_full( ) . await ,
1380- "batch should be full at length threshold"
1381- ) ;
1382-
1383- // Drain and validate contents -> Batch creates BatchedMessage with mapped fields
1384- let drained = batch. get_messages ( ) . await ;
1385- assert_eq ! ( drained. len( ) , 3 ) ;
1386-
1387- for ( _tx, bm) in drained {
1388- // properties
1389- let kvs: HashMap < _ , _ > = bm
1390- . metadata
1391- . properties
1392- . iter ( )
1393- . map ( |kv| ( kv. key . clone ( ) , kv. value . clone ( ) ) )
1394- . collect ( ) ;
1395- assert_eq ! ( kvs. get( "k" ) . map( String :: as_str) , Some ( "v" ) ) ;
1396-
1397- // partition key & ordering key
1398- assert_eq ! ( bm. metadata. partition_key. as_deref( ) , Some ( "p" ) ) ;
1399- assert_eq ! ( bm. metadata. ordering_key. as_deref( ) , Some ( & [ 7 , 8 ] [ ..] ) ) ;
1400-
1401- // event time & payload size
1402- assert_eq ! ( bm. metadata. event_time, Some ( 99 ) ) ;
1403- assert_eq ! ( bm. metadata. payload_size as usize , bm. payload. len( ) ) ;
1404- }
1405-
1406- // After draining, no longer full
1407- assert ! ( !batch. is_full( ) . await ) ;
1408- } ) ;
1409- }
1410-
1411- #[ test]
1412- fn batch_fills_on_size_threshold_even_if_length_not_reached ( ) {
1413- block_on ( async {
1414- // Allow up to 10 messages, but set size threshold at 10 bytes
1415- let batch = Batch :: new ( 10 , Some ( 10 ) ) ;
1416-
1417- // Push two messages of 6 bytes -> total 12 >= 10, should be full
1418- for _ in 0 ..2 {
1419- let ( tx, _rx) = oneshot:: channel :: < Result < CommandSendReceipt , Error > > ( ) ;
1420- let msg = pm_with ( vec ! [ 0 ; 6 ] , & [ ] , None , None , None ) ;
1421- batch. push_back ( ( tx, msg) ) . await ;
1422- }
1423-
1424- assert ! (
1425- batch. is_full( ) . await ,
1426- "batch should be full when total payload meets size threshold"
1427- ) ;
1428-
1429- // Drain resets size accounting
1430- let drained = batch. get_messages ( ) . await ;
1431- assert_eq ! ( drained. len( ) , 2 ) ;
1432- assert ! (
1433- !batch. is_full( ) . await ,
1434- "after drain, batch shouldn't be full"
1435- ) ;
1436- } ) ;
1437- }
1438-
1439- #[ test]
1440- fn batch_storage_tracks_and_resets_size ( ) {
1441- // Directly test the inner size accounting behavior
1442- let mut storage = BatchStorage :: new ( 5 ) ;
1443-
1444- // Build two batched messages with payload_size 3 and 4
1445- let mut meta1 = proto:: SingleMessageMetadata :: default ( ) ;
1446- meta1. payload_size = 3 ;
1447- let bm1 = BatchedMessage {
1448- metadata : meta1,
1449- payload : vec ! [ 0 ; 3 ] ,
1450- } ;
1451-
1452- let mut meta2 = proto:: SingleMessageMetadata :: default ( ) ;
1453- meta2. payload_size = 4 ;
1454- let bm2 = BatchedMessage {
1455- metadata : meta2,
1456- payload : vec ! [ 0 ; 4 ] ,
1457- } ;
1458-
1459- let ( tx1, _rx1) = oneshot:: channel :: < Result < CommandSendReceipt , Error > > ( ) ;
1460- let ( tx2, _rx2) = oneshot:: channel :: < Result < CommandSendReceipt , Error > > ( ) ;
1461-
1462- storage. push_back ( tx1, bm1) ;
1463- assert_eq ! ( storage. size, 3 ) ;
1464-
1465- storage. push_back ( tx2, bm2) ;
1466- assert_eq ! ( storage. size, 7 ) ;
1467-
1468- // Draining resets the tracked size
1469- let drained = storage. get_messages ( ) ;
1470- assert_eq ! ( drained. len( ) , 2 ) ;
1471- assert_eq ! ( storage. size, 0 ) ;
1472- assert ! ( storage. storage. is_empty( ) ) ;
1473- }
1474-
1475- // Guards that we treat "exactly at threshold" as full.
1476- #[ test]
1477- fn batch_is_full_when_size_exactly_matches_threshold ( ) {
1478- block_on ( async {
1479- // Allow many messages but fix a small byte threshold.
1480- let batch = Batch :: new ( 10 , Some ( 10 ) ) ;
1481-
1482- // 4 + 6 == 10 exactly.
1483- for & n in & [ 4usize , 6usize ] {
1484- let ( tx, _rx) = oneshot:: channel :: < Result < CommandSendReceipt , Error > > ( ) ;
1485- let msg = ProducerMessage {
1486- payload : vec ! [ 0 ; n] ,
1487- ..Default :: default ( )
1488- } ;
1489- batch. push_back ( ( tx, msg) ) . await ;
1490- }
1491-
1492- assert ! (
1493- batch. is_full( ) . await ,
1494- "batch should be full when total payload equals the size threshold"
1495- ) ;
1496-
1497- // drain & ensure we reset fullness afterwards
1498- let drained = batch. get_messages ( ) . await ;
1499- assert_eq ! ( drained. len( ) , 2 ) ;
1500- assert ! (
1501- !batch. is_full( ) . await ,
1502- "after draining, a fresh batch should not be full"
1503- ) ;
1504- } ) ;
1505- }
1506-
1507- // Ensures empty properties/keys/event_time remain unset in batched metadata,
1508- // while payload_size is tracked correctly.
1509- #[ test]
1510- fn batch_mapping_when_optional_fields_absent ( ) {
1511- block_on ( async {
1512- let batch = Batch :: new ( 5 , None ) ;
1513-
1514- let ( tx, _rx) = oneshot:: channel :: < Result < CommandSendReceipt , Error > > ( ) ;
1515- let msg = ProducerMessage {
1516- // No properties / keys / event_time
1517- payload : vec ! [ 1 , 2 , 3 , 4 , 5 ] , // 5 bytes
1518- ..Default :: default ( )
1519- } ;
1520-
1521- batch. push_back ( ( tx, msg) ) . await ;
1522-
1523- // Not full by length; just verifying mapping here.
1524- assert ! ( !batch. is_full( ) . await ) ;
1525-
1526- let drained = batch. get_messages ( ) . await ;
1527- assert_eq ! ( drained. len( ) , 1 ) ;
1528-
1529- let ( _tx, bm) = drained. into_iter ( ) . next ( ) . unwrap ( ) ;
1530- // properties empty
1531- assert ! ( bm. metadata. properties. is_empty( ) ) ;
1532- // optional fields should be None
1533- assert ! ( bm. metadata. partition_key. is_none( ) ) ;
1534- assert ! ( bm. metadata. ordering_key. is_none( ) ) ;
1535- assert ! ( bm. metadata. event_time. is_none( ) ) ;
1536- // payload_size must match actual payload length
1537- assert_eq ! ( bm. metadata. payload_size as usize , 5 ) ;
1538- assert_eq ! ( bm. payload. len( ) , 5 ) ;
1539- } ) ;
1540- }
1541- }
0 commit comments