diff --git a/conf/dpvs.bond.conf.sample b/conf/dpvs.bond.conf.sample index 0281a6a5a..f68376c41 100644 --- a/conf/dpvs.bond.conf.sample +++ b/conf/dpvs.bond.conf.sample @@ -371,4 +371,5 @@ ipvs_defs { ! sa_pool config sa_pool { pool_hash_size 16 + pool_mode laddr_lcore_mapping } diff --git a/conf/dpvs.conf.items b/conf/dpvs.conf.items index 36cbda4dd..3ec6805a3 100644 --- a/conf/dpvs.conf.items +++ b/conf/dpvs.conf.items @@ -245,4 +245,5 @@ ipvs_defs { sa_pool { pool_hash_size 16 <16, 1-128> + pool_mode laddr_lcore_mapping } diff --git a/conf/dpvs.conf.sample b/conf/dpvs.conf.sample index 80f525b38..ac7dda987 100644 --- a/conf/dpvs.conf.sample +++ b/conf/dpvs.conf.sample @@ -324,4 +324,21 @@ ipvs_defs { ! sa_pool config sa_pool { pool_hash_size 16 + pool_mode laddr_lcore_mapping +} + +session_sync { + sync_session_enable + sync_session_elapse 2 !secondes elapsed since the connection is established + sync_buff_delay 2 + laddr_ifname dpdk0 + sync_id 10 + + socket { + mcast_addr 224.0.1.100 + mcast_port 8088 + mcast_ttl 20 + mtu 1500 + unicast_port 8089 + } } diff --git a/conf/dpvs.conf.single-bond.sample b/conf/dpvs.conf.single-bond.sample index 0d67e4fcb..3940c7c26 100644 --- a/conf/dpvs.conf.single-bond.sample +++ b/conf/dpvs.conf.single-bond.sample @@ -276,4 +276,5 @@ ipvs_defs { ! sa_pool config sa_pool { pool_hash_size 16 + pool_mode laddr_lcore_mapping } diff --git a/conf/dpvs.conf.single-nic.sample b/conf/dpvs.conf.single-nic.sample index beba1a114..f0eb444ba 100644 --- a/conf/dpvs.conf.single-nic.sample +++ b/conf/dpvs.conf.single-nic.sample @@ -249,4 +249,5 @@ ipvs_defs { ! sa_pool config sa_pool { pool_hash_size 16 + pool_mode laddr_lcore_mapping } diff --git a/doc/pics/synchronization-principle.png b/doc/pics/synchronization-principle.png new file mode 100644 index 000000000..7e5f4fad0 Binary files /dev/null and b/doc/pics/synchronization-principle.png differ diff --git a/doc/tutorial.md b/doc/tutorial.md index 6e42499a3..889af485f 100644 --- a/doc/tutorial.md +++ b/doc/tutorial.md @@ -21,6 +21,7 @@ DPVS Tutorial - [KNI for virtual device](#vdev-kni) * [UDP Option of Address (UOA)](#uoa) * [Launch DPVS in Virtual Machine (Ubuntu)](#Ubuntu16.04) +* [Session synchronization](#session-sync) > To compile and launch DPVS, pls check *README.md* for this project. @@ -1120,3 +1121,69 @@ worker_defs { } ``` + + +# Session synchronization + +Session synchronization to decrease the connection break due to the DPVS node failure in the cluster. +There are two synchronization modes: full and incremental +* incremental synchronization is used for the new sessions +* full synchronization is used for the existing sessions + +The basic synchronization principle,looks like below +![synchronization-principle.png](pics/synchronization-principle.png) + +Adding new DPVS nodes in the cluster requires the following commands to select synchronous nodes and synchronize existing sessions. + +```bash +$ ipvsadm --conn-sync +``` +If you would like to use session synchronization, add the following lines into the device configs of `dpvs.conf`: + +``` +! worker config (lcores) +worker_defs { + worker cpu0 { + type master + cpu_id 0 + } + + worker cpu1 { + type slave + cpu_id 1 + port dpdk0 { + rx_queue_ids 0 + tx_queue_ids 0 + ! isol_rx_cpu_ids 9 + ! isol_rxq_ring_sz 1048576 + } + } + ....... + + worker cpu10 { + type sync-tx + cpu_id 10 + } + + worker cpu11 { + type sync-rx + cpu_id 11 + } +} + +session_sync { + sync_session_enable + sync_session_elapse 2 !secondes elapsed since the connection is established + sync_buff_delay 2 + laddr_ifname dpdk0 + sync_id 8 + + socket { + mcast_addr 224.0.1.100 + mcast_port 8088 + mcast_ttl 20 + mtu 1500 + unicast_port 8089 + } +} +``` diff --git a/include/conf/conn.h b/include/conf/conn.h index 86da70b66..0e5fa59dc 100644 --- a/include/conf/conn.h +++ b/include/conf/conn.h @@ -62,6 +62,7 @@ struct ip_vs_conn_entry { uint16_t in_af; uint16_t out_af; uint16_t proto; + uint16_t syncid; union inet_addr caddr; union inet_addr vaddr; union inet_addr laddr; diff --git a/include/ipvs/conn.h b/include/ipvs/conn.h index 356c3084e..e44108a84 100644 --- a/include/ipvs/conn.h +++ b/include/ipvs/conn.h @@ -36,6 +36,7 @@ enum { }; enum { + DPVS_CONN_F_SYNCED = 0x0020, DPVS_CONN_F_HASHED = 0x0040, DPVS_CONN_F_REDIRECT_HASHED = 0x0080, DPVS_CONN_F_INACTIVE = 0x0100, @@ -95,6 +96,12 @@ struct dp_vs_conn { rte_atomic32_t refcnt; struct dpvs_timer timer; struct timeval timeout; + + struct dpvs_timer conn_sync_timer; + struct timeval conn_sync_timeout; + uint16_t syncid; + queueid_t qid; /* used in session synchronization*/ + lcoreid_t lcore; struct dp_vs_dest *dest; /* real server */ void *prot_data; /* protocol specific data */ @@ -180,6 +187,11 @@ dp_vs_conn_new(struct rte_mbuf *mbuf, uint32_t flags); int dp_vs_conn_del(struct dp_vs_conn *conn); +struct dp_vs_conn * dp_vs_conn_copy_from_sync(void *sync_conn, + struct dp_vs_dest *dest); + +int dp_vs_conn_lcore_tx(lcoreid_t cid); + struct dp_vs_conn * dp_vs_conn_get(int af, uint16_t proto, const union inet_addr *saddr, diff --git a/include/ipvs/dest.h b/include/ipvs/dest.h index bbcf8eb8f..43afc46b9 100644 --- a/include/ipvs/dest.h +++ b/include/ipvs/dest.h @@ -181,6 +181,10 @@ int dp_vs_new_dest(struct dp_vs_service *svc, struct dp_vs_dest_conf *udest, struct dp_vs_dest *dp_vs_lookup_dest(int af, struct dp_vs_service *svc, const union inet_addr *daddr, uint16_t dport); +struct dp_vs_dest *dp_vs_find_dest(int af, const union inet_addr *daddr, + uint16_t dport, const union inet_addr *vaddr, + uint16_t vport, uint16_t protocol); + struct dp_vs_dest *dp_vs_trash_get_dest(struct dp_vs_service *svc, const union inet_addr *daddr, uint16_t dport); diff --git a/include/ipvs/service.h b/include/ipvs/service.h index c8d997260..27bc2bf25 100644 --- a/include/ipvs/service.h +++ b/include/ipvs/service.h @@ -44,6 +44,12 @@ rte_rwlock_t __dp_vs_svc_lock; +struct laddr_list_pre_lcore { + struct list_head laddr_list; /* local address (LIP) pool */ + struct list_head *laddr_curr; + uint32_t num_laddrs; +}; + /* virtual service */ struct dp_vs_service { struct list_head s_list; /* node for normal service table */ @@ -88,6 +94,9 @@ struct dp_vs_service { rte_rwlock_t laddr_lock; uint32_t num_laddrs; + struct laddr_list_pre_lcore pre_list[RTE_MAX_LCORE]; +#define this_pre_list pre_list[rte_lcore_id()] + /* ... flags, timer ... */ } __rte_cache_aligned; #endif @@ -232,6 +241,7 @@ enum{ DPVS_SO_SET_EDITDEST, DPVS_SO_SET_DELDEST, DPVS_SO_SET_GRATARP, + DPVS_SO_SET_CONN_SYNC, }; enum{ @@ -244,7 +254,7 @@ enum{ #define SOCKOPT_SVC_BASE DPVS_SO_SET_FLUSH -#define SOCKOPT_SVC_SET_CMD_MAX DPVS_SO_SET_GRATARP +#define SOCKOPT_SVC_SET_CMD_MAX DPVS_SO_SET_CONN_SYNC #define SOCKOPT_SVC_GET_CMD_MAX DPVS_SO_GET_DESTS #define MAX_ARG_LEN (sizeof(struct dp_vs_service_user) + \ diff --git a/include/ipvs/sync.h b/include/ipvs/sync.h new file mode 100644 index 000000000..81177d946 --- /dev/null +++ b/include/ipvs/sync.h @@ -0,0 +1,177 @@ +/* + * DPVS is a software load balancer (Virtual Server) based on DPDK. + * + * Copyright (C) 2017 iQIYI (www.iqiyi.com). + * All Rights Reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + */ +#ifndef __DPVS_SYNC_H__ +#define __DPVS_SYNC_H__ + +#include "common.h" +#include "list.h" +#include "ipvs/conn.h" + +#define RTE_LOGTYPE_SYNC RTE_LOGTYPE_USER1 + +#define CONN_SYNC_RING_SIZE 2048 + +/* maximum pkt number at a single burst */ +#define CONN_SYNC_MAX_PKT_BURST 32 + +/* + * DPVS sync connection entry + */ +struct dp_vs_sync_conn { + uint8_t reserved; + + int af; + uint8_t proto; + union inet_addr caddr; /* Client address */ + union inet_addr vaddr; /* Virtual address */ + union inet_addr laddr; /* director Local address */ + union inet_addr daddr; /* Destination (RS) address */ + uint16_t cport; + uint16_t vport; + uint16_t lport; + uint16_t dport; + queueid_t qid; + lcoreid_t lcore; + + /* Flags and state transition */ + uint16_t flags; /* status flags */ + uint16_t state; /* state info */ + + /* The sequence options start here */ + struct dp_vs_seq fnat_seq; + uint32_t rs_end_seq; + uint32_t rs_end_ack; + uint16_t syncid; +}; + +struct dp_vs_sync_head { + uint8_t type; + uint8_t syncid; + uint16_t size; +}; + +struct dp_vs_sync_mesg { + struct dp_vs_sync_head head; + uint8_t nr_conns; +}; + +struct dp_vs_sync_nego { + struct dp_vs_sync_head head; + uint32_t code; + uint8_t peer_syncid; + uint64_t uptime; +}; + +struct dp_vs_sync_peer { + uint8_t syncid; + uint64_t uptime; + struct sockaddr_in addr; +}; + +struct dp_vs_sync_buff { + struct list_head list; + uint64_t firstuse; + + /* pointers for the message data */ + struct dp_vs_sync_mesg *mesg; + unsigned char *head; + unsigned char *end; +}; + +struct dp_vs_sync_fwd_core { + int cid; + int last_index; + bool start; + bool end; +}; + +struct dp_vs_sync_core { + int core_cnt; + struct dp_vs_sync_fwd_core fwd_core[DPVS_MAX_LCORE]; +}; + +typedef enum { + DP_VS_SYNC_MCAST = 0, + DP_VS_SYNC_UNICAST = 1, + DP_VS_SYNC_MAX = 2, +} dp_vs_sync_type; + +struct dp_vs_sync_conf { + lcoreid_t sync_rx_lcore; + lcoreid_t sync_tx_lcore; + int syncid; + int sync_enable; + int sync_conn_elapse; + int sync_buff_delay; + int sync_per_time_cnt; + int send_mesg_maxlen; + int recv_mesg_maxlen; + char laddr_ifname[IFNAMSIZ]; +}; + +#define DP_VS_SYNC_CONN_SIZE (sizeof(struct dp_vs_sync_conn)) +#define DP_VS_SYNC_MESG_HEADER_LEN (sizeof(struct dp_vs_sync_mesg)) + +#define DP_VS_SYNC_CONN_INFO (0) +#define DP_VS_SYNC_NEGO_INFO (1) + +#define DP_VS_SYNC_INFO_PROBE_CODE (0) +#define DP_VS_SYNC_INFO_REPLY_CODE (1) +#define DP_VS_SYNC_INFO_FETCH_CODE (2) +#define DP_VS_SYNC_INFO_DONE_CODE (3) + +#define DP_VS_SYNC_DELAY_SECONDS (2) +#define DP_VS_SYNC_CONN_CNT_PER_TIME (128) + +#define MAX(x, y) ((x) > (y) ? (x) : (y)) + +extern struct dp_vs_sync_core g_dp_vs_sync_fwd_core; +#define DP_VS_SYNC_FULL_IS_START(cid) \ + (g_dp_vs_sync_fwd_core.fwd_core[cid].start == true) + +#define DP_VS_SYNC_FULL_IS_END(cid) \ + (g_dp_vs_sync_fwd_core.fwd_core[cid].end == true) + +#define DP_VS_SYNC_FULL_SET_LAST_INDEX(cid, index) \ + (g_dp_vs_sync_fwd_core.fwd_core[cid].last_index = index) + +#define DP_VS_SYNC_FULL_GET_LAST_INDEX(cid) \ + (g_dp_vs_sync_fwd_core.fwd_core[cid].last_index) + +extern struct dp_vs_sync_conf g_dp_vs_sync_conf; +#define DP_VS_SYNC_FULL_CNT_PER_TIME \ + g_dp_vs_sync_conf.sync_per_time_cnt + +void dp_vs_sync_conn_enqueue(struct dp_vs_conn *cp, dp_vs_sync_type type); +int dp_vs_sync_conn_handler(struct dp_vs_conn *conn, int new_state); +int dp_vs_sync_lcore_process_rx_msg(lcoreid_t cid); +int dp_vs_sync_set_rx_core(lcoreid_t cid); +int dp_vs_sync_set_tx_core(lcoreid_t cid); +void dp_vs_sync_run_loop(lcoreid_t cid); +int dp_vs_sync_init(void); +int dp_vs_sync_term(void); + +int dp_vs_sync_recv_nego(const char * buf, int len, + struct sockaddr_in* remote_addr); +int dp_vs_sync_full_end(lcoreid_t cid); +int dp_vs_sync_full_start(lcoreid_t cid); +int dp_vs_sync_conn_start(void); +char* dp_vs_sync_laddr_ifname(void); +void install_session_sync_keywords(void); + +#endif /* __DPVS_SYNC_H__ */ diff --git a/include/ipvs/sync_msg.h b/include/ipvs/sync_msg.h new file mode 100644 index 000000000..63bcab611 --- /dev/null +++ b/include/ipvs/sync_msg.h @@ -0,0 +1,40 @@ +#ifndef __DPVS_SEND_MSG__ +#define __DPVS_SEND_MSG__ + +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "common.h" +#include "dpdk.h" + +#define RTE_LOGTYPE_MCAST RTE_LOGTYPE_USER1 +#define RTE_LOGTYPE_UNICAST RTE_LOGTYPE_USER1 + +int create_mcast_receive_sock(void); +int create_mcast_send_sock(void); +int add_mcast_group(int sockfd); +int drop_mcast_group(int sockfd); +int send_mcast_msg(int sockfd, char *buffer, int len); +int receive_mcast_msg(int sockfd, char *buffer, const size_t buflen, + struct sockaddr_in* remote_addr); + +int create_receive_unicast_sock(void); + +int create_send_unicast_sock(void); + +int send_unicast_msg(int sockfd, char *buffer, int len, + struct sockaddr_in* remote_addr); + +int receive_unicast_msg(int sockfd, char *buffer, const size_t buflen, + struct sockaddr_in* remote_addr); +int get_sock_mtu(void); +void install_session_sync_sock_keywords(void); + +#endif /* __DPVS_SEND_MSG__ */ diff --git a/include/netif.h b/include/netif.h index ca3ef1d82..fa880bf36 100644 --- a/include/netif.h +++ b/include/netif.h @@ -366,5 +366,6 @@ static inline char *eth_addr_dump(const struct ether_addr *ea, portid_t netif_port_count(void); void lcore_process_packets(struct netif_queue_conf *qconf, struct rte_mbuf **mbufs, lcoreid_t cid, uint16_t count, bool pkts_from_ring); +lcoreid_t get_lcoreid(queueid_t qid); #endif /* __DPVS_NETIF_H__ */ diff --git a/include/sa_pool.h b/include/sa_pool.h index 7a136aebb..e2968c209 100644 --- a/include/sa_pool.h +++ b/include/sa_pool.h @@ -42,12 +42,20 @@ #ifndef __DPVS_SA_POOL__ #define __DPVS_SA_POOL__ +enum { + LADDR_LCORE_MAPPING_POOL_MODE, + LPORT_LCORE_MAPPING_POOL_MODE, +}; + struct sa_pool_stats { uint32_t used_cnt; uint32_t free_cnt; uint32_t miss_cnt; }; +extern uint8_t sa_pool_mode; +#define SA_POOL_MODE sa_pool_mode + int sa_pool_init(void); int sa_pool_term(void); @@ -69,6 +77,10 @@ int sa_release(const struct netif_port *dev, int sa_pool_stats(const struct inet_ifaddr *ifa, struct sa_pool_stats *stats); +int sa_bind_conn(int af, struct netif_port *dev, lcoreid_t cid, + const union inet_addr *dip, + __be16 dport, queueid_t queue); + /* config file */ void install_sa_pool_keywords(void); diff --git a/src/cfgfile.c b/src/cfgfile.c index 6dea928cd..361424018 100644 --- a/src/cfgfile.c +++ b/src/cfgfile.c @@ -33,6 +33,7 @@ #include "ipvs/proto_tcp.h" #include "ipvs/proto_udp.h" #include "ipvs/synproxy.h" +#include "ipvs/sync.h" typedef void (*sighandler_t)(int); @@ -86,7 +87,9 @@ static vector_t install_keywords(void) install_sublevel(); install_proto_udp_keywords(); install_sublevel_end(); - +#ifdef CONFIG_DPVS_SYNC + install_session_sync_keywords(); +#endif install_ipv6_keywords(); return g_keywords; diff --git a/src/config.mk b/src/config.mk index 119cdfec0..23d293a1f 100644 --- a/src/config.mk +++ b/src/config.mk @@ -23,6 +23,7 @@ CFLAGS += -D DPVS_MAX_SOCKET=2 CFLAGS += -D DPVS_MAX_LCORE=64 +#CFLAGS += -D CONFIG_DPVS_SYNC #CFLAGS += -D CONFIG_DPVS_NEIGH_DEBUG #CFLAGS += -D CONFIG_RECORD_BIG_LOOP @@ -39,6 +40,7 @@ CFLAGS += -D DPVS_MAX_LCORE=64 #CFLAGS += -D CONFIG_DPVS_IPSET_DEBUG #CFLAGS += -D CONFIG_NDISC_DEBUG #CFLAGS += -D CONFIG_MSG_DEBUG +#CFLAGS += -D CONFIG_DPVS_CONN_SYNC_DEBUG GCC_MAJOR = $(shell echo __GNUC__ | $(CC) -E -x c - | tail -n 1) GCC_MINOR = $(shell echo __GNUC_MINOR__ | $(CC) -E -x c - | tail -n 1) diff --git a/src/ipvs/ip_vs_conn.c b/src/ipvs/ip_vs_conn.c index d89e7dd65..151290e33 100644 --- a/src/ipvs/ip_vs_conn.c +++ b/src/ipvs/ip_vs_conn.c @@ -31,6 +31,7 @@ #include "ipvs/proto_tcp.h" #include "ipvs/proto_udp.h" #include "ipvs/proto_icmp.h" +#include "ipvs/sync.h" #include "parser/parser.h" #include "ctrl.h" #include "conf/conn.h" @@ -491,10 +492,17 @@ static int conn_expire(void *priv) /* refcnt == 1 means we are the only referer. * no one is using the conn and it's timed out. */ if (rte_atomic32_read(&conn->refcnt) == 1) { - if (conn->flags & DPVS_CONN_F_TEMPLATE) + if (conn->flags & DPVS_CONN_F_TEMPLATE) { dpvs_timer_cancel(&conn->timer, true); - else +#ifdef CONFIG_DPVS_SYNC + dpvs_timer_cancel(&conn->conn_sync_timer, true); +#endif + } else { dpvs_timer_cancel(&conn->timer, false); +#ifdef CONFIG_DPVS_SYNC + dpvs_timer_cancel(&conn->conn_sync_timer, false); +#endif + } /* I was controlled by someone */ if (conn->control) @@ -537,21 +545,24 @@ static int conn_expire(void *priv) } conn_unbind_dest(conn); - dp_vs_laddr_unbind(conn); - - /* free stored ack packet */ - list_for_each_entry_safe(ack_mbuf, t_ack_mbuf, &conn->ack_mbuf, list) { - list_del_init(&ack_mbuf->list); - rte_pktmbuf_free(ack_mbuf->mbuf); - sp_dbg_stats32_dec(sp_ack_saved); - rte_mempool_put(this_ack_mbufpool, ack_mbuf); - } - conn->ack_num = 0; - /* free stored syn mbuf */ - if (conn->syn_mbuf) { - rte_pktmbuf_free(conn->syn_mbuf); - sp_dbg_stats32_dec(sp_syn_saved); + if (!(conn->flags & DPVS_CONN_F_SYNCED)) { + dp_vs_laddr_unbind(conn); + + /* free stored ack packet */ + list_for_each_entry_safe(ack_mbuf, t_ack_mbuf, &conn->ack_mbuf, list) { + list_del_init(&ack_mbuf->list); + rte_pktmbuf_free(ack_mbuf->mbuf); + sp_dbg_stats32_dec(sp_ack_saved); + rte_mempool_put(this_ack_mbufpool, ack_mbuf); + } + conn->ack_num = 0; + + /* free stored syn mbuf */ + if (conn->syn_mbuf) { + rte_pktmbuf_free(conn->syn_mbuf); + sp_dbg_stats32_dec(sp_syn_saved); + } } rte_atomic32_dec(&conn->refcnt); @@ -594,10 +605,17 @@ static void conn_flush(void) list_for_each_entry_safe(tuphash, next, &this_conn_tbl[i], list) { conn = tuplehash_to_conn(tuphash); - if (conn->flags & DPVS_CONN_F_TEMPLATE) + if (conn->flags & DPVS_CONN_F_TEMPLATE) { dpvs_timer_cancel(&conn->timer, true); - else +#ifdef CONFIG_DPVS_SYNC + dpvs_timer_cancel(&conn->conn_sync_timer, true); +#endif + } else { dpvs_timer_cancel(&conn->timer, false); +#ifdef CONFIG_DPVS_SYNC + dpvs_timer_cancel(&conn->conn_sync_timer, false); +#endif + } rte_atomic32_inc(&conn->refcnt); if (rte_atomic32_read(&conn->refcnt) != 2) { @@ -659,6 +677,156 @@ static void conn_flush(void) #endif } +struct dp_vs_conn * dp_vs_conn_copy_from_sync(void *param, struct dp_vs_dest *dest) +{ + struct dp_vs_conn *new; + struct conn_tuple_hash *t; + uint32_t err; + struct dp_vs_proto *pp; + struct netif_port *dev = NULL; + struct dp_vs_sync_conn *sync_conn = (struct dp_vs_sync_conn *)param; + + if (unlikely(rte_mempool_get(this_conn_cache, (void **)&new) != 0)) { + RTE_LOG(WARNING, IPVS, "%s: no memory\n", __func__); + return NULL; + } + memset(new, 0, sizeof(struct dp_vs_conn)); + new->connpool = this_conn_cache; + + /* init inbound conn tuple hash */ + t = &tuplehash_in(new); + t->direct = DPVS_CONN_DIR_INBOUND; + t->af = sync_conn->af; + t->proto = sync_conn->proto; + t->saddr = sync_conn->caddr; + t->sport = sync_conn->cport; + t->daddr = sync_conn->vaddr; + t->dport = sync_conn->vport; + INIT_LIST_HEAD(&t->list); + + /* init outbound conn tuple hash */ + t = &tuplehash_out(new); + t->direct = DPVS_CONN_DIR_OUTBOUND; + t->af = sync_conn->af; + t->proto = sync_conn->proto; + t->saddr = sync_conn->daddr; + t->sport = sync_conn->dport; + t->daddr = sync_conn->laddr; + t->dport = sync_conn->lport; + INIT_LIST_HEAD(&t->list); + + /* init connection */ + new->af = sync_conn->af; + new->proto = sync_conn->proto; + new->caddr = sync_conn->caddr; + new->cport = sync_conn->cport; + new->vaddr = sync_conn->vaddr; + new->vport = sync_conn->vport; + new->laddr = sync_conn->laddr; + new->lport = sync_conn->lport; + new->daddr = sync_conn->daddr; + new->dport = sync_conn->dport; + + /* L2 fast xmit */ + new->in_dev = NULL; + new->out_dev = NULL; + + /* Controll member */ + new->control = NULL; + rte_atomic32_clear(&new->n_control); + + /* caller will use it right after created, + * just like dp_vs_conn_get(). */ + rte_atomic32_set(&new->refcnt, 1); + + rte_memcpy(&new->fnat_seq, &sync_conn->fnat_seq, sizeof(struct dp_vs_seq)); + new->rs_end_seq = sync_conn->rs_end_seq; + new->rs_end_ack = sync_conn->rs_end_ack; + + new->syncid = sync_conn->syncid; + new->flags = sync_conn->flags | DPVS_CONN_F_SYNCED; + new->state = sync_conn->state; + new->qid = sync_conn->qid; + new->lcore = rte_lcore_id(); + + dev = netif_port_get_by_name(dp_vs_sync_laddr_ifname()); + if (!dev) { + RTE_LOG(ERR, IPVS, "%s: dpdk device not found\n", __func__); + return EDPVS_OK; + } + + sa_bind_conn(new->af, dev, new->lcore, &new->laddr, 0, new->qid); + + /* bind destination and corresponding trasmitter */ + err = conn_bind_dest(new, dest); + if (err != EDPVS_OK) { + RTE_LOG(WARNING, IPVS, "%s: fail to bind dest: %s\n", __func__, + dpvs_strerror(err)); + goto errout; + } + + if ((err = dp_vs_conn_hash(new)) != EDPVS_OK) + goto errout; + + /* timer */ + pp = dp_vs_proto_lookup(sync_conn->proto); + if (pp && pp->timeout_table) + new->timeout.tv_sec = pp->timeout_table[sync_conn->state]; + else + new->timeout.tv_sec = 60; + + new->timeout.tv_usec = 0; + + this_conn_count++; + + /* schedule conn timer */ + dpvs_time_rand_delay(&new->timeout, 1000000); + if (new->flags & DPVS_CONN_F_TEMPLATE) { + dpvs_timer_sched(&new->timer, &new->timeout, conn_expire, new, true); + } else { + dpvs_timer_sched(&new->timer, &new->timeout, conn_expire, new, false); + } + +#ifdef CONFIG_DPVS_IPVS_DEBUG + conn_dump("sync conn: ", new); +#endif + return new; + +errout: + rte_mempool_put(this_conn_cache, new); + return NULL; +} + +int dp_vs_conn_lcore_tx(lcoreid_t cid) +{ + int i = 0; + int cnt = 0; + struct conn_tuple_hash *tuphash; + struct dp_vs_conn *conn; + + if (!DP_VS_SYNC_FULL_IS_START(cid)) { + return EDPVS_OK; + } + + i = DP_VS_SYNC_FULL_GET_LAST_INDEX(cid); + for (; i < DPVS_CONN_TBL_SIZE && cnt <= DP_VS_SYNC_FULL_CNT_PER_TIME; i++, cnt++) { + list_for_each_entry(tuphash, &this_conn_tbl[i], list) { + if (tuphash->direct != DPVS_CONN_DIR_INBOUND) + continue; + conn = tuplehash_to_conn(tuphash); + if (DPVS_TCP_S_ESTABLISHED == conn->state) + dp_vs_sync_conn_enqueue(conn, DP_VS_SYNC_UNICAST); + } + } + + DP_VS_SYNC_FULL_SET_LAST_INDEX(cid, i); + + if (i >= DPVS_CONN_TBL_SIZE) + dp_vs_sync_full_end(cid); + + return EDPVS_OK; +} + struct dp_vs_conn *dp_vs_conn_new(struct rte_mbuf *mbuf, const struct dp_vs_iphdr *iph, struct dp_vs_conn_param *param, @@ -762,6 +930,10 @@ struct dp_vs_conn *dp_vs_conn_new(struct rte_mbuf *mbuf, rte_atomic32_set(&new->refcnt, 1); new->flags = flags; new->state = 0; +#ifdef CONFIG_DPVS_SYNC + new->lcore = rte_lcore_id(); + new->qid = mbuf->hash.usr; +#endif #ifdef CONFIG_DPVS_IPVS_STATS_DEBUG new->ctime = rte_rdtsc(); #endif @@ -1182,6 +1354,9 @@ static inline void sockopt_fill_conn_entry(const struct dp_vs_conn *conn, entry->lport = conn->lport; entry->dport = conn->dport; entry->timeout = conn->timeout.tv_sec; + if (conn->flags & DPVS_CONN_F_SYNCED) { + entry->syncid = conn->syncid; + } } static int sockopt_conn_get_specified(const struct ip_vs_conn_req *conn_req, diff --git a/src/ipvs/ip_vs_core.c b/src/ipvs/ip_vs_core.c index 4b0547acd..7ba0eae2a 100644 --- a/src/ipvs/ip_vs_core.c +++ b/src/ipvs/ip_vs_core.c @@ -35,6 +35,7 @@ #include "ipvs/synproxy.h" #include "ipvs/blklst.h" #include "ipvs/proto_udp.h" +#include "ipvs/sync.h" #include "route6.h" #include "ipvs/redirect.h" @@ -1198,15 +1199,23 @@ int dp_vs_init(void) goto err_stats; } + err = dp_vs_sync_init(); + if (err != EDPVS_OK) { + RTE_LOG(ERR, IPVS, "fail to init stats: %s\n", dpvs_strerror(err)); + goto err_hooks; + } + err = inet_register_hooks(dp_vs_ops, NELEMS(dp_vs_ops)); if (err != EDPVS_OK) { RTE_LOG(ERR, IPVS, "fail to register hooks: %s\n", dpvs_strerror(err)); - goto err_hooks; + goto err_sync; } RTE_LOG(DEBUG, IPVS, "ipvs inialized.\n"); return EDPVS_OK; +err_sync: + dp_vs_sync_term(); err_hooks: dp_vs_stats_term(); err_stats: diff --git a/src/ipvs/ip_vs_dest.c b/src/ipvs/ip_vs_dest.c index 92697a7d9..afae4199f 100644 --- a/src/ipvs/ip_vs_dest.c +++ b/src/ipvs/ip_vs_dest.c @@ -47,6 +47,22 @@ struct dp_vs_dest *dp_vs_lookup_dest(int af, return NULL; } +struct dp_vs_dest *dp_vs_find_dest(int af, const union inet_addr *daddr, + uint16_t dport, const union inet_addr *vaddr, + uint16_t vport, uint16_t protocol) +{ + struct dp_vs_dest *dest; + struct dp_vs_service *svc; + svc = dp_vs_service_lookup(af, protocol, vaddr, vport, 0, NULL, NULL, NULL); + if(!svc) + return NULL; + dest = dp_vs_lookup_dest(af, svc, daddr, dport); + if(dest) + rte_atomic32_inc(&dest->refcnt); + dp_vs_service_put(svc); + return dest; +} + /* * Lookup dest by {svc,addr,port} in the destination trash. * The destination trash is used to hold the destinations that are removed diff --git a/src/ipvs/ip_vs_laddr.c b/src/ipvs/ip_vs_laddr.c index 836d49f09..4d67579d1 100644 --- a/src/ipvs/ip_vs_laddr.c +++ b/src/ipvs/ip_vs_laddr.c @@ -108,6 +108,7 @@ struct dp_vs_laddr { }; static uint32_t dp_vs_laddr_max_trails = 16; +static uint64_t lcore_mask; static inline int __laddr_step(struct dp_vs_service *svc) { @@ -124,7 +125,7 @@ static inline int __laddr_step(struct dp_vs_service *svc) return 1; } -static inline struct dp_vs_laddr *__get_laddr(struct dp_vs_service *svc) +static inline struct dp_vs_laddr *__get_laddr_port_mode(struct dp_vs_service *svc) { int step; struct dp_vs_laddr *laddr = NULL; @@ -153,6 +154,45 @@ static inline struct dp_vs_laddr *__get_laddr(struct dp_vs_service *svc) return laddr; } +static inline struct dp_vs_laddr *__get_laddr_addr_mode(struct dp_vs_service *svc) +{ + struct dp_vs_laddr *laddr = NULL; + + /* if list not inited ? list_empty() returns true ! */ + assert(svc->this_pre_list.laddr_list.next); + + if (list_empty(&svc->this_pre_list.laddr_list)) { + return NULL; + } + + /* In LADDR_LCORE_MAPPING_POOL_MODE, the iteration step is different + * between laddr_list and realserver rr/wrr scheduler internally since every + * laddr is bound to a dedicated lcore. So we don't need to get a random + * laddr_list step any more. + **/ + if (unlikely(!svc->this_pre_list.laddr_curr)) + svc->this_pre_list.laddr_curr = svc->this_pre_list.laddr_list.next; + else + svc->this_pre_list.laddr_curr = svc->this_pre_list.laddr_curr->next; + + if (svc->this_pre_list.laddr_curr == &svc->this_pre_list.laddr_list) + svc->this_pre_list.laddr_curr = svc->this_pre_list.laddr_list.next; + + laddr = list_entry(svc->this_pre_list.laddr_curr, struct dp_vs_laddr, list); + rte_atomic32_inc(&laddr->refcnt); + + return laddr; +} + +static inline struct dp_vs_laddr *__get_laddr(struct dp_vs_service *svc) +{ + if (SA_POOL_MODE == LPORT_LCORE_MAPPING_POOL_MODE) { + return __get_laddr_port_mode(svc); + } else { + return __get_laddr_addr_mode(svc); + } +} + static inline void put_laddr(struct dp_vs_laddr *laddr) { /* use lock if other field need by changed */ @@ -164,8 +204,10 @@ int dp_vs_laddr_bind(struct dp_vs_conn *conn, struct dp_vs_service *svc) { struct dp_vs_laddr *laddr = NULL; int i; + int num_laddrs = 0; uint16_t sport = 0; struct sockaddr_storage dsin, ssin; + struct inet_ifaddr *ifa; if (!conn || !conn->dest || !svc) return EDPVS_INVAL; @@ -182,7 +224,11 @@ int dp_vs_laddr_bind(struct dp_vs_conn *conn, struct dp_vs_service *svc) * 2. we uses svc->num_laddrs; */ rte_rwlock_write_lock(&svc->laddr_lock); - for (i = 0; i < dp_vs_laddr_max_trails && i < svc->num_laddrs; i++) { + if (SA_POOL_MODE == LPORT_LCORE_MAPPING_POOL_MODE) + num_laddrs = svc->num_laddrs; + else + num_laddrs = svc->this_pre_list.num_laddrs; + for (i = 0; i < dp_vs_laddr_max_trails && i < num_laddrs; i++) { /* select a local IP from service */ laddr = __get_laddr(svc); if (!laddr) { @@ -191,6 +237,21 @@ int dp_vs_laddr_bind(struct dp_vs_conn *conn, struct dp_vs_service *svc) return EDPVS_RESOURCE; } + if (SA_POOL_MODE == LADDR_LCORE_MAPPING_POOL_MODE) { + ifa = inet_addr_ifa_get(conn->af, laddr->iface, &laddr->addr); + assert(ifa); + if (!ifa->this_sa_pool) { +#ifdef CONFIG_DPVS_IPVS_DEBUG + char buf[64]; + if (inet_ntop(conn->af, &laddr->addr, buf, sizeof(buf)) == NULL) + snprintf(buf, sizeof(buf), "::"); + RTE_LOG(DEBUG, IPVS, "%s: %s is not assigned on [%d], " + "try next laddr.\n",__func__, buf, rte_lcore_id()); +#endif + continue; + } + } + memset(&dsin, 0, sizeof(struct sockaddr_storage)); memset(&ssin, 0, sizeof(struct sockaddr_storage)); @@ -299,11 +360,71 @@ int dp_vs_laddr_unbind(struct dp_vs_conn *conn) return EDPVS_OK; } +static int __dp_vs_laddr_add_port_mode(struct dp_vs_service *svc, + int af, struct dp_vs_laddr *new) +{ + struct dp_vs_laddr *curr; + + rte_rwlock_write_lock(&svc->laddr_lock); + list_for_each_entry(curr, &svc->laddr_list, list) { + if (af == curr->af && inet_addr_equal(af, &curr->addr, &new->addr)) { + rte_rwlock_write_unlock(&svc->laddr_lock); + //rte_free(new); + return EDPVS_EXIST; + } + } + + list_add_tail(&new->list, &svc->laddr_list); + svc->num_laddrs++; + + rte_rwlock_write_unlock(&svc->laddr_lock); + return EDPVS_OK; +} + +static int __dp_vs_laddr_add_addr_mode(struct dp_vs_service *svc, + int af, struct dp_vs_laddr *new) +{ + struct dp_vs_laddr *curr; + struct inet_ifaddr *ifa; + int cid = 0; + + rte_rwlock_write_lock(&svc->laddr_lock); + for (cid = 0; cid < RTE_MAX_LCORE; cid++) { + list_for_each_entry(curr, &svc->pre_list[cid].laddr_list, list) { + if (af == curr->af && inet_addr_equal(af, &curr->addr, &new->addr)) { + rte_rwlock_write_unlock(&svc->laddr_lock); + //rte_free(new); + return EDPVS_EXIST; + } + } + } + + ifa = inet_addr_ifa_get(af, new->iface, &new->addr); + if (!ifa) { + rte_rwlock_write_unlock(&svc->laddr_lock); + return EDPVS_NOTEXIST; + } + + for (cid = 0; cid < RTE_MAX_LCORE; cid++) { + /* skip master and unused cores */ + if (cid > 64 || !(lcore_mask & (1L << cid))) + continue; + if (ifa->sa_pools[cid]) { + list_add_tail(&new->list, &svc->pre_list[cid].laddr_list); + svc->pre_list[cid].num_laddrs++; + } + } + + rte_rwlock_write_unlock(&svc->laddr_lock); + return EDPVS_OK; +} + int dp_vs_laddr_add(struct dp_vs_service *svc, int af, const union inet_addr *addr, const char *ifname) { - struct dp_vs_laddr *new, *curr; + struct dp_vs_laddr *new; + int err = 0; if (!svc || !addr) return EDPVS_INVAL; @@ -325,23 +446,19 @@ int dp_vs_laddr_add(struct dp_vs_service *svc, return EDPVS_NOTEXIST; } - rte_rwlock_write_lock(&svc->laddr_lock); - list_for_each_entry(curr, &svc->laddr_list, list) { - if (af == curr->af && inet_addr_equal(af, &curr->addr, &new->addr)) { - rte_rwlock_write_unlock(&svc->laddr_lock); - rte_free(new); - return EDPVS_EXIST; - } + if (SA_POOL_MODE == LPORT_LCORE_MAPPING_POOL_MODE) { + err = __dp_vs_laddr_add_port_mode(svc, af, new); + } else { + err = __dp_vs_laddr_add_addr_mode(svc, af, new); } - list_add_tail(&new->list, &svc->laddr_list); - svc->num_laddrs++; - rte_rwlock_write_unlock(&svc->laddr_lock); - - return EDPVS_OK; + if (err != EDPVS_OK) + rte_free(new); + return err; } -int dp_vs_laddr_del(struct dp_vs_service *svc, int af, const union inet_addr *addr) +static int __dp_vs_laddr_del_port_mode(struct dp_vs_service *svc, int af, + const union inet_addr *addr) { struct dp_vs_laddr *laddr, *next; int err = EDPVS_NOTEXIST; @@ -378,8 +495,65 @@ int dp_vs_laddr_del(struct dp_vs_service *svc, int af, const union inet_addr *ad return err; } -/* if success, it depend on caller to free @addrs by rte_free() */ -static int dp_vs_laddr_getall(struct dp_vs_service *svc, +static int __dp_vs_laddr_del_addr_mode(struct dp_vs_service *svc, int af, + const union inet_addr *addr) +{ + struct dp_vs_laddr *laddr, *next; + int cid = 0; + int err = EDPVS_NOTEXIST; + + if (!svc || !addr) + return EDPVS_INVAL; + + rte_rwlock_write_lock(&svc->laddr_lock); + for (cid = 0; cid < RTE_MAX_LCORE; cid++) { + /* skip master and unused cores */ + if (cid > 64 || !(lcore_mask & (1L << cid))) + continue; + list_for_each_entry_safe(laddr, next, &svc->pre_list[cid].laddr_list, list) { + if (!((af == laddr->af) && inet_addr_equal(af, &laddr->addr, addr))) + continue; + + /* found */ + if (rte_atomic32_read(&laddr->refcnt) == 0) { + /* update svc->curr_laddr */ + if (svc->pre_list[cid].laddr_curr == &laddr->list) + svc->pre_list[cid].laddr_curr = laddr->list.next; + list_del(&laddr->list); + rte_free(laddr); + svc->pre_list[cid].num_laddrs--; + err = EDPVS_OK; + } else { + /* XXX: move to trash list and implement an garbage collector, + * or just try del again ? */ + err = EDPVS_BUSY; + } + break; + } + } + + rte_rwlock_write_unlock(&svc->laddr_lock); + + if (err == EDPVS_BUSY) + RTE_LOG(DEBUG, IPVS, "%s: laddr is in use.\n", __func__); + + return err; +} + +int dp_vs_laddr_del(struct dp_vs_service *svc, int af, const union inet_addr *addr) +{ + int err = 0; + + if (SA_POOL_MODE == LPORT_LCORE_MAPPING_POOL_MODE) { + err = __dp_vs_laddr_del_port_mode(svc,af, addr); + } else { + err = __dp_vs_laddr_del_addr_mode(svc, af, addr); + } + + return err; +} + +static int __dp_vs_laddr_getall_port_mode(struct dp_vs_service *svc, struct dp_vs_laddr_entry **addrs, size_t *naddr) { struct dp_vs_laddr *laddr; @@ -416,7 +590,69 @@ static int dp_vs_laddr_getall(struct dp_vs_service *svc, return EDPVS_OK; } -int dp_vs_laddr_flush(struct dp_vs_service *svc) +static int __dp_vs_laddr_getall_addr_mode(struct dp_vs_service *svc, + struct dp_vs_laddr_entry **addrs, size_t *naddr) +{ + struct dp_vs_laddr *laddr; + int i = 0; + int cid = 0; + int num_laddrs = 0; + + if (!svc || !addrs || !naddr) + return EDPVS_INVAL; + + rte_rwlock_write_lock(&svc->laddr_lock); + + for (cid = 0; cid < RTE_MAX_LCORE; cid++) { + num_laddrs += svc->pre_list[cid].num_laddrs; + } + + if (num_laddrs > 0) { + *naddr = num_laddrs; + *addrs = rte_malloc_socket(0, sizeof(struct dp_vs_laddr_entry) * num_laddrs, + RTE_CACHE_LINE_SIZE, rte_socket_id()); + if (!(*addrs)) { + rte_rwlock_write_unlock(&svc->laddr_lock); + return EDPVS_NOMEM; + } + + for (cid = 0; cid < RTE_MAX_LCORE; cid++) { + /* skip master and unused cores */ + if (cid > 64 || !(lcore_mask & (1L << cid))) + continue; + list_for_each_entry(laddr, &svc->pre_list[cid].laddr_list, list) { + assert(i < *naddr); + (*addrs)[i].af = laddr->af; + (*addrs)[i].addr = laddr->addr; + (*addrs)[i].nconns = rte_atomic32_read(&laddr->conn_counts); + i++; + } + } + } else { + *naddr = 0; + *addrs = NULL; + } + + rte_rwlock_write_unlock(&svc->laddr_lock); + return EDPVS_OK; +} + +/* if success, it depend on caller to free @addrs by rte_free() */ +static int dp_vs_laddr_getall(struct dp_vs_service *svc, + struct dp_vs_laddr_entry **addrs, size_t *naddr) +{ + int err = EDPVS_OK; + + if (SA_POOL_MODE == LPORT_LCORE_MAPPING_POOL_MODE) { + err = __dp_vs_laddr_getall_port_mode(svc, addrs, naddr); + } else { + err = __dp_vs_laddr_getall_addr_mode(svc, addrs, naddr); + } + + return err; +} + +static int __dp_vs_laddr_flush_port_mode(struct dp_vs_service *svc) { struct dp_vs_laddr *laddr, *next; int err = EDPVS_OK; @@ -440,7 +676,55 @@ int dp_vs_laddr_flush(struct dp_vs_service *svc) err = EDPVS_BUSY; } } + rte_rwlock_write_unlock(&svc->laddr_lock); + return err; +} + +static int __dp_vs_laddr_flush_addr_mode(struct dp_vs_service *svc) +{ + struct dp_vs_laddr *laddr, *next; + int cid = 0; + int err = EDPVS_OK; + + if (!svc) + return EDPVS_INVAL; + + rte_rwlock_write_lock(&svc->laddr_lock); + for (cid = 0; cid < RTE_MAX_LCORE; cid++) { + /* skip master and unused cores */ + if (cid > 64 || !(lcore_mask & (1L << cid))) + continue; + list_for_each_entry_safe(laddr, next, &svc->pre_list[cid].laddr_list, list) { + if (rte_atomic32_read(&laddr->refcnt) == 0) { + list_del(&laddr->list); + rte_free(laddr); + svc->pre_list[cid].num_laddrs--; + } else { + char buf[64]; + + if (inet_ntop(laddr->af, &laddr->addr, buf, sizeof(buf)) == NULL) + snprintf(buf, sizeof(buf), "::"); + + RTE_LOG(DEBUG, IPVS, "%s: laddr %s is in use.\n", __func__, buf); + err = EDPVS_BUSY; + } + } + } + + rte_rwlock_write_unlock(&svc->laddr_lock); + return err; +} + +int dp_vs_laddr_flush(struct dp_vs_service *svc) +{ + int err = EDPVS_OK; + + if (SA_POOL_MODE == LPORT_LCORE_MAPPING_POOL_MODE) { + err = __dp_vs_laddr_flush_port_mode(svc); + } else { + err = __dp_vs_laddr_flush_addr_mode(svc); + } return err; } @@ -571,6 +855,8 @@ int dp_vs_laddr_init(void) if ((err = sockopt_register(&laddr_sockopts)) != EDPVS_OK) return err; + /* enabled lcore should not change after init */ + netif_get_slave_lcores(NULL, &lcore_mask); return EDPVS_OK; } diff --git a/src/ipvs/ip_vs_proto_tcp.c b/src/ipvs/ip_vs_proto_tcp.c index 27c04476c..fe333e18d 100644 --- a/src/ipvs/ip_vs_proto_tcp.c +++ b/src/ipvs/ip_vs_proto_tcp.c @@ -37,6 +37,7 @@ #include #include #include "ipvs/redirect.h" +#include "ipvs/sync.h" static int g_defence_tcp_drop = 0; @@ -923,6 +924,9 @@ static int tcp_state_trans(struct dp_vs_proto *proto, struct dp_vs_conn *conn, } } +#ifdef CONFIG_DPVS_SYNC + dp_vs_sync_conn_handler(conn, new_state); +#endif return EDPVS_OK; } diff --git a/src/ipvs/ip_vs_service.c b/src/ipvs/ip_vs_service.c index 60a8637ee..16b55089c 100644 --- a/src/ipvs/ip_vs_service.c +++ b/src/ipvs/ip_vs_service.c @@ -25,6 +25,7 @@ #include "ipvs/sched.h" #include "ipvs/laddr.h" #include "ipvs/blklst.h" +#include "ipvs/sync.h" #include "ctrl.h" #include "route.h" #include "route6.h" @@ -471,6 +472,7 @@ int dp_vs_add_service(struct dp_vs_service_conf *u, { int ret = 0; int size; + int cid = 0; struct dp_vs_scheduler *sched = NULL; struct dp_vs_service *svc = NULL; @@ -522,6 +524,11 @@ int dp_vs_add_service(struct dp_vs_service_conf *u, svc->num_laddrs = 0; svc->laddr_curr = &svc->laddr_list; + for (cid = 0; cid < RTE_MAX_LCORE; cid++) { + INIT_LIST_HEAD(&svc->pre_list[cid].laddr_list); + svc->pre_list[cid].laddr_curr = &svc->pre_list[cid].laddr_list; + svc->pre_list[cid].num_laddrs = 0; + } INIT_LIST_HEAD(&svc->dests); rte_rwlock_init(&svc->sched_lock); @@ -944,6 +951,11 @@ static int dp_vs_set_svc(sockoptid_t opt, const void *user, size_t len) if (opt == DPVS_SO_SET_FLUSH) return dp_vs_flush(); +#ifdef CONFIG_DPVS_SYNC + if (opt == DPVS_SO_SET_CONN_SYNC) + return dp_vs_sync_conn_start(); +#endif + memcpy(arg, user, len); usvc_compat = (struct dp_vs_service_user *)arg; udest_compat = (struct dp_vs_dest_user *)(usvc_compat + 1); diff --git a/src/ipvs/ip_vs_sync.c b/src/ipvs/ip_vs_sync.c new file mode 100644 index 000000000..b5a06084f --- /dev/null +++ b/src/ipvs/ip_vs_sync.c @@ -0,0 +1,1094 @@ +/* + * DPVS is a software load balancer (Virtual Server) based on DPDK. + * + * Copyright (C) 2017 iQIYI (www.iqiyi.com). + * All Rights Reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + */ +#include +#include "netif.h" +#include "netif.h" +#include "ipvs/sync_msg.h" +#include "ipvs/sync.h" +#include "ipvs/proto.h" +#include "ipvs/proto_tcp.h" +#include "parser/parser.h" + +/* the sync_buff list head */ +struct list_head dp_vs_sync_queue[DP_VS_SYNC_MAX]; + +/* current sync_buff for accepting new conn entries */ +struct dp_vs_sync_buff *dp_vs_sync_curr_buff[DP_VS_SYNC_MAX]; + +struct dp_vs_sync_core g_dp_vs_sync_fwd_core; +struct dp_vs_sync_conf g_dp_vs_sync_conf; +static uint64_t cycles_per_sec = 0; + +struct rte_ring *g_dp_vs_sync_tx_ring[DP_VS_SYNC_MAX]; +struct rte_ring *g_dp_vs_sync_rx_ring[DPVS_MAX_LCORE]; +int g_dp_vs_sync_send_fd[DP_VS_SYNC_MAX]; + +uint64_t g_start_cycles; +uint8_t g_req_timeout; +struct dpvs_timer g_req_timer; + +struct dp_vs_sync_peer g_dp_vs_sync_fetch; +struct dp_vs_sync_peer g_dp_vs_sync_request; + +static inline void dp_vs_sync_msg_dump(const char* info, int issend, + struct dp_vs_sync_head* head) +{ + RTE_LOG(INFO, SYNC, "%s(syncid:%d %s %d) %s size %d\n", + info ? info : "", + g_dp_vs_sync_conf.syncid, issend ? "->" : "<-", head->syncid, + head->type == DP_VS_SYNC_NEGO_INFO ? "nego info" : "conn sync", + head->size); +} + +static inline void dp_vs_sync_conn_dump(const char *msg, struct dp_vs_sync_conn *conn) +{ + char cbuf[64], vbuf[64], lbuf[64], dbuf[64]; + const char *caddr, *vaddr, *laddr, *daddr; + + caddr = inet_ntop(conn->af, &conn->caddr, cbuf, sizeof(cbuf)) ? cbuf : "::"; + vaddr = inet_ntop(conn->af, &conn->vaddr, vbuf, sizeof(vbuf)) ? vbuf : "::"; + laddr = inet_ntop(conn->af, &conn->laddr, lbuf, sizeof(lbuf)) ? lbuf : "::"; + daddr = inet_ntop(conn->af, &conn->daddr, dbuf, sizeof(dbuf)) ? dbuf : "::"; + + RTE_LOG(INFO, SYNC, "%s [%d] %s %s:%u %s:%u %s:%u %s:%u from lcore: %u\n", + msg ? msg : "", rte_lcore_id(), inet_proto_name(conn->proto), + caddr, ntohs(conn->cport), vaddr, ntohs(conn->vport), + laddr, ntohs(conn->lport), daddr, ntohs(conn->dport), conn->lcore); +} + +/* + * Add an dp_vs_conn into the session sync tx ring. + * */ +void dp_vs_sync_conn_enqueue(struct dp_vs_conn *cp, dp_vs_sync_type type) +{ + struct dp_vs_sync_conn *s; + int ret; + + if (!(s=rte_zmalloc(NULL, + sizeof(struct dp_vs_sync_conn), + RTE_CACHE_LINE_SIZE))) { + RTE_LOG(WARNING, SYNC, "%s: no memory for a new dp_vs_sync_conn.\n", __func__); + return; + } + + /* copy members */ + s->af = cp->af; + s->proto = cp->proto; + s->cport = cp->cport; + s->vport = cp->vport; + s->lport = cp->lport; + s->dport = cp->dport; + + s->caddr = cp->caddr; + s->vaddr = cp->vaddr; + s->laddr = cp->laddr; + s->daddr = cp->daddr; + s->qid = cp->qid; + s->lcore = cp->lcore; + s->syncid = g_dp_vs_sync_conf.syncid; + + rte_memcpy(&s->fnat_seq, &cp->fnat_seq, sizeof(struct dp_vs_seq)); + s->rs_end_seq = cp->rs_end_seq; + s->rs_end_ack = cp->rs_end_ack; + + s->flags = cp->flags & ~DPVS_CONN_F_HASHED; + s->state = cp->state; + + ret = rte_ring_enqueue(g_dp_vs_sync_tx_ring[type], s); + if (ret) { + if (-ENOBUFS == ret) + RTE_LOG(WARNING, SYNC, "%s: session %s sync tx ring quota exceeded.\n", + __func__, (DP_VS_SYNC_UNICAST== type) ? "unicat" : "mcast"); + else + RTE_LOG(WARNING, SYNC, "%s: session %s sync tx ring enqueue failed. ret = %d\n", + __func__, (DP_VS_SYNC_UNICAST == type) ? "unicast" : "mcast", ret); + rte_free(s); + } else +#ifdef CONFIG_DPVS_CONN_SYNC_DEBUG + RTE_LOG(INFO, SYNC, "%s: session %s sync tx ring enqueue successed.\n", + __func__, (DP_VS_SYNC_UNICAST == type) ? "unicat" : "mcast"); +#endif + return; +} + +static void dp_vs_sync_buff_release(struct dp_vs_sync_buff *sb) +{ + rte_free(sb->mesg); + rte_free(sb); +} + +static inline struct dp_vs_sync_buff * dp_vs_sync_buff_create(void) +{ + struct dp_vs_sync_buff *sb; + + if (!(sb = rte_zmalloc(NULL, + sizeof(struct dp_vs_sync_buff), + RTE_CACHE_LINE_SIZE))) + return NULL; + + if (!(sb->mesg = rte_zmalloc(NULL, + g_dp_vs_sync_conf.send_mesg_maxlen, + RTE_CACHE_LINE_SIZE))) { + rte_free(sb); + return NULL; + } + + sb->mesg->head.syncid = g_dp_vs_sync_conf.syncid; + sb->mesg->head.type = DP_VS_SYNC_CONN_INFO; + sb->mesg->head.size = DP_VS_SYNC_MESG_HEADER_LEN; + sb->mesg->nr_conns = 0; + sb->head = (unsigned char *)sb->mesg + DP_VS_SYNC_MESG_HEADER_LEN; + sb->end = (unsigned char *)sb->mesg + g_dp_vs_sync_conf.send_mesg_maxlen; + sb->firstuse = rte_get_timer_cycles(); + + return sb; +} + +static inline void dp_vs_sync_buff_enqueue(struct dp_vs_sync_buff *sb, + dp_vs_sync_type type) +{ + list_add_tail(&sb->list, &dp_vs_sync_queue[type]); +} + +static struct dp_vs_sync_buff * dp_vs_sync_buff_dequeue(dp_vs_sync_type type) +{ + struct dp_vs_sync_buff *sb; + + if (list_empty(&dp_vs_sync_queue[type])) { + sb = NULL; + } else { + sb = list_entry(dp_vs_sync_queue[type].next, struct dp_vs_sync_buff, list); + list_del(&sb->list); + } + + return sb; +} + +static void dp_vs_sync_conn_append_to_buff(struct dp_vs_sync_conn *sync_conn, + dp_vs_sync_type type) +{ + struct dp_vs_sync_mesg *m; + int len = sizeof(struct dp_vs_sync_conn); + +#ifdef CONFIG_DPVS_CONN_SYNC_DEBUG + dp_vs_sync_conn_dump("sync conn append to buff", sync_conn); +#endif + + if (!dp_vs_sync_curr_buff[type]) { + if (!(dp_vs_sync_curr_buff[type] = dp_vs_sync_buff_create())) { + RTE_LOG(INFO, SYNC, "%s: create sync buffer failed.\n", __func__); + return; + } + } + + m = dp_vs_sync_curr_buff[type]->mesg; + rte_memcpy(dp_vs_sync_curr_buff[type]->head, sync_conn, len); + m->nr_conns++; + m->head.size += len; + dp_vs_sync_curr_buff[type]->head += len; + + /** + * if curr_sync_buff has no space for next one + * then append it to dp_vs_sync_queue + */ + if (dp_vs_sync_curr_buff[type]->head + len > dp_vs_sync_curr_buff[type]->end) { + dp_vs_sync_buff_enqueue(dp_vs_sync_curr_buff[type], type); + dp_vs_sync_curr_buff[type] = NULL; + } + + return; +} + +/** + * get curr_sync_buff if it has been created for more + * than g_sync_buff_delay seconds + */ +static struct dp_vs_sync_buff * +dp_vs_sync_expired_curr_sync_buff(dp_vs_sync_type type) +{ + struct dp_vs_sync_buff *sb = NULL; + uint64_t time_diff = 0; + + if (NULL == dp_vs_sync_curr_buff[type]) + return NULL; + + time_diff = rte_get_timer_cycles() - dp_vs_sync_curr_buff[type]->firstuse; + if(time_diff >= g_dp_vs_sync_conf.sync_buff_delay * cycles_per_sec) { + sb = dp_vs_sync_curr_buff[type]; + dp_vs_sync_curr_buff[type] = NULL; + } + + return sb; +} + +static int dp_vs_sync_process_conn(const char *buffer, const int buflen) +{ + struct dp_vs_sync_mesg *m = (struct dp_vs_sync_mesg *)buffer; + struct dp_vs_sync_conn *s = NULL; + char *p = NULL; + int i, res; + + p = (char *)(buffer + sizeof(struct dp_vs_sync_mesg)); + + for (i = 0; i < m->nr_conns; i++) { + if (!(s = rte_zmalloc(NULL, sizeof(struct dp_vs_sync_conn), RTE_CACHE_LINE_SIZE))) { + RTE_LOG(ERR, SYNC, "%s: alloc sync conn node failed\n", __func__); + return EDPVS_NOMEM; + } + + rte_memcpy(s, p, sizeof(struct dp_vs_sync_conn)); + res = rte_ring_enqueue(g_dp_vs_sync_rx_ring[get_lcoreid(s->qid)], s); + if (res) { + if (unlikely(-EDQUOT == res)) { + RTE_LOG(WARNING, SYNC, "%s: session sync rx ring of lcore %d quota exceeded\n", + __func__, get_lcoreid(s->qid)); + } else if (res < 0) { + RTE_LOG(WARNING, SYNC, "%s: session sync rx ring of lcore %d enqueue failed\n", + __func__, get_lcoreid(s->qid)); + } + rte_free(s); + } + + p += sizeof(struct dp_vs_sync_conn); +#ifdef CONFIG_DPVS_CONN_SYNC_DEBUG + RTE_LOG(DEBUG, SYNC, "%s: current conn %d(total conn %d) from core %d queue %d enqueue rx rings[%d] %s\n", + __func__, i + 1, m->nr_conns, s->lcore, s->qid, get_lcoreid(s->qid), + res ? "failed" : "succeed"); +#endif + } + + return EDPVS_OK; +} + +static int dp_vs_sync_send_msg(int type, char* msg, int len, + struct sockaddr_in* addr) +{ + if (DP_VS_SYNC_UNICAST== type) { + return send_unicast_msg(g_dp_vs_sync_send_fd[type], + msg, len, addr); + } else { + return send_mcast_msg(g_dp_vs_sync_send_fd[type], msg, len); + } +} + +/** + * Process received multicast message and create the corresponding + * dp_vs_conn entries. + */ +static int dp_vs_sync_process_rx_msg(const char *buffer, const int buflen, + struct sockaddr_in* remote_addr) +{ + struct dp_vs_sync_head *head = (struct dp_vs_sync_head *)buffer; + +#ifdef CONFIG_DPVS_CONN_SYNC_DEBUG + dp_vs_sync_msg_dump("recv", 0, head); +#endif + + if (buflen != head->size) { + RTE_LOG(ERR, SYNC, "%s: recv conn sync message, buflen = %u, m->size = %d\n", + __func__, buflen, head->size); + return EDPVS_INVPKT; + } + + /* syncid sanity check, ignore message sent from itself */ + if (head->syncid == g_dp_vs_sync_conf.syncid) { +#ifdef CONFIG_DPVS_CONN_SYNC_DEBUG + RTE_LOG(DEBUG, SYNC, "%s: ignoring incoming msg with syncid = %d\n", + __func__, head->syncid); +#endif + return EDPVS_OK; + } + + if (DP_VS_SYNC_NEGO_INFO == head->type) { + dp_vs_sync_recv_nego(buffer, buflen, remote_addr); + } else { + dp_vs_sync_process_conn(buffer, buflen); + } + + return EDPVS_OK; +} + +static int dp_vs_sync_tx_loop(dp_vs_sync_type type) +{ + uint16_t nb_rb = 0; + uint16_t index = 0; + struct dp_vs_sync_conn *conns[NETIF_MAX_PKT_BURST]; + struct dp_vs_sync_conn *conn; + struct dp_vs_sync_buff *sb; + + nb_rb = rte_ring_dequeue_burst(g_dp_vs_sync_tx_ring[type], (void **)conns, + NETIF_MAX_PKT_BURST, NULL); + for (index = 0; index < nb_rb; index++) { + conn = conns[index]; + dp_vs_sync_conn_append_to_buff(conn, type); + rte_free(conn); + } + + while ((sb = dp_vs_sync_buff_dequeue(type))) { +#ifdef CONFIG_DPVS_CONN_SYNC_DEBUG + dp_vs_sync_msg_dump("send sync dequeue conn", 1, &(sb->mesg->head)); +#endif + dp_vs_sync_send_msg(type, (char *)sb->mesg, sb->mesg->head.size, + &g_dp_vs_sync_request.addr); + dp_vs_sync_buff_release(sb); + } + + if ((sb = dp_vs_sync_expired_curr_sync_buff(type))) { +#ifdef CONFIG_DPVS_CONN_SYNC_DEBUG + dp_vs_sync_msg_dump("send curr sync buff conn", 1, &(sb->mesg->head)); +#endif + dp_vs_sync_send_msg(type, (char *)sb->mesg, sb->mesg->head.size, + &g_dp_vs_sync_request.addr); + dp_vs_sync_buff_release(sb); + } + + return EDPVS_OK; +} + +static int dp_vs_session_sync_tx_loop(void) +{ + int index = 0; + if (!g_dp_vs_sync_conf.sync_enable) + return EDPVS_OK; + + if (!cycles_per_sec) + cycles_per_sec = rte_get_timer_hz(); + + g_dp_vs_sync_send_fd[DP_VS_SYNC_MCAST] = create_mcast_send_sock(); + if (g_dp_vs_sync_send_fd[DP_VS_SYNC_MCAST] == -1) { + RTE_LOG(ERR, SYNC, "%s: failed to create mcast send sock.\n", __func__); + return -1; + } + + g_dp_vs_sync_send_fd[DP_VS_SYNC_UNICAST] = create_send_unicast_sock(); + if (g_dp_vs_sync_send_fd[DP_VS_SYNC_UNICAST] == -1) { + RTE_LOG(ERR, SYNC, "%s: failed to create unicast send sock.\n", __func__); + return -1; + } + + for(;;) { + for (index = 0; index < DP_VS_SYNC_MAX; index++) { + dp_vs_sync_tx_loop(index); + } + } + + return 0; +} + +static int dp_vs_sync_unicast_rx_loop(int sockfd, char *buffer, const size_t buflen) +{ + int len = 0; + struct sockaddr_in remote_addr; + memset(buffer, 0, buflen); + + len = receive_unicast_msg(sockfd, buffer, buflen, &remote_addr); + if (len <= 0) { + return EDPVS_OK; + } + dp_vs_sync_process_rx_msg(buffer, len, &remote_addr); + + return EDPVS_OK; +} + +static int dp_vs_sync_mcast_rx_loop(int sockfd, char *buffer, const size_t buflen) +{ + int len = 0; + struct sockaddr_in remote_addr; + memset(buffer, 0, buflen); + + len = receive_mcast_msg(sockfd, buffer, buflen, &remote_addr); + if (len <= 0) { + return EDPVS_OK; + } + dp_vs_sync_process_rx_msg(buffer, len, &remote_addr); + + return EDPVS_OK; +} + +static int dp_vs_session_sync_rx_loop(void) +{ + int res = 0; + int unicast_fd = 0; + int mcast_fd = 0; + char* buff = NULL; + fd_set fdsr; + int maxsock; + struct timeval tv; + int ret = 0; + + if (!g_dp_vs_sync_conf.sync_enable) + return EDPVS_OK; + + mcast_fd = create_mcast_receive_sock(); + if (mcast_fd == -1) { + RTE_LOG(ERR, SYNC, "%s: failed to create receive sock.\n", __func__); + return -1; + } + + res = add_mcast_group(mcast_fd); + if (res < 0) { + RTE_LOG(ERR, SYNC, "%s: failed to add multicast group.\n", __func__); + return -1; + } + + unicast_fd = create_receive_unicast_sock(); + if (!(buff = rte_zmalloc(NULL, + g_dp_vs_sync_conf.recv_mesg_maxlen, + RTE_CACHE_LINE_SIZE))) { + RTE_LOG(ERR, SYNC, "%s: alloc sync recv buffer failed\n", __func__); + return EDPVS_NOMEM; + } + + maxsock = MAX(unicast_fd, mcast_fd); + for (;;) { + FD_ZERO(&fdsr); + FD_SET(mcast_fd,&fdsr); + FD_SET(unicast_fd, &fdsr); + tv.tv_sec = 0; + tv.tv_usec = 0; + ret = select(maxsock +1, &fdsr, NULL, NULL, &tv); + if (ret < 0) { + RTE_LOG(ERR, SYNC, "%s: select error ret %d\n", __func__, ret); + } else if (ret == 0) { + continue; + } + + if (FD_ISSET(mcast_fd, & fdsr)) { +#ifdef CONFIG_DPVS_CONN_SYNC_DEBUG + RTE_LOG(INFO, SYNC, "%s: mcastfd ready\n", __func__); +#endif + dp_vs_sync_mcast_rx_loop(mcast_fd, buff, g_dp_vs_sync_conf.recv_mesg_maxlen); + } + + if (FD_ISSET(unicast_fd, &fdsr)) { +#ifdef CONFIG_DPVS_CONN_SYNC_DEBUG + RTE_LOG(INFO, SYNC, "%s: unicastfd ready\n", __func__); +#endif + dp_vs_sync_unicast_rx_loop(unicast_fd, buff, g_dp_vs_sync_conf.recv_mesg_maxlen); + } + } + + rte_free(buff); + return 0; +} + +int dp_vs_sync_lcore_process_rx_msg(lcoreid_t cid) +{ + uint16_t idx = 0; + uint16_t nb_rb = 0; + struct dp_vs_sync_conn *conns[CONN_SYNC_MAX_PKT_BURST]; + struct dp_vs_sync_conn *conn = NULL; + struct dp_vs_proto *pp = NULL; + struct dp_vs_conn *cp = NULL; + struct dp_vs_dest *dest = NULL; + +#ifdef CONFIG_DPVS_CONN_SYNC_DEBUG + char sbuf[64], dbuf[64]; +#endif + + if (!g_dp_vs_sync_conf.sync_enable) + return EDPVS_OK; + + nb_rb = rte_ring_dequeue_burst(g_dp_vs_sync_rx_ring[cid], (void **)conns, + CONN_SYNC_MAX_PKT_BURST, NULL); + for (idx = 0; idx < nb_rb; idx++) { + conn = conns[idx]; + cp = dp_vs_conn_get(conn->af, + conn->proto, + &conn->caddr, + &conn->vaddr, + conn->cport, + conn->vport, + NULL /*direct*/, + false); +#ifdef CONFIG_DPVS_CONN_SYNC_DEBUG + RTE_LOG(DEBUG, SYNC, "conn lookup: [%d] %s %s/%d -> %s/%d %s\n", + rte_lcore_id(), inet_proto_name(conn->proto), + inet_ntop(conn->af, &(conn->caddr), sbuf, sizeof(sbuf)) ? sbuf : "::", ntohs(conn->cport), + inet_ntop(conn->af, &(conn->vaddr), dbuf, sizeof(dbuf)) ? dbuf : "::", ntohs(conn->vport), + cp ? "already exits" : "does not exits"); +#endif + if (!cp) { + dest = dp_vs_find_dest(conn->af, + &conn->daddr, + conn->dport, + &conn->vaddr, + conn->vport, + conn->proto); + if (!dest) { +#ifdef CONFIG_DPVS_CONN_SYNC_DEBUG + RTE_LOG(DEBUG, SYNC, "dest lookup: [%d] %s %s/%d -> %s/%d %s\n", + rte_lcore_id(), inet_proto_name(conn->proto), + inet_ntop(conn->af, &(conn->vaddr), dbuf, sizeof(dbuf)) ? dbuf : "::", ntohs(conn->vport), + inet_ntop(conn->af, &(conn->daddr), sbuf, sizeof(sbuf)) ? sbuf : "::", ntohs(conn->dport), + "does not exits"); +#endif + continue; + } + cp = dp_vs_conn_copy_from_sync(conn, dest); + } else { /* connection is already exists. change the state */ +#ifdef CONFIG_DPVS_CONN_SYNC_DEBUG + RTE_LOG(DEBUG, SYNC, "conn lookup: [%d] %s %s/%d -> %s/%d state %d->%d\n", + rte_lcore_id(), inet_proto_name(conn->proto), + inet_ntop(conn->af, &(conn->caddr), sbuf, sizeof(sbuf)) ? sbuf : "::", ntohs(conn->cport), + inet_ntop(conn->af, &(conn->vaddr), dbuf, sizeof(dbuf)) ? dbuf : "::", ntohs(conn->vport), + cp->state, conn->state); +#endif + cp->state = conn->state; + pp = dp_vs_proto_lookup(cp->proto); + if (pp && pp->timeout_table) + cp->timeout.tv_sec = pp->timeout_table[cp->state]; + else + cp->timeout.tv_sec = 60; + + cp->timeout.tv_usec = 0; + if (cp->flags & DPVS_CONN_F_TEMPLATE) + dpvs_timer_update(&cp->timer, &cp->timeout, true); + else + dpvs_timer_update(&cp->timer, &cp->timeout, false); + } + + dp_vs_conn_put_no_reset(cp); + rte_free(conn); + } + + return EDPVS_OK; +} + +static int dp_vs_sync_conn_expire(void *priv) +{ + struct dp_vs_conn *conn = priv; + assert(conn); + +#ifdef CONFIG_DPVS_SYNC_DEBUG + char sbuf[64], dbuf[64]; + dp_vs_sync_conn_dump("sync conn expire", conn); +#endif + + if (conn->flags & DPVS_CONN_F_TEMPLATE) { + dpvs_timer_cancel(&conn->conn_sync_timer, true); + } else { + dpvs_timer_cancel(&conn->conn_sync_timer, false); + } + + dp_vs_sync_conn_enqueue(conn, DP_VS_SYNC_MCAST); + return DTIMER_STOP; +} + +int dp_vs_sync_conn_handler(struct dp_vs_conn *conn, int new_state) +{ + bool global = false; + if (!g_dp_vs_sync_conf.sync_enable) + return EDPVS_OK; + + if (conn->flags & DPVS_CONN_F_TEMPLATE) { + global = true; + } + + dpvs_timer_cancel(&conn->conn_sync_timer, global); + + if (new_state == DPVS_TCP_S_ESTABLISHED) { + conn->conn_sync_timeout.tv_sec = g_dp_vs_sync_conf.sync_conn_elapse; + conn->conn_sync_timeout.tv_usec = 0; + dpvs_time_rand_delay(&conn->conn_sync_timeout, 1000000); + if (conn->flags & DPVS_CONN_F_TEMPLATE) { + dpvs_timer_sched(&conn->conn_sync_timer, &conn->conn_sync_timeout, + dp_vs_sync_conn_expire, conn, true); + } else { + dpvs_timer_sched(&conn->conn_sync_timer, &conn->conn_sync_timeout, + dp_vs_sync_conn_expire, conn, false); + } + } else if ( new_state == DPVS_TCP_S_FIN_WAIT || + new_state == DPVS_TCP_S_TIME_WAIT || + new_state == DPVS_TCP_S_CLOSE || + new_state == DPVS_TCP_S_CLOSE_WAIT || + new_state == DPVS_TCP_S_LAST_ACK) { + dp_vs_sync_conn_enqueue(conn, DP_VS_SYNC_MCAST); + } + + return EDPVS_OK; +} + +int dp_vs_sync_set_rx_core(lcoreid_t cid) +{ + g_dp_vs_sync_conf.sync_rx_lcore = cid; + RTE_LOG(INFO, SYNC, "%s: conn sync receive core id %d.\n", + __func__, g_dp_vs_sync_conf.sync_rx_lcore); + return EDPVS_OK; +} + +int dp_vs_sync_set_tx_core(lcoreid_t cid) +{ + g_dp_vs_sync_conf.sync_tx_lcore = cid; + RTE_LOG(INFO, SYNC, "%s: conn sync send core id %d.\n", + __func__, g_dp_vs_sync_conf.sync_tx_lcore); + return EDPVS_OK; +} + +static int dp_vs_sync_head_init(struct dp_vs_sync_nego* info) +{ + int len = 0; + len = sizeof(struct dp_vs_sync_nego); + memset(info, 0, len); + + info->head.syncid = g_dp_vs_sync_conf.syncid; + info->head.type = DP_VS_SYNC_NEGO_INFO; + info->head.size = len; + + return EDPVS_OK; +} + +static char* dp_vs_sync_code2str(int code) +{ + switch (code) { + case DP_VS_SYNC_INFO_REPLY_CODE: + return "reply"; + case DP_VS_SYNC_INFO_PROBE_CODE: + return "probe"; + case DP_VS_SYNC_INFO_FETCH_CODE: + return "fetch"; + case DP_VS_SYNC_INFO_DONE_CODE: + return "done"; + default: + return "unknown"; + } +} + +static int dp_vs_sync_send_nego(uint8_t peer_syncid, int code, + int type, struct sockaddr_in* remote_addr) +{ + struct dp_vs_sync_nego full_req; + int len = 0; + + len = sizeof(struct dp_vs_sync_nego); + dp_vs_sync_head_init(&full_req); + full_req.code = code; + full_req.peer_syncid = peer_syncid; + full_req.uptime = rte_get_timer_cycles() - g_start_cycles; + + RTE_LOG(INFO, SYNC, "(syncid:%d -> %d)send sync %s" + " uptime %ld remote addr %s\n", + full_req.head.syncid, full_req.peer_syncid, + dp_vs_sync_code2str(full_req.code), + full_req.uptime, + remote_addr ? inet_ntoa(remote_addr->sin_addr) : "null"); + + dp_vs_sync_send_msg(type, (char *)&full_req, len, remote_addr); + + return EDPVS_OK; +} + +static int dp_vs_sync_send_fetch_code(void* arg) +{ + dpvs_timer_cancel(&g_req_timer, true); + g_req_timeout = 1; + dp_vs_sync_send_nego(g_dp_vs_sync_fetch.syncid, + DP_VS_SYNC_INFO_FETCH_CODE, DP_VS_SYNC_UNICAST, + &g_dp_vs_sync_fetch.addr); + + return DTIMER_STOP; +} + +static void dp_vs_sync_full_is_all_end(void) +{ + int cid = 0; + + for (cid = 0; cid < DPVS_MAX_LCORE; cid++) { + if ((cid == rte_get_master_lcore()) || !is_lcore_id_valid(cid)) + continue; + + if (!DP_VS_SYNC_FULL_IS_END(cid)) + break; + } + + if (cid >= DPVS_MAX_LCORE) { +#ifdef CONFIG_DPVS_SYNC_DEBUG + RTE_LOG(INFO, SYNC, "(syncid:%d -> %d) full sync is complete.\n", + g_dp_vs_sync_conf.syncid, g_dp_vs_sync_request.syncid); +#endif + dp_vs_sync_send_nego(g_dp_vs_sync_request.syncid, + DP_VS_SYNC_INFO_DONE_CODE, DP_VS_SYNC_UNICAST, + &g_dp_vs_sync_request.addr); + } + + return; +} + +int dp_vs_sync_full_start(lcoreid_t cid) +{ + g_dp_vs_sync_fwd_core.fwd_core[cid].start = true; + g_dp_vs_sync_fwd_core.fwd_core[cid].end = false; + g_dp_vs_sync_fwd_core.fwd_core[cid].last_index = 0; + return EDPVS_OK; +} + +int dp_vs_sync_full_end(lcoreid_t cid) +{ + g_dp_vs_sync_fwd_core.fwd_core[cid].end = true; + g_dp_vs_sync_fwd_core.fwd_core[cid].start = false; + + dp_vs_sync_full_is_all_end(); + return EDPVS_OK; +} + +static int dp_vs_sync_recv_probe_code(struct dp_vs_sync_nego* req, + struct sockaddr_in* remote_addr) +{ + return dp_vs_sync_send_nego(req->head.syncid, DP_VS_SYNC_INFO_REPLY_CODE, + DP_VS_SYNC_UNICAST, remote_addr); +} + +static int dp_vs_sync_recv_reply_code(struct dp_vs_sync_nego* req, + struct sockaddr_in* remote_addr) +{ + if (req->peer_syncid != g_dp_vs_sync_conf.syncid || g_req_timeout) + return EDPVS_OK; + + if (req->uptime > g_dp_vs_sync_fetch.uptime) { + g_dp_vs_sync_fetch.uptime = req->uptime; + g_dp_vs_sync_fetch.syncid = req->head.syncid; + memcpy(&g_dp_vs_sync_fetch.addr, remote_addr, + sizeof(g_dp_vs_sync_fetch.addr)); + } + + return EDPVS_OK; +} + +static int dp_vs_sync_recv_fetch_code(struct dp_vs_sync_nego* req, + struct sockaddr_in* remote_addr) +{ + int cid = 0; + + if (req->peer_syncid != g_dp_vs_sync_conf.syncid) + return EDPVS_OK; + + for (cid = 0; cid < DPVS_MAX_LCORE; cid++) { + if ((cid == rte_get_master_lcore()) || !is_lcore_id_valid(cid)) + continue; + dp_vs_sync_full_start(cid); + } + + g_dp_vs_sync_request.syncid = req->head.syncid; + memcpy(&g_dp_vs_sync_request.addr.sin_addr, + &remote_addr->sin_addr, sizeof(g_dp_vs_sync_request.addr.sin_addr)); + + return EDPVS_OK; +} + +static int dp_vs_sync_recv_done_code(struct dp_vs_sync_nego* req, + struct sockaddr_in* remote_addr) +{ + if (req->peer_syncid != g_dp_vs_sync_conf.syncid) + return EDPVS_OK; + + RTE_LOG(INFO, SYNC, "%s:(syncid %d <- %d) full sync is complete.\n", + __func__, g_dp_vs_sync_conf.syncid, req->head.syncid); + + return EDPVS_OK; +} + + +int dp_vs_sync_recv_nego(const char * buf, int len, + struct sockaddr_in* remote_addr) +{ + struct dp_vs_sync_nego* req = (struct dp_vs_sync_nego*)buf; + + if (len != sizeof(struct dp_vs_sync_nego)) { + RTE_LOG(ERR, SYNC, "%s: recv request sync message len error" + "(actual length = %d, expected length = %d)\n", + __func__, len, sizeof(struct dp_vs_sync_nego)); + return EDPVS_INVPKT; + } + + RTE_LOG(INFO, SYNC, "(syncid:%d <- %d)recv sync %s" + " uptime %ld remote addr %s\n", + g_dp_vs_sync_conf.syncid, req->head.syncid, + dp_vs_sync_code2str(req->code), req->uptime, + inet_ntoa(remote_addr->sin_addr)); + + switch (req->code) { + case DP_VS_SYNC_INFO_REPLY_CODE: + dp_vs_sync_recv_reply_code(req, remote_addr); + break; + case DP_VS_SYNC_INFO_PROBE_CODE: + dp_vs_sync_recv_probe_code(req, remote_addr); + break; + case DP_VS_SYNC_INFO_FETCH_CODE: + dp_vs_sync_recv_fetch_code(req, remote_addr); + break; + case DP_VS_SYNC_INFO_DONE_CODE: + dp_vs_sync_recv_done_code(req, remote_addr); + break; + default: + RTE_LOG(ERR, SYNC, "(syncid:%d <- %d)recv sync code %d" + " uptime %ld remote addr %s\n", + g_dp_vs_sync_conf.syncid, req->head.syncid, + req->code, req->uptime, + inet_ntoa(remote_addr->sin_addr)); + return EDPVS_INVPKT; + } + + return EDPVS_OK; +} + +int dp_vs_sync_conn_start(void) +{ + struct timeval tv; + static int start_full_sync = 0; + + if (start_full_sync) { + RTE_LOG(DEBUG, SYNC, "%s:(syncid:%d) already start conn sync.\n", + __func__, g_dp_vs_sync_conf.syncid); + return 0; + } + + start_full_sync = 1; + dp_vs_sync_send_nego(0, DP_VS_SYNC_INFO_PROBE_CODE, + DP_VS_SYNC_MCAST, NULL); + tv.tv_sec = 2; + tv.tv_usec = 0; + dpvs_timer_sched(&g_req_timer, &tv, + dp_vs_sync_send_fetch_code, NULL, true); + +#ifdef CONFIG_DPVS_SYNC_DEBUG + RTE_LOG(INFO, SYNC, "%s:(syncid:%d) start conn sync.\n", + __func__, g_dp_vs_sync_conf.syncid); +#endif + return EDPVS_OK; +} + +void dp_vs_sync_run_loop(lcoreid_t cid) +{ + if (cid == g_dp_vs_sync_conf.sync_rx_lcore) { + dp_vs_session_sync_rx_loop(); + } else if (cid == g_dp_vs_sync_conf.sync_tx_lcore) { + dp_vs_session_sync_tx_loop(); + } +} + +char* dp_vs_sync_laddr_ifname(void) +{ + return g_dp_vs_sync_conf.laddr_ifname; +} + +static int dp_vs_set_sync_mesg_maxlen(void) +{ + int num = 0; + int mtu = 0; + + mtu = get_sock_mtu(); + + num = (mtu - sizeof(struct ipv4_hdr) - sizeof(struct udp_hdr) - + DP_VS_SYNC_MESG_HEADER_LEN - 20) / DP_VS_SYNC_CONN_SIZE; + + g_dp_vs_sync_conf.send_mesg_maxlen = + DP_VS_SYNC_MESG_HEADER_LEN + DP_VS_SYNC_CONN_SIZE * num; + RTE_LOG(INFO, SYNC, "%s: send_mesg_maxlen is %d.\n", __func__, + g_dp_vs_sync_conf.send_mesg_maxlen); + + g_dp_vs_sync_conf.recv_mesg_maxlen = mtu - sizeof(struct ipv4_hdr) - sizeof(struct udp_hdr); + RTE_LOG(INFO, SYNC, "%s: recv_mesg_maxlen is %d.\n", __func__, + g_dp_vs_sync_conf.recv_mesg_maxlen); + + return 0; +} + +static int dp_vs_sync_conf_init(void) +{ + if (!g_dp_vs_sync_conf.sync_buff_delay) { + g_dp_vs_sync_conf.sync_buff_delay = DP_VS_SYNC_DELAY_SECONDS; + + RTE_LOG(INFO, SYNC, "%s: sync curr buffer delay time is %d.\n", __func__, + g_dp_vs_sync_conf.sync_buff_delay); + } + + if (!g_dp_vs_sync_conf.sync_conn_elapse) { + g_dp_vs_sync_conf.sync_conn_elapse = DP_VS_SYNC_DELAY_SECONDS; + + RTE_LOG(INFO, SYNC, "%s: sync conn delay time is %d.\n", __func__, + g_dp_vs_sync_conf.sync_conn_elapse); + } + + dp_vs_set_sync_mesg_maxlen(); + return EDPVS_OK; +} + +int dp_vs_sync_init(void) +{ + char ring_name[128]; + uint8_t cid; + int index = 0; + + for (cid = 0; cid < DPVS_MAX_LCORE; cid++) { + snprintf(ring_name, sizeof(ring_name), "session_sync_rx_ring_%d", cid); + g_dp_vs_sync_rx_ring[cid] = rte_ring_create(ring_name, CONN_SYNC_RING_SIZE, + rte_socket_id(), RING_F_SP_ENQ); + if (unlikely(!g_dp_vs_sync_rx_ring[cid])) { + RTE_LOG(ERR, SYNC, "%s: Failed to create rx ring on lcore %d.\n", + __func__, cid); + return EDPVS_DPDKAPIFAIL; + } + } + + for (index = 0; index < DP_VS_SYNC_MAX; index++) { + INIT_LIST_HEAD(&dp_vs_sync_queue[index]); + dp_vs_sync_curr_buff[index] = NULL; + + snprintf(ring_name, sizeof(ring_name), "session_sync_tx_ring_%d", index); + g_dp_vs_sync_tx_ring[index] = rte_ring_create(ring_name, CONN_SYNC_RING_SIZE, + rte_socket_id(), RING_F_SC_DEQ); + if (unlikely(!g_dp_vs_sync_tx_ring[index])) { + RTE_LOG(ERR, SYNC, "[%s] Failed to create tx ring.\n", __func__); + return EDPVS_DPDKAPIFAIL; + } + } + + g_start_cycles = rte_get_timer_cycles(); + + dp_vs_sync_conf_init(); + + return EDPVS_OK; +} + +int dp_vs_sync_term(void) +{ + lcoreid_t cid = 0; + int index = 0; + + for (cid = 0; cid < DPVS_MAX_LCORE; cid++) { + rte_ring_free(g_dp_vs_sync_rx_ring[cid]); + } + + for (index = 0; index < DP_VS_SYNC_MAX; index++) { + rte_ring_free(g_dp_vs_sync_tx_ring[index]); + } + + return EDPVS_OK; +} + +static void dp_vs_sync_enable_handler(vector_t tokens) +{ + g_dp_vs_sync_conf.sync_enable = 1; + RTE_LOG(INFO, SYNC, "%s: g_sync_sesion_enable ON.\n", __func__); +} + +static void dp_vs_sync_elapse_handler(vector_t tokens) +{ + char *str = set_value(tokens); + int delay = 0; + assert(str); + + delay = atoi(str); + if (delay > 30 || delay < 0) { + RTE_LOG(WARNING, SYNC, "invalid sync_conn_elapse %s, using default %d\n", + str, DP_VS_SYNC_DELAY_SECONDS); + delay = DP_VS_SYNC_DELAY_SECONDS; + } else { + RTE_LOG(INFO, SYNC, "sync_conn_elapse = %d\n", delay); + } + + g_dp_vs_sync_conf.sync_conn_elapse = delay; + + FREE_PTR(str); +} + +static void dp_vs_sync_buff_delay_handler(vector_t tokens) +{ + char *str = set_value(tokens); + int delay = 0; + assert(str); + + delay = atoi(str); + if (delay > 30 || delay < 0) { + RTE_LOG(WARNING, SYNC, "invalid sync_buff_delay %s, using default %d\n", + str, DP_VS_SYNC_DELAY_SECONDS); + delay = DP_VS_SYNC_DELAY_SECONDS; + } else { + RTE_LOG(INFO, SYNC, "sync_buff_delay = %d\n", delay); + } + + g_dp_vs_sync_conf.sync_buff_delay = delay; + + FREE_PTR(str); +} + +static void dp_vs_sync_conn_count_handler(vector_t tokens) +{ + char *str = set_value(tokens); + int count = 0; + assert(str); + + count = atoi(str); + if (count < 0) { + RTE_LOG(WARNING, SYNC, "invalid count of sync per time %s, using default %d\n", + str, DP_VS_SYNC_CONN_CNT_PER_TIME); + count = DP_VS_SYNC_CONN_CNT_PER_TIME; + } else { + RTE_LOG(INFO, SYNC, "sync_per_time_cnt = %d\n", count); + } + + g_dp_vs_sync_conf.sync_per_time_cnt = count; + + FREE_PTR(str); +} + +static void dp_vs_sync_laddr_ifname_handler(vector_t tokens) +{ + char *str = set_value(tokens); + assert(str); + + rte_memcpy(g_dp_vs_sync_conf.laddr_ifname, str, strlen(str)); + RTE_LOG(INFO, SYNC, "%s: laddr_ifname is %s\n", + __func__, g_dp_vs_sync_conf.laddr_ifname); + + FREE_PTR(str); +} + +static void dp_vs_sync_syncid_handler(vector_t tokens) +{ + char *str = set_value(tokens); + int id = 0; + assert(str); + + id = atoi(str); + if (id > 65534 || id < 1) { + RTE_LOG(WARNING, SYNC, "invalid dp_vs_syncid %s, using default %d\n", + str, 1); + g_dp_vs_sync_conf.syncid = 1; + } else { + RTE_LOG(INFO, SYNC, "dp_vs_syncid = %d\n", id); + g_dp_vs_sync_conf.syncid = id; + } + + FREE_PTR(str); +} + +void install_session_sync_keywords(void) +{ + install_keyword_root("session_sync", NULL); + install_keyword("sync_session_enable", dp_vs_sync_enable_handler, KW_TYPE_INIT); + install_keyword("sync_session_elapse", dp_vs_sync_elapse_handler, KW_TYPE_INIT); + install_keyword("sync_buff_delay", dp_vs_sync_buff_delay_handler, KW_TYPE_INIT); + install_keyword("sync_conn_count", dp_vs_sync_conn_count_handler, KW_TYPE_INIT); + install_keyword("laddr_ifname", dp_vs_sync_laddr_ifname_handler, KW_TYPE_INIT); + install_keyword("sync_id", dp_vs_sync_syncid_handler, KW_TYPE_INIT); + install_keyword("socket", NULL, KW_TYPE_NORMAL); + install_sublevel(); + install_session_sync_sock_keywords(); + install_sublevel_end(); +} diff --git a/src/ipvs/ip_vs_sync_msg.c b/src/ipvs/ip_vs_sync_msg.c new file mode 100644 index 000000000..1b835b7f7 --- /dev/null +++ b/src/ipvs/ip_vs_sync_msg.c @@ -0,0 +1,337 @@ +#include "ipvs/sync_msg.h" +#include "parser/parser.h" + +#define DEFAULT_MCAST_ADDR "224.0.1.100" +#define DEFAULT_MCAST_PORT 8088 +#define DEFAULT_MCAST_TTL 20 +#define DEFAULT_SOCK_MTU 1500 + +static uint32_t sock_mtu = DEFAULT_SOCK_MTU; +static uint32_t mcast_ttl = DEFAULT_MCAST_TTL; +static uint16_t mcast_port = DEFAULT_MCAST_PORT; +char mcast_addr_str[16]; +static struct sockaddr_in mcast_addr; + +#define DEFAULT_UNICAST_PORT 8089 +static uint16_t unicast_port = DEFAULT_UNICAST_PORT; + +/* Set up sending multicast socket over UDP */ +int create_mcast_send_sock(void) +{ + /* Create a socket */ + int fd = socket(AF_INET, SOCK_DGRAM, 0); + if (fd == -1) { + RTE_LOG(ERR, MCAST, "%s: failed to create socket.\n", __func__); + return fd; + } + /* Add mcast info */ + memset(&mcast_addr, 0, sizeof(mcast_addr)); + mcast_addr.sin_family = AF_INET; + mcast_addr.sin_addr.s_addr = inet_addr(mcast_addr_str); + mcast_addr.sin_port = htons(mcast_port); + + /* Set multicast ttl */ + u_char ttl = mcast_ttl; + setsockopt(fd, IPPROTO_IP, IP_MULTICAST_TTL, (void *)&ttl, sizeof(ttl)); + + return fd; +} + +int create_mcast_receive_sock(void) +{ + struct sockaddr_in local_addr; + + /* Create a socket */ + int yes = 1; + int fd = socket(AF_INET, SOCK_DGRAM, 0); + if (fd == -1) { + RTE_LOG(ERR, MCAST, "%s: failed to create socket.\n", __func__); + return fd; + } + + /* Set reused*/ + if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0) { + RTE_LOG(ERR, MCAST, "%s: failed to set SO_REUSEADDR.\n", __func__); + goto error; + } + + memset(&local_addr, 0, sizeof(local_addr)); + local_addr.sin_family = AF_INET; + local_addr.sin_addr.s_addr = htonl(INADDR_ANY); + local_addr.sin_port = htons(DEFAULT_MCAST_PORT); + + /* Set loop */ + if (setsockopt(fd, IPPROTO_IP, IP_MULTICAST_LOOP, &yes, sizeof(yes)) < 0) { + RTE_LOG(ERR, MCAST, "%s: failed to set IP_MULTICAST_LOOP.\n", __func__); + goto error; + } + + /* Bind local addr */ + if (bind(fd, (struct sockaddr *)&local_addr, sizeof(local_addr)) < 0) { + RTE_LOG(ERR, MCAST, "%s: failed to bind local_addr.\n", __func__); + goto error; + } + + return fd; + +error: + close(fd); + return -1; +} + +static void fill_in_mreq(struct ip_mreq *mreq) +{ + //memset(mreq, 0, sizeof(struct ip_mreq)); + mreq->imr_multiaddr.s_addr = inet_addr(mcast_addr_str); + mreq->imr_interface.s_addr = htonl(INADDR_ANY); +} + +int add_mcast_group(int sockfd) +{ + struct ip_mreq mreq; + int err; + fill_in_mreq(&mreq); + err = setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); + if (err < 0) { + RTE_LOG(ERR, MCAST, "%s: failed to add multicast group.\n", __func__); + close(sockfd); + } + return err; +} + +int drop_mcast_group(int sockfd) +{ + struct ip_mreq mreq; + int err; + fill_in_mreq(&mreq); + err = setsockopt(sockfd, IPPROTO_IP, IP_DROP_MEMBERSHIP, &mreq, sizeof(mreq)); + if (err < 0) { + RTE_LOG(ERR, MCAST, "%s: failed to drop mcast group.\n", __func__); + } + close(sockfd); + return err; +} + +int send_mcast_msg(int sockfd, char *buffer, int len) +{ + int res = sendto(sockfd, buffer, len, 0, + (struct sockaddr*)&mcast_addr, + sizeof(mcast_addr)); + if (res < 0 ) + RTE_LOG(ERR, MCAST, "%s: mcast send to %s failed.\n", + __func__, inet_ntoa(mcast_addr.sin_addr)); +#ifdef CONFIG_DPVS_CONN_SYNC_DEBUG + else + RTE_LOG(DEBUG, MCAST, "%s: send %d bytes msg to %s success.\n", + __func__, res, inet_ntoa(mcast_addr.sin_addr)); +#endif + return res; +} + +int receive_mcast_msg(int sockfd, char *buffer, const size_t buflen, + struct sockaddr_in* remote_addr) +{ + /* receive a multicast packet */ + struct sockaddr_in addr; + socklen_t l = sizeof(addr); + int len = recvfrom(sockfd, buffer, buflen, 0, (struct sockaddr *)&addr, &l); + if (len == -1) { + RTE_LOG(ERR, MCAST, "%s: failed to recv msg.\n", __func__); + } else { +#ifdef CONFIG_DPVS_CONN_SYNC_DEBUG + RTE_LOG(DEBUG, MCAST, "%s: recv %d msg from %s .\n", __func__, + len, inet_ntoa(addr.sin_addr)); +#endif + if (remote_addr != NULL) + memcpy(remote_addr, &addr, l); + } + return len; +} + +static void mcast_addr_str_handler(vector_t tokens) +{ + char *str = set_value(tokens); + assert(str); + + rte_memcpy(mcast_addr_str, str, strlen(str)); + RTE_LOG(INFO, MCAST, "%s: mcast_addr = %s\n", __func__, + mcast_addr_str); + + FREE_PTR(str); +} + +static void mcast_port_handler(vector_t tokens) +{ + char *str = set_value(tokens); + uint16_t port; + + assert(str); + port = atoi(str); + + if (port > 65535 || port < 1024) { + RTE_LOG(WARNING, MCAST, "invalid mcast_port %s, using default %d\n", + str, DEFAULT_MCAST_PORT); + } else { + RTE_LOG(INFO, MCAST, "%s: mcast_port = %d\n", __func__, port); + mcast_port = port; + } + + FREE_PTR(str); +} + +static void mcast_ttl_handler(vector_t tokens) +{ + char *str = set_value(tokens); + uint32_t ttl; + + assert(str); + ttl = atoi(str); + + if (ttl > 255 || ttl < 0) { + RTE_LOG(WARNING, MCAST, "invalid mcast_ttl %s, using default %d\n", + str, DEFAULT_MCAST_TTL); + mcast_ttl = DEFAULT_MCAST_TTL; + } else { + RTE_LOG(INFO, MCAST, "%s: mcast_ttl = %d\n", __func__, ttl); + mcast_ttl = ttl; + } + + FREE_PTR(str); +} + +static void install_session_sync_mcast_keywords(void) +{ + install_keyword("mcast_addr", mcast_addr_str_handler, KW_TYPE_NORMAL); + install_keyword("mcast_port", mcast_port_handler, KW_TYPE_NORMAL); + install_keyword("mcast_ttl", mcast_ttl_handler, KW_TYPE_NORMAL); + //install_keyword("mcast_mtu", mcast_mtu_handler, KW_TYPE_NORMAL); +} + +int create_receive_unicast_sock(void) +{ + struct sockaddr_in sin; + int fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if (fd == -1) { + return -1; + } + + memset(&sin, 0, sizeof(sin)); + sin.sin_family = AF_INET; + sin.sin_addr.s_addr = htonl(INADDR_ANY); + sin.sin_port = htons(unicast_port); + if (bind(fd, (struct sockaddr *)&sin, sizeof(sin)) == -1) + goto eexit; + return fd; + +eexit: + close(fd); + return -1; +} + +int create_send_unicast_sock(void) +{ + int fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if (fd == -1) { + return -1; + } + + return fd; +} + +int send_unicast_msg(int sockfd, char *buffer, int len, + struct sockaddr_in* remote_addr) +{ + struct sockaddr_in addr; + + addr.sin_family = AF_INET; + addr.sin_port = htons(unicast_port); + memcpy(&addr.sin_addr, &remote_addr->sin_addr, sizeof(addr.sin_addr)); + + int res = sendto(sockfd, buffer, len, 0, + (struct sockaddr*)&addr, + sizeof(addr)); + if (res < 0 ) + RTE_LOG(ERR, UNICAST, "%s: unicast send to %s failed.\n", + __func__, inet_ntoa(remote_addr->sin_addr)); +#ifdef CONFIG_DPVS_CONN_SYNC_DEBUG + else + RTE_LOG(DEBUG, UNICAST, "%s: unicast send %d bytes msg to %s success.\n", + __func__, res, inet_ntoa(remote_addr->sin_addr)); +#endif + return res; +} + +int receive_unicast_msg(int sockfd, char *buffer, const size_t buflen, + struct sockaddr_in* remote_addr) +{ + struct sockaddr_in addr; + socklen_t l = sizeof(addr); + int len = recvfrom(sockfd, buffer, buflen, 0, (struct sockaddr *)&addr, &l); + if (len == -1) { + RTE_LOG(ERR, UNICAST, "%s: unicast failed to recv msg.\n", __func__); + } else { +#ifdef CONFIG_DPVS_CONN_SYNC_DEBUG + RTE_LOG(INFO, UNICAST, "%s: unicast recv %d msg from %s .\n", __func__, + len, inet_ntoa(addr.sin_addr)); +#endif + if (remote_addr != NULL) + memcpy(remote_addr, &addr, l); + } + return len; +} + +static void unicast_port_handler(vector_t tokens) +{ + char *str = set_value(tokens); + uint16_t port; + + assert(str); + port = atoi(str); + + if (port > 65535 || port < 1024) { + RTE_LOG(WARNING, UNICAST, "invalid mcast_port %s, using default %d\n", + str, DEFAULT_UNICAST_PORT); + } else { + RTE_LOG(INFO, UNICAST, "%s: mcast_port = %d\n", __func__, port); + unicast_port = port; + } + + FREE_PTR(str); +} + +static void install_session_sync_unicast_keywords(void) +{ + install_keyword("unicast_port", unicast_port_handler, KW_TYPE_NORMAL); +} + +int get_sock_mtu(void) +{ + return sock_mtu; +} + +static void sock_mtu_handler(vector_t tokens) +{ + char *str = set_value(tokens); + uint32_t mtu; + + assert(str); + mtu = atoi(str); + + if (mtu > 0 && mtu < 65536) { + RTE_LOG(INFO, MCAST, "sock_mtu = %d\n", mtu); + sock_mtu = mtu; + } else { + RTE_LOG(WARNING, MCAST, "invalid sock_mtu %s, using default %d\n", + str, DEFAULT_SOCK_MTU); + sock_mtu = DEFAULT_SOCK_MTU; + } + + FREE_PTR(str); +} + +void install_session_sync_sock_keywords(void) +{ + install_keyword("mtu", sock_mtu_handler, KW_TYPE_NORMAL); + install_session_sync_mcast_keywords(); + install_session_sync_unicast_keywords(); +} diff --git a/src/netif.c b/src/netif.c index 21e4544e2..56da4bca5 100644 --- a/src/netif.c +++ b/src/netif.c @@ -37,11 +37,13 @@ #include "timer.h" #include "parser/parser.h" #include "neigh.h" +#include "sa_pool.h" #include #include #include #include +#include "ipvs/sync.h" #define NETIF_PKTPOOL_NB_MBUF_DEF 65535 #define NETIF_PKTPOOL_NB_MBUF_MIN 1023 @@ -138,6 +140,8 @@ static struct list_head worker_list; /* lcore configurations from cfgfile */ #define NETIF_PORT_TABLE_MASK (NETIF_PORT_TABLE_BUCKETS - 1) static struct list_head port_tab[NETIF_PORT_TABLE_BUCKETS]; /* hashed by id */ static struct list_head port_ntab[NETIF_PORT_TABLE_BUCKETS]; /* hashed by name */ +uint32_t lcore_ids[RTE_MAX_LCORE]; + /* Note: Lockless, NIC can only be registered on initialization stage and * unregistered on cleanup stage */ @@ -650,7 +654,8 @@ static void worker_type_handler(vector_t tokens) struct worker_conf_stream, worker_list_node); assert(str); - if (!strcmp(str, "master") || !strcmp(str, "slave")) { + if (!strcmp(str, "master") || !strcmp(str, "slave") + || !strcmp(str, "sync-rx") || !strcmp(str, "sync-tx")) { RTE_LOG(INFO, NETIF, "%s:type = %s\n", current_worker->name, str); strncpy(current_worker->type, str, sizeof(current_worker->type)); } else { @@ -678,6 +683,12 @@ static void cpu_id_handler(vector_t tokens) cpu_id = atoi(str); RTE_LOG(INFO, NETIF, "%s:cpu_id = %d\n", current_worker->name, cpu_id); current_worker->cpu_id = cpu_id; + + if (!strcmp(current_worker->type, "sync-rx")) + dp_vs_sync_set_rx_core(cpu_id); + + if (!strcmp(current_worker->type, "sync-tx")) + dp_vs_sync_set_tx_core(cpu_id); } FREE_PTR(str); @@ -1149,20 +1160,28 @@ static void isol_rxq_del(struct rx_partner *isol_rxq, bool force); static void config_lcores(struct list_head *worker_list) { int ii, tk; - int cpu_id_min, cpu_left; + int cpu_id_min, cpu_left, cpu_cnt; lcoreid_t id = 0; portid_t pid; struct netif_port *port; struct queue_conf_stream *queue; - struct worker_conf_stream *worker, *worker_min; - - cpu_left = list_elems(worker_list); - list_for_each_entry(worker, worker_list, worker_list_node) { + struct worker_conf_stream *worker, *worker_next, *worker_min; + + /** + * move non-slave workers to tail of worker_list. + * cpu_cnt: number of wokers + * cpu_left: number of slave workers + * */ + cpu_cnt = cpu_left = list_elems(worker_list); + list_for_each_entry_safe(worker, worker_next, worker_list, worker_list_node) { if (strcmp(worker->type, "slave")) { list_move_tail(&worker->worker_list_node, worker_list); cpu_left--; } + if (--cpu_cnt <= 0) + break; } + while (cpu_left > 0) { cpu_id_min = DPVS_MAX_LCORE; worker_min = NULL; @@ -1289,6 +1308,7 @@ void netif_get_slave_lcores(uint8_t *nb, uint64_t *mask) while (lcore_conf[i].nports > 0) { slave_lcore_nb++; slave_lcore_mask |= (1L << lcore_conf[i].id); + lcore_ids[i] = lcore_conf[i].id; i++; } @@ -1633,6 +1653,21 @@ portid_t netif_max_pid; queueid_t netif_max_qid; struct port_queue_lcore_map pql_map[NETIF_MAX_PORTS]; +lcoreid_t get_lcoreid(queueid_t qid) +{ + struct netif_port *dev = NULL; + char* ifname = NULL; + + ifname = dp_vs_sync_laddr_ifname(); + dev = netif_port_get_by_name(ifname); + if (!dev) { + RTE_LOG(WARNING, IPVS, "%s: dpdk device not found\n", __func__); + return 16; + } + + return pql_map[dev->id].rx_qid[qid]; +} + static int build_port_queue_lcore_map(void) { int i, j, k; @@ -2309,6 +2344,9 @@ void lcore_process_packets(struct netif_queue_conf *qconf, struct rte_mbuf **mbu /* L2 filter */ for (i = 0; i < count; i++) { struct rte_mbuf *mbuf = mbufs[i]; +#ifdef CONFIG_DPVS_SYNC + mbuf->hash.usr = qconf->id; /* use in session synchronization*/ +#endif struct netif_port *dev = netif_port_get(mbuf->port); if (unlikely(!dev)) { @@ -2413,6 +2451,11 @@ static void lcore_job_recv_fwd(void *arg) cid = rte_lcore_id(); assert(LCORE_ID_ANY != cid); +#ifdef CONFIG_DPVS_SYNC + dp_vs_sync_lcore_process_rx_msg(cid); + dp_vs_conn_lcore_tx(cid); +#endif + for (i = 0; i < lcore_conf[lcore2index[cid]].nports; i++) { pid = lcore_conf[lcore2index[cid]].pqs[i].id; assert(pid <= bond_pid_end); @@ -3476,7 +3519,8 @@ int netif_port_start(struct netif_port *port) } // device configure - if ((ret = netif_port_fdir_dstport_mask_set(port)) != EDPVS_OK) + if (SA_POOL_MODE == LPORT_LCORE_MAPPING_POOL_MODE && + (ret = netif_port_fdir_dstport_mask_set(port)) != EDPVS_OK) return ret; ret = rte_eth_dev_configure(port->id, port->nrxq, port->ntxq, &port->dev_conf); if (ret < 0 ) { @@ -3768,9 +3812,10 @@ static struct rte_eth_conf default_port_conf = { .dst_ip = { 0xFFFFFFFF, 0xFFFFFFFF, 0xFFFFFFFF, 0xFFFFFFFF }, }, .src_port_mask = 0x0000, + .dst_port_mask = 0x0000, /* to be changed according to slave lcore number in use */ - .dst_port_mask = 0x00F8, + // .dst_port_mask = 0x00F8, .mac_addr_byte_mask = 0x00, .tunnel_type_mask = 0, @@ -4008,6 +4053,9 @@ static int netif_loop(void *dummy) assert(LCORE_ID_ANY != cid); +#ifdef CONFIG_DPVS_SYNC + dp_vs_sync_run_loop(cid); +#endif try_isol_rxq_lcore_loop(); if (0 == lcore_conf[lcore2index[cid]].nports) { RTE_LOG(INFO, NETIF, "[%s] Lcore %d has nothing to do.\n", __func__, cid); diff --git a/src/sa_pool.c b/src/sa_pool.c index 35da2fcde..ab40300f7 100644 --- a/src/sa_pool.c +++ b/src/sa_pool.c @@ -69,6 +69,10 @@ #define SAPOOL_MIN_HASH_SZ 1 #define SAPOOL_MAX_HASH_SZ 128 +#define LPORT_LCORE_MAPPING_POOL_MODE_NAME "lport_lcore_mapping" +#define LADDR_LCORE_MAPPING_POOL_MODE_NAME "laddr_lcore_mapping" +#define INVALID_QUEUE_ID (0xFFFF) + enum { SA_F_USED = 0x01, }; @@ -99,8 +103,8 @@ struct sa_entry_pool { /* another way is use total_used/free_cnt in sa_pool, * so that we need not travels the hash to get stats. * we use cnt here, since we may need per-pool stats. */ - rte_atomic16_t used_cnt; - rte_atomic16_t free_cnt; + rte_atomic32_t used_cnt; + rte_atomic32_t free_cnt; uint32_t miss_cnt; }; @@ -139,8 +143,79 @@ static uint8_t sa_nlcore; static uint64_t sa_lcore_mask; static uint8_t sa_pool_hash_size = SAPOOL_DEF_HASH_SZ; +uint8_t sa_pool_mode = LPORT_LCORE_MAPPING_POOL_MODE; +extern uint32_t lcore_ids[RTE_MAX_LCORE]; -static int __add_del_filter(int af, struct netif_port *dev, lcoreid_t cid, +static int __add_del_filter_addr_mode(int af, struct netif_port *dev, lcoreid_t cid, + const union inet_addr *dip, uint32_t filter_id[MAX_FDIR_PROTO], + bool add, queueid_t queue) +{ + struct rte_eth_fdir_filter filt = { + .action.behavior = RTE_ETH_FDIR_ACCEPT, + .action.report_status = RTE_ETH_FDIR_REPORT_ID, + .soft_id = filter_id[0], + }; + + if (af == AF_INET) { + filt.input.flow_type = RTE_ETH_FLOW_NONFRAG_IPV4_OTHER; + filt.input.flow.ip4_flow.dst_ip = dip->in.s_addr; + } else if (af == AF_INET6) { + filt.input.flow_type = RTE_ETH_FLOW_NONFRAG_IPV6_OTHER; + memcpy(filt.input.flow.ipv6_flow.dst_ip, &dip->in6, sizeof(struct in6_addr)); + } else { + return EDPVS_NOTSUPP; + } + + int err; + enum rte_filter_op op; +#ifdef CONFIG_DPVS_SAPOOL_DEBUG + char ipaddr[64]; +#endif + + if (dev->netif_ops && dev->netif_ops->op_filter_supported) { + if (dev->netif_ops->op_filter_supported(dev, RTE_ETH_FILTER_FDIR) < 0) { + if (dev->nrxq <= 1) + return EDPVS_OK; + RTE_LOG(ERR, SAPOOL, "%s: FDIR is not supported by device %s. Only" + " single rxq can be configured.\n", __func__, dev->name); + return EDPVS_NOTSUPP; + } + } else { + RTE_LOG(ERR, SAPOOL, "%s: FDIR support of device %s is not known.\n", + __func__, dev->name); + return EDPVS_INVAL; + } + + if (queue == INVALID_QUEUE_ID) { + err = netif_get_queue(dev, cid, &queue); + if (err != EDPVS_OK) + return err; + } + + filt.action.rx_queue = queue; + + /**add change to update, purpose is to resolve the problem + of returning collision errors when send duplicate configurations + **/ + op = add ? RTE_ETH_FILTER_UPDATE : RTE_ETH_FILTER_DELETE; + + err = netif_fdir_filter_set(dev, op, &filt); + if (err != EDPVS_OK) + return err; + +#ifdef CONFIG_DPVS_SAPOOL_DEBUG + RTE_LOG(DEBUG, SAPOOL, "FDIR: %s %s %s TCP/UDP " + "ip %s queue %d lcore %2d filterID %d\n", + add ? "add" : "del", dev->name, + af == AF_INET ? "IPv4" : "IPv6", + inet_ntop(af, dip, ipaddr, sizeof(ipaddr)) ? : "::", + queue, cid, filter_id[0]); +#endif + + return err; +} + +static int __add_del_filter_port_mode(int af, struct netif_port *dev, lcoreid_t cid, const union inet_addr *dip, __be16 dport, uint32_t filter_id[MAX_FDIR_PROTO], bool add) { @@ -201,7 +276,11 @@ static int __add_del_filter(int af, struct netif_port *dev, lcoreid_t cid, return err; filt[0].action.rx_queue = filt[1].action.rx_queue = queue; - op = add ? RTE_ETH_FILTER_ADD : RTE_ETH_FILTER_DELETE; + + /**add change to update, purpose is to resolve the problem + of returning collision errors when send duplicate configurations + **/ + op = add ? RTE_ETH_FILTER_UPDATE : RTE_ETH_FILTER_DELETE; netif_mask_fdir_filter(af, dev, &filt[0]); netif_mask_fdir_filter(af, dev, &filt[1]); @@ -230,18 +309,28 @@ static int __add_del_filter(int af, struct netif_port *dev, lcoreid_t cid, return err; } +static int __add_del_filter(int af, struct netif_port *dev, lcoreid_t cid, + const union inet_addr *dip, __be16 dport, + uint32_t filter_id[MAX_FDIR_PROTO], bool add, queueid_t queue) +{ + if (SA_POOL_MODE == LPORT_LCORE_MAPPING_POOL_MODE) + return __add_del_filter_port_mode(af, dev, cid, dip, dport, filter_id, add); + else + return __add_del_filter_addr_mode(af, dev, cid, dip, filter_id, add, queue); +} + static inline int sa_add_filter(int af, struct netif_port *dev, lcoreid_t cid, const union inet_addr *dip, __be16 dport, - uint32_t filter_id[MAX_FDIR_PROTO]) + uint32_t filter_id[MAX_FDIR_PROTO], queueid_t queue) { - return __add_del_filter(af, dev, cid, dip, dport, filter_id, true); + return __add_del_filter(af, dev, cid, dip, dport, filter_id, true, queue); } static inline int sa_del_filter(int af, struct netif_port *dev, lcoreid_t cid, const union inet_addr *dip, __be16 dport, - uint32_t filter_id[MAX_FDIR_PROTO]) + uint32_t filter_id[MAX_FDIR_PROTO], queueid_t queue) { - return __add_del_filter(af, dev, cid, dip, dport, filter_id, false); + return __add_del_filter(af, dev, cid, dip, dport, filter_id, false, queue); } static int sa_pool_alloc_hash(struct sa_pool *ap, uint8_t hash_sz, @@ -264,13 +353,13 @@ static int sa_pool_alloc_hash(struct sa_pool *ap, uint8_t hash_sz, INIT_LIST_HEAD(&pool->used_enties); INIT_LIST_HEAD(&pool->free_enties); - rte_atomic16_set(&pool->used_cnt, 0); - rte_atomic16_set(&pool->free_cnt, 0); + rte_atomic32_set(&pool->used_cnt, 0); + rte_atomic32_set(&pool->free_cnt, 0); for (port = ap->low; port <= ap->high; port++) { struct sa_entry *sa; - - if (fdir->mask && + if (SA_POOL_MODE == LPORT_LCORE_MAPPING_POOL_MODE && + fdir->mask && ((uint16_t)port & fdir->mask) != ntohs(fdir->port_base)) continue; @@ -278,7 +367,7 @@ static int sa_pool_alloc_hash(struct sa_pool *ap, uint8_t hash_sz, sa->addr = ap->ifa->addr; sa->port = htons((uint16_t)port); list_add_tail(&sa->list, &pool->free_enties); - rte_atomic16_inc(&pool->free_cnt); + rte_atomic32_inc(&pool->free_cnt); } } @@ -292,12 +381,54 @@ static int sa_pool_free_hash(struct sa_pool *ap) return EDPVS_OK; } -int sa_pool_create(struct inet_ifaddr *ifa, uint16_t low, uint16_t high) +static int __sa_pool_create(struct inet_ifaddr *ifa, lcoreid_t cid, + uint16_t low, uint16_t high) { + uint32_t filtids[MAX_FDIR_PROTO]; + struct sa_fdir *fdir = &sa_fdirs[cid]; struct sa_pool *ap; int err; - lcoreid_t cid; + ap = rte_zmalloc(NULL, sizeof(struct sa_pool), 0); + if (!ap) { + err = EDPVS_NOMEM; + goto errout; + } + ap->ifa = ifa; + ap->low = low; + ap->high = high; + rte_atomic32_set(&ap->refcnt, 0); + + err = sa_pool_alloc_hash(ap, sa_pool_hash_size, fdir); + if (err != EDPVS_OK) { + rte_free(ap); + goto errout; + } + + /* if add filter failed, waste some soft-id is acceptable. */ + filtids[0] = fdir->soft_id++; + filtids[1] = fdir->soft_id++; + err = sa_add_filter(ifa->af, ifa->idev->dev, cid, &ifa->addr, + fdir->port_base, filtids, INVALID_QUEUE_ID); + if (err != EDPVS_OK) { + sa_pool_free_hash(ap); + rte_free(ap); + goto errout; + } + ap->filter_id[0] = filtids[0]; + ap->filter_id[1] = filtids[1]; + + ifa->sa_pools[cid] = ap; + return EDPVS_OK; +errout: + return err; +} + +int sa_pool_create(struct inet_ifaddr *ifa, uint16_t low, uint16_t high) +{ + int err; + lcoreid_t cid; + static unsigned idx = 0; low = low ? : DEF_MIN_PORT; high = high ? : DEF_MAX_PORT; @@ -306,47 +437,25 @@ int sa_pool_create(struct inet_ifaddr *ifa, uint16_t low, uint16_t high) return EDPVS_INVAL; } - for (cid = 0; cid < RTE_MAX_LCORE; cid++) { - uint32_t filtids[MAX_FDIR_PROTO]; - struct sa_fdir *fdir = &sa_fdirs[cid]; + if (SA_POOL_MODE == LADDR_LCORE_MAPPING_POOL_MODE) { + cid = lcore_ids[(idx++) % sa_nlcore]; + err = __sa_pool_create(ifa, cid, low, high); - /* skip master and unused cores */ - if (cid > 64 || !(sa_lcore_mask & (1L << cid))) - continue; - assert(rte_lcore_is_enabled(cid) && cid != rte_get_master_lcore()); - - ap = rte_zmalloc(NULL, sizeof(struct sa_pool), 0); - if (!ap) { - err = EDPVS_NOMEM; + if (idx >= sa_nlcore) + idx = 0; + if (err != EDPVS_OK) goto errout; - } - - ap->ifa = ifa; - ap->low = low; - ap->high = high; - rte_atomic32_set(&ap->refcnt, 0); - - err = sa_pool_alloc_hash(ap, sa_pool_hash_size, fdir); - if (err != EDPVS_OK) { - rte_free(ap); - goto errout; - } - - /* if add filter failed, waste some soft-id is acceptable. */ - filtids[0] = fdir->soft_id++; - filtids[1] = fdir->soft_id++; + } else if (SA_POOL_MODE == LPORT_LCORE_MAPPING_POOL_MODE) { + for (cid = 0; cid < RTE_MAX_LCORE; cid++) { + /* skip master and unused cores */ + if (cid > 64 || !(sa_lcore_mask & (1L << cid))) + continue; + assert(rte_lcore_is_enabled(cid) && cid != rte_get_master_lcore()); + err = __sa_pool_create(ifa, cid, low, high); - err = sa_add_filter(ifa->af, ifa->idev->dev, cid, &ifa->addr, - fdir->port_base, filtids); - if (err != EDPVS_OK) { - sa_pool_free_hash(ap); - rte_free(ap); - goto errout; + if (err != EDPVS_OK) + goto errout; } - ap->filter_id[0] = filtids[0]; - ap->filter_id[1] = filtids[1]; - - ifa->sa_pools[cid] = ap; } #ifdef CONFIG_DPVS_SAPOOL_DEBUG @@ -383,7 +492,7 @@ int sa_pool_destroy(struct inet_ifaddr *ifa) } sa_del_filter(ifa->af, ifa->idev->dev, cid, &ifa->addr, - fdir->port_base, ap->filter_id); + fdir->port_base, ap->filter_id, INVALID_QUEUE_ID); sa_pool_free_hash(ap); rte_free(ap); ifa->sa_pools[cid] = NULL; @@ -443,8 +552,8 @@ static inline int sa_pool_fetch(struct sa_entry_pool *pool, if (!ent) { #ifdef CONFIG_DPVS_SAPOOL_DEBUG RTE_LOG(DEBUG, SAPOOL, "%s: no entry (used/free %d/%d)\n", __func__, - rte_atomic16_read(&pool->used_cnt), - rte_atomic16_read(&pool->free_cnt)); + rte_atomic32_read(&pool->used_cnt), + rte_atomic32_read(&pool->free_cnt)); #endif pool->miss_cnt++; return EDPVS_RESOURCE; @@ -464,8 +573,8 @@ static inline int sa_pool_fetch(struct sa_entry_pool *pool, ent->flags |= SA_F_USED; list_move_tail(&ent->list, &pool->used_enties); - rte_atomic16_inc(&pool->used_cnt); - rte_atomic16_dec(&pool->free_cnt); + rte_atomic32_inc(&pool->used_cnt); + rte_atomic32_dec(&pool->free_cnt); #ifdef CONFIG_DPVS_SAPOOL_DEBUG RTE_LOG(DEBUG, SAPOOL, "%s: %s:%d fetched!\n", __func__, @@ -515,8 +624,8 @@ static inline int sa_pool_release(struct sa_entry_pool *pool, ent->flags &= (~SA_F_USED); list_move_tail(&ent->list, &pool->free_enties); - rte_atomic16_dec(&pool->used_cnt); - rte_atomic16_inc(&pool->free_cnt); + rte_atomic32_dec(&pool->used_cnt); + rte_atomic32_inc(&pool->free_cnt); #ifdef CONFIG_DPVS_SAPOOL_DEBUG RTE_LOG(DEBUG, SAPOOL, "%s: %s:%d released!\n", __func__, @@ -767,6 +876,36 @@ int sa_release(const struct netif_port *dev, return err; } +int sa_bind_conn(int af, struct netif_port *dev, lcoreid_t cid, + const union inet_addr *dip, + __be16 dport, queueid_t queue) +{ + uint32_t filtids[MAX_FDIR_PROTO]; + int err; +#ifdef CONFIG_DPVS_SAPOOL_DEBUG + char ipaddr[64]; +#endif + + filtids[0] = 1; + filtids[1] = 2; + + err = sa_add_filter(af, dev, cid, dip, dport, filtids, queue); + if (err != EDPVS_OK) { + RTE_LOG(WARNING, SAPOOL, "%s: add fdir filter failed.\n", __func__); + return err; + } + +#ifdef CONFIG_DPVS_SAPOOL_DEBUG + RTE_LOG(DEBUG, SAPOOL, "FDIR: %s %s %s " + "ip %s queue %d lcore %2d\n", + "bind", dev->name, + af == AF_INET ? "IPv4" : "IPv6", + inet_ntop(af, dip, ipaddr, sizeof(ipaddr)) ? : "::", + queue, cid); +#endif + return EDPVS_OK; +} + int sa_pool_stats(const struct inet_ifaddr *ifa, struct sa_pool_stats *stats) { struct dpvs_msg *req, *reply; @@ -830,8 +969,8 @@ static int sa_msg_get_stats(struct dpvs_msg *msg) pool = &ifa->this_sa_pool->pool_hash[hash]; assert(pool); - stats->used_cnt += rte_atomic16_read(&pool->used_cnt); - stats->free_cnt += rte_atomic16_read(&pool->free_cnt); + stats->used_cnt += rte_atomic32_read(&pool->used_cnt); + stats->free_cnt += rte_atomic32_read(&pool->free_cnt); stats->miss_cnt += pool->miss_cnt; } @@ -894,6 +1033,22 @@ int sa_pool_term(void) /* * config file */ +static void pool_mode_handler(vector_t tokens) +{ + char *str = set_value(tokens); + assert(str); + + if (!strcmp(str, LADDR_LCORE_MAPPING_POOL_MODE_NAME)) + sa_pool_mode = LADDR_LCORE_MAPPING_POOL_MODE; + else if (!strcmp(str, LPORT_LCORE_MAPPING_POOL_MODE_NAME)) + sa_pool_mode = LPORT_LCORE_MAPPING_POOL_MODE; + else + RTE_LOG(WARNING, SAPOOL, "invalid pool_mode %s, use default %s\n", + str, LPORT_LCORE_MAPPING_POOL_MODE_NAME); + + FREE_PTR(str); +} + static void sa_pool_hash_size_conf(vector_t tokens) { char *str = set_value(tokens); @@ -915,5 +1070,6 @@ static void sa_pool_hash_size_conf(vector_t tokens) void install_sa_pool_keywords(void) { install_keyword_root("sa_pool", NULL); + install_keyword("pool_mode", pool_mode_handler, KW_TYPE_INIT); install_keyword("pool_hash_size", sa_pool_hash_size_conf, KW_TYPE_INIT); } diff --git a/tools/ipvsadm/ipvsadm.c b/tools/ipvsadm/ipvsadm.c index 274d01e0a..c1aff2681 100644 --- a/tools/ipvsadm/ipvsadm.c +++ b/tools/ipvsadm/ipvsadm.c @@ -144,7 +144,8 @@ #define CMD_ADDBLKLST (CMD_NONE+18) #define CMD_DELBLKLST (CMD_NONE+19) #define CMD_GETBLKLST (CMD_NONE+20) -#define CMD_MAX CMD_GETBLKLST +#define CMD_CONN_SYNC (CMD_NONE+21) +#define CMD_MAX CMD_CONN_SYNC #define NUMBER_OF_CMD (CMD_MAX - CMD_NONE) static const char* cmdnames[] = { @@ -284,6 +285,7 @@ enum { TAG_SORT, TAG_NO_SORT, TAG_PERSISTENCE_ENGINE, + TAG_CONN_SYNC, TAG_SOCKPAIR, }; @@ -392,6 +394,8 @@ parse_options(int argc, char **argv, struct ipvs_command_entry *ce, TAG_START_DAEMON, NULL, NULL }, { "stop-daemon", '\0', POPT_ARG_STRING, &optarg, TAG_STOP_DAEMON, NULL, NULL }, + { "conn-sync", '\0', POPT_ARG_NONE, &optarg, + TAG_CONN_SYNC, NULL, NULL }, { "add-laddr", 'P', POPT_ARG_NONE, NULL, 'P', NULL, NULL }, { "del-laddr", 'Q', POPT_ARG_NONE, NULL, 'Q', NULL, NULL }, { "get-laddr", 'G', POPT_ARG_NONE, NULL, 'G', NULL, NULL }, @@ -485,6 +489,9 @@ parse_options(int argc, char **argv, struct ipvs_command_entry *ce, case 'C': set_command(&ce->cmd, CMD_FLUSH); break; + case TAG_CONN_SYNC: + set_command(&ce->cmd, CMD_CONN_SYNC); + break; case 'L': case 'l': set_command(&ce->cmd, CMD_LIST); @@ -947,6 +954,10 @@ static int process_options(int argc, char **argv, int reading_stdin) result = ipvs_flush(); break; + case CMD_CONN_SYNC: + result = ipvs_conn_sync(); + break; + case CMD_ADD: result = ipvs_add_service(&ce.svc); break; @@ -1562,9 +1573,15 @@ static void print_conn_entry(const ipvs_conn_entry_t *conn_entry, ntohs(conn_entry->dport), conn_entry->proto, format))) goto exit; - printf("[%d]%-3s %-6s %-11s %-18s %-18s %-18s %s\n", - conn_entry->lcoreid, proto_str, time_str, conn_entry->state, - cname, vname, lname, dname); + if (conn_entry->syncid) { + printf("[%d]%-3s %-6s %-11s %-18s %-18s %-18s %-18s syncid:%d\n", + conn_entry->lcoreid, proto_str, time_str, conn_entry->state, + cname, vname, lname, dname, conn_entry->syncid); + } else { + printf("[%d]%-3s %-6s %-11s %-18s %-18s %-18s %s\n", + conn_entry->lcoreid, proto_str, time_str, conn_entry->state, + cname, vname, lname, dname); + } exit: if (cname) free(cname); diff --git a/tools/keepalived/keepalived/libipvs-2.6/dp_vs.h b/tools/keepalived/keepalived/libipvs-2.6/dp_vs.h index a222ca3f4..8c2446856 100644 --- a/tools/keepalived/keepalived/libipvs-2.6/dp_vs.h +++ b/tools/keepalived/keepalived/libipvs-2.6/dp_vs.h @@ -22,6 +22,7 @@ enum{ DPVS_SO_SET_EDITDEST, DPVS_SO_SET_DELDEST, DPVS_SO_SET_GRATARP, + DPVS_SO_SET_CONN_SYNC, }; enum{ diff --git a/tools/keepalived/keepalived/libipvs-2.6/libipvs.c b/tools/keepalived/keepalived/libipvs-2.6/libipvs.c index 8e532d937..3d45121c3 100644 --- a/tools/keepalived/keepalived/libipvs-2.6/libipvs.c +++ b/tools/keepalived/keepalived/libipvs-2.6/libipvs.c @@ -159,6 +159,10 @@ int ipvs_flush(void) return dpvs_setsockopt(DPVS_SO_SET_FLUSH, NULL, 0); } +int ipvs_conn_sync(void) +{ + return dpvs_setsockopt(DPVS_SO_SET_CONN_SYNC, NULL, 0); +} int ipvs_add_service(ipvs_service_t *svc) { diff --git a/tools/keepalived/keepalived/libipvs-2.6/libipvs.h b/tools/keepalived/keepalived/libipvs-2.6/libipvs.h index 4c46fa2f0..3c21bf4fe 100644 --- a/tools/keepalived/keepalived/libipvs-2.6/libipvs.h +++ b/tools/keepalived/keepalived/libipvs-2.6/libipvs.h @@ -92,6 +92,7 @@ extern unsigned int ipvs_version(void); /* flush all the rules */ extern int ipvs_flush(void); +extern int ipvs_conn_sync(void); /* add a virtual service */ extern int ipvs_add_service(ipvs_service_t *svc);