Memcached源碼分析 - 網(wǎng)絡(luò)模型(1)

Memcached源碼分析 - 網(wǎng)絡(luò)模型(1)
Memcached源碼分析 - 命令解析(2)
Memcached源碼分析 - 數(shù)據(jù)存儲(chǔ)(3)
Memcached源碼分析 - 增刪改查操作(4)
Memcached源碼分析 - 內(nèi)存存儲(chǔ)機(jī)制Slabs(5)
Memcached源碼分析 - LRU淘汰算法(6)
Memcached源碼分析 - 消息回應(yīng)(7)

開篇

?寫Memcached的目的很簡單,就是想搞清楚和redis在多線程處理方面的差異,結(jié)果發(fā)現(xiàn)它的代碼雖然是用C語言實(shí)現(xiàn)的,但是看起來一點(diǎn)也不吃力,而且有了很多很專業(yè)的大咖前輩的文章可以參考,所以萌生了寫這個(gè)系列的沖動(dòng)。
?其實(shí)mybatis的系列還沒完結(jié)呢,不過看源碼也可以隨意一點(diǎn),兩個(gè)系列來回穿插著看似乎也是可行的,當(dāng)然一貫本著尊重原創(chuàng)的原則,我會(huì)把參考文章在博文的最后列出來,供大家參考前輩大拿的精華。


Memcached網(wǎng)絡(luò)模型

  • 1.Memcached主要是基于Libevent 網(wǎng)絡(luò)事件庫進(jìn)行開發(fā)的。

  • 2.Memcached的網(wǎng)絡(luò)模型分為兩部分:主線程和工作線程。主線程主要用來接收客戶端的連接信息;工作線程主要用來接管客戶端連接,處理具體的業(yè)務(wù)邏輯。默認(rèn)情況下會(huì)開啟8個(gè)工作線程。

    1. 主線程和工作線程之間主要是通過pipe管道來進(jìn)行通信。當(dāng)主線程接收到客戶端的連接的時(shí)候,會(huì)通過輪詢的方式選擇一個(gè)工作線程,然后向該工作線程的管道pipe寫數(shù)據(jù)。工作線程監(jiān)聽到管道中有數(shù)據(jù)寫入的時(shí)候,就會(huì)觸發(fā)代碼邏輯去接管客戶端的連接。
    1. 每個(gè)工作線程也是基于Libevent的事件機(jī)制,當(dāng)客戶端有數(shù)據(jù)寫入的時(shí)候,就會(huì)觸發(fā)讀取的操作。


      Memcached網(wǎng)絡(luò)模型.png


libevent的知識(shí)鋪墊

?因?yàn)樵贛emcached的代碼實(shí)現(xiàn)當(dāng)中,清一色用到libevent的實(shí)現(xiàn),所以先安利一波簡單知識(shí)鋪墊,后面所有的libevent相關(guān)的邏輯就往這個(gè)案例上面去靠近就可以了。整個(gè)步驟是:

  • 1.pEventBase =event_init(); 初始化libevent庫
  • 2.event_set(&event , sock, EV_READ | EV_PERSIST, MyCallBack, (void*)0 ); 賦值 struct event結(jié)構(gòu)
  • 3.event_base_set(pEventBase, &event); 修改struct event事件結(jié)構(gòu)所屬的event_base為指定的event_base
  • 4.event_add(&event, 0); 增加事件到事件監(jiān)控中
  • 5.event_base_loop(pEventBase, 0); 事件循環(huán)。調(diào)用底層的select、poll或epoll等,如監(jiān)聽事件發(fā)生,調(diào)用事件結(jié)構(gòu)中指定的回調(diào)函數(shù)
//事件回調(diào)處理函數(shù)

static void MyCallBack(const int fd, constshort which, void *arg) {}
 

