接着上一篇继续分析,上一篇请参考 《Memcached源码阅读之网络监听的建立》,这篇主要分析TCP的连接建立(从前面的代码分析可以看出,这个过程是由主线程驱动的),UDP没有连接建立的过程,所以之间进行连接分发,我们后续分析,现在直接上代码进行讲解。
conn *conn_new(const int sfd, enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport,
struct event_base *base)
{
conn *c = conn_from_freelist();
//获取一个空闲连接,conn是Memcached内部对网络连接的一个封装
//如果没有空闲的连接
if (NULL == c)
{
if (!(c = (conn *)calloc(1, sizeof(conn))))//申请空间
{
fprintf(stderr, "calloc()\n");
return NULL;
}MEMCACHED_CONN_CREATE(c);
//进行一些初始化
c->rbuf = c->wbuf = 0;
c->ilist = 0;
c->suffixlist = 0;
c->iov = 0;
c->msglist = 0;
c->hdrbuf = 0;
c->rsize = read_buffer_size;
c->wsize = DATA_BUFFER_SIZE;
c->isize = ITEM_LIST_INITIAL;
c->suffixsize = SUFFIX_LIST_INITIAL;
c->iovsize = IOV_LIST_INITIAL;
c->msgsize = MSG_LIST_INITIAL;
c->hdrsize = 0;
//每个conn都自带读入和输出缓冲区,在进行网络收发数据时,特别方便
c->rbuf = (char *)malloc((size_t)c->rsize);
c->wbuf = (char *)malloc((size_t)c->wsize);
c->ilist = (item **)malloc(sizeof(item *) * c->isize);
c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
c->iov = (struct iovec *) malloc(sizeof(struct iovec) * c->iovsize);
c->msglist = (struct msghdr *) malloc(
sizeof(struct msghdr) * c->msgsize);
if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0
|| c->msglist == 0 || c->suffixlist == 0)
{
conn_free(c);
fprintf(stderr, "malloc()\n");
return NULL;
}
STATS_LOCK();
//统计变量更新
stats.conn_structs++;
STATS_UNLOCK();
}
c->transport = transport;
c->protocol = settings.binding_protocol;
if (!settings.socketpath)
{
c->request_addr_size = sizeof(c->request_addr);
}
else
{
c->request_addr_size = 0;
}
//输出一些日志信息
if (settings.verbose > 1)
{
if (init_state == conn_listening)
{
fprintf(stderr, "<%d server listening (%s)\n", sfd,
prot_text(c->protocol));
}
else if (IS_UDP(transport))
{
fprintf(stderr, "<%d server listening (udp)\n", sfd);
}
else if (c->protocol == negotiating_prot)
{
fprintf(stderr, "<%d new auto-negotiating client connection\n",
sfd);
}
else if (c->protocol == ascii_prot)
{
fprintf(stderr, "<%d new ascii client connection.\n", sfd);
}
else if (c->protocol == binary_prot)
{
fprintf(stderr, "<%d new binary client connection.\n", sfd);
}
else
{
fprintf(stderr, "<%d new unknown (%d) client connection\n", sfd,
c->protocol);
assert(false);
}
}
c->sfd = sfd;
c->state = init_state;
c->rlbytes = 0;
c->cmd = -1;
c->rbytes = c->wbytes = 0;
c->wcurr = c->wbuf;
c->rcurr = c->rbuf;
c->ritem = 0;
c->icurr = c->ilist;
c->suffixcurr = c->suffixlist;
c->ileft = 0;
c->suffixleft = 0;
c->iovused = 0;
c->msgcurr = 0;
c->msgused = 0;
c->write_and_go = init_state;
c->write_and_free = 0;
c->item = 0;
c->noreply = false;
//建立sfd描述符上面的event事件,事件回调函数为event_handler
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
c->ev_flags = event_flags;
if (event_add(&c->event, 0) == -1)
{
//如果建立libevent事件失败,将创建的conn添加到空闲列表中
if (conn_add_to_freelist(c))
{
conn_free(c);
}
perror("event_add");
return NULL;
}
STATS_LOCK();
//统计信息更新
stats.curr_conns++;
stats.total_conns++;
STATS_UNLOCK();
MEMCACHED_CONN_ALLOCATE(c->sfd);
return c;
}
//获得conn
conn *conn_from_freelist()
{
conn *c;
pthread_mutex_lock(&conn_lock);//操作链表,加锁,保持同步
//freecurr为静态全局变量
if (freecurr > 0)
{
//freeconns是在Memcached启动时初始化的
c = freeconns[--freecurr];
}
else//没有conn
{
c = NULL;
}
pthread_mutex_unlock(&conn_lock);
return c;
}
//添加conn到空闲链表中
bool conn_add_to_freelist(conn *c)
{
bool ret = true;
pthread_mutex_lock(&conn_lock);
//freeconns还有空间
if (freecurr < freetotal)
{
freeconns[freecurr++] = c;//直接添加
ret = false;
}
else
{
//没有多余空间,进行扩容,按目前容量的2倍进行扩容
size_t newsize = freetotal * 2;
conn **new_freeconns = realloc(freeconns, sizeof(conn *) * newsize);
if (new_freeconns)
{
freetotal = newsize;
freeconns = new_freeconns;
freeconns[freecurr++] = c;
ret = false;
}
}
pthread_mutex_unlock(&conn_lock);
return ret;
}
//libevent事件回调函数的处理,回调函数被调用时,表明Memcached监听的端口号有网络事件到了
void event_handler(const int fd, const short which, void *arg)
{
conn *c;
c = (conn *)arg;
assert(c != NULL);
c->which = which;
//这种情况应该很少出现
if (fd != c->sfd)
{
if (settings.verbose > 0)
fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
conn_close(c);
return;
}
//进入业务处理状态机
drive_machine(c);
return;
}