Redis源碼:內(nèi)存管理與事件處理

Redis內(nèi)存管理

Redis內(nèi)存管理相關(guān)文件為zmalloc.c/zmalloc.h,其只是對C中內(nèi)存管理函數(shù)做了簡單的封裝,屏蔽了底層平臺的差異,并增加了內(nèi)存使用情況統(tǒng)計的功能。

void *zmalloc(size_t size) {
    // 多申請的一部分內(nèi)存用于存儲當(dāng)前分配了多少自己的內(nèi)存
    void *ptr = malloc(size+PREFIX_SIZE);

    if (!ptr) zmalloc_oom_handler(size);
#ifdef HAVE_MALLOC_SIZE
    update_zmalloc_stat_alloc(zmalloc_size(ptr));
    return ptr;
#else
    *((size_t*)ptr) = size;
    // 內(nèi)存分配統(tǒng)計
    update_zmalloc_stat_alloc(size+PREFIX_SIZE);
    return (char*)ptr+PREFIX_SIZE;
#endif
}
內(nèi)存結(jié)構(gòu)圖示

Redis事件處理

Redis的事件類型分為時間事件和文件事件,文件事件也就是網(wǎng)絡(luò)連接事件。時間事件的處理是在epoll_wait返回處理文件事件后處理的,每次epoll_wait的超時時間都是Redis最近的一個定時器時間。

Redis在進(jìn)行事件處理前,首先會進(jìn)行初始化,初始化的主要邏輯在main/initServer函數(shù)中。初始化流程主要做的工作如下:

  1. 設(shè)置信號回調(diào)函數(shù)。
  2. 創(chuàng)建事件循環(huán)機(jī)制,即調(diào)用epoll_create。
  3. 創(chuàng)建服務(wù)監(jiān)聽端口,創(chuàng)建定時事件,并將這些事件添加到事件機(jī)制中。
void initServer(void) {
    int j;

    // 設(shè)置信號對應(yīng)的處理函數(shù)
    signal(SIGHUP, SIG_IGN);
    signal(SIGPIPE, SIG_IGN);
    setupSignalHandlers();
    ...

    createSharedObjects();
    adjustOpenFilesLimit();
    // 創(chuàng)建事件循環(huán)機(jī)制,及調(diào)用epoll_create創(chuàng)建epollfd用于事件監(jiān)聽
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
    server.db = zmalloc(sizeof(redisDb)*server.dbnum);

    /* Open the TCP listening socket for the user commands. */
    // 創(chuàng)建監(jiān)聽服務(wù)端口,socket/bind/listen
    if (server.port != 0 &&
        listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
        exit(1);
    ...

    /* Create the Redis databases, and initialize other internal state. */
    for (j = 0; j < server.dbnum; j++) {
        server.db[j].dict = dictCreate(&dbDictType,NULL);
        server.db[j].expires = dictCreate(&keyptrDictType,NULL);
        server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
        server.db[j].ready_keys = dictCreate(&setDictType,NULL);
        server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
        server.db[j].eviction_pool = evictionPoolAlloc();
        server.db[j].id = j;
        server.db[j].avg_ttl = 0;
    }
    ...

    /* Create the serverCron() time event, that's our main way to process
     * background operations. 創(chuàng)建定時事件 */
    if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create the serverCron time event.");
        exit(1);
    }

    /* Create an event handler for accepting new connections in TCP and Unix
     * domain sockets. */
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                serverPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }
    // 將事件加入到事件機(jī)制中,調(diào)用鏈為 aeCreateFileEvent/aeApiAddEvent/epoll_ctl
    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
        acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");

    /* Open the AOF file if needed. */
    if (server.aof_state == AOF_ON) {
        server.aof_fd = open(server.aof_filename,
                               O_WRONLY|O_APPEND|O_CREAT,0644);
        if (server.aof_fd == -1) {
            serverLog(LL_WARNING, "Can't open the append-only file: %s",
                strerror(errno));
            exit(1);
        }
    }

    ...
}

事件處理流程
事件處理函數(shù)鏈:aeMain / aeProcessEvents / aeApiPoll / epoll_wait。

常見的事件機(jī)制處理流程是:調(diào)用epoll_wait等待事件來臨,然后遍歷每一個epoll_event,提取epoll_event中的events和data域,data域常用來存儲fd或者指針,不過一般的做法是提取出events和data.fd,然后根據(jù)fd找到對應(yīng)的回調(diào)函數(shù),fd與對應(yīng)回調(diào)函數(shù)之間的映射關(guān)系可以存儲在特定的數(shù)據(jù)結(jié)構(gòu)中,比如數(shù)組或者哈希表,然后調(diào)用事件回調(diào)函數(shù)來處理。
Redis中用了一個數(shù)組來保存fd與回調(diào)函數(shù)的映射關(guān)系,使用數(shù)組的優(yōu)點就是簡單高效,但是數(shù)組一般使用在建立的連接不太多情況,而Redis正好符合這個情況,一般Redis的文件事件大都是客戶端建立的連接,而客戶端的連接個數(shù)是一定的,該數(shù)量通過配置項maxclients來指定。

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    numevents = aeApiPoll(eventLoop, tvp);
    for (j = 0; j < numevents; j++) {
        // 從eventLoop->events數(shù)組中查找對應(yīng)的回調(diào)函數(shù)
        aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
        int mask = eventLoop->fired[j].mask;
        int fd = eventLoop->fired[j].fd;
        int rfired = 0;

        /* note the fe->mask & mask & ... code: maybe an already processed
         * event removed an element that fired and we still didn't
         * processed, so we check if the event is still valid. */
        if (fe->mask & mask & AE_READABLE) {
            rfired = 1;
            fe->rfileProc(eventLoop,fd,fe->clientData,mask);
        }
        if (fe->mask & mask & AE_WRITABLE) {
            if (!rfired || fe->wfileProc != fe->rfileProc)
                fe->wfileProc(eventLoop,fd,fe->clientData,mask);
        }
        processed++;
    }
    ...
}

文件事件的監(jiān)聽

Redis監(jiān)聽端口的事件回調(diào)函數(shù)鏈?zhǔn)牵?code>acceptTcpHandler / acceptCommonHandler / createClient / aeCreateFileEvent / aeApiAddEvent / epoll_ctl。
在Reids監(jiān)聽事件處理流程中,會將客戶端的連接fd添加到事件機(jī)制中,并設(shè)置其回調(diào)函數(shù)為readQueryFromClient,該函數(shù)負(fù)責(zé)處理客戶端的命令請求。

命令處理流程

命令處理流程鏈?zhǔn)牵?code>readQueryFromClient / processInputBuffer / processCommand / call / 對應(yīng)命令的回調(diào)函數(shù)(c->cmd->proc),比如get key命令的處理回調(diào)函數(shù)為getCommand。getCommand的執(zhí)行流程是先到client對應(yīng)的數(shù)據(jù)庫字典中根據(jù)key來查找數(shù)據(jù),然后根據(jù)響應(yīng)消息格式將查詢結(jié)果填充到響應(yīng)消息中。

如何添加自定義命令

如何在Redis中添加自定的命令呢?其中只需要改動以下幾個地方就行了,比如自定義命令hello xxx,然后返回redis: xxx,因為hello xxxget key類似,所以就依葫蘆畫瓢。

首先在redisCommandTable數(shù)組中添加自定義的命令,redisCommandTable數(shù)組定義在server.c中。

struct redisCommand redisCommandTable[] = {
    {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
    ...
    {"hello",helloCommand,2,"rF",0,NULL,1,1,1,0,0},
};

然后在getCommand定義處后面添加``helloCommand的定義,getCommand定義在t_string.c`中。

void getCommand(client *c) {
    getGenericCommand(c);
}

void helloCommand(client *c)
{
    char buff[512] = {0};
    robj *o = NULL;

    sprintf(buff, "%s %s", "redis: ", c->argv[1]);
    o = createObject(OBJ_STRING, sdsnewlen(buff, strlen(buff)));

    addReplyBulk(c, o);
    sdsfree(o);
}

最后在server.h中添加helloCommand的聲明。

void getCommand(client *c);
void helloCommand(client *c);

至此為止,代碼已經(jīng)編寫完畢,編譯redis然后運行redis-server。

hello命令運行結(jié)果

參考資料:

  1. redis源碼分析(1)內(nèi)存管理
  2. redis源碼
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • epoll概述 epoll是linux中IO多路復(fù)用的一種機(jī)制,I/O多路復(fù)用就是通過一種機(jī)制,一個進(jìn)程可以監(jiān)視多...
    發(fā)仔很忙閱讀 11,105評論 4 35
  • 最近在看 UNIX 網(wǎng)絡(luò)編程并研究了一下 Redis 的實現(xiàn),感覺 Redis 的源代碼十分適合閱讀和分析,其中 ...
    Draveness閱讀 10,863評論 8 56
  • 本文的討論,暫時忽略redis數(shù)據(jù)結(jié)構(gòu)和算法層面的東西。 目錄 redis如此之快的原因 redis server...
    tafeng閱讀 10,539評論 0 6
  • 花間詞,人在花間,極寫男女歡情之好。 花間派領(lǐng)袖,以我之愚見,唯有柳永,韋莊,溫庭筠三人而已。 情色,客觀...
    秦吉_閱讀 536評論 0 2
  • 【內(nèi)楗第三】 原文: (3.1)君臣上下之事,有遠(yuǎn)有親,近而疏,就之不用,去之反求。日進(jìn)前而不御,遙聞聲而相思。事...
    海納百川vs王者之風(fēng)閱讀 403評論 0 0

友情鏈接更多精彩內(nèi)容