Int main(int argc, char** argv)
{
       //初始化libevent
       struct event_base *pEventBase;
       pEventBase =event_init();
       intsock=socket(……);
      
       struct event event;
       event_set(&event , sock, EV_READ | EV_PERSIST, MyCallBack, (void*)0 );
       event_base_set(pEventBase, &event);
       event_add(&event, 0);
       event_base_loop(pEventBase, 0);
       
      return 0;
}


主線程初始化邏輯

?Memcached主線程的初始化邏輯比較簡單,主要作用是啟動(dòng)監(jiān)聽的master線程工作的worker線程。,其中啟動(dòng)worker線程通過memcached_thread_init函數(shù)進(jìn)行實(shí)現(xiàn),這部分邏輯分析在worker線程初始化當(dāng)中進(jìn)行分析,這里主要分析監(jiān)聽的master線程。
?整個(gè)master線程的啟動(dòng)過程就是socket的server端初始化結(jié)合libevent的初始化。整個(gè)過程如下:

  • server_sockets,該方法主要是遍歷所有l(wèi)isten的socket列表并逐個(gè)進(jìn)行綁定。
  • server_socket,該方法主要是操作單個(gè)socket到listen狀態(tài)。
  • conn_new,將socket注冊到libevent當(dāng)中。
  • event_handler,監(jiān)聽socket的回調(diào)函數(shù)。
  • 最后event_base_loop讓整個(gè)libevent進(jìn)行循環(huán)工作狀態(tài)。
int main (int argc, char **argv) {
  
  #if defined(LIBEVENT_VERSION_NUMBER) && LIBEVENT_VERSION_NUMBER >= 0x02000101
    struct event_config *ev_config;
    ev_config = event_config_new();
    event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
    main_base = event_base_new_with_config(ev_config);
    event_config_free(ev_config);
  #else
    /* Otherwise, use older API */
    main_base = event_init();
  #endif

  #ifdef EXTSTORE
    slabs_set_storage(storage);
    memcached_thread_init(settings.num_threads, storage);
    init_lru_crawler(storage);
  #else
    memcached_thread_init(settings.num_threads, NULL);
    init_lru_crawler(NULL);
  #endif


 if (settings.port && server_sockets(settings.port, tcp_transport,
                                           portnumber_file)) {
            vperror("failed to listen on TCP port %d", settings.port);
            exit(EX_OSERR);
        }

 /* enter the event loop */
 if (event_base_loop(main_base, 0) != 0) {
        retval = EXIT_FAILURE;
    }
}



?解析參數(shù)并把遍歷所有的監(jiān)聽socket進(jìn)行綁定。執(zhí)行方法server_socket(p, the_port, transport, portnumber_file)。

static int server_sockets(int port, enum network_transport transport,
                          FILE *portnumber_file) {
    if (settings.inter == NULL) {
        return server_socket(settings.inter, port, transport, portnumber_file);
    } else {
        // tokenize them and bind to each one of them..
        char *b;
        int ret = 0;
        char *list = strdup(settings.inter);
        for (char *p = strtok_r(list, ";,", &b);
            ret |= server_socket(p, the_port, transport, portnumber_file);
        }
        free(list);
        return ret;
    }
}



?針對(duì)單個(gè)listen的socket的初始化過程,這里主要做的事情是socket的相關(guān)初始化過程,主要是指設(shè)置socket相關(guān)的一些參數(shù);進(jìn)行socket的bind操作;通過方法conn_new關(guān)聯(lián)socket和libevent當(dāng)中。

static int server_socket(const char *interface,
                         int port,
                         enum network_transport transport,
                         FILE *portnumber_file) {
    int sfd;
    struct linger ling = {0, 0};
    struct addrinfo *ai;
    struct addrinfo *next;
    struct addrinfo hints = { .ai_flags = AI_PASSIVE,
                              .ai_family = AF_UNSPEC };
    char port_buf[NI_MAXSERV];
    int error;
    int success = 0;
    int flags =1;

    for (next= ai; next; next= next->ai_next) {
        conn *listen_conn_add;
        if ((sfd = new_socket(next)) == -1) {
            continue;
        }

        //todo 設(shè)置socket相關(guān)的屬性,這里省略相關(guān)代碼

        // 綁定socket,省略相關(guān)代碼
        if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {}

        // 暫時(shí)只關(guān)心TCP協(xié)議的,忽略UDP協(xié)議實(shí)現(xiàn)
        if (IS_UDP(transport)) {
        } else {
            if (!(listen_conn_add = conn_new(sfd, conn_listening,
                                             EV_READ | EV_PERSIST, 1,
                                             transport, main_base))) {
                fprintf(stderr, "failed to create listening connection\n");
                exit(EXIT_FAILURE);
            }
            listen_conn_add->next = listen_conn;
            listen_conn = listen_conn_add;
        }
    }

    freeaddrinfo(ai);

    /* Return zero iff we detected no errors in starting up connections */
    return success == 0;
}



?conn_new內(nèi)部就是執(zhí)行l(wèi)ibevent相關(guān)的配置,包括event_set和event_base_set,這里需要關(guān)注的是event_set當(dāng)中綁定了回調(diào)函數(shù)event_handler,用于連接到來后執(zhí)行的邏輯。

conn *conn_new(const int sfd, enum conn_states init_state,
                const int event_flags,
                const int read_buffer_size, enum network_transport transport,
                struct event_base *base) {
    conn *c;
    c = conns[sfd];

    // libevent相關(guān)的設(shè)置
    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
    event_base_set(base, &c->event);
    c->ev_flags = event_flags;

    if (event_add(&c->event, 0) == -1) {
        perror("event_add");
        return NULL;
    }

    STATS_LOCK();
    stats_state.curr_conns++;
    stats.total_conns++;
    STATS_UNLOCK();

    MEMCACHED_CONN_ALLOCATE(c->sfd);

    return c;
}



?回調(diào)函數(shù)event_handler的核心在于drive_machine,這個(gè)函數(shù)是整個(gè)Memcached的狀態(tài)轉(zhuǎn)移中心,所有的操作都通過drive_machine進(jìn)行驅(qū)動(dòng)來實(shí)現(xiàn)的。

void event_handler(const int fd, const short which, void *arg) {
    conn *c;

    c = (conn *)arg;
    assert(c != NULL);

    c->which = which;

    /* sanity */
    if (fd != c->sfd) {
        if (settings.verbose > 0)
            fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
        conn_close(c);
        return;
    }

    drive_machine(c);
    return;
}


工作線程worker的初始化邏輯

?memcached_thread_init主要用于工作線程worker的初始化,核心的三個(gè)操作主要是:

  • 初始化master線程和worker線程通信的pipe管道,pipe(fds)。
  • setup_thread,主要用于設(shè)置工作線程libevent相關(guān)的參數(shù)。
  • create_worker,主要是啟動(dòng)工作線程開始循環(huán)處理工作。
void memcached_thread_init(int nthreads, void *arg) {
    int         i;
    
    // 初始化所有工作線程的pipe的fds
    for (i = 0; i < nthreads; i++) {
        int fds[2];
        if (pipe(fds)) {}
        threads[i].notify_receive_fd = fds[0];
        threads[i].notify_send_fd = fds[1];
        threads[i].storage = arg;

        // 初始化線程對(duì)應(yīng)的libevent事件
        setup_thread(&threads[i]);
        stats_state.reserved_fds += 5;
    }

    // 每個(gè)線程進(jìn)入libevent的事件循環(huán)當(dāng)中
    for (i = 0; i < nthreads; i++) {
        create_worker(worker_libevent, &threads[i]);
    }
}



?setup_thread內(nèi)部主要是初始化工作線程worker的libevent相關(guān)參數(shù),這里我們重點(diǎn)關(guān)注包括:

  • 回調(diào)函數(shù)thread_libevent_process。
  • 初始化master線程和worker線程通信的隊(duì)列cq_init(me->new_conn_queue)。
static void setup_thread(LIBEVENT_THREAD *me) {
    me->base = event_init();
    event_set(&me->notify_event, me->notify_receive_fd,
              EV_READ | EV_PERSIST, thread_libevent_process, me);
    event_base_set(me->base, &me->notify_event);

    if (event_add(&me->notify_event, 0) == -1) {
        fprintf(stderr, "Can't monitor libevent notify pipe\n");
        exit(1);
    }

    me->new_conn_queue = malloc(sizeof(struct conn_queue));
    if (me->new_conn_queue == NULL) {
        perror("Failed to allocate memory for connection queue");
        exit(EXIT_FAILURE);
    }

    cq_init(me->new_conn_queue);
}



?create_worker主要是啟動(dòng)工作線程worker使其開始工作就可以了。

  • create_worker(worker_libevent, &threads[i])傳入函數(shù)是worker_libevent
  • 通過pthread_create方法觸發(fā)worker_libevent的工作
  • 在worker_libevent方法內(nèi)部通過event_base_loop最終使得libevent開始工作。
static void create_worker(void *(*func)(void *), void *arg) {
    pthread_attr_t  attr;
    int             ret;

    pthread_attr_init(&attr);

    if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) 
                                            != 0) {}
}

static void *worker_libevent(void *arg) {
    LIBEVENT_THREAD *me = arg;
    register_thread_initialized();
    event_base_loop(me->base, 0);
    event_base_free(me->base);
    return NULL;
}


typedef struct {
    pthread_t thread_id;        /* unique ID of this thread */
    struct event_base *base;    /* libevent handle this thread uses */
    struct event notify_event;  /* listen event for notify pipe */
    int notify_receive_fd;      /* receiving end of notify pipe */
    int notify_send_fd;         /* sending end of notify pipe */
    struct thread_stats stats;  /* Stats generated by this thread */
    struct conn_queue *new_conn_queue; /* queue of new connections to handle */
    cache_t *suffix_cache;      /* suffix cache */
    logger *l;                  /* logger buffer */
    void *lru_bump_buf;         /* async LRU bump buffer */
} LIBEVENT_THREAD;



?thread_libevent_process用于接收到master線程分發(fā)的新連接并進(jìn)行處理,新的連接到來以后通過conn_new來處理新到來的連接。

static void thread_libevent_process(int fd, short which, void *arg) {
    LIBEVENT_THREAD *me = arg;
    CQ_ITEM *item;
    char buf[1];
    conn *c;
    unsigned int timeout_fd;

    if (read(fd, buf, 1) != 1) {
        if (settings.verbose > 0)
            fprintf(stderr, "Can't read from libevent pipe\n");
        return;
    }

    switch (buf[0]) {
    case 'c':
        item = cq_pop(me->new_conn_queue);

        if (NULL == item) {
            break;
        }
        switch (item->mode) {
            case queue_new_conn:
                c = conn_new(item->sfd, item->init_state, item->event_flags,
                                   item->read_buffer_size, item->transport,
                                   me->base);
                if (c == NULL) {
                } else {
                    c->thread = me;
                }
                break;

            case queue_redispatch:
                conn_worker_readd(item->c);
                break;
        }
}


主從線程通信流程分析

?嘗試講清楚master線程和worker線程之間如何實(shí)現(xiàn)新來socket的分發(fā)操作。
?在master線程接受連接以后會(huì)觸發(fā)drive_machine方法,其中master的狀態(tài)為conn_listening,所以我們暫時(shí)只關(guān)注這部分邏輯,最終我們通過dispatch_conn_new方法實(shí)現(xiàn)master到worker的分發(fā)操作。

static void drive_machine(conn *c) {
    bool stop = false;
    int sfd;
    socklen_t addrlen;
    struct sockaddr_storage addr;
    int nreqs = settings.reqs_per_event;
    int res;
    const char *str;
#ifdef HAVE_ACCEPT4
    static int  use_accept4 = 1;
#else
    static int  use_accept4 = 0;
#endif

    assert(c != NULL);

    while (!stop) {

        switch(c->state) {
        case conn_listening:
            addrlen = sizeof(addr);
            sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
           // 中間省略一系列的socket相關(guān)的初始化工作            
            if (settings.maxconns_fast &&
            } else {
                dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                                     DATA_BUFFER_SIZE, c->transport);
            }

            stop = true;
            break;



?dispatch_conn_new內(nèi)部實(shí)現(xiàn)的功能比較簡單,用于實(shí)現(xiàn)master向worker分發(fā)新連接:

  • 組裝通信的CQ_ITEM對(duì)象,CQ_ITEM *item = cqi_new();
  • 通過輪詢方式選擇worker對(duì)象,(last_thread + 1) % settings.num_threads;
  • 通過pipe管道想worker發(fā)送新連接的socket,write(thread->notify_send_fd, buf, 1),其中buf[0] = 'c'。
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                       int read_buffer_size, enum network_transport transport) {
    CQ_ITEM *item = cqi_new();
    char buf[1];
    if (item == NULL) {
        close(sfd);
        /* given that malloc failed this may also fail, but let's try */
        fprintf(stderr, "Failed to allocate memory for connection object\n");
        return ;
    }

    int tid = (last_thread + 1) % settings.num_threads;

    LIBEVENT_THREAD *thread = threads + tid;

    last_thread = tid;

    item->sfd = sfd;
    item->init_state = init_state;
    item->event_flags = event_flags;
    item->read_buffer_size = read_buffer_size;
    item->transport = transport;
    item->mode = queue_new_conn;

    cq_push(thread->new_conn_queue, item);

    MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
    buf[0] = 'c';
    if (write(thread->notify_send_fd, buf, 1) != 1) {
        perror("Writing to thread notify pipe");
    }
}



?thread_libevent_process是worker線程接受master分發(fā)新來連接時(shí)候的回調(diào)函數(shù),內(nèi)部通過conn_new來處理新連接的到來,conn_new的內(nèi)部操作就是把心連接的socket注冊到worker線程的libevent當(dāng)中。

static void thread_libevent_process(int fd, short which, void *arg) {
    LIBEVENT_THREAD *me = arg;
    CQ_ITEM *item;
    char buf[1];
    conn *c;
    unsigned int timeout_fd;

    if (read(fd, buf, 1) != 1) {
        if (settings.verbose > 0)
            fprintf(stderr, "Can't read from libevent pipe\n");
        return;
    }

    switch (buf[0]) {
    case 'c':
        item = cq_pop(me->new_conn_queue);

        if (NULL == item) {
            break;
        }
        switch (item->mode) {
            case queue_new_conn:
                c = conn_new(item->sfd, item->init_state, item->event_flags,
                                   item->read_buffer_size, item->transport,
                                   me->base);
                if (c == NULL) {
                } else {
                    c->thread = me;
                }
                break;

            case queue_redispatch:
                conn_worker_readd(item->c);
                break;
        }
}



conn *conn_new(const int sfd, enum conn_states init_state,
                const int event_flags,
                const int read_buffer_size, enum network_transport transport,
                struct event_base *base) {
    conn *c;
    c = conns[sfd];

    // libevent相關(guān)的設(shè)置
    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
    event_base_set(base, &c->event);
    c->ev_flags = event_flags;

    if (event_add(&c->event, 0) == -1) {
        perror("event_add");
        return NULL;
    }

    STATS_LOCK();
    stats_state.curr_conns++;
    stats.total_conns++;
    STATS_UNLOCK();

    MEMCACHED_CONN_ALLOCATE(c->sfd);

    return c;
}


參考文章

libevent簡單介紹
Memcached源碼分析 - Memcached源碼分析之基于Libevent的網(wǎng)絡(luò)模型(1)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容