快捷搜索:  汽车  科技

redis版本更新内容(Redis6.0.6源码阅读redis启动流程)

redis版本更新内容(Redis6.0.6源码阅读redis启动流程)redis-server ../redis.conf --port 6379 复制代码来指定配置文件的地址,以及可以填写一些参数,最终通过loadServerConfig方法来将配置文件以及自己填的参数解析if (argc >= 2) { j = 1; sds options = sdsempty(); char *configfile = NULL; //输出版本号以及帮助等 if (strcmp(argv[1] "-v") == 0 || strcmp(argv[1] "--version") == 0) version(); if (strcmp(argv[1] "--help") == 0 ||

前言

本片文章将搭建Redis源码debug环境,通过分析main函数流程,了解到redis的网络模型以及事件驱动模型,将redis架构清晰展现。

正文源码搭建

源码下载地址:redis-github

下载clion然后导入项目,切换redis6分支,创建以下文件:

文件名: CMAKELists.txt

  • 跟目录:

CMake_minimum_required(VERSION 3.0 FATAL_ERROR) project(redis VERSION 4.0) set(CMAKE_BUILD_TYPE "Debug") get_filename_component(REDIS_ROOT "${CMAKE_CURRENT_SOURCE_DIR}" ABSOLUTE) add_subdirectory(deps) add_subdirectory(src/modules) set(SRC_SERVER_TMP src/acl.c src/adlist.c src/ae.c src/anet.c src/ae_kqueue.c src/dict.c src/sds.c src/zmalloc.c src/lzf_c.c src/lzf_d.c src/pqsort.c src/zipmap.c src/sha1.c src/ziplist.c src/release.c src/networking.c src/util.c src/object.c src/db.c src/replication.c src/rdb.c src/t_string.c src/t_list.c src/t_set.c src/t_zset.c src/evict.c src/defrag.c src/module.c src/quicklist.c src/expire.c src/childinfo.c src/redis-check-aof.c src/redis-check-rdb.c src/lazyfree.c src/geohash.c src/rax.c src/geohash_helper.c src/siphash.c src/geo.c src/t_hash.c src/config.c src/aof.c src/pubsub.c src/multi.c src/debug.c src/sort.c src/intset.c src/syncio.c src/cluster.c src/crc16.c src/endianconv.c src/slowlog.c src/scripting.c src/bio.c src/rio.c src/rand.c src/memtest.c src/crc64.c src/bitops.c src/sentinel.c src/notify.c src/setproctitle.c src/crcspeed.c src/blocked.c src/hyperloglog.c src/latency.c src/sparkline.c src/t_stream.c src/lolwut.c src/lolwut5.c src/listpack.c src/localtime.c src/timeout.c src/connection.c src/tls.c src/tracking.c src/lolwut6.c src/gopher.c src/sha256.c ) set(SRC_SERVER src/server.c ${SRC_SERVER_TMP}) set(SRC_CLI src/anet.c src/sds.c src/adlist.c src/redis-cli.c src/zmalloc.c src/release.c src/anet.c src/ae.c src/crc64.c src/crc16.c src/dict.c src/siphash.c src/crcspeed.c ) set(EXECUTABLE_OUTPUT_PATH src) link_directories(deps/linenoise/ deps/lua/src deps/hiredis) add_executable(redis-server ${SRC_SERVER}) target_include_directories(redis-server PRIVATE ${REDIS_ROOT}/deps/linenoise PRIVATE ${REDIS_ROOT}/deps/hiredis PRIVATE ${REDIS_ROOT}/deps/lua/src) target_link_libraries(redis-server PRIVATE pthread PRIVATE m PRIVATE lua PRIVATE linenoise PRIVATE hiredis) add_executable(redis-cli ${SRC_CLI}) target_include_directories(redis-cli PRIVATE ${REDIS_ROOT}/deps/linenoise PRIVATE ${REDIS_ROOT}/deps/hiredis PRIVATE ${REDIS_ROOT}/deps/lua/src) target_link_libraries(redis-cli PRIVATE pthread PRIVATE m PRIVATE linenoise PRIVATE hiredis ) 复制代码

  • deps/CMakeLists.txt

add_subdirectory(hiredis) add_subdirectory(linenoise) add_subdirectory(lua) 复制代码

  • deps/hiredis/CMakeLists.txt

add_library(hiredis STATIC hiredis.c net.c dict.c sds.c async.c read.c ) 复制代码

  • deps/linenoise/CMakeLists.txt

add_library(linenoise linenoise.c) 复制代码

  • deps/lua/CMakeLists.txt

set(LUA_SRC src/lapi.c src/lcode.c src/ldebug.c src/ldo.c src/ldump.c src/lfunc.c src/lgc.c src/llex.c src/lmem.c src/lobject.c src/lopcodes.c src/lparser.c src/lstate.c src/lstring.c src/ltable.c src/ltm.c src/lundump.c src/lvm.c src/lzio.c src/strbuf.c src/fpconv.c src/lauxlib.c src/lbaselib.c src/ldblib.c src/liolib.c src/lmathlib.c src/loslib.c src/ltablib.c src/lstrlib.c src/loadlib.c src/linit.c src/lua_cjson.c src/lua_struct.c src/lua_cmsgpack.c src/lua_bit.c ) add_library(lua STATIC ${LUA_SRC}) 复制代码

  • src/modules/CMakeLists.txt

cmake_minimum_required(VERSION 3.9) set(CMAKE_BUILD_TYPE "Debug") add_library(helloworld SHARED helloworld.c) set_target_properties(helloworld PROPERTIES PREFIX "" SUFFIX ".so") add_library(hellotype SHARED hellotype.c) set_target_properties(hellotype PROPERTIES PREFIX "" SUFFIX ".so") add_library(helloblock SHARED helloblock.c) set_target_properties(helloblock PROPERTIES PREFIX "" SUFFIX ".so") add_library(testmodule SHARED testmodule.c) set_target_properties(testmodule PROPERTIES PREFIX "" SUFFIX ".so") 复制代码

修改文件:ae_kqueue.c,将include修改为:

#include "unistd.h" #include "ae.h" #include "zmalloc.h" #include <sys/types.h> #include <sys/event.h> #include <sys/time.h> 复制代码

执行cmake . (自行安装cmake) 执行make 等待成功后配置运行

redis版本更新内容(Redis6.0.6源码阅读redis启动流程)(1)

main方法

int main(int argc char **argv) { struct timeval tv; int j; #ifdef REDIS_TEST if (argc == 3 && !strcasecmp(argv[1] "test")) { if (!strcasecmp(argv[2] "ziplist")) { return ziplistTest(argc argv); } else if (!strcasecmp(argv[2] "quicklist")) { quicklistTest(argc argv); } else if (!strcasecmp(argv[2] "intset")) { return intsetTest(argc argv); } else if (!strcasecmp(argv[2] "zipmap")) { return zipmapTest(argc argv); } else if (!strcasecmp(argv[2] "sha1test")) { return sha1Test(argc argv); } else if (!strcasecmp(argv[2] "util")) { return utilTest(argc argv); } else if (!strcasecmp(argv[2] "endianconv")) { return endianconvTest(argc argv); } else if (!strcasecmp(argv[2] "crc64")) { return crc64Test(argc argv); } else if (!strcasecmp(argv[2] "zmalloc")) { return zmalloc_test(argc argv); } return -1; /* test not found */ } #endif /* We need to initialize our libraries and the server configuration. */ #ifdef INIT_SETPROCTITLE_REPLACEMENT spt_init(argc argv); #endif setlocale(LC_COLLATE ""); //设置时间环境变量 tzset(); //内存超出的handler(记录日志) zmalloc_set_oom_handler(redisOutOfMemoryHandler); //随机数初始化 srand(time(NULL)^getpid()); //精确时间 gettimeofday(&tv NULL); crc64_init(); //hash算法种子 uint8_t hashseed[16]; getRandomBytes(hashseed sizeof(hashseed)); dictSetHashFunctionSeed(hashseed); //是否以哨兵模式启动 server.sentinel_mode = checkForSentinelMode(argc argv); //初始化配置文件 initServerConfig(); ACLInit(); moduleInitModulesSystem(); tlsInit(); //将执行命令以及参数保存 server.executable = getAbsolutePath(argv[0]); server.exec_argv = zmalloc(sizeof(char*)*(argc 1)); server.exec_argv[argc] = NULL; for (j = 0; j < argc; j ) server.exec_argv[j] = zstrdup(argv[j]); //初始化哨兵配置 if (server.sentinel_mode) { initSentinelConfig(); initSentinel(); } //检查aof和rdb错误 if (strstr(argv[0] "redis-check-rdb") != NULL) redis_check_rdb_main(argc argv NULL); else if (strstr(argv[0] "redis-check-aof") != NULL) redis_check_aof_main(argc argv); 复制代码

主要是执行了initServerConfig方法,将服务的配置进行设置了默认值

if (argc >= 2) { j = 1; sds options = sdsempty(); char *configfile = NULL; //输出版本号以及帮助等 if (strcmp(argv[1] "-v") == 0 || strcmp(argv[1] "--version") == 0) version(); if (strcmp(argv[1] "--help") == 0 || strcmp(argv[1] "-h") == 0) usage(); if (strcmp(argv[1] "--test-memory") == 0) { if (argc == 3) { memtest(atoi(argv[2]) 50); exit(0); } else { fprintf(stderr "Please specify the amount of memory to test in megabytes.\n"); fprintf(stderr "Example: ./redis-server --test-memory 4096\n\n"); exit(1); } } //获取配置文件地址 if (argv[j][0] != '-' || argv[j][1] != '-') { configfile = argv[j]; server.configfile = getAbsolutePath(configfile); zfree(server.exec_argv[j]); server.exec_argv[j] = zstrdup(server.configfile); j ; } //解析参数 比如 --port 6379 等一些配置 while(j != argc) { if (argv[j][0] == '-' && argv[j][1] == '-') { //跳过 --check-rdb没有参数 if (!strcmp(argv[j] "--check-rdb")) { j ; continue; } if (sdslen(options)) options = sdscat(options "\n"); options = sdscat(options argv[j] 2); options = sdscat(options " "); } else { /* Option argument */ options = sdscatrepr(options argv[j] strlen(argv[j])); options = sdscat(options " "); } j ; } //哨兵配置只能从文件读取 不允许参数 if (server.sentinel_mode && configfile && *configfile == '-') { serverLog(LL_WARNING "Sentinel config from STDIN not allowed."); serverLog(LL_WARNING "Sentinel needs config file on disk to save state. Exiting..."); exit(1); } resetServerSaveParams(); //加载配置文件 和命令行读入的参数到server配置中 loadServerConfig(configfile options); sdsfree(options); } 复制代码

然后是解析输入的参数,可以通过:

redis-server ../redis.conf --port 6379 复制代码

来指定配置文件的地址,以及可以填写一些参数,最终通过loadServerConfig方法来将配置文件以及自己填的参数解析

//Redis信息 serverLog(LL_WARNING "oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo"); serverLog(LL_WARNING "Redis version=%s bits=%d commit=%s modified=%d pid=%d just started" REDIS_VERSION (sizeof(long) == 8) ? 64 : 32 redisGitSHA1() strtol(redisGitDirty() NULL 10) > 0 (int)getpid()); if (argc == 1) { serverLog(LL_WARNING "Warning: no config file specified using the default config. In order to specify a config file use %s /path/to/%s.conf" argv[0] server.sentinel_mode ? "sentinel" : "redis"); } else { serverLog(LL_WARNING "Configuration loaded"); } //后台模式模式 server.supervised = redisIsSupervised(server.supervised_mode); int background = server.daemonize && !server.supervised; if (background) daemonize(); //初始化服务器 initServer(); 复制代码

主要是initServer这个方法,inintServer首先初始化了一些list,这一段就不放上来了,继续往下看

//常用字符串等共享对象 createSharedObjects(); //提高最大打开文件数量 adjustOpenFilesLimit(); //创建事件循环 server.el = aeCreateEventLoop(server.maxclients CONFIG_FDSET_INCR); if (server.el == NULL) { serverLog(LL_WARNING "Failed creating the event loop. Error message: '%s'" strerror(errno)); exit(1); } 复制代码

可以看到这里调用aeCreateEventLoop创建了服务的事件循环器,这也是整个服务的基石

aeEventLoop

typedef struct aeEventLoop { int maxfd; /* 当前最高文件描述符 */ int setsize; //event数量 long long timeEventNextId; //定时任务自增id time_t lastTime; aeFileEvent *events; //事件数组 aeFiredEvent *fired; /* 准备就绪的事件*/ aeTimeEvent *timeEventHead; //定时任务头节点 int stop; void *apidata; aeBeforeSleepProc *beforesleep; aeBeforeSleepProc *aftersleep; int flags; } aeEventLoop; 复制代码

这是事件循环的结构体,主要是*events来存放对应的事件

typedef struct aeFileEvent { int mask; /* 三种flags AE_(READABLE|WRITABLE|BARRIER) */ aeFileProc *rfileProc; aeFileProc *wfileProc; void *clientData; } aeFileEvent; 复制代码

aeFileEvent主要是读事件和写事件,根据事件的不同调用方法,按照普通的流程来说应该是先读后写,但是如果mask为AE_BARRIER的时候,表示先写后读

typedef struct aeTimeEvent { long long id; long when_sec; //秒 long when_ms; //毫秒 aeTimeProc *timeProc; //对应执行方法 aeEventFinalizerProc *finalizerProc; //回收方法 void *clientData; struct aeTimeEvent *prev; struct aeTimeEvent *next; int refcount; } aeTimeEvent; 复制代码

这是事件循环里面的定时事件,多个定时事件组成一个链表

aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; int i; if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; eventLoop->setsize = setsize; eventLoop->lastTime = time(NULL); eventLoop->timeEventHead = NULL; eventLoop->timeEventNextId = 0; eventLoop->stop = 0; eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; eventLoop->aftersleep = NULL; eventLoop->flags = 0; if (aeApiCreate(eventLoop) == -1) goto err; for (i = 0; i < setsize; i ) eventLoop->events[i].mask = AE_NONE; return eventLoop; err: if (eventLoop) { zfree(eventLoop->events); zfree(eventLoop->fired); zfree(eventLoop); } return NULL; } 复制代码

创建事件循环代码,可以看到主要调用aeApiCreate,并且初始化了setsize个事件,setsize表示最大client数量

#ifdef HAVE_EVPORT #include "ae_evport.c" #else #ifdef HAVE_EPOLL #include "ae_epoll.c" #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" #else #include "ae_select.c" #endif #endif #endif 复制代码

aeApiCreate调用了底层io实现,一共支持4种:

  • epoll:linux环境下主要使用
  • kqueue:mac环境下使用
  • evport:Solaris 10环境
  • select:几乎所有都可以使用

关于什么是select,什么又是epoll老生常谈的话题了,这里就不解释了....

下面用epoll当作主要的IO方式

在java里面也可以使用epoll来作为io方式,写过javaNio的都知道,可以注册自己想要监听的事件,比如accept、read,当事件到来的时候可以根据事件的类型分别处理,在C语言里面相关操作:

epoll_create 创建一个epoll对象 epoll_ctl 往epoll对象中增加/删除某一个流的某一个事件比如 epoll_ctl(epollfd EPOLL_CTL_ADD socket EPOLLIN);//有缓冲区内有数据时epoll_wait返回 epoll_ctl(epollfd EPOLL_CTL_DEL socket EPOLLOUT);//缓冲区可写入时epoll_wait返回epoll_wait(epollfd ...)等待直到注册的事件发生 复制代码

static int aeApiCreate(aeEventLoop *eventLoop) { aeApiState *state = zmalloc(sizeof(aeApiState)); if (!state) return -1; state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize); if (!state->events) { zfree(state); return -1; } state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */ if (state->epfd == -1) { zfree(state->events); zfree(state); return -1; } eventLoop->apidata = state; return 0; } static int aeApiAddEvent(aeEventLoop *eventLoop int fd int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; ee.events = 0; //合并旧的mask mask |= eventLoop->events[fd].mask; //设置事件 读/写 if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.fd = fd; //新增事件 if (epoll_ctl(state->epfd op fd &ee) == -1) return -1; return 0; } 复制代码

创建epoll以及新增事件

int aeCreateFileEvent(aeEventLoop *eventLoop int fd int mask aeFileProc *proc void *clientData) { if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } aeFileEvent *fe = &eventLoop->events[fd]; //核心 if (aeApiAddEvent(eventLoop fd mask) == -1) return AE_ERR; fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; } 复制代码

这是aeCreateFileEvent方法,在很多地方都用到了,比如aof的重写。通过aeApiAddEvent向epoll注册一个对应的事件

static int aeApiPoll(aeEventLoop *eventLoop struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval numevents = 0; retval = epoll_wait(state->epfd state->events eventLoop->setsize tvp ? (tvp->tv_sec*1000 tvp->tv_usec/1000) : -1); if (retval > 0) { int j; numevents = retval; for (j = 0; j < numevents; j ) { int mask = 0; struct epoll_event *e = state->events j; if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE; eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } return numevents; } 复制代码

调用epoll_wait来等待事件的到来,然后将事件放入fired数组中

总结:redis使用了不同的I/O模型来处理事件,可以自由的处理事件的添加删除等,最终调用aeApiPoll获取达到的事件,后续其他地方会处理对应的方法。

//创建默认16个数据库 server.db = zmalloc(sizeof(redisDb)*server.dbnum); //绑定端口 if (server.port != 0 && listenToPort(server.port server.ipfd &server.ipfd_count) == C_ERR) exit(1); if (server.tls_port != 0 && listenToPort(server.tls_port server.tlsfd &server.tlsfd_count) == C_ERR) exit(1); //unixsocket 不配置port 打开unixsokcet配置即可 if (server.unixsocket != NULL) { unlink(server.unixsocket); /* don't care if this fails */ server.sofd = anetUnixServer(server.neterr server.unixsocket server.unixsocketperm server.tcp_backlog); if (server.sofd == ANET_ERR) { serverLog(LL_WARNING "Opening Unix socket: %s" server.neterr); exit(1); } anetNonBlock(NULL server.sofd); } //没有listen任何端口 if (server.ipfd_count == 0 && server.tlsfd_count == 0 && server.sofd < 0) { serverLog(LL_WARNING "Configured to not listen anywhere exiting."); exit(1); } 复制代码

继续是绑定端口以及unixsocket

//创建16个数据库 for (j = 0; j < server.dbnum; j ) { server.db[j].dict = dictCreate(&dbDictType NULL); server.db[j].expires = dictCreate(&keyptrDictType NULL); server.db[j].expires_cursor = 0; server.db[j].blocking_keys = dictCreate(&keylistDictType NULL); server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType NULL); server.db[j].watched_keys = dictCreate(&keylistDictType NULL); server.db[j].id = j; server.db[j].avg_ttl = 0; server.db[j].defrag_later = listCreate(); listSetFreeMethod(server.db[j].defrag_later (void (*)(void*))sdsfree); } 复制代码

初始化数据库

//创建定时器 if (aeCreateTimeEvent(server.el 1 serverCron NULL NULL) == AE_ERR) { serverPanic("Can't create event loop timers."); exit(1); } 复制代码

这里创建了serverCron定时器,serverCron是整个系统的定时任务处理

long long aeCreateTimeEvent(aeEventLoop *eventLoop long long milliseconds aeTimeProc *proc void *clientData aeEventFinalizerProc *finalizerProc) { //获取自增id long long id = eventLoop->timeEventNextId ; aeTimeEvent *te; te = zmalloc(sizeof(*te)); if (te == NULL) return AE_ERR; te->id = id; aeAddMillisecondsToNow(milliseconds &te->when_sec &te->when_ms); te->timeProc = proc; te->finalizerProc = finalizerProc; te->clientData = clientData; te->prev = NULL; te->next = eventLoop->timeEventHead; te->refcount = 0; if (te->next) te->next->prev = te; eventLoop->timeEventHead = te; return id; } 复制代码

添加定时任务并没有使用到IO,而是创建了定时任务放入到链表的头部

static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop) { aeTimeEvent *te = eventLoop->timeEventHead; aeTimeEvent *nearest = NULL; while(te) { if (!nearest || te->when_sec < nearest->when_sec || (te->when_sec == nearest->when_sec && te->when_ms < nearest->when_ms)) nearest = te; te = te->next; } return nearest; } 复制代码

遍历整个定时链表,搜到出来最快执行的任务

//创建事件处理 用于处理tcp或unix连接 for (j = 0; j < server.ipfd_count; j ) { if (aeCreateFileEvent(server.el server.ipfd[j] AE_READABLE acceptTcpHandler NULL) == AE_ERR) { serverPanic( "Unrecoverable error creating server.ipfd file event."); } } 复制代码

这里创建了很多个事件处理器,用于处理redis客户端的连接

void acceptTcpHandler(aeEventLoop *el int fd void *privdata int mask) { int cport cfd max = MAX_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; UNUSED(el); UNUSED(mask); UNUSED(privdata); while(max--) { cfd = anetTcpAccept(server.neterr fd cip sizeof(cip) &cport); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) serverLog(LL_WARNING "Accepting client connection: %s" server.neterr); return; } serverLog(LL_VERBOSE "Accepted %s:%d" cip cport); acceptCommonHandler(connCreateAcceptedSocket(cfd) 0 cip); } } 复制代码

继续看acceptCommonHandler

static void acceptCommonHandler(connection *conn int flags char *ip) { client *c; UNUSED(ip); //客户端太多了 if (listLength(server.clients) getClusterConnectionsCount() >= server.maxclients) { char *err; if (server.cluster_enabled) err = "-ERR max number of clients cluster " "connections reached\r\n"; else err = "-ERR max number of clients reached\r\n"; //写入错误信息 if (connWrite(conn err strlen(err)) == -1) { } server.stat_rejected_conn ; connClose(conn); return; } /* 创建客户端 */ if ((c = createClient(conn)) == NULL) { char conninfo[100]; serverLog(LL_WARNING "Error registering fd event for the new client: %s (conn: %s)" connGetLastError(conn) connGetInfo(conn conninfo sizeof(conninfo))); connClose(conn); /* May be already closed just ignore errors */ return; } c->flags |= flags; if (connAccept(conn clientAcceptHandler) == C_ERR) { char conninfo[100]; if (connGetState(conn) == CONN_STATE_ERROR) serverLog(LL_WARNING "Error accepting a client connection: %s (conn: %s)" connGetLastError(conn) connGetInfo(conn conninfo sizeof(conninfo))); freeClient(connGetPrivateData(conn)); return; } } 复制代码

首先判断了最大客户端数量,超出数量返回失败,然后调用createClient创建了客户端

client *createClient(connection *conn) { client *c = zmalloc(sizeof(client)); if (conn) { connNonBlock(conn); connEnableTcpNoDelay(conn); if (server.tcpkeepalive) connKeepAlive(conn server.tcpkeepalive); connSetReadHandler(conn readQueryFromClient); connSetPrivateData(conn c); } 复制代码

只看这一段,给客户端连接设置了读处理器,readQueryFromClient

static inline int connSetReadHandler(connection *conn ConnectionCallbackFunc func) { return conn->type->set_read_handler(conn func); } .set_read_handler = connSocketSetReadHandler 复制代码

set_read_handler是connSocketSetReadHandler方法

static int connSocketSetReadHandler(connection *conn ConnectionCallbackFunc func) { if (func == conn->read_handler) return C_OK; conn->read_handler = func; if (!conn->read_handler) aeDeleteFileEvent(server.el conn->fd AE_READABLE); else if (aeCreateFileEvent(server.el conn->fd AE_READABLE conn->type->ae_handler conn) == AE_ERR) return C_ERR; return C_OK; } 复制代码

看到这里就明白了,redis创建了多个tcp事件,当客户端第一次连接的时候会创建client实例,并且设置ReadHandler,ReadHandler实际上向epoll注册了读事件,当有命令可读的时候调用readQueryFromClient

readQueryFromClient最后会调用到了processCommand方法也就是执行命令,同时在redis6里面是支持多线程的,但是通过前面的源码可以看出来:dict并没有进行加锁,那么这个多线程是怎么回事呢?看下回分解redis多线程模型

总结

redis版本更新内容(Redis6.0.6源码阅读redis启动流程)(2)


作者:LZ
链接:https://juejin.cn/post/6979458051556245535
来源:掘金

猜您喜欢: