@@ -1149,3 +1149,147 @@ pub fn test_socket<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
1149
1149
1150
1150
Ok ( ( ) )
1151
1151
}
1152
+
1153
+ pub fn test_udp_recvmsg_multishot < S : squeue:: EntryMarker , C : cqueue:: EntryMarker > (
1154
+ ring : & mut IoUring < S , C > ,
1155
+ test : & Test ,
1156
+ ) -> anyhow:: Result < ( ) > {
1157
+ // Multishot recvmsg was introduced in 6.0, like `SendZc`.
1158
+ // We cannot probe for the former, so we check for the latter as a proxy instead.
1159
+ require ! (
1160
+ test;
1161
+ test. probe. is_supported( opcode:: RecvMsg :: CODE ) ;
1162
+ test. probe. is_supported( opcode:: ProvideBuffers :: CODE ) ;
1163
+ test. probe. is_supported( opcode:: SendZc :: CODE ) ;
1164
+ ) ;
1165
+
1166
+ println ! ( "test udp_recvmsg_multishot" ) ;
1167
+
1168
+ let ( socket_slot, socket_addr) = {
1169
+ // `:0` means "pick up a random available port number", which should
1170
+ // help avoiding test flakes if a static port is already in use.
1171
+ let server_sock = std:: net:: UdpSocket :: bind ( "127.0.0.1:0" ) ?;
1172
+ ring. submitter ( )
1173
+ . register_files ( & [ server_sock. as_raw_fd ( ) ] ) ?;
1174
+ let addr = server_sock. local_addr ( ) . unwrap ( ) ;
1175
+ ( io_uring:: types:: Fixed ( 0 ) , addr)
1176
+ } ;
1177
+
1178
+ // Provide 2 buffers in buffer group `33`, at index 0 and 1.
1179
+ // Each one is 512 bytes large.
1180
+ const BUF_GROUP : u16 = 33 ;
1181
+ const SIZE : usize = 512 ;
1182
+ let mut buffers = [ [ 0u8 ; SIZE ] ; 2 ] ;
1183
+ for ( index, buf) in buffers. iter_mut ( ) . enumerate ( ) {
1184
+ let provide_bufs_e = io_uring:: opcode:: ProvideBuffers :: new (
1185
+ buf. as_mut_ptr ( ) ,
1186
+ SIZE as i32 ,
1187
+ 1 ,
1188
+ BUF_GROUP ,
1189
+ index as u16 ,
1190
+ )
1191
+ . build ( )
1192
+ . user_data ( 11 )
1193
+ . into ( ) ;
1194
+ unsafe { ring. submission ( ) . push ( & provide_bufs_e) ? } ;
1195
+ ring. submitter ( ) . submit_and_wait ( 1 ) ?;
1196
+ let cqes: Vec < io_uring:: cqueue:: Entry > = ring. completion ( ) . map ( Into :: into) . collect ( ) ;
1197
+ assert_eq ! ( cqes. len( ) , 1 ) ;
1198
+ assert_eq ! ( cqes[ 0 ] . user_data( ) , 11 ) ;
1199
+ assert_eq ! ( cqes[ 0 ] . result( ) , 0 ) ;
1200
+ assert_eq ! ( cqes[ 0 ] . flags( ) , 0 ) ;
1201
+ }
1202
+
1203
+ // This structure is actually only used for input arguments to the kernel
1204
+ // (and only name length and control length are actually relevant).
1205
+ let mut msghdr: libc:: msghdr = unsafe { std:: mem:: zeroed ( ) } ;
1206
+ msghdr. msg_namelen = 32 ;
1207
+ msghdr. msg_controllen = 0 ;
1208
+
1209
+ // TODO(lucab): make this more ergonomic to use.
1210
+ const IORING_RECV_MULTISHOT : u16 = 2 ;
1211
+
1212
+ let recvmsg_e = io_uring:: opcode:: RecvMsg :: new ( socket_slot, & mut msghdr as * mut _ )
1213
+ . ioprio ( IORING_RECV_MULTISHOT )
1214
+ . buf_group ( BUF_GROUP )
1215
+ . build ( )
1216
+ . flags ( io_uring:: squeue:: Flags :: BUFFER_SELECT )
1217
+ . user_data ( 77 )
1218
+ . into ( ) ;
1219
+ unsafe { ring. submission ( ) . push ( & recvmsg_e) ? } ;
1220
+ ring. submitter ( ) . submit ( ) . unwrap ( ) ;
1221
+
1222
+ let client_socket: socket2:: Socket = std:: net:: UdpSocket :: bind ( "127.0.0.1:0" ) . unwrap ( ) . into ( ) ;
1223
+ let client_addr = client_socket
1224
+ . local_addr ( )
1225
+ . unwrap ( )
1226
+ . as_socket_ipv4 ( )
1227
+ . unwrap ( ) ;
1228
+ client_socket
1229
+ . send_to ( "testfoo" . as_bytes ( ) , & socket_addr. into ( ) )
1230
+ . unwrap ( ) ;
1231
+ client_socket
1232
+ . send_to ( "testbarbar" . as_bytes ( ) , & socket_addr. into ( ) )
1233
+ . unwrap ( ) ;
1234
+
1235
+ // Check the completion events for the two UDP messages, plus a trailing
1236
+ // CQE signaling that we ran out of buffers.
1237
+ ring. submitter ( ) . submit_and_wait ( 3 ) . unwrap ( ) ;
1238
+ let cqes: Vec < io_uring:: cqueue:: Entry > = ring. completion ( ) . map ( Into :: into) . collect ( ) ;
1239
+ assert_eq ! ( cqes. len( ) , 3 ) ;
1240
+ assert_eq ! ( cqes[ 0 ] . user_data( ) , 77 ) ;
1241
+ assert ! ( cqes[ 0 ] . result( ) > 0 ) ;
1242
+ assert ! ( io_uring:: cqueue:: more( cqes[ 0 ] . flags( ) ) ) ;
1243
+ assert_eq ! ( io_uring:: cqueue:: buffer_select( cqes[ 0 ] . flags( ) ) , Some ( 0 ) ) ;
1244
+ assert ! ( cqes[ 0 ] . flags( ) != 0 ) ;
1245
+ assert_eq ! ( cqes[ 1 ] . user_data( ) , 77 ) ;
1246
+ assert ! ( cqes[ 1 ] . result( ) > 0 ) ;
1247
+ assert ! ( io_uring:: cqueue:: more( cqes[ 1 ] . flags( ) ) ) ;
1248
+ assert_eq ! ( io_uring:: cqueue:: buffer_select( cqes[ 1 ] . flags( ) ) , Some ( 1 ) ) ;
1249
+ assert ! ( cqes[ 1 ] . flags( ) != 0 ) ;
1250
+ assert_eq ! ( cqes[ 2 ] . user_data( ) , 77 ) ;
1251
+ assert_eq ! ( cqes[ 2 ] . result( ) , -libc:: ENOBUFS ) ;
1252
+ assert ! ( !io_uring:: cqueue:: more( cqes[ 2 ] . flags( ) ) ) ;
1253
+ assert_eq ! ( io_uring:: cqueue:: buffer_select( cqes[ 2 ] . flags( ) ) , None ) ;
1254
+ assert_eq ! ( cqes[ 2 ] . flags( ) , 0 ) ;
1255
+
1256
+ let msg0 = types:: RecvMsgOut :: parse ( buffers[ 0 ] . as_slice ( ) , & msghdr) . unwrap ( ) ;
1257
+ assert ! ( !msg0. is_payload_truncated( ) ) ;
1258
+ assert_eq ! ( msg0. payload_data( ) , b"testfoo" . as_slice( ) ) ;
1259
+ assert ! ( !msg0. is_control_data_truncated( ) ) ;
1260
+ assert_eq ! ( msg0. control_data( ) , & [ ] ) ;
1261
+ assert ! ( !msg0. is_name_data_truncated( ) ) ;
1262
+ let ( _, addr) = unsafe {
1263
+ socket2:: SockAddr :: init ( |storage, len| {
1264
+ * len = msg0. name_data ( ) . len ( ) as u32 ;
1265
+ std:: ptr:: copy_nonoverlapping ( msg0. name_data ( ) . as_ptr ( ) as _ , storage, 1 ) ;
1266
+ Ok ( ( ) )
1267
+ } )
1268
+ }
1269
+ . unwrap ( ) ;
1270
+ let addr = addr. as_socket_ipv4 ( ) . unwrap ( ) ;
1271
+ assert_eq ! ( addr. ip( ) , client_addr. ip( ) ) ;
1272
+ assert_eq ! ( addr. port( ) , client_addr. port( ) ) ;
1273
+
1274
+ let msg1 = types:: RecvMsgOut :: parse ( buffers[ 1 ] . as_slice ( ) , & msghdr) . unwrap ( ) ;
1275
+ assert ! ( !msg1. is_payload_truncated( ) ) ;
1276
+ assert_eq ! ( msg1. payload_data( ) , b"testbarbar" . as_slice( ) ) ;
1277
+ assert ! ( !msg1. is_control_data_truncated( ) ) ;
1278
+ assert_eq ! ( msg1. control_data( ) , & [ ] ) ;
1279
+ assert ! ( !msg1. is_name_data_truncated( ) ) ;
1280
+ let ( _, addr) = unsafe {
1281
+ socket2:: SockAddr :: init ( |storage, len| {
1282
+ * len = msg1. name_data ( ) . len ( ) as u32 ;
1283
+ std:: ptr:: copy_nonoverlapping ( msg1. name_data ( ) . as_ptr ( ) as _ , storage, 1 ) ;
1284
+ Ok ( ( ) )
1285
+ } )
1286
+ }
1287
+ . unwrap ( ) ;
1288
+ let addr = addr. as_socket_ipv4 ( ) . unwrap ( ) ;
1289
+ assert_eq ! ( addr. ip( ) , client_addr. ip( ) ) ;
1290
+ assert_eq ! ( addr. port( ) , client_addr. port( ) ) ;
1291
+
1292
+ ring. submitter ( ) . unregister_files ( ) . unwrap ( ) ;
1293
+
1294
+ Ok ( ( ) )
1295
+ }
0 commit comments