From 6f5861771ef19786a4ee8b93812a110db3355a11 Mon Sep 17 00:00:00 2001 From: Jon Chiappetta Date: Wed, 6 Aug 2025 16:33:18 -0400 Subject: [PATCH 1/2] bulk mode --- src/openvpn/forward.c | 257 ++++++++++++++++++++++++++++++++++------- src/openvpn/forward.h | 6 + src/openvpn/init.c | 62 ++++++++++ src/openvpn/mtu.c | 10 +- src/openvpn/mtu.h | 15 +++ src/openvpn/multi.c | 142 +++++++++++++++++++++-- src/openvpn/multi.h | 7 ++ src/openvpn/multi_io.c | 18 ++- src/openvpn/multi_io.h | 1 + src/openvpn/openvpn.h | 11 ++ src/openvpn/options.c | 10 ++ src/openvpn/options.h | 3 + 12 files changed, 485 insertions(+), 57 deletions(-) diff --git a/src/openvpn/forward.c b/src/openvpn/forward.c index f342958e331..6cd52b14479 100644 --- a/src/openvpn/forward.c +++ b/src/openvpn/forward.c @@ -46,6 +46,9 @@ #include "mstats.h" +#include +#include + counter_type link_read_bytes_global; /* GLOBAL */ counter_type link_write_bytes_global; /* GLOBAL */ @@ -611,6 +614,21 @@ buffer_turnover(const uint8_t *orig_buf, struct buffer *dest_stub, struct buffer } } +uint8_t *buff_prepsize(uint8_t *buff, int *size) +{ + buff[0] = ((*size >> 8) & 0xff); + buff[1] = ((*size >> 0) & 0xff); + buff += 2; + return buff; +} + +uint8_t *buff_postsize(uint8_t *buff, int *size) +{ + *size = ((buff[0] << 8) + (buff[1] << 0)); + buff += 2; + return buff; +} + /* * Compress, fragment, encrypt and HMAC-sign an outgoing packet. * Input: c->c2.buf @@ -888,7 +906,7 @@ socks_postprocess_incoming_link(struct context *c, struct link_socket *sock) { if (sock->socks_proxy && sock->info.proto == PROTO_UDP) { - socks_process_incoming_udp(&c->c2.buf, &c->c2.from); + socks_process_incoming_udp(&c->c2.buf2, &c->c2.from); } } @@ -918,7 +936,7 @@ link_socket_write_post_size_adjust(int *size, int size_delta, struct buffer *buf } /* - * Output: c->c2.buf + * Output: c->c2.buf2 */ void @@ -934,10 +952,10 @@ read_incoming_link(struct context *c, struct link_socket *sock) perf_push(PERF_READ_IN_LINK); - c->c2.buf = c->c2.buffers->read_link_buf; - ASSERT(buf_init(&c->c2.buf, c->c2.frame.buf.headroom)); + c->c2.buf2 = c->c2.buffers->read_link_buf; + ASSERT(buf_init(&c->c2.buf2, c->c2.frame.buf.headroom)); - status = link_socket_read(sock, &c->c2.buf, &c->c2.from); + status = link_socket_read(sock, &c->c2.buf2, &c->c2.from); if (socket_connection_reset(sock, status)) { @@ -993,17 +1011,17 @@ process_incoming_link_part1(struct context *c, struct link_socket_info *lsi, boo struct gc_arena gc = gc_new(); bool decrypt_status = false; - if (c->c2.buf.len > 0) + if (c->c2.buf2.len > 0) { - c->c2.link_read_bytes += c->c2.buf.len; - link_read_bytes_global += c->c2.buf.len; + c->c2.link_read_bytes += c->c2.buf2.len; + link_read_bytes_global += c->c2.buf2.len; #ifdef ENABLE_MEMSTATS if (mmap_stats) { mmap_stats->link_read_bytes = link_read_bytes_global; } #endif - c->c2.original_recv_size = c->c2.buf.len; + c->c2.original_recv_size = c->c2.buf2.len; } else { @@ -1016,21 +1034,22 @@ process_incoming_link_part1(struct context *c, struct link_socket_info *lsi, boo { if (!ask_gremlin(c->options.gremlin)) { - c->c2.buf.len = 0; + c->c2.buf2.len = 0; } - corrupt_gremlin(&c->c2.buf, c->options.gremlin); + corrupt_gremlin(&c->c2.buf2, c->options.gremlin); } #endif /* log incoming packet */ #ifdef LOG_RW - if (c->c2.log_rw && c->c2.buf.len > 0) + if (c->c2.log_rw && c->c2.buf2.len > 0) { fprintf(stderr, "R"); } #endif + msg(D_LINK_RW, "%s READ [%d] from %s: %s", proto2ascii(lsi->proto, lsi->af, true), - BLEN(&c->c2.buf), print_link_socket_actual(&c->c2.from, &gc), PROTO_DUMP(&c->c2.buf, &gc)); + BLEN(&c->c2.buf2), print_link_socket_actual(&c->c2.from, &gc), PROTO_DUMP(&c->c2.buf2, &gc)); /* * Good, non-zero length packet received. @@ -1039,18 +1058,18 @@ process_incoming_link_part1(struct context *c, struct link_socket_info *lsi, boo * If any stage fails, it sets buf.len to 0 or -1, * telling downstream stages to ignore the packet. */ - if (c->c2.buf.len > 0) + if (c->c2.buf2.len > 0) { struct crypto_options *co = NULL; const uint8_t *ad_start = NULL; - if (!link_socket_verify_incoming_addr(&c->c2.buf, lsi, &c->c2.from)) + if (!link_socket_verify_incoming_addr(&c->c2.buf2, lsi, &c->c2.from)) { - link_socket_bad_incoming_addr(&c->c2.buf, lsi, &c->c2.from); + link_socket_bad_incoming_addr(&c->c2.buf2, lsi, &c->c2.from); } if (c->c2.tls_multi) { - uint8_t opcode = *BPTR(&c->c2.buf) >> P_OPCODE_SHIFT; + uint8_t opcode = *BPTR(&c->c2.buf2) >> P_OPCODE_SHIFT; /* * If DCO is enabled, the kernel drivers require that the @@ -1064,7 +1083,7 @@ process_incoming_link_part1(struct context *c, struct link_socket_info *lsi, boo { msg(D_LINK_ERRORS, "Data Channel Offload doesn't support DATA_V1 packets. " "Upgrade your server to 2.4.5 or newer."); - c->c2.buf.len = 0; + c->c2.buf2.len = 0; } /* @@ -1077,7 +1096,7 @@ process_incoming_link_part1(struct context *c, struct link_socket_info *lsi, boo * will load crypto_options with the correct encryption key * and return false. */ - if (tls_pre_decrypt(c->c2.tls_multi, &c->c2.from, &c->c2.buf, &co, floated, &ad_start)) + if (tls_pre_decrypt(c->c2.tls_multi, &c->c2.from, &c->c2.buf2, &co, floated, &ad_start)) { interval_action(&c->c2.tmp_int); @@ -1100,12 +1119,12 @@ process_incoming_link_part1(struct context *c, struct link_socket_info *lsi, boo */ if (c->c2.tls_multi && c->c2.tls_multi->multi_state < CAS_CONNECT_DONE) { - c->c2.buf.len = 0; + c->c2.buf2.len = 0; } /* authenticate and decrypt the incoming packet */ decrypt_status = - openvpn_decrypt(&c->c2.buf, c->c2.buffers->decrypt_buf, co, &c->c2.frame, ad_start); + openvpn_decrypt(&c->c2.buf2, c->c2.buffers->decrypt_buf, co, &c->c2.frame, ad_start); if (!decrypt_status /* on the instance context we have only one socket, so just check the first one */ @@ -1130,12 +1149,12 @@ void process_incoming_link_part2(struct context *c, struct link_socket_info *lsi, const uint8_t *orig_buf) { - if (c->c2.buf.len > 0) + if (c->c2.buf2.len > 0) { #ifdef ENABLE_FRAGMENT if (c->c2.fragment) { - fragment_incoming(c->c2.fragment, &c->c2.buf, &c->c2.frame_fragment); + fragment_incoming(c->c2.fragment, &c->c2.buf2, &c->c2.frame_fragment); } #endif @@ -1143,14 +1162,14 @@ process_incoming_link_part2(struct context *c, struct link_socket_info *lsi, /* decompress the incoming packet */ if (c->c2.comp_context) { - (*c->c2.comp_context->alg.decompress)(&c->c2.buf, c->c2.buffers->decompress_buf, + (*c->c2.comp_context->alg.decompress)(&c->c2.buf2, c->c2.buffers->decompress_buf, c->c2.comp_context, &c->c2.frame); } #endif #ifdef PACKET_TRUNCATION_CHECK - /* if (c->c2.buf.len > 1) --c->c2.buf.len; */ - ipv4_packet_size_verify(BPTR(&c->c2.buf), BLEN(&c->c2.buf), TUNNEL_TYPE(c->c1.tuntap), + /* if (c->c2.buf2.len > 1) --c->c2.buf2.len; */ + ipv4_packet_size_verify(BPTR(&c->c2.buf2), BLEN(&c->c2.buf2), TUNNEL_TYPE(c->c1.tuntap), "POST_DECRYPT", &c->c2.n_trunc_post_decrypt); #endif @@ -1163,39 +1182,39 @@ process_incoming_link_part2(struct context *c, struct link_socket_info *lsi, * * Also, update the persisted version of our packet-id. */ - if (!TLS_MODE(c) && c->c2.buf.len > 0) + if (!TLS_MODE(c) && c->c2.buf2.len > 0) { link_socket_set_outgoing_addr(lsi, &c->c2.from, NULL, c->c2.es); } /* reset packet received timer */ - if (c->options.ping_rec_timeout && c->c2.buf.len > 0) + if (c->options.ping_rec_timeout && c->c2.buf2.len > 0) { event_timeout_reset(&c->c2.ping_rec_interval); } /* increment authenticated receive byte count */ - if (c->c2.buf.len > 0) + if (c->c2.buf2.len > 0) { - c->c2.link_read_bytes_auth += c->c2.buf.len; + c->c2.link_read_bytes_auth += c->c2.buf2.len; c->c2.max_recv_size_local = max_int(c->c2.original_recv_size, c->c2.max_recv_size_local); } /* Did we just receive an openvpn ping packet? */ - if (is_ping_msg(&c->c2.buf)) + if (is_ping_msg(&c->c2.buf2)) { dmsg(D_PING, "RECEIVED PING PACKET"); - c->c2.buf.len = 0; /* drop packet */ + c->c2.buf2.len = 0; /* drop packet */ } /* Did we just receive an OCC packet? */ - if (is_occ_msg(&c->c2.buf)) + if (is_occ_msg(&c->c2.buf2)) { process_received_occ_msg(c); } - buffer_turnover(orig_buf, &c->c2.to_tun, &c->c2.buf, &c->c2.buffers->read_link_buf); + buffer_turnover(orig_buf, &c->c2.to_tun, &c->c2.buf2, &c->c2.buffers->read_link_buf); /* to_tun defined + unopened tuntap can cause deadlock */ if (!tuntap_defined(c->c1.tuntap)) @@ -1209,16 +1228,37 @@ process_incoming_link_part2(struct context *c, struct link_socket_info *lsi, } } +void process_incoming_link_part3(struct context *c) +{ + if (BULK_MODE(c)) + { + int leng = BLEN(&c->c2.buf2); + if (leng > 0) + { + c->c2.buffers->send_tun_max.offset = TUN_BAT_OFF; + c->c2.buffers->send_tun_max.len = leng; + bcopy(BPTR(&c->c2.buf2), BPTR(&c->c2.buffers->send_tun_max), leng); + c->c2.to_tun.offset += 2; + c->c2.buf2.offset += 2; + } + else + { + buf_reset(&c->c2.to_tun); + } + } +} + static void process_incoming_link(struct context *c, struct link_socket *sock) { perf_push(PERF_PROC_IN_LINK); struct link_socket_info *lsi = &sock->info; - const uint8_t *orig_buf = c->c2.buf.data; + const uint8_t *orig_buf = c->c2.buf2.data; process_incoming_link_part1(c, lsi, false); process_incoming_link_part2(c, lsi, orig_buf); + process_incoming_link_part3(c); perf_pop(); } @@ -1319,7 +1359,7 @@ process_incoming_dco(struct context *c) */ void -read_incoming_tun(struct context *c) +read_incoming_tun_part2(struct context *c) { /* * Setup for read() call on TUN/TAP device. @@ -1380,6 +1420,52 @@ read_incoming_tun(struct context *c) perf_pop(); } +void read_incoming_tun_part3(struct context *c) +{ + fd_set rfds; + struct timeval timo; + int plen = 0, pidx = -1; + int fdno = c->c1.tuntap->fd; + for (int x = 0; x < TUN_BAT_MIN; ++x) + { + int leng = plen, indx = (pidx + 1); + if (leng < 1) + { + FD_ZERO(&rfds); + FD_SET(fdno, &rfds); + timo.tv_sec = 0; + timo.tv_usec = 0; + select(fdno+1, &rfds, NULL, NULL, &timo); + if (FD_ISSET(fdno, &rfds)) + { + read_incoming_tun_part2(c); + plen = BLEN(&c->c2.buf); + } else { break; } + } + leng = plen; + if (leng > 0) + { + c->c2.buffers->read_tun_bufs[indx].offset = TUN_BAT_OFF; + c->c2.buffers->read_tun_bufs[indx].len = leng; + bcopy(BPTR(&c->c2.buf), BPTR(&c->c2.buffers->read_tun_bufs[indx]), leng); + c->c2.bufs[indx] = c->c2.buffers->read_tun_bufs[indx]; + pidx = indx; + } else { break; } + plen = 0; + } + c->c2.buffers->bulk_indx = 0; + c->c2.buffers->bulk_leng = (pidx + 1); +} + +void read_incoming_tun(struct context *c) +{ + if (!BULK_MODE(c)) { + read_incoming_tun_part2(c); + } else { + read_incoming_tun_part3(c); + } +} + /** * Drops UDP packets which OS decided to route via tun. * @@ -1467,7 +1553,7 @@ drop_if_recursive_routing(struct context *c, struct buffer *buf) */ void -process_incoming_tun(struct context *c, struct link_socket *out_sock) +process_incoming_tun_part2(struct context *c, struct link_socket *out_sock) { struct gc_arena gc = gc_new(); @@ -1486,7 +1572,7 @@ process_incoming_tun(struct context *c, struct link_socket *out_sock) #endif /* Show packet content */ - dmsg(D_TUN_RW, "TUN READ [%d]", BLEN(&c->c2.buf)); + dmsg(D_TUN_RW, "TUN READ [%d] [%d]", BLEN(&c->c2.buf), c->c2.frame.buf.payload_size); if (c->c2.buf.len > 0) { @@ -1510,7 +1596,10 @@ process_incoming_tun(struct context *c, struct link_socket *out_sock) } if (c->c2.buf.len > 0) { - encrypt_sign(c, true); + if (!BULK_MODE(c)) + { + encrypt_sign(c, true); + } } else { @@ -1520,6 +1609,57 @@ process_incoming_tun(struct context *c, struct link_socket *out_sock) gc_free(&gc); } +void process_incoming_tun_part3(struct context *c, struct link_socket *out_sock) +{ + if (BULK_DATA(c->c2.buffers)) + { + c->c2.buffers->read_tun_max.offset = TUN_BAT_OFF; + c->c2.buffers->read_tun_max.len = 0; + uint8_t *temp = BPTR(&c->c2.buffers->read_tun_max); + int leng = c->c2.buffers->bulk_leng; + int plen = 0, maxl = 0; + for (int x = 0; x < leng; ++x) + { + c->c2.buf = c->c2.bufs[x]; + process_incoming_tun_part2(c, out_sock); + plen = BLEN(&c->c2.buf); + if (plen > 0) + { + temp = buff_prepsize(temp, &plen); + bcopy(BPTR(&c->c2.buf), temp, plen); + temp += plen; maxl += (plen + 2); + } + c->c2.bufs[x].len = 0; + } + if (maxl > 0) + { + c->c2.buffers->read_tun_max.offset = TUN_BAT_OFF; + c->c2.buffers->read_tun_max.len = maxl; + c->c2.buf = c->c2.buffers->read_tun_max; + encrypt_sign(c, true); + } + else + { + buf_reset(&c->c2.to_link); + } + } + else + { + buf_reset(&c->c2.to_link); + } + c->c2.buffers->bulk_indx = -1; + c->c2.buffers->bulk_leng = -1; +} + +void process_incoming_tun(struct context *c, struct link_socket *out_sock) +{ + if (!BULK_MODE(c)) { + process_incoming_tun_part2(c, out_sock); + } else { + process_incoming_tun_part3(c, out_sock); + } +} + /** * Forges a IPv6 ICMP packet with a no route to host error code from the * IPv6 packet in buf and sends it directly back to the client via the tun @@ -1746,7 +1886,7 @@ process_outgoing_link(struct context *c, struct link_socket *sock) perf_push(PERF_PROC_OUT_LINK); - if (c->c2.to_link.len > 0 && c->c2.to_link.len <= c->c2.frame.buf.payload_size) + if (c->c2.to_link.len > 0 && (c->c2.to_link.len <= c->c2.frame.buf.payload_size || c->c2.frame.bulk_size > 0)) { /* * Setup for call to send/sendto which will send @@ -1791,6 +1931,7 @@ process_outgoing_link(struct context *c, struct link_socket *sock) fprintf(stderr, "W"); } #endif + msg(D_LINK_RW, "%s WRITE [%d] to %s: %s", proto2ascii(sock->info.proto, sock->info.af, true), BLEN(&c->c2.to_link), print_link_socket_actual(c->c2.to_link_addr, &gc), PROTO_DUMP(&c->c2.to_link, &gc)); @@ -1882,7 +2023,7 @@ process_outgoing_link(struct context *c, struct link_socket *sock) */ void -process_outgoing_tun(struct context *c, struct link_socket *in_sock) +process_outgoing_tun_part2(struct context *c, struct link_socket *in_sock) { /* * Set up for write() call to TUN/TAP @@ -1915,7 +2056,8 @@ process_outgoing_tun(struct context *c, struct link_socket *in_sock) fprintf(stderr, "w"); } #endif - dmsg(D_TUN_RW, "TUN WRITE [%d]", BLEN(&c->c2.to_tun)); + + dmsg(D_TUN_RW, "TUN WRITE [%d] [%d]", BLEN(&c->c2.to_tun), c->c2.frame.buf.payload_size); #ifdef PACKET_TRUNCATION_CHECK ipv4_packet_size_verify(BPTR(&c->c2.to_tun), BLEN(&c->c2.to_tun), TUNNEL_TYPE(c->c1.tuntap), @@ -1971,6 +2113,37 @@ process_outgoing_tun(struct context *c, struct link_socket *in_sock) perf_pop(); } +void process_outgoing_tun_part3(struct context *c, struct link_socket *in_sock) +{ + int maxl = 0, plen = 0; + int leng = BLEN(&c->c2.buffers->send_tun_max); + uint8_t *temp = BPTR(&c->c2.buffers->send_tun_max); + for (int x = 0; x < TUN_BAT_MIN; ++x) + { + temp = buff_postsize(temp, &plen); + if ((leng > 0) && (plen > 0) && ((maxl + plen) < leng)) + { + c->c2.to_tun = c->c2.buffers->to_tun_max; + c->c2.to_tun.offset = TUN_BAT_OFF; + c->c2.to_tun.len = plen; + bcopy(temp, BPTR(&c->c2.to_tun), plen); + temp += plen; maxl += (plen + 2); + process_outgoing_tun_part2(c, in_sock); + } else { break; } + } + c->c2.buffers->send_tun_max.len = 0; + buf_reset(&c->c2.to_tun); +} + +void process_outgoing_tun(struct context *c, struct link_socket *in_sock) +{ + if (!BULK_MODE(c)) { + process_outgoing_tun_part2(c, in_sock); + } else { + process_outgoing_tun_part3(c, in_sock); + } +} + #if defined(__GNUC__) || defined(__clang__) #pragma GCC diagnostic pop #endif diff --git a/src/openvpn/forward.h b/src/openvpn/forward.h index a575fafe4b7..fc1e513fe52 100644 --- a/src/openvpn/forward.h +++ b/src/openvpn/forward.h @@ -34,6 +34,10 @@ * file */ +#define BULK_MODE(c) (c->c2.frame.bulk_size > 0) +#define BULK_DATA(b) (b && (b->bulk_leng > 0) && (b->bulk_indx < b->bulk_leng)) +#define INST_LENG(a) (a && (a->inst_leng > 0) && (a->inst_indx < a->inst_leng) && (a->pending == NULL)) + #define TUN_OUT(c) (BLEN(&(c)->c2.to_tun) > 0) #define LINK_OUT(c) (BLEN(&(c)->c2.to_link) > 0) #define ANY_OUT(c) (TUN_OUT(c) || LINK_OUT(c)) @@ -196,6 +200,8 @@ bool process_incoming_link_part1(struct context *c, struct link_socket_info *lsi void process_incoming_link_part2(struct context *c, struct link_socket_info *lsi, const uint8_t *orig_buf); +void process_incoming_link_part3(struct context *c); + /** * Transfers \c float_sa data extracted from an incoming DCO * PEER_FLOAT_NTF to \c out_osaddr for later processing. diff --git a/src/openvpn/init.c b/src/openvpn/init.c index f8a0fee61fc..a5a681a1fd7 100644 --- a/src/openvpn/init.c +++ b/src/openvpn/init.c @@ -2976,6 +2976,11 @@ frame_finalize_options(struct context *c, const struct options *o) tailroom += COMP_EXTRA_BUFFER(payload_size); #endif + if (frame->bulk_size > 0) + { + payload_size = BAT_SIZE(TUN_BAT_ONE, frame->tun_mtu, TUN_BAT_OFF); + } + frame->buf.payload_size = payload_size; frame->buf.headroom = headroom; frame->buf.tailroom = tailroom; @@ -3478,6 +3483,10 @@ do_init_frame_tls(struct context *c) if (c->c2.tls_multi) { tls_multi_init_finalize(c->c2.tls_multi, c->options.ce.tls_mtu); + if (c->c2.frame.bulk_size > 0) + { + c->c2.tls_multi->opt.frame.buf.payload_size = c->c2.frame.tun_mtu; + } ASSERT(c->c2.tls_multi->opt.frame.buf.payload_size <= c->c2.frame.buf.payload_size); frame_print(&c->c2.tls_multi->opt.frame, D_MTU_INFO, "Control Channel MTU parms"); @@ -3545,6 +3554,14 @@ do_init_frame(struct context *c) c->c2.frame.extra_tun += c->options.ce.tun_mtu_extra; } + /* + * Adjust bulk size based on the --bulk-mode parameter. + */ + if (c->options.ce.bulk_mode) + { + c->c2.frame.bulk_size = c->options.ce.tun_mtu; + } + /* * Fill in the blanks in the frame parameters structure, * make sure values are rational, etc. @@ -3685,9 +3702,43 @@ init_context_buffers(const struct frame *frame) size_t buf_size = BUF_SIZE(frame); + if (frame->bulk_size > 0) + { + size_t off_size = (frame->buf.headroom + TUN_BAT_OFF + frame->buf.tailroom); + buf_size = BAT_SIZE(TUN_BAT_MAX, frame->tun_mtu, off_size); + } + + dmsg(M_INFO, "MEM NEW [%ld] [%d+%d+%d]", buf_size, frame->buf.headroom, frame->buf.payload_size, frame->buf.tailroom); + b->read_link_buf = alloc_buf(buf_size); b->read_tun_buf = alloc_buf(buf_size); + if (frame->bulk_size > 0) + { + for (int x = 0; x < TUN_BAT_MAX; ++x) + { + size_t part_size = BUF_SIZE(frame); + b->read_tun_bufs[x] = alloc_buf(part_size); + b->read_tun_bufs[x].offset = TUN_BAT_OFF; + b->read_tun_bufs[x].len = 0; + } + + b->read_tun_max = alloc_buf(buf_size); + b->read_tun_max.offset = TUN_BAT_OFF; + b->read_tun_max.len = 0; + + b->send_tun_max = alloc_buf(buf_size); + b->send_tun_max.offset = TUN_BAT_OFF; + b->send_tun_max.len = 0; + + b->to_tun_max = alloc_buf(buf_size); + b->to_tun_max.offset = TUN_BAT_OFF; + b->to_tun_max.len = 0; + } + + b->bulk_indx = -1; + b->bulk_leng = -1; + b->aux_buf = alloc_buf(buf_size); b->encrypt_buf = alloc_buf(buf_size); @@ -3710,6 +3761,17 @@ free_context_buffers(struct context_buffers *b) free_buf(&b->read_tun_buf); free_buf(&b->aux_buf); + if (b->to_tun_max.data) + { + free_buf(&b->to_tun_max); + free_buf(&b->send_tun_max); + free_buf(&b->read_tun_max); + for (int x = 0; x < TUN_BAT_MAX; ++x) + { + free_buf(&b->read_tun_bufs[x]); + } + } + #ifdef USE_COMP free_buf(&b->compress_buf); free_buf(&b->decompress_buf); diff --git a/src/openvpn/mtu.c b/src/openvpn/mtu.c index 66f81a62980..3de18954f21 100644 --- a/src/openvpn/mtu.c +++ b/src/openvpn/mtu.c @@ -41,9 +41,15 @@ void alloc_buf_sock_tun(struct buffer *buf, const struct frame *frame) { /* allocate buffer for overlapped I/O */ - *buf = alloc_buf(BUF_SIZE(frame)); + int alen = BUF_SIZE(frame); + int blen = frame->buf.payload_size; + if (frame->bulk_size > 0) { + alen = BAT_SIZE(TUN_BAT_MAX, frame->tun_mtu, TUN_BAT_OFF); + blen = BAT_SIZE(TUN_BAT_MAX, frame->tun_mtu, TUN_BAT_NOP); + } + *buf = alloc_buf(alen); ASSERT(buf_init(buf, frame->buf.headroom)); - buf->len = frame->buf.payload_size; + buf->len = blen; ASSERT(buf_safe(buf, 0)); } diff --git a/src/openvpn/mtu.h b/src/openvpn/mtu.h index 3bbff6f6514..aa7aba5d104 100644 --- a/src/openvpn/mtu.h +++ b/src/openvpn/mtu.h @@ -58,6 +58,15 @@ */ #define TUN_MTU_MIN 100 +/* + * Bulk mode static define values. + */ +#define TUN_BAT_MIN 6 +#define TUN_BAT_MAX 9 +#define TUN_BAT_OFF 250 +#define TUN_BAT_NOP 0 +#define TUN_BAT_ONE 1 + /* * Default MTU of network over which tunnel data will pass by TCP/UDP. */ @@ -157,6 +166,11 @@ struct frame * which defaults to 0 for tun and 32 * (\c TAP_MTU_EXTRA_DEFAULT) for tap. * */ + + int bulk_size; /**< Set in the init frame function to + * signal to related functions to + * process bulk mode data transfers. + * */ }; /* Forward declarations, to prevent includes */ @@ -176,6 +190,7 @@ struct options; * larger than the headroom. */ #define BUF_SIZE(f) ((f)->buf.headroom + (f)->buf.payload_size + (f)->buf.tailroom) +#define BAT_SIZE(a, b, c) ((a * b) + c) /* * Function prototypes. diff --git a/src/openvpn/multi.c b/src/openvpn/multi.c index 2863ff173e4..b1e506312fa 100644 --- a/src/openvpn/multi.c +++ b/src/openvpn/multi.c @@ -435,6 +435,10 @@ multi_init(struct context *t) } m->deferred_shutdown_signal.signal_received = 0; + + m->inst_indx = -1; + m->inst_leng = -1; + m->inst_list = calloc(TUN_BAT_MAX, sizeof(struct multi_instance *)); } const char * @@ -721,6 +725,10 @@ multi_uninit(struct multi_context *m) multi_reap_free(m->reaper); mroute_helper_free(m->route_helper); multi_io_free(m->multi_io); + + m->inst_indx = -1; + m->inst_leng = -1; + free(m->inst_list); } } @@ -3144,7 +3152,7 @@ multi_process_float(struct multi_context *m, struct multi_instance *mi, struct l msg(D_MULTI_LOW, "Disallow float to an address taken by another client %s", multi_instance_string(ex_mi, false, &gc)); - mi->context.c2.buf.len = 0; + mi->context.c2.buf2.len = 0; goto done; } @@ -3368,7 +3376,7 @@ multi_process_incoming_link(struct multi_context *m, struct multi_instance *inst if (!instance) { #ifdef MULTI_DEBUG_EVENT_LOOP - printf("TCP/UDP -> TUN [%d]\n", BLEN(&m->top.c2.buf)); + printf("TCP/UDP -> TUN [%d]\n", BLEN(&m->top.c2.buf2)); #endif multi_set_pending(m, multi_get_create_instance_udp(m, &floated, sock)); } @@ -3387,7 +3395,7 @@ multi_process_incoming_link(struct multi_context *m, struct multi_instance *inst if (!instance) { /* transfer packet pointer from top-level context buffer to instance */ - c->c2.buf = m->top.c2.buf; + c->c2.buf2 = m->top.c2.buf2; /* transfer from-addr from top-level context buffer to instance */ if (!floated) @@ -3396,7 +3404,7 @@ multi_process_incoming_link(struct multi_context *m, struct multi_instance *inst } } - if (BLEN(&c->c2.buf) > 0) + if (BLEN(&c->c2.buf2) > 0) { struct link_socket_info *lsi; const uint8_t *orig_buf; @@ -3405,16 +3413,17 @@ multi_process_incoming_link(struct multi_context *m, struct multi_instance *inst perf_push(PERF_PROC_IN_LINK); lsi = &sock->info; - orig_buf = c->c2.buf.data; + orig_buf = c->c2.buf2.data; if (process_incoming_link_part1(c, lsi, floated)) { /* nonzero length means that we have a valid, decrypted packed */ - if (floated && c->c2.buf.len > 0) + if (floated && c->c2.buf2.len > 0) { multi_process_float(m, m->pending, sock); } process_incoming_link_part2(c, lsi, orig_buf); + process_incoming_link_part3(c); } perf_pop(); @@ -3543,12 +3552,19 @@ multi_process_incoming_link(struct multi_context *m, struct multi_instance *inst return ret; } +int min_max(int a, int b, int c) +{ + if (a > c) { return c; } + if (a < b) { return b; } + return a; +} + /* * Process packets in the TUN/TAP interface -> TCP/UDP socket direction, * i.e. server -> client direction. */ bool -multi_process_incoming_tun(struct multi_context *m, const unsigned int mpp_flags) +multi_process_incoming_tun_part2(struct multi_context *m, const unsigned int mpp_flags) { bool ret = true; @@ -3594,6 +3610,27 @@ multi_process_incoming_tun(struct multi_context *m, const unsigned int mpp_flags /* for now, treat multicast as broadcast */ multi_bcast(m, &m->top.c2.buf, NULL, vid); } + else if (m->inst_indx == -9) + { + struct multi_instance *inst = multi_get_instance_by_virtual_addr(m, &dest, dev_type == DEV_TYPE_TUN); + if (inst) + { + int leng = m->inst_leng; + for (int x = 0; x < leng; ++x) + { + if (m->inst_list[x] == inst) + { + m->inst_indx = x; + return true; + } + } + leng = min_max(leng, 0, TUN_BAT_MIN - 1); + m->inst_list[leng] = inst; + m->inst_indx = leng; + m->inst_leng = (leng + 1); + } + return true; + } else { multi_set_pending( @@ -3635,6 +3672,97 @@ multi_process_incoming_tun(struct multi_context *m, const unsigned int mpp_flags return ret; } +bool multi_process_post_part2(struct multi_context *m, const unsigned int mpp_flags) +{ + if (!INST_LENG(m)) + { + return false; + } + struct multi_instance *i = m->inst_list[m->inst_indx]; + if (!i) + { + m->inst_indx += 1; + return false; + } + if (!(multi_output_queue_ready(m, i))) + { + return false; + } + multi_set_pending(m, i); + set_prefix(m->pending); + multi_process_post(m, m->pending, mpp_flags); + clear_prefix(); + m->inst_list[m->inst_indx] = NULL; + m->inst_indx += 1; + return true; +} + +bool multi_process_incoming_tun_part3(struct multi_context *m, const unsigned int mpp_flags) +{ + struct context *c, *b = &(m->top); + struct multi_instance *i; + int leng = b->c2.buffers->bulk_leng; + m->inst_indx = -1; + m->inst_leng = -1; + for (int x = 0; x < leng; ++x) + { + m->inst_indx = -9; + m->top.c2.buf = b->c2.bufs[x]; + multi_process_incoming_tun_part2(m, mpp_flags); + if (m->inst_indx > -1) + { + i = m->inst_list[m->inst_indx]; + c = &(i->context); + int y = min_max(c->c2.buffers->bulk_leng, 0, TUN_BAT_MIN - 1); + c->c2.buffers->read_tun_bufs[y].offset = TUN_BAT_OFF; + c->c2.buffers->read_tun_bufs[y].len = BLEN(&b->c2.bufs[x]); + bcopy(BPTR(&b->c2.bufs[x]), BPTR(&c->c2.buffers->read_tun_bufs[y]), BLEN(&b->c2.bufs[x])); + c->c2.bufs[y] = c->c2.buffers->read_tun_bufs[y]; + c->c2.buffers->bulk_indx = 0; + c->c2.buffers->bulk_leng = (y + 1); + } + } + for (int x = 0; x < m->inst_leng; ++x) + { + i = m->inst_list[x]; + c = &(i->context); + c->c2.buf = c->c2.bufs[0]; + process_incoming_tun(c, c->c2.link_sockets[0]); + } + b->c2.buffers->bulk_indx = -1; + b->c2.buffers->bulk_leng = -1; + m->inst_indx = 0; + return multi_process_post_part2(m, mpp_flags); +} + +bool multi_process_incoming_tun(struct multi_context *m, const unsigned int mpp_flags) +{ + if (!(m->top.options.ce.bulk_mode)) { + return multi_process_incoming_tun_part2(m, mpp_flags); + } else { + return multi_process_incoming_tun_part3(m, mpp_flags); + } +} + +bool multi_in_tun(struct multi_context *m, const unsigned int mpp_flags) +{ + if (INST_LENG(m)) + { + multi_process_post_part2(m, mpp_flags); + } + else + { + struct context *c = &(m->top); + read_incoming_tun(c); + if (!IS_SIG(c)) + { + multi_process_incoming_tun(m, mpp_flags); + } + return true; + } + return false; +} + /* * Process a possible client-to-client/bcast/mcast message in the * queue. diff --git a/src/openvpn/multi.h b/src/openvpn/multi.h index b2b892bbb5d..57fedca7cff 100644 --- a/src/openvpn/multi.h +++ b/src/openvpn/multi.h @@ -218,6 +218,10 @@ struct multi_context #endif struct deferred_signal_schedule_entry deferred_shutdown_signal; + + int inst_indx; + int inst_leng; + struct multi_instance **inst_list; }; /** @@ -359,6 +363,9 @@ bool multi_process_incoming_link(struct multi_context *m, struct multi_instance */ bool multi_process_incoming_tun(struct multi_context *m, const unsigned int mpp_flags); +bool multi_process_post_part2(struct multi_context *m, const unsigned int mpp_flags); + +bool multi_in_tun(struct multi_context *m, const unsigned int mpp_flags); void multi_process_drop_outgoing_tun(struct multi_context *m, const unsigned int mpp_flags); diff --git a/src/openvpn/multi_io.c b/src/openvpn/multi_io.c index 0bfbb63f889..f4fe7a8f441 100644 --- a/src/openvpn/multi_io.c +++ b/src/openvpn/multi_io.c @@ -294,12 +294,9 @@ multi_io_dispatch(struct multi_context *m, struct multi_instance *mi, const int switch (action) { + case TA_INST_LENG: case TA_TUN_READ: - read_incoming_tun(&m->top); - if (!IS_SIG(&m->top)) - { - multi_process_incoming_tun(m, mpp_flags); - } + multi_in_tun(m, mpp_flags); break; case TA_SOCKET_READ: @@ -368,6 +365,7 @@ multi_io_post(struct multi_context *m, struct multi_instance *mi, const int acti #define MTP_NONE 0 #define MTP_TUN_OUT (1 << 0) #define MTP_LINK_OUT (1 << 1) +#define MTP_MULTI_LEN (1 << 2) unsigned int flags = MTP_NONE; if (TUN_OUT(c)) @@ -378,9 +376,17 @@ multi_io_post(struct multi_context *m, struct multi_instance *mi, const int acti { flags |= MTP_LINK_OUT; } + if (INST_LENG(m)) + { + flags = MTP_MULTI_LEN; + } switch (flags) { + case MTP_MULTI_LEN: + newaction = TA_INST_LENG; + break; + case MTP_TUN_OUT | MTP_LINK_OUT: case MTP_TUN_OUT: newaction = TA_TUN_WRITE; @@ -572,7 +578,7 @@ multi_io_action(struct multi_context *m, struct multi_instance *mi, int action, * On our first pass, poll will be false because we already know * that input is available, and to call io_wait would be redundant. */ - if (poll && action != TA_SOCKET_READ_RESIDUAL) + if (poll && action != TA_SOCKET_READ_RESIDUAL && action != TA_INST_LENG) { const int orig_action = action; action = multi_io_wait_lite(m, mi, action, &tun_input_pending); diff --git a/src/openvpn/multi_io.h b/src/openvpn/multi_io.h index 4a3c60d236d..daf43b0e0f6 100644 --- a/src/openvpn/multi_io.h +++ b/src/openvpn/multi_io.h @@ -44,6 +44,7 @@ #define TA_INITIAL 8 #define TA_TIMEOUT 9 #define TA_TUN_WRITE_TIMEOUT 10 +#define TA_INST_LENG 11 /* * I/O state and events tracker diff --git a/src/openvpn/openvpn.h b/src/openvpn/openvpn.h index cd99cd409e0..b70194708db 100644 --- a/src/openvpn/openvpn.h +++ b/src/openvpn/openvpn.h @@ -112,6 +112,14 @@ struct context_buffers */ struct buffer read_link_buf; struct buffer read_tun_buf; + + struct buffer read_tun_bufs[TUN_BAT_MAX]; + struct buffer read_tun_max; + struct buffer send_tun_max; + struct buffer to_tun_max; + + int bulk_indx; + int bulk_leng; }; /* @@ -373,9 +381,12 @@ struct context_2 * struct context_buffers. */ struct buffer buf; + struct buffer buf2; struct buffer to_tun; struct buffer to_link; + struct buffer bufs[TUN_BAT_MAX]; + /* should we print R|W|r|w to console on packet transfers? */ bool log_rw; diff --git a/src/openvpn/options.c b/src/openvpn/options.c index f35738d8049..ceccf03e0de 100644 --- a/src/openvpn/options.c +++ b/src/openvpn/options.c @@ -306,6 +306,7 @@ static const char usage_message[] = " 'maybe' -- Use per-route hints\n" " 'yes' -- Always DF (Don't Fragment)\n" "--mtu-test : Empirically measure and report MTU.\n" + "--bulk-mode : Use bulk TUN/TCP reads/writes.\n" #ifdef ENABLE_FRAGMENT "--fragment max : Enable internal datagram fragmentation so that no UDP\n" " datagrams are sent which are larger than max bytes.\n" @@ -3291,6 +3292,11 @@ options_postprocess_mutate_invariant(struct options *options) options->pkcs11_providers[0] = DEFAULT_PKCS11_MODULE; } #endif + + if ((options->ce.proto != PROTO_TCP) && (options->ce.proto != PROTO_TCP_SERVER) && (options->ce.proto != PROTO_TCP_CLIENT)) + { + options->ce.bulk_mode = false; + } } static void @@ -9901,6 +9907,10 @@ add_option(struct options *options, char *p[], bool is_inline, const char *file, goto err; } } + else if (streq(p[0], "bulk-mode")) + { + options->ce.bulk_mode = true; + } else { int i; diff --git a/src/openvpn/options.h b/src/openvpn/options.h index b03306812a0..82cdb606ea5 100644 --- a/src/openvpn/options.h +++ b/src/openvpn/options.h @@ -174,6 +174,9 @@ struct connection_entry /* Allow only client that support resending the wrapped client key */ bool tls_crypt_v2_force_cookie; + + /* Bulk mode allows for multiple tun reads + larger tcp writes */ + bool bulk_mode; }; struct remote_entry From 9ba447b784b330a7f9557235ae1c9e0f8527ed32 Mon Sep 17 00:00:00 2001 From: Jon Chiappetta Date: Tue, 12 Aug 2025 14:44:05 -0400 Subject: [PATCH 2/2] mtio mode --- src/openvpn/forward.c | 26 ++- src/openvpn/forward.h | 2 +- src/openvpn/init.c | 5 + src/openvpn/mtcp.c | 7 +- src/openvpn/mtcp.h | 2 +- src/openvpn/mudp.c | 5 +- src/openvpn/multi.c | 385 ++++++++++++++++++++++++++++++++++++++--- src/openvpn/multi.h | 16 +- src/openvpn/multi_io.c | 11 +- src/openvpn/multi_io.h | 2 +- src/openvpn/openvpn.c | 268 +++++++++++++++++++++++++++- src/openvpn/openvpn.h | 55 ++++++ src/openvpn/options.c | 17 ++ src/openvpn/options.h | 4 + src/openvpn/socket.c | 7 + src/openvpn/socket.h | 2 + src/openvpn/tun.c | 23 ++- src/openvpn/tun.h | 3 + 18 files changed, 778 insertions(+), 62 deletions(-) diff --git a/src/openvpn/forward.c b/src/openvpn/forward.c index 6cd52b14479..2291e0f84b1 100644 --- a/src/openvpn/forward.c +++ b/src/openvpn/forward.c @@ -47,7 +47,9 @@ #include "mstats.h" #include +#include #include +#include counter_type link_read_bytes_global; /* GLOBAL */ counter_type link_write_bytes_global; /* GLOBAL */ @@ -2501,8 +2503,24 @@ io_wait_dowork(struct context *c, const unsigned int flags) dmsg(D_EVENT_WAIT, "I/O WAIT status=0x%04x", c->c2.event_set_status); } +void threaded_fwd_in_tun(struct context *c, struct link_socket *sock, struct thread_pointer *b) +{ + if (b->p->h == b->p->n) + { + ssize_t size; + uint8_t temp[1]; + size = read(c->c1.tuntap->fd, temp, 1); + if (size < 1) { /* no-op */ } + if (!IS_SIG(c)) + { + process_incoming_tun(c, sock); + } + size = write(c->c1.tuntap->fz, temp, 1); + } +} + void -process_io(struct context *c, struct link_socket *sock) +process_io(struct context *c, struct link_socket *sock, struct thread_pointer *b) { const unsigned int status = c->c2.event_set_status; @@ -2536,11 +2554,7 @@ process_io(struct context *c, struct link_socket *sock) /* Incoming data on TUN device */ else if (status & TUN_READ) { - read_incoming_tun(c); - if (!IS_SIG(c)) - { - process_incoming_tun(c, sock); - } + threaded_fwd_in_tun(c, sock, b); } else if (status & DCO_READ) { diff --git a/src/openvpn/forward.h b/src/openvpn/forward.h index fc1e513fe52..2d62e09357c 100644 --- a/src/openvpn/forward.h +++ b/src/openvpn/forward.h @@ -81,7 +81,7 @@ void io_wait_dowork(struct context *c, const unsigned int flags); void pre_select(struct context *c); -void process_io(struct context *c, struct link_socket *sock); +void process_io(struct context *c, struct link_socket *sock, struct thread_pointer *b); /**********************************************************************/ diff --git a/src/openvpn/init.c b/src/openvpn/init.c index a5a681a1fd7..75cf9725f31 100644 --- a/src/openvpn/init.c +++ b/src/openvpn/init.c @@ -1920,6 +1920,7 @@ do_open_tun(struct context *c, int *error_flags) } /* do ifconfig */ + c->c1.tuntap->skip_bind = c->skip_bind; if (!ifconfig_noexec_enabled(c) && ifconfig_order(c->c1.tuntap) == IFCONFIG_BEFORE_TUN_OPEN) { /* guess actual tun/tap unit number that will be returned @@ -2013,6 +2014,10 @@ do_open_tun(struct context *c, int *error_flags) add_wfp_block(c); } + if (c->c1.tuntap) + { + c->c1.tuntap->fe = c->c1.tuntap->fd; + } gc_free(&gc); return ret; } diff --git a/src/openvpn/mtcp.c b/src/openvpn/mtcp.c index 83edec6e895..cbddcabe176 100644 --- a/src/openvpn/mtcp.c +++ b/src/openvpn/mtcp.c @@ -51,15 +51,18 @@ struct ta_iow_flags #endif struct multi_instance * -multi_create_instance_tcp(struct multi_context *m, struct link_socket *sock) +multi_create_instance_tcp(struct thread_pointer *a, struct link_socket *sock) { struct gc_arena gc = gc_new(); + struct multi_context *m = a->p->m[a->i-1]; struct multi_instance *mi = NULL; struct hash *hash = m->hash; - mi = multi_create_instance(m, NULL, sock); + mi = multi_create_instance(a, NULL, sock); if (mi) { + m = a->p->p; + hash = m->hash; mi->real.proto = sock->info.proto; struct hash_element *he; const uint32_t hv = hash_value(hash, &mi->real); diff --git a/src/openvpn/mtcp.h b/src/openvpn/mtcp.h index f4b806ccf6c..d4f88101eb7 100644 --- a/src/openvpn/mtcp.h +++ b/src/openvpn/mtcp.h @@ -45,7 +45,7 @@ bool multi_tcp_process_outgoing_link(struct multi_context *m, bool defer, bool multi_tcp_process_outgoing_link_ready(struct multi_context *m, struct multi_instance *mi, const unsigned int mpp_flags); -struct multi_instance *multi_create_instance_tcp(struct multi_context *m, struct link_socket *sock); +struct multi_instance *multi_create_instance_tcp(struct thread_pointer *a, struct link_socket *sock); void multi_tcp_link_out_deferred(struct multi_context *m, struct multi_instance *mi); diff --git a/src/openvpn/mudp.c b/src/openvpn/mudp.c index a373a6a04c9..b8edc5ddba7 100644 --- a/src/openvpn/mudp.c +++ b/src/openvpn/mudp.c @@ -198,6 +198,8 @@ multi_get_create_instance_udp(struct multi_context *m, bool *floated, struct lin struct mroute_addr real = { 0 }; struct multi_instance *mi = NULL; struct hash *hash = m->hash; + struct context_pointer p = { 0 }; + struct thread_pointer a = { 0 }; real.proto = sock->info.proto; m->hmac_reply_ls = sock; @@ -271,7 +273,8 @@ multi_get_create_instance_udp(struct multi_context *m, bool *floated, struct lin * connect-freq but not against connect-freq-initial */ reflect_filter_rate_limit_decrease(m->initial_rate_limiter); - mi = multi_create_instance(m, &real, sock); + p.p = m; a.p = &p; a.i = -1; + mi = multi_create_instance(&a, &real, sock); if (mi) { hash_add_fast(hash, bucket, &mi->real, hv, mi); diff --git a/src/openvpn/multi.c b/src/openvpn/multi.c index b1e506312fa..dfc19391625 100644 --- a/src/openvpn/multi.c +++ b/src/openvpn/multi.c @@ -639,7 +639,7 @@ multi_close_instance(struct multi_context *m, struct multi_instance *mi, bool sh schedule_remove_entry(m->schedule, (struct schedule_entry *)mi); - ifconfig_pool_release(m->ifconfig_pool, mi->vaddr_handle, false); + ifconfig_pool_release(m->mtio_info.pool, mi->vaddr_handle, false); if (mi->did_iroutes) { @@ -732,15 +732,78 @@ multi_uninit(struct multi_context *m) } } +struct multi_context *multi_context_switch_conn(struct thread_pointer *a, struct multi_context *m, struct multi_instance *i) +{ + if (a->i <= 0) + { + a->p->p = m; + return m; + } + + struct sockaddr_in *wans_ipvo = (struct sockaddr_in *)&i->context.c2.link_sockets[0]->info.lsa->actual.dest.addr.sa; + const char *uniq = inet_ntoa(wans_ipvo->sin_addr); + + int indx = -1, fidx = 0; + time_t secs = time(NULL); + struct multi_address *addr; + + time_t last = secs; + for (int x = 0; x < a->p->x; ++x) + { + addr = &(a->p->a[x]); + if (addr->last < last) + { + fidx = x; + last = addr->last; + } + if (strcmp(addr->wans, uniq) == 0) + { + indx = x; + break; + } + } + + pthread_mutex_lock(m->mtio_info.lock); + if (indx < 0) + { + if (fidx > -1) { indx = fidx; } + else { indx = 0; } + addr = &(a->p->a[indx]); + addr->indx = 0; + addr->last = 0; + bzero(addr->wans, MAX_STRLENG * sizeof(char)); + strncpy(addr->wans, uniq, MAX_STRLENG-8); + for (int x = 0; x < a->p->x; ++x) + { + if (addr->link[x].indx == (indx + 1)) + { + bzero(&(addr->link[x]), sizeof(struct multi_link)); + } + } + } + pthread_mutex_unlock(m->mtio_info.lock); + + addr = &(a->p->a[indx]); + fidx = addr->indx; + m = a->p->m[addr->indx]; + a->p->p = a->p->m[fidx]; + addr->indx = ((fidx + 1) % a->p->n); + addr->last = secs; + + msg(M_INFO, "TCPv4_SERVER MTIO conn [%s][%p] [%d][%d] {%d}{%d}", uniq, m, indx, fidx, i->mtio_idno, m->mtio_idno); + + return m; +} + /* * Create a client instance object for a newly connected client. */ struct multi_instance * -multi_create_instance(struct multi_context *m, const struct mroute_addr *real, - struct link_socket *sock) +multi_create_instance(struct thread_pointer *a, const struct mroute_addr *real, struct link_socket *sock) { struct gc_arena gc = gc_new(); struct multi_instance *mi; + struct multi_context *m = (a->i > 0) ? a->p->m[a->i-1] : a->p->p; perf_push(PERF_MULTI_CREATE_INSTANCE); @@ -765,6 +828,7 @@ multi_create_instance(struct multi_context *m, const struct mroute_addr *real, { goto err; } + m = multi_context_switch_conn(a, m, mi); mi->context.c2.tls_multi->multi_state = CAS_NOT_CONNECTED; @@ -815,6 +879,9 @@ multi_create_instance(struct multi_context *m, const struct mroute_addr *real, mi->ev_arg.type = EVENT_ARG_MULTI_INSTANCE; mi->ev_arg.u.mi = mi; + mi->mtio_idno = m->mtio_idno; + bzero(&(mi->mtio_addr), sizeof(struct multi_address)); + perf_pop(); gc_free(&gc); return mi; @@ -1067,6 +1134,8 @@ multi_learn_addr(struct multi_context *m, struct multi_instance *mi, const struc struct multi_instance *owner = NULL; struct gc_arena gc = gc_new(); + msg(M_INFO, "TCPv4_SERVER MTIO addr [%s][%s] [%s] {%d}{%d}", print_in_addr_t(mi->context.c2.push_ifconfig_local, IA_EMPTY_IF_UNDEF, &gc), mroute_addr_print(addr, &gc), mi->mtio_addr.wans, m->mtio_idno, mi->mtio_idno); + /* if route currently exists, get the instance which owns it */ he = hash_lookup_fast(m->vhash, bucket, addr, hv); if (he) @@ -1451,7 +1520,7 @@ multi_select_virtual_addr(struct multi_context *m, struct multi_instance *mi) * release dynamic allocation */ if (mi->vaddr_handle >= 0) { - ifconfig_pool_release(m->ifconfig_pool, mi->vaddr_handle, true); + ifconfig_pool_release(m->mtio_info.pool, mi->vaddr_handle, true); mi->vaddr_handle = -1; } @@ -1472,20 +1541,21 @@ multi_select_virtual_addr(struct multi_context *m, struct multi_instance *mi) "MULTI_sva: WARNING: if --ifconfig-push is used for IPv4, automatic IPv6 assignment from --ifconfig-ipv6-pool does not work. Use --ifconfig-ipv6-push for IPv6 then."); } } - else if (m->ifconfig_pool && mi->vaddr_handle < 0) /* otherwise, choose a pool address */ + else if (m->mtio_info.pool && mi->vaddr_handle < 0) /* otherwise, choose a pool address */ { in_addr_t local = 0, remote = 0; struct in6_addr remote_ipv6; const char *cn = NULL; - if (!mi->context.options.duplicate_cn) + if ((!mi->context.options.duplicate_cn) && (!(m->top.options.ce.mtio_mode))) { cn = tls_common_name(mi->context.c2.tls_multi, true); } CLEAR(remote_ipv6); - mi->vaddr_handle = - ifconfig_pool_acquire(m->ifconfig_pool, &local, &remote, &remote_ipv6, cn); + pthread_mutex_lock(m->mtio_info.lock); + mi->vaddr_handle = ifconfig_pool_acquire(m->mtio_info.pool, &local, &remote, &remote_ipv6, cn); + pthread_mutex_unlock(m->mtio_info.lock); if (mi->vaddr_handle >= 0) { const int tunnel_type = TUNNEL_TYPE(mi->context.c1.tuntap); @@ -2343,6 +2413,110 @@ multi_client_generate_tls_keys(struct context *c) return true; } +bool multi_context_switch_addr(struct multi_context *m, struct multi_instance *i) +{ + struct gc_arena g = gc_new(); + in_addr_t locl_addr = i->context.c2.push_ifconfig_local; + time_t secs = time(NULL); + int maxt = m->mtio_info.maxt; + int xidx = 0, leng = 0; + int idxs[MAX_THREADS]; + struct multi_address *addr; + struct multi_link *link; + + const char *lans_addr = print_in_addr_t(locl_addr, IA_EMPTY_IF_UNDEF, &g); + if (lans_addr) { + bzero(i->mtio_addr.lans, MAX_STRLENG * sizeof(char)); + strncpy(i->mtio_addr.lans, lans_addr, MAX_STRLENG-8); + } + + struct sockaddr_in *wans_ipvo = (struct sockaddr_in *)&i->context.c2.link_sockets[0]->info.lsa->actual.dest.addr.sa; + const char *wans_addr = inet_ntoa(wans_ipvo->sin_addr); + if (wans_addr) { + bzero(i->mtio_addr.wans, MAX_STRLENG * sizeof(char)); + strncpy(i->mtio_addr.wans, wans_addr, MAX_STRLENG-8); + } + + const char *dest_user = tls_username(i->context.c2.tls_multi, false); + if (dest_user) { + bzero(i->mtio_addr.usrs, MAX_STRLENG * sizeof(char)); + strncpy(i->mtio_addr.usrs, dest_user, MAX_STRLENG-8); + } + + bzero(idxs, MAX_THREADS * sizeof(int)); + pthread_mutex_lock(m->mtio_info.lock); + if ((strcmp(lans_addr, "") != 0) && (strcmp(dest_user, "") != 0)) + { + for (int x = 0; x < m->mtio_info.maxc; ++x) + { + addr = &(m->mtio_info.addr[x]); + if (strcmp(addr->wans, i->mtio_addr.wans) == 0) + { + int lidx = 1, fidx = 0, eidx = 0, clen = 0; + time_t last = secs; + for (int y = 0; y < m->mtio_info.maxc; ++y) + { + link = &(addr->link[y]); + if (strcmp(link->usrs, dest_user) == 0) + { + link->last = secs; + if (link->ladr == locl_addr) { eidx = (y + 1); } + if (leng < MAX_THREADS) { idxs[leng] = (y + 1); leng += 1; } + if (clen >= maxt) { bzero(link, sizeof(struct multi_link)); } + if (link->last < last) { last = link->last; lidx = (y + 1); } + clen += 1; + } + else if (strcmp(link->usrs, "") == 0) + { + if (fidx == 0) { fidx = (y + 1); } + } + } + if (eidx != 0) { fidx = 0; } + else + { + if (leng >= maxt) { fidx = idxs[i->mtio_idno]; } + if (fidx == 0) { fidx = lidx; } + } + if (fidx > 0) + { + link = &(addr->link[fidx-1]); + bzero(link->usrs, MAX_STRLENG * sizeof(char)); + strncpy(link->usrs, dest_user, MAX_STRLENG-8); + link->ladr = locl_addr; + link->indx = (x + 1); + link->last = secs; + addr->last = secs; + clen += 1; + } + if (clen >= maxt) + { + xidx = (x + 1); + } + } + } + } + pthread_mutex_unlock(m->mtio_info.lock); + + if ((xidx > 0) && (leng > 0)) + { + addr = &(m->mtio_info.addr[xidx-1]); + for (int z = 0; z < leng; ++z) + { + int y = idxs[z]; + if (y > 0) + { + link = &(addr->link[y-1]); + link->last = secs; + multi_learn_in_addr_t(m, i, link->ladr, -1, true); + } + } + i->mtio_addr.stat += 1; + } + + gc_free(&g); + return true; +} + static void multi_client_connect_late_setup(struct multi_context *m, struct multi_instance *mi, const unsigned int option_types_found) @@ -2390,6 +2564,8 @@ multi_client_connect_late_setup(struct multi_context *m, struct multi_instance * mi->reporting_addr = mi->context.c2.push_ifconfig_local; mi->reporting_addr_ipv6 = mi->context.c2.push_ifconfig_ipv6_local; + multi_context_switch_addr(m, mi); + /* set context-level authentication flag */ mi->context.c2.tls_multi->multi_state = CAS_CONNECT_DONE; @@ -3351,6 +3527,23 @@ multi_process_incoming_dco(struct multi_context *m) } #endif /* if defined(ENABLE_DCO) */ +struct multi_instance *multi_learn_peer_addr(struct multi_context *m, struct mroute_addr *s) +{ + struct multi_instance *i = m->pending, *r = NULL; + struct multi_link *l; + in_addr_t a = ntohl(s->v4.addr); + for (int x = 0; x < m->mtio_info.maxc; ++x) + { + l = &(m->mtio_info.addr[0].link[x]); + if ((strcmp(l->usrs, "") != 0) && (l->ladr == a)) + { + multi_context_switch_addr(m, i); + r = i; + } + } + return r; +} + /* * Process packets in the TCP/UDP socket -> TUN/TAP interface direction, * i.e. client -> server direction. @@ -3439,7 +3632,7 @@ multi_process_incoming_link(struct multi_context *m, struct multi_instance *inst c->c2.to_tun.len = 0; } /* make sure that source address is associated with this client */ - else if (multi_get_instance_by_virtual_addr(m, &src, true) != m->pending) + else if ((multi_get_instance_by_virtual_addr(m, &src, true) != m->pending) && (multi_learn_peer_addr(m, &src) != m->pending)) { /* IPv6 link-local address (fe80::xxx)? */ if ((src.type & MR_ADDR_MASK) == MR_ADDR_IPV6 @@ -3449,7 +3642,7 @@ multi_process_incoming_link(struct multi_context *m, struct multi_instance *inst } else { - msg(D_MULTI_DROPPED, + msg(D_MULTI_ERRORS, "MULTI: bad source address from client [%s], packet dropped", mroute_addr_print(&src, &gc)); } @@ -3529,7 +3722,7 @@ multi_process_incoming_link(struct multi_context *m, struct multi_instance *inst } else { - msg(D_MULTI_DROPPED, + msg(D_MULTI_ERRORS, "MULTI: bad source address from client [%s], packet dropped", mroute_addr_print(&src, &gc)); c->c2.to_tun.len = 0; @@ -3744,7 +3937,7 @@ bool multi_process_incoming_tun(struct multi_context *m, const unsigned int mpp_ } } -bool multi_in_tun(struct multi_context *m, const unsigned int mpp_flags) +bool threaded_multi_in_tun(struct multi_context *m, const unsigned int mpp_flags) { if (INST_LENG(m)) { @@ -3753,12 +3946,19 @@ bool multi_in_tun(struct multi_context *m, const unsigned int mpp_flags) else { struct context *c = &(m->top); - read_incoming_tun(c); - if (!IS_SIG(c)) + if (*(m->mtio_info.hold) == m->mtio_info.maxt) { - multi_process_incoming_tun(m, mpp_flags); + int size; + uint8_t temp[1]; + size = read(c->c1.tuntap->fd, temp, 1); + if (size < 1) { /* no-op */ } + if (!IS_SIG(c)) + { + multi_process_incoming_tun(m, mpp_flags); + } + size = write(c->c1.tuntap->fz, temp, 1); + return true; } - return true; } return false; } @@ -4305,13 +4505,72 @@ multi_assign_peer_id(struct multi_context *m, struct multi_instance *mi) * * @param multi context structure */ -static void -tunnel_server_loop(struct multi_context *multi) +static void tunnel_server_loop(struct thread_pointer *b) { int status; + struct context_pointer *p = b->p; + + status = 0; + while (status == 0) + { + status = 1; + for (int x = 0; x < p->n; ++x) + { + if (p->m[x] == NULL) + { + status = 0; + } + } + sleep(1); + } + + struct multi_context *multi = p->m[b->i-1]; + struct context *c = &(p->m[b->i-1]->top); + struct context *d = &(p->m[0]->top); + + multi->mtio_idno = b->i; + multi->mtio_info.maxt = b->n; + multi->mtio_info.maxc = p->x; + multi->mtio_info.addr = p->a; + multi->mtio_info.lock = p->l; + multi->mtio_info.indx = &(p->i); + multi->mtio_info.hold = &(p->h); + multi->mtio_info.pool = p->m[0]->ifconfig_pool; + + if (b->i == 1) + { + while (p->h < p->n) + { + if (p->z == -1) { break; } else { sleep(1); } + } + p->z = 1; + } + else + { + b->h += 1; p->h += 1; + while ((p->z != 1) || (!(d->c1.tuntap)) || (d->c1.tuntap->ff <= 1)) + { + if (p->z == -1) { break; } else { sleep(1); } + } + } + + msg(M_INFO, "TCPv4_SERVER MTIO init [%d][%d] [%d][%d] {%d}{%d}", b->h, b->n, p->h, p->n, p->z, b->i); while (true) { + if (p->z != 1) { break; } + if (c->c1.tuntap && (c->c1.tuntap->fd > 1) && (c->c1.tuntap->ff <= 1)) + { + socketpair(AF_UNIX, SOCK_DGRAM, 0, p->s[b->i-1]); + socketpair(AF_UNIX, SOCK_DGRAM, 0, p->r[b->i-1]); + c->c1.tuntap->ff = c->c1.tuntap->fd; + c->c1.tuntap->fe = (b->i == 1) ? c->c1.tuntap->ff : d->c1.tuntap->ff; + //c->c1.tuntap->fd = (b->i == 1) ? c->c1.tuntap->ff : d->c1.tuntap->ff; + c->c1.tuntap->fd = p->s[b->i-1][0]; + c->c1.tuntap->fz = p->r[b->i-1][1]; + msg(M_INFO, "TCPv4_SERVER MTIO fdno [%d][%d][%d][%d] {%d}", c->c1.tuntap->fd, c->c1.tuntap->fe, c->c1.tuntap->ff, c->c1.tuntap->fz, b->i); + } + perf_push(PERF_EVENT_LOOP); /* wait on tun/socket list */ @@ -4326,7 +4585,7 @@ tunnel_server_loop(struct multi_context *multi) if (status > 0) { /* process the I/O which triggered select */ - multi_io_process_io(multi); + multi_io_process_io(b); } else if (status == 0) { @@ -4336,18 +4595,36 @@ tunnel_server_loop(struct multi_context *multi) MULTI_CHECK_SIG(multi); perf_pop(); } + + msg(M_INFO, "TCPv4_SERVER MTIO fins [%d][%d] [%d][%d] {%d}{%d}", b->h, b->n, p->h, p->n, p->z, b->i); + + p->z = -1; + + if (c->c1.tuntap && (c->c1.tuntap->ff > 1)) + { + close(p->s[b->i-1][0]); + close(p->s[b->i-1][1]); + close(p->r[b->i-1][0]); + close(p->r[b->i-1][1]); + c->c1.tuntap->fd = c->c1.tuntap->ff; + c->c1.tuntap->ff = -1; + } } /* * Top level event loop. */ -void -tunnel_server(struct context *top) +void *tunnel_server(void *args) { - ASSERT(top->options.mode == MODE_SERVER); - + struct thread_pointer *arg = (struct thread_pointer *)args; + struct context_pointer *ptr = arg->p; + struct context *top = (arg->i == 1) ? ptr->c : arg->c; struct multi_context multi; + if (arg->i == 1) { sleep(1); } + + ASSERT(top->options.mode == MODE_SERVER); + top->mode = CM_TOP; top->multi = &multi; context_clear_2(top); @@ -4356,7 +4633,7 @@ tunnel_server(struct context *top) init_instance_handle_signals(top, top->es, CC_HARD_USR1_TO_HUP); if (IS_SIG(top)) { - return; + return NULL; } /* initialize global multi_context object */ @@ -4379,7 +4656,10 @@ tunnel_server(struct context *top) } #endif - tunnel_server_loop(&multi); + bzero(&(multi.mtio_info), sizeof(struct multi_info)); + ptr->m[arg->i-1] = &multi; + + tunnel_server_loop(arg); #ifdef ENABLE_ASYNC_PUSH close(top->c2.inotify_fd); @@ -4395,6 +4675,61 @@ tunnel_server(struct context *top) multi_uninit(&multi); multi_top_free(&multi); close_instance(top); + + return NULL; +} + +void threaded_tunnel_server(struct context *c, struct context *d) +{ + int maxt = (c->options.ce.mtio_mode) ? MAX_THREADS : 1; + int maxc = c->options.max_clients; + struct thread_pointer a[MAX_THREADS]; + struct context_pointer p; + struct multi_address b[maxc]; + struct multi_link l[maxc]; + pthread_mutex_t lock; + pthread_t thrm, thrd[MAX_THREADS]; + + bzero(&p, sizeof(struct context_pointer)); + p.i = 1; p.h = 1; p.n = maxt; p.x = maxc; p.z = 0; + p.c = c; p.a = b; p.l = &(lock); p.p = NULL; + p.m = calloc(p.n, sizeof(struct multi_context *)); + bzero(p.a, maxc * sizeof(struct multi_address)); + bzero(p.l, sizeof(pthread_mutex_t)); + pthread_mutex_init(p.l, NULL); + + bzero(l, maxc * sizeof(struct multi_link)); + for (int x = 0; x < maxc; ++x) + { + b[x].link = l; + } + + c->skip_bind = 0; + a[0].p = &(p); a[0].c = c; a[0].i = 1; a[0].n = p.n; a[0].h = 0; + bzero(&(thrd[0]), sizeof(pthread_t)); + pthread_create(&(thrd[0]), NULL, tunnel_server, &(a[0])); + + bzero(&(thrm), sizeof(pthread_t)); + pthread_create(&(thrm), NULL, threaded_io_management, &(a[0])); + + for (int x = 1; x < p.n; ++x) + { + d[x].skip_bind = -1; + a[x].p = &(p); a[x].c = &(d[x]); a[x].i = (x + 1); a[x].n = p.n; a[x].h = 1; + bzero(&(thrd[x]), sizeof(pthread_t)); + pthread_create(&(thrd[x]), NULL, tunnel_server, &(a[x])); + } + + pthread_join(thrd[0], NULL); + + for (int x = 1; x < p.n; ++x) + { + pthread_join(thrd[x], NULL); + } + + pthread_join(thrm, NULL); + + free(p.m); } /** diff --git a/src/openvpn/multi.h b/src/openvpn/multi.h index 57fedca7cff..1058588edf2 100644 --- a/src/openvpn/multi.h +++ b/src/openvpn/multi.h @@ -147,6 +147,9 @@ struct multi_instance #ifdef ENABLE_ASYNC_PUSH int inotify_watch; /* watch descriptor for acf */ #endif + + int mtio_idno; + struct multi_address mtio_addr; }; @@ -222,8 +225,12 @@ struct multi_context int inst_indx; int inst_leng; struct multi_instance **inst_list; + + int mtio_idno; + struct multi_info mtio_info; }; + /** * Return values used by the client connect call-back functions. */ @@ -259,8 +266,9 @@ struct multi_route * * @param top - Top-level context structure. */ -void tunnel_server(struct context *top); +void threaded_tunnel_server(struct context *c, struct context *d); +bool multi_context_switch_addr(struct multi_context *m, struct multi_instance *i); const char *multi_instance_string(const struct multi_instance *mi, bool null, struct gc_arena *gc); @@ -268,9 +276,7 @@ const char *multi_instance_string(const struct multi_instance *mi, bool null, st * Called by mtcp.c, mudp.c, or other (to be written) protocol drivers */ -struct multi_instance *multi_create_instance(struct multi_context *m, - const struct mroute_addr *real, - struct link_socket *sock); +struct multi_instance *multi_create_instance(struct thread_pointer *a, const struct mroute_addr *real, struct link_socket *sock); void multi_close_instance(struct multi_context *m, struct multi_instance *mi, bool shutdown); @@ -365,7 +371,7 @@ bool multi_process_incoming_tun(struct multi_context *m, const unsigned int mpp_ bool multi_process_post_part2(struct multi_context *m, const unsigned int mpp_flags); -bool multi_in_tun(struct multi_context *m, const unsigned int mpp_flags); +bool threaded_multi_in_tun(struct multi_context *m, const unsigned int mpp_flags); void multi_process_drop_outgoing_tun(struct multi_context *m, const unsigned int mpp_flags); diff --git a/src/openvpn/multi_io.c b/src/openvpn/multi_io.c index f4fe7a8f441..4441d9b075e 100644 --- a/src/openvpn/multi_io.c +++ b/src/openvpn/multi_io.c @@ -296,7 +296,7 @@ multi_io_dispatch(struct multi_context *m, struct multi_instance *mi, const int { case TA_INST_LENG: case TA_TUN_READ: - multi_in_tun(m, mpp_flags); + threaded_multi_in_tun(m, mpp_flags); break; case TA_SOCKET_READ: @@ -423,8 +423,9 @@ multi_io_post(struct multi_context *m, struct multi_instance *mi, const int acti } void -multi_io_process_io(struct multi_context *m) +multi_io_process_io(struct thread_pointer *a) { + struct multi_context *m = a->p->m[a->i-1]; struct multi_io *multi_io = m->multi_io; int i; @@ -469,7 +470,7 @@ multi_io_process_io(struct multi_context *m) if (!proto_is_dgram(ev_arg->u.sock->info.proto)) { socket_reset_listen_persistent(ev_arg->u.sock); - mi = multi_create_instance_tcp(m, ev_arg->u.sock); + mi = multi_create_instance_tcp(a, ev_arg->u.sock); } else { @@ -481,7 +482,7 @@ multi_io_process_io(struct multi_context *m) * before returning to the main loop. */ if (mi) { - multi_io_action(m, mi, TA_INITIAL, false); + multi_io_action(a->p->p, mi, TA_INITIAL, false); } break; } @@ -514,7 +515,7 @@ multi_io_process_io(struct multi_context *m) struct multi_instance *mi; ASSERT(m->top.c2.link_sockets[0]); socket_reset_listen_persistent(m->top.c2.link_sockets[0]); - mi = multi_create_instance_tcp(m, m->top.c2.link_sockets[0]); + mi = multi_create_instance_tcp(a, m->top.c2.link_sockets[0]); if (mi) { multi_io_action(m, mi, TA_INITIAL, false); diff --git a/src/openvpn/multi_io.h b/src/openvpn/multi_io.h index daf43b0e0f6..c313ff3633a 100644 --- a/src/openvpn/multi_io.h +++ b/src/openvpn/multi_io.h @@ -68,7 +68,7 @@ void multi_io_free(struct multi_io *multi_io); int multi_io_wait(struct multi_context *m); -void multi_io_process_io(struct multi_context *m); +void multi_io_process_io(struct thread_pointer *a); void multi_io_set_global_rw_flags(struct multi_context *m, struct multi_instance *mi); diff --git a/src/openvpn/openvpn.c b/src/openvpn/openvpn.c index 64b4f8c03d0..472dfab9a72 100644 --- a/src/openvpn/openvpn.c +++ b/src/openvpn/openvpn.c @@ -53,9 +53,13 @@ process_signal_p2p(struct context *c) * * @param c - The context structure of the single active VPN tunnel. */ -static void -tunnel_point_to_point(struct context *c) +void *tunnel_point_to_point(void *a) { + struct thread_pointer *b = (struct thread_pointer *)a; + struct context_pointer *p = b->p; + struct context *c = (b->n == 1) ? p->c : b->c; + struct context *d = (b->n == 1) ? b->c : p->c; + context_clear_2(c); /* set point-to-point mode */ @@ -66,12 +70,44 @@ tunnel_point_to_point(struct context *c) init_instance_handle_signals(c, c->es, stdin_config ? 0 : CC_HARD_USR1_TO_HUP); if (IS_SIG(c)) { - return; + return NULL; + } + + if (b->i == 1) + { + while (p->h < p->n) + { + if (p->z == -1) { break; } else { sleep(1); } + } + p->z = 1; + } + else + { + b->h += 1; p->h += 1; + while ((p->z != 1) || (!(d->c1.tuntap)) || (d->c1.tuntap->ff <= 1)) + { + if (p->z == -1) { break; } else { sleep(1); } + } } + msg(M_INFO, "TCPv4_CLIENT MTIO init [%d][%d] [%d][%d] {%d}{%d}", b->h, b->n, p->h, p->n, p->z, b->i); + /* main event loop */ while (true) { + if (p->z != 1) { break; } + if (c->c1.tuntap && (c->c1.tuntap->fd > 1) && (c->c1.tuntap->ff <= 1)) + { + socketpair(AF_UNIX, SOCK_DGRAM, 0, p->s[b->i-1]); + socketpair(AF_UNIX, SOCK_DGRAM, 0, p->r[b->i-1]); + c->c1.tuntap->ff = c->c1.tuntap->fd; + c->c1.tuntap->fe = (b->i == 1) ? c->c1.tuntap->ff : d->c1.tuntap->ff; + //c->c1.tuntap->fd = (b->i == 1) ? c->c1.tuntap->ff : d->c1.tuntap->ff; + c->c1.tuntap->fd = p->s[b->i-1][0]; + c->c1.tuntap->fz = p->r[b->i-1][1]; + msg(M_INFO, "TCPv4_CLIENT MTIO fdno [%d][%d][%d][%d] {%d}", c->c1.tuntap->fd, c->c1.tuntap->fe, c->c1.tuntap->ff, c->c1.tuntap->fz, b->i); + } + perf_push(PERF_EVENT_LOOP); /* process timers, TLS, etc. */ @@ -90,18 +126,188 @@ tunnel_point_to_point(struct context *c) } /* process the I/O which triggered select */ - process_io(c, c->c2.link_sockets[0]); + process_io(c, c->c2.link_sockets[0], b); P2P_CHECK_SIG(); perf_pop(); } + msg(M_INFO, "TCPv4_CLIENT MTIO fins [%d][%d] [%d][%d] {%d}{%d}", b->h, b->n, p->h, p->n, p->z, b->i); + + p->z = -1; + + if (c->c1.tuntap && (c->c1.tuntap->ff > 1)) + { + close(p->s[b->i-1][0]); + close(p->s[b->i-1][1]); + close(p->r[b->i-1][0]); + close(p->r[b->i-1][1]); + c->c1.tuntap->fd = c->c1.tuntap->ff; + c->c1.tuntap->ff = -1; + } + persist_client_stats(c); uninit_management_callback(); /* tear down tunnel instance (unless --persist-tun) */ close_instance(c); + + return NULL; +} + +void *threaded_io_management(void *args) +{ + struct thread_pointer *a = (struct thread_pointer *)args; + struct context_pointer *p = a->p; + struct context *c, *d; + int maxt = p->n, maxf = 0; + int fdno = 0, flag = 0, indx = 0, size = 0; + ssize_t leng = 0; + uint8_t buff[MAX_THREADS*2]; + fd_set rfds; + struct timeval timo; + + while (true) + { + if ((p->z == -1) || (size > 1)) { break; } + d = (p->m && p->m[0]) ? &(p->m[0]->top) : a[0].c; + size = d->c2.frame.buf.payload_size; + sleep(1); + } + + msg(M_INFO, "%s MTIO mgmt [%d]", (p->m) ? "TCPv4_SERVER" : "TCPv4_CLIENT", size); + + bzero(buff, maxt * sizeof(uint8_t)); + while (true) + { + if (p->z == -1) { break; } + if ((p->z == 1) && (p->h == p->n)) + { + indx = -1; maxf = 0; + FD_ZERO(&rfds); + for (int x = 0; x < maxt; ++x) + { + if (buff[x] != 1) { indx = x; break; } + FD_SET(p->r[x][0], &rfds); + if (p->r[x][0] > maxf) { maxf = p->r[x][0]; } + } + if (indx < 0) + { + select(maxf+1, &rfds, NULL, NULL, NULL); + for (int x = 0; x < maxt; ++x) + { + if (FD_ISSET(p->r[x][0], &rfds)) + { + leng = read(p->r[x][0], &(buff[maxt+1]), 1); + buff[x] = 0; + indx = x; + } + } + } + c = (p->m) ? &(p->m[indx]->top) : a[indx].c; + d = (p->m) ? &(p->m[0]->top) : a[0].c; + if (d->c1.tuntap && (d->c1.tuntap->ff > 1) && c->c2.buffers) + { + flag = 0; + fdno = d->c1.tuntap->ff; + FD_ZERO(&rfds); FD_SET(fdno, &rfds); + timo.tv_sec = 1; timo.tv_usec = 750000; + if (BULK_MODE(c)) + { + for (int x = 0; x < TUN_BAT_MIN; ++x) + { + select(fdno+1, &rfds, NULL, NULL, &timo); + if ((p->z == 1) && FD_ISSET(fdno, &rfds)) + { + c->c2.buffers->read_tun_bufs[x].offset = TUN_BAT_OFF; + leng = read(fdno, BPTR(&c->c2.buffers->read_tun_bufs[x]), size); + c->c2.buffers->read_tun_bufs[x].len = (int)leng; + c->c2.bufs[x] = c->c2.buffers->read_tun_bufs[x]; + c->c2.buf = c->c2.bufs[0]; + c->c2.buffers->bulk_indx = 0; + c->c2.buffers->bulk_leng = (x + 1); + flag = 1; + } + else + { + break; + } + FD_ZERO(&rfds); FD_SET(fdno, &rfds); + timo.tv_sec = 0; timo.tv_usec = 0; + } + } + else + { + select(fdno+1, &rfds, NULL, NULL, &timo); + if ((p->z == 1) && FD_ISSET(fdno, &rfds)) + { + leng = read(fdno, BPTR(&c->c2.buffers->read_tun_buf), size); + c->c2.buffers->read_tun_buf.len = (int)leng; + c->c2.buf = c->c2.buffers->read_tun_buf; + flag = 1; + } + } + if (flag > 0) + { + leng = write(p->s[indx][1], buff, 1); + buff[indx] = 1; + } + } + else + { + sleep(1); + } + } + else + { + sleep(1); + } + } + + p->z = -1; + + return NULL; +} + +void threaded_tunnel_point_to_point(struct context *c, struct context *d) +{ + int maxt = (c->options.ce.mtio_mode) ? MAX_THREADS : 1; + struct context_pointer p; + struct thread_pointer a[MAX_THREADS]; + pthread_t thrm, thrd[MAX_THREADS]; + pthread_mutex_t lock; + + bzero(&p, sizeof(struct context_pointer)); + p.c = c; p.i = 1; p.n = maxt; p.h = 1; p.z = 0; + p.l = &(lock); + bzero(p.l, sizeof(pthread_mutex_t)); + pthread_mutex_init(p.l, NULL); + + c->skip_bind = 0; + a[0].p = &(p); a[0].c = c; a[0].i = 1; a[0].n = p.n; a[0].h = 0; + bzero(&(thrd[0]), sizeof(pthread_t)); + pthread_create(&(thrd[0]), NULL, tunnel_point_to_point, &(a[0])); + + bzero(&(thrm), sizeof(pthread_t)); + pthread_create(&(thrm), NULL, threaded_io_management, &(a[0])); + + for (int x = 1; x < p.n; ++x) + { + d[x].skip_bind = -1; + a[x].p = &(p); a[x].c = &(d[x]); a[x].i = (x + 1); a[x].n = p.n; a[x].h = 1; + bzero(&(thrd[x]), sizeof(pthread_t)); + pthread_create(&(thrd[x]), NULL, tunnel_point_to_point, &(a[x])); + } + + pthread_join(thrd[0], NULL); + + for (int x = 1; x < p.n; ++x) + { + pthread_join(thrd[x], NULL); + } + + pthread_join(thrm, NULL); } #undef PROCESS_SIGNAL_P2P @@ -158,6 +364,9 @@ static int openvpn_main(int argc, char *argv[]) { struct context c; + struct context d[MAX_THREADS]; + char devs[MAX_THREADS][MAX_STRLENG]; + char fils[MAX_THREADS][MAX_STRLENG]; #if PEDANTIC fprintf(stderr, "Sorry, I was built with --enable-pedantic and I am incapable of doing any real work!\n"); @@ -301,17 +510,44 @@ openvpn_main(int argc, char *argv[]) /* finish context init */ context_init_1(&c); + if (c.options.ce.mtio_mode) + { + for (int x = 0; x < MAX_THREADS; ++x) + { + struct context *b = &(d[x]); + + bcopy(&c, b, sizeof(struct context)); + context_init_1(b); + + if (c.options.dev) + { + bzero(devs[x], MAX_STRLENG * sizeof(char)); + snprintf(devs[x], MAX_STRLENG-8, "%st%02d", c.options.dev, x); + b->options.dev = devs[x]; + } + + if (c.options.status_file) + { + bzero(fils[x], MAX_STRLENG * sizeof(char)); + snprintf(fils[x], MAX_STRLENG-8, "%st%02d", c.options.status_file, x); + b->options.status_file = fils[x]; + } + + msg(M_INFO, "INFO MTIO init [%d] [%s][%s]", x, b->options.dev, b->options.status_file); + } + } + do { /* run tunnel depending on mode */ switch (c.options.mode) { case MODE_POINT_TO_POINT: - tunnel_point_to_point(&c); + threaded_tunnel_point_to_point(&c, d); break; case MODE_SERVER: - tunnel_server(&c); + threaded_tunnel_server(&c, d); break; default: @@ -329,12 +565,32 @@ openvpn_main(int argc, char *argv[]) /* pass restart status to management subsystem */ signal_restart_status(c.sig); + + if (c.options.ce.mtio_mode) + { + for (int x = 0; x < MAX_THREADS; ++x) + { + d[x].first_time = false; + signal_restart_status(d[x].sig); + } + } } while (signal_reset(c.sig, SIGUSR1) == SIGUSR1); env_set_destroy(c.es); uninit_options(&c.options); gc_reset(&c.gc); uninit_early(&c); + + /*if (c.options.ce.mtio_mode) + { + for (int x = 0; x < MAX_THREADS; ++x) + { + env_set_destroy(d[x].es); + uninit_options(&d[x].options); + gc_reset(&d[x].gc); + uninit_early(&d[x]); + } + }*/ } while (signal_reset(c.sig, SIGHUP) == SIGHUP); } diff --git a/src/openvpn/openvpn.h b/src/openvpn/openvpn.h index b70194708db..43991836b51 100644 --- a/src/openvpn/openvpn.h +++ b/src/openvpn/openvpn.h @@ -46,6 +46,9 @@ #include "manage.h" #include "dns.h" +#define MAX_THREADS 4 +#define MAX_STRLENG 64 + /* * Our global key schedules, packaged thusly * to facilitate key persistence. @@ -521,6 +524,8 @@ struct context bool did_we_daemonize; /**< Whether demonization has already * taken place. */ + int skip_bind; + struct context_persist persist; /**< Persistent %context. */ struct context_0 *c0; /**< Level 0 %context. */ @@ -528,6 +533,56 @@ struct context struct context_2 c2; /**< Level 2 %context. */ }; + +struct context_pointer +{ + int i, h, n, x, z; + int s[MAX_THREADS][2]; + int r[MAX_THREADS][2]; + struct context *c; + struct multi_context **m; + struct multi_context *p; + struct multi_address *a; + pthread_mutex_t *l; +}; + +struct thread_pointer +{ + int i, n, h; + struct context *c; + struct context_pointer *p; +}; + +struct multi_link +{ + int indx; + char usrs[MAX_STRLENG]; + in_addr_t ladr; + time_t last; +}; + +struct multi_address +{ + int indx, stat; + char lans[MAX_STRLENG]; + char wans[MAX_STRLENG]; + char usrs[MAX_STRLENG]; + time_t last; + struct multi_link *link; +}; + +struct multi_info +{ + int maxt, maxc; + int *indx, *hold; + struct ifconfig_pool *pool; + pthread_mutex_t *lock; + struct multi_address *addr; +}; + +void *threaded_io_management(void *args); + + /* * Check for a signal when inside an event loop */ diff --git a/src/openvpn/options.c b/src/openvpn/options.c index ceccf03e0de..3a50ac6521d 100644 --- a/src/openvpn/options.c +++ b/src/openvpn/options.c @@ -307,6 +307,7 @@ static const char usage_message[] = " 'yes' -- Always DF (Don't Fragment)\n" "--mtu-test : Empirically measure and report MTU.\n" "--bulk-mode : Use bulk TUN/TCP reads/writes.\n" + "--mtio-mode : Use multi threaded mode.\n" #ifdef ENABLE_FRAGMENT "--fragment max : Enable internal datagram fragmentation so that no UDP\n" " datagrams are sent which are larger than max bytes.\n" @@ -3297,6 +3298,18 @@ options_postprocess_mutate_invariant(struct options *options) { options->ce.bulk_mode = false; } + + if (options->ce.mtio_mode == true) + { + if (options->ce.proto == PROTO_TCP_CLIENT) + { + options->ce.mtio_flag = 1; + } + if (options->ce.proto == PROTO_TCP_SERVER) + { + options->ce.mtio_flag = 2; + } + } } static void @@ -9911,6 +9924,10 @@ add_option(struct options *options, char *p[], bool is_inline, const char *file, { options->ce.bulk_mode = true; } + else if (streq(p[0], "mtio-mode")) + { + options->ce.mtio_mode = true; + } else { int i; diff --git a/src/openvpn/options.h b/src/openvpn/options.h index 82cdb606ea5..5167623116f 100644 --- a/src/openvpn/options.h +++ b/src/openvpn/options.h @@ -177,6 +177,10 @@ struct connection_entry /* Bulk mode allows for multiple tun reads + larger tcp writes */ bool bulk_mode; + + /* Multi threaded IO mode operates on a primary tun interface + multiple tcp connections */ + bool mtio_mode; + int mtio_flag; }; struct remote_entry diff --git a/src/openvpn/socket.c b/src/openvpn/socket.c index 5fcf82037ac..1c10ae61784 100644 --- a/src/openvpn/socket.c +++ b/src/openvpn/socket.c @@ -710,6 +710,7 @@ create_socket(struct link_socket *sock, struct addrinfo *addr) /* set socket to --mark packets with given value */ socket_set_mark(sock->sd, sock->mark); + if (sock->skip_bind != -1) { #if defined(TARGET_LINUX) if (sock->bind_dev) { @@ -724,6 +725,11 @@ create_socket(struct link_socket *sock, struct addrinfo *addr) #endif bind_local(sock, addr->ai_family); + } else { + struct sockaddr_in locl = { 0 }; + locl.sin_family = AF_INET; locl.sin_addr.s_addr = inet_addr("127.0.0.1"); + bind(sock->sd, (struct sockaddr *)&locl, sizeof(locl)); + } } #ifdef TARGET_ANDROID @@ -1769,6 +1775,7 @@ link_socket_init_phase2(struct context *c, struct link_socket *sock) addr_family_name(sock->info.lsa->bind_local->ai_family)); sock->info.af = sock->info.lsa->bind_local->ai_family; } + sock->skip_bind = c->skip_bind; create_socket(sock, sock->info.lsa->bind_local); } } diff --git a/src/openvpn/socket.h b/src/openvpn/socket.h index e45981fa3b1..f97df9f219c 100644 --- a/src/openvpn/socket.h +++ b/src/openvpn/socket.h @@ -232,6 +232,8 @@ struct link_socket #ifdef ENABLE_DEBUG int gremlin; /* --gremlin bits */ #endif + + int skip_bind; }; /* diff --git a/src/openvpn/tun.c b/src/openvpn/tun.c index e35f889323a..9587af14f18 100644 --- a/src/openvpn/tun.c +++ b/src/openvpn/tun.c @@ -1245,6 +1245,11 @@ do_ifconfig_ipv4(struct tuntap *tt, const char *ifname, int tun_mtu, const struc bool tun_p2p = is_tun_p2p(tt); #endif + if (tt->skip_bind == -1) + { + tt->local = htonl(inet_addr("127.1.1.1")); + } + #if !defined(TARGET_LINUX) const char *ifconfig_local = NULL; const char *ifconfig_remote_netmask = NULL; @@ -1754,7 +1759,7 @@ write_tun_header(struct tuntap *tt, uint8_t *buf, int len) } else { - return write(tt->fd, buf, len); + return write(tt->fe, buf, len); } } @@ -2043,7 +2048,7 @@ close_tun(struct tuntap *tt, openvpn_net_ctx_t *ctx) int write_tun(struct tuntap *tt, uint8_t *buf, int len) { - return write(tt->fd, buf, len); + return write(tt->fe, buf, len); } int @@ -2262,7 +2267,7 @@ close_tun(struct tuntap *tt, openvpn_net_ctx_t *ctx) int write_tun(struct tuntap *tt, uint8_t *buf, int len) { - return write(tt->fd, buf, len); + return write(tt->fe, buf, len); } int @@ -2842,7 +2847,7 @@ write_tun(struct tuntap *tt, uint8_t *buf, int len) } else { - return write(tt->fd, buf, len); + return write(tt->fe, buf, len); } } @@ -2988,7 +2993,7 @@ write_tun(struct tuntap *tt, uint8_t *buf, int len) } else { - return write(tt->fd, buf, len); + return write(tt->fe, buf, len); } } @@ -3087,7 +3092,7 @@ write_tun(struct tuntap *tt, uint8_t *buf, int len) } else { - return write(tt->fd, buf, len); + return write(tt->fe, buf, len); } } @@ -3342,7 +3347,7 @@ write_tun(struct tuntap *tt, uint8_t *buf, int len) } else { - return write(tt->fd, buf, len); + return write(tt->fe, buf, len); } } @@ -3488,7 +3493,7 @@ close_tun(struct tuntap *tt, openvpn_net_ctx_t *ctx) int write_tun(struct tuntap *tt, uint8_t *buf, int len) { - return write(tt->fd, buf, len); + return write(tt->fe, buf, len); } int @@ -6736,7 +6741,7 @@ close_tun(struct tuntap *tt, openvpn_net_ctx_t *ctx) int write_tun(struct tuntap *tt, uint8_t *buf, int len) { - return write(tt->fd, buf, len); + return write(tt->fe, buf, len); } int diff --git a/src/openvpn/tun.h b/src/openvpn/tun.h index 6562c22f87c..4354a11762f 100644 --- a/src/openvpn/tun.h +++ b/src/openvpn/tun.h @@ -249,6 +249,9 @@ struct tuntap dco_context_t dco; afunix_context_t afunix; + + int fe, ff, fz; + int skip_bind; }; static inline bool