redis版本更新内容(Redis6.0.6源码阅读redis启动流程)
redis版本更新内容(Redis6.0.6源码阅读redis启动流程)redis-server ../redis.conf --port 6379 复制代码来指定配置文件的地址,以及可以填写一些参数,最终通过loadServerConfig方法来将配置文件以及自己填的参数解析if (argc >= 2) { j = 1; sds options = sdsempty(); char *configfile = NULL; //输出版本号以及帮助等 if (strcmp(argv[1] "-v") == 0 || strcmp(argv[1] "--version") == 0) version(); if (strcmp(argv[1] "--help") == 0 ||
前言本片文章将搭建Redis源码debug环境,通过分析main函数流程,了解到redis的网络模型以及事件驱动模型,将redis架构清晰展现。
正文源码搭建源码下载地址:redis-github
下载clion然后导入项目,切换redis6分支,创建以下文件:
文件名: CMAKELists.txt
- 跟目录:
CMake_minimum_required(VERSION 3.0 FATAL_ERROR)
project(redis VERSION 4.0)
set(CMAKE_BUILD_TYPE "Debug")
get_filename_component(REDIS_ROOT "${CMAKE_CURRENT_SOURCE_DIR}" ABSOLUTE)
add_subdirectory(deps)
add_subdirectory(src/modules)
set(SRC_SERVER_TMP
src/acl.c
src/adlist.c
src/ae.c
src/anet.c
src/ae_kqueue.c
src/dict.c
src/sds.c
src/zmalloc.c
src/lzf_c.c
src/lzf_d.c
src/pqsort.c
src/zipmap.c
src/sha1.c
src/ziplist.c
src/release.c
src/networking.c
src/util.c
src/object.c
src/db.c
src/replication.c
src/rdb.c
src/t_string.c
src/t_list.c
src/t_set.c
src/t_zset.c
src/evict.c
src/defrag.c
src/module.c
src/quicklist.c
src/expire.c
src/childinfo.c
src/redis-check-aof.c
src/redis-check-rdb.c
src/lazyfree.c
src/geohash.c
src/rax.c
src/geohash_helper.c
src/siphash.c
src/geo.c
src/t_hash.c
src/config.c
src/aof.c
src/pubsub.c
src/multi.c
src/debug.c
src/sort.c
src/intset.c
src/syncio.c
src/cluster.c
src/crc16.c
src/endianconv.c
src/slowlog.c
src/scripting.c
src/bio.c
src/rio.c
src/rand.c
src/memtest.c
src/crc64.c
src/bitops.c
src/sentinel.c
src/notify.c
src/setproctitle.c
src/crcspeed.c
src/blocked.c
src/hyperloglog.c
src/latency.c
src/sparkline.c
src/t_stream.c
src/lolwut.c
src/lolwut5.c
src/listpack.c
src/localtime.c
src/timeout.c
src/connection.c
src/tls.c
src/tracking.c
src/lolwut6.c
src/gopher.c
src/sha256.c
)
set(SRC_SERVER src/server.c ${SRC_SERVER_TMP})
set(SRC_CLI
src/anet.c
src/sds.c
src/adlist.c
src/redis-cli.c
src/zmalloc.c
src/release.c
src/anet.c
src/ae.c
src/crc64.c
src/crc16.c
src/dict.c
src/siphash.c
src/crcspeed.c
)
set(EXECUTABLE_OUTPUT_PATH src)
link_directories(deps/linenoise/ deps/lua/src deps/hiredis)
add_executable(redis-server ${SRC_SERVER})
target_include_directories(redis-server
PRIVATE ${REDIS_ROOT}/deps/linenoise
PRIVATE ${REDIS_ROOT}/deps/hiredis
PRIVATE ${REDIS_ROOT}/deps/lua/src)
target_link_libraries(redis-server
PRIVATE pthread
PRIVATE m
PRIVATE lua
PRIVATE linenoise
PRIVATE hiredis)
add_executable(redis-cli ${SRC_CLI})
target_include_directories(redis-cli
PRIVATE ${REDIS_ROOT}/deps/linenoise
PRIVATE ${REDIS_ROOT}/deps/hiredis
PRIVATE ${REDIS_ROOT}/deps/lua/src)
target_link_libraries(redis-cli
PRIVATE pthread
PRIVATE m
PRIVATE linenoise
PRIVATE hiredis
)
复制代码
- deps/CMakeLists.txt
add_subdirectory(hiredis)
add_subdirectory(linenoise)
add_subdirectory(lua)
复制代码
- deps/hiredis/CMakeLists.txt
add_library(hiredis STATIC
hiredis.c
net.c
dict.c
sds.c
async.c
read.c
)
复制代码
- deps/linenoise/CMakeLists.txt
add_library(linenoise linenoise.c)
复制代码
- deps/lua/CMakeLists.txt
set(LUA_SRC
src/lapi.c src/lcode.c src/ldebug.c src/ldo.c src/ldump.c src/lfunc.c
src/lgc.c src/llex.c src/lmem.c
src/lobject.c src/lopcodes.c src/lparser.c src/lstate.c src/lstring.c
src/ltable.c src/ltm.c
src/lundump.c src/lvm.c src/lzio.c src/strbuf.c src/fpconv.c
src/lauxlib.c src/lbaselib.c src/ldblib.c src/liolib.c src/lmathlib.c
src/loslib.c src/ltablib.c
src/lstrlib.c src/loadlib.c src/linit.c src/lua_cjson.c
src/lua_struct.c
src/lua_cmsgpack.c
src/lua_bit.c
)
add_library(lua STATIC ${LUA_SRC})
复制代码
- src/modules/CMakeLists.txt
cmake_minimum_required(VERSION 3.9)
set(CMAKE_BUILD_TYPE "Debug")
add_library(helloworld SHARED helloworld.c)
set_target_properties(helloworld PROPERTIES PREFIX "" SUFFIX ".so")
add_library(hellotype SHARED hellotype.c)
set_target_properties(hellotype PROPERTIES PREFIX "" SUFFIX ".so")
add_library(helloblock SHARED helloblock.c)
set_target_properties(helloblock PROPERTIES PREFIX "" SUFFIX ".so")
add_library(testmodule SHARED testmodule.c)
set_target_properties(testmodule PROPERTIES PREFIX "" SUFFIX ".so")
复制代码
修改文件:ae_kqueue.c,将include修改为:
#include "unistd.h"
#include "ae.h"
#include "zmalloc.h"
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
复制代码
执行cmake . (自行安装cmake) 执行make 等待成功后配置运行
int main(int argc char **argv) {
struct timeval tv;
int j;
#ifdef REDIS_TEST
if (argc == 3 && !strcasecmp(argv[1] "test")) {
if (!strcasecmp(argv[2] "ziplist")) {
return ziplistTest(argc argv);
} else if (!strcasecmp(argv[2] "quicklist")) {
quicklistTest(argc argv);
} else if (!strcasecmp(argv[2] "intset")) {
return intsetTest(argc argv);
} else if (!strcasecmp(argv[2] "zipmap")) {
return zipmapTest(argc argv);
} else if (!strcasecmp(argv[2] "sha1test")) {
return sha1Test(argc argv);
} else if (!strcasecmp(argv[2] "util")) {
return utilTest(argc argv);
} else if (!strcasecmp(argv[2] "endianconv")) {
return endianconvTest(argc argv);
} else if (!strcasecmp(argv[2] "crc64")) {
return crc64Test(argc argv);
} else if (!strcasecmp(argv[2] "zmalloc")) {
return zmalloc_test(argc argv);
}
return -1; /* test not found */
}
#endif
/* We need to initialize our libraries and the server configuration. */
#ifdef INIT_SETPROCTITLE_REPLACEMENT
spt_init(argc argv);
#endif
setlocale(LC_COLLATE "");
//设置时间环境变量
tzset();
//内存超出的handler(记录日志)
zmalloc_set_oom_handler(redisOutOfMemoryHandler);
//随机数初始化
srand(time(NULL)^getpid());
//精确时间
gettimeofday(&tv NULL);
crc64_init();
//hash算法种子
uint8_t hashseed[16];
getRandomBytes(hashseed sizeof(hashseed));
dictSetHashFunctionSeed(hashseed);
//是否以哨兵模式启动
server.sentinel_mode = checkForSentinelMode(argc argv);
//初始化配置文件
initServerConfig();
ACLInit();
moduleInitModulesSystem();
tlsInit();
//将执行命令以及参数保存
server.executable = getAbsolutePath(argv[0]);
server.exec_argv = zmalloc(sizeof(char*)*(argc 1));
server.exec_argv[argc] = NULL;
for (j = 0; j < argc; j ) server.exec_argv[j] = zstrdup(argv[j]);
//初始化哨兵配置
if (server.sentinel_mode) {
initSentinelConfig();
initSentinel();
}
//检查aof和rdb错误
if (strstr(argv[0] "redis-check-rdb") != NULL)
redis_check_rdb_main(argc argv NULL);
else if (strstr(argv[0] "redis-check-aof") != NULL)
redis_check_aof_main(argc argv);
复制代码
主要是执行了initServerConfig方法,将服务的配置进行设置了默认值
if (argc >= 2) {
j = 1;
sds options = sdsempty();
char *configfile = NULL;
//输出版本号以及帮助等
if (strcmp(argv[1] "-v") == 0 ||
strcmp(argv[1] "--version") == 0) version();
if (strcmp(argv[1] "--help") == 0 ||
strcmp(argv[1] "-h") == 0) usage();
if (strcmp(argv[1] "--test-memory") == 0) {
if (argc == 3) {
memtest(atoi(argv[2]) 50);
exit(0);
} else {
fprintf(stderr "Please specify the amount of memory to test in megabytes.\n");
fprintf(stderr "Example: ./redis-server --test-memory 4096\n\n");
exit(1);
}
}
//获取配置文件地址
if (argv[j][0] != '-' || argv[j][1] != '-') {
configfile = argv[j];
server.configfile = getAbsolutePath(configfile);
zfree(server.exec_argv[j]);
server.exec_argv[j] = zstrdup(server.configfile);
j ;
}
//解析参数 比如 --port 6379 等一些配置
while(j != argc) {
if (argv[j][0] == '-' && argv[j][1] == '-') {
//跳过 --check-rdb没有参数
if (!strcmp(argv[j] "--check-rdb")) {
j ;
continue;
}
if (sdslen(options)) options = sdscat(options "\n");
options = sdscat(options argv[j] 2);
options = sdscat(options " ");
} else {
/* Option argument */
options = sdscatrepr(options argv[j] strlen(argv[j]));
options = sdscat(options " ");
}
j ;
}
//哨兵配置只能从文件读取 不允许参数
if (server.sentinel_mode && configfile && *configfile == '-') {
serverLog(LL_WARNING
"Sentinel config from STDIN not allowed.");
serverLog(LL_WARNING
"Sentinel needs config file on disk to save state. Exiting...");
exit(1);
}
resetServerSaveParams();
//加载配置文件 和命令行读入的参数到server配置中
loadServerConfig(configfile options);
sdsfree(options);
}
复制代码
然后是解析输入的参数,可以通过:
redis-server ../redis.conf --port 6379
复制代码
来指定配置文件的地址,以及可以填写一些参数,最终通过loadServerConfig方法来将配置文件以及自己填的参数解析
//Redis信息
serverLog(LL_WARNING "oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo");
serverLog(LL_WARNING
"Redis version=%s bits=%d commit=%s modified=%d pid=%d just started"
REDIS_VERSION
(sizeof(long) == 8) ? 64 : 32
redisGitSHA1()
strtol(redisGitDirty() NULL 10) > 0
(int)getpid());
if (argc == 1) {
serverLog(LL_WARNING "Warning: no config file specified using the default config. In order to specify a config file use %s /path/to/%s.conf" argv[0] server.sentinel_mode ? "sentinel" : "redis");
} else {
serverLog(LL_WARNING "Configuration loaded");
}
//后台模式模式
server.supervised = redisIsSupervised(server.supervised_mode);
int background = server.daemonize && !server.supervised;
if (background) daemonize();
//初始化服务器
initServer();
复制代码
主要是initServer这个方法,inintServer首先初始化了一些list,这一段就不放上来了,继续往下看
//常用字符串等共享对象
createSharedObjects();
//提高最大打开文件数量
adjustOpenFilesLimit();
//创建事件循环
server.el = aeCreateEventLoop(server.maxclients CONFIG_FDSET_INCR);
if (server.el == NULL) {
serverLog(LL_WARNING
"Failed creating the event loop. Error message: '%s'"
strerror(errno));
exit(1);
}
复制代码
可以看到这里调用aeCreateEventLoop创建了服务的事件循环器,这也是整个服务的基石
aeEventLooptypedef struct aeEventLoop {
int maxfd; /* 当前最高文件描述符 */
int setsize; //event数量
long long timeEventNextId; //定时任务自增id
time_t lastTime;
aeFileEvent *events; //事件数组
aeFiredEvent *fired; /* 准备就绪的事件*/
aeTimeEvent *timeEventHead; //定时任务头节点
int stop;
void *apidata;
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
int flags;
} aeEventLoop;
复制代码
这是事件循环的结构体,主要是*events来存放对应的事件
typedef struct aeFileEvent {
int mask; /* 三种flags AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
复制代码
aeFileEvent主要是读事件和写事件,根据事件的不同调用方法,按照普通的流程来说应该是先读后写,但是如果mask为AE_BARRIER的时候,表示先写后读
typedef struct aeTimeEvent {
long long id;
long when_sec; //秒
long when_ms; //毫秒
aeTimeProc *timeProc; //对应执行方法
aeEventFinalizerProc *finalizerProc; //回收方法
void *clientData;
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
int refcount;
} aeTimeEvent;
复制代码
这是事件循环里面的定时事件,多个定时事件组成一个链表
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
eventLoop->flags = 0;
if (aeApiCreate(eventLoop) == -1) goto err;
for (i = 0; i < setsize; i )
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
复制代码
创建事件循环代码,可以看到主要调用aeApiCreate,并且初始化了setsize个事件,setsize表示最大client数量
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
复制代码
aeApiCreate调用了底层io实现,一共支持4种:
- epoll:linux环境下主要使用
- kqueue:mac环境下使用
- evport:Solaris 10环境
- select:几乎所有都可以使用
关于什么是select,什么又是epoll老生常谈的话题了,这里就不解释了....
下面用epoll当作主要的IO方式
在java里面也可以使用epoll来作为io方式,写过javaNio的都知道,可以注册自己想要监听的事件,比如accept、read,当事件到来的时候可以根据事件的类型分别处理,在C语言里面相关操作:
epoll_create 创建一个epoll对象
epoll_ctl 往epoll对象中增加/删除某一个流的某一个事件比如
epoll_ctl(epollfd EPOLL_CTL_ADD socket EPOLLIN);//有缓冲区内有数据时epoll_wait返回
epoll_ctl(epollfd EPOLL_CTL_DEL socket EPOLLOUT);//缓冲区可写入时epoll_wait返回epoll_wait(epollfd ...)等待直到注册的事件发生
复制代码
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
eventLoop->apidata = state;
return 0;
}
static int aeApiAddEvent(aeEventLoop *eventLoop int fd int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0};
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
//合并旧的mask
mask |= eventLoop->events[fd].mask;
//设置事件 读/写
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
//新增事件
if (epoll_ctl(state->epfd op fd &ee) == -1) return -1;
return 0;
}
复制代码
创建epoll以及新增事件
int aeCreateFileEvent(aeEventLoop *eventLoop int fd int mask
aeFileProc *proc void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
//核心
if (aeApiAddEvent(eventLoop fd mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
复制代码
这是aeCreateFileEvent方法,在很多地方都用到了,比如aof的重写。通过aeApiAddEvent向epoll注册一个对应的事件
static int aeApiPoll(aeEventLoop *eventLoop struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval numevents = 0;
retval = epoll_wait(state->epfd state->events eventLoop->setsize
tvp ? (tvp->tv_sec*1000 tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j ) {
int mask = 0;
struct epoll_event *e = state->events j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
复制代码
调用epoll_wait来等待事件的到来,然后将事件放入fired数组中
总结:redis使用了不同的I/O模型来处理事件,可以自由的处理事件的添加删除等,最终调用aeApiPoll获取达到的事件,后续其他地方会处理对应的方法。
//创建默认16个数据库
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
//绑定端口
if (server.port != 0 &&
listenToPort(server.port server.ipfd &server.ipfd_count) == C_ERR)
exit(1);
if (server.tls_port != 0 &&
listenToPort(server.tls_port server.tlsfd &server.tlsfd_count) == C_ERR)
exit(1);
//unixsocket 不配置port 打开unixsokcet配置即可
if (server.unixsocket != NULL) {
unlink(server.unixsocket); /* don't care if this fails */
server.sofd = anetUnixServer(server.neterr server.unixsocket
server.unixsocketperm server.tcp_backlog);
if (server.sofd == ANET_ERR) {
serverLog(LL_WARNING "Opening Unix socket: %s" server.neterr);
exit(1);
}
anetNonBlock(NULL server.sofd);
}
//没有listen任何端口
if (server.ipfd_count == 0 && server.tlsfd_count == 0 && server.sofd < 0) {
serverLog(LL_WARNING "Configured to not listen anywhere exiting.");
exit(1);
}
复制代码
继续是绑定端口以及unixsocket
//创建16个数据库
for (j = 0; j < server.dbnum; j ) {
server.db[j].dict = dictCreate(&dbDictType NULL);
server.db[j].expires = dictCreate(&keyptrDictType NULL);
server.db[j].expires_cursor = 0;
server.db[j].blocking_keys = dictCreate(&keylistDictType NULL);
server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType NULL);
server.db[j].id = j;
server.db[j].avg_ttl = 0;
server.db[j].defrag_later = listCreate();
listSetFreeMethod(server.db[j].defrag_later (void (*)(void*))sdsfree);
}
复制代码
初始化数据库
//创建定时器
if (aeCreateTimeEvent(server.el 1 serverCron NULL NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
复制代码
这里创建了serverCron定时器,serverCron是整个系统的定时任务处理
long long aeCreateTimeEvent(aeEventLoop *eventLoop long long milliseconds
aeTimeProc *proc void *clientData
aeEventFinalizerProc *finalizerProc)
{
//获取自增id
long long id = eventLoop->timeEventNextId ;
aeTimeEvent *te;
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
te->id = id;
aeAddMillisecondsToNow(milliseconds &te->when_sec &te->when_ms);
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
te->prev = NULL;
te->next = eventLoop->timeEventHead;
te->refcount = 0;
if (te->next)
te->next->prev = te;
eventLoop->timeEventHead = te;
return id;
}
复制代码
添加定时任务并没有使用到IO,而是创建了定时任务放入到链表的头部
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
{
aeTimeEvent *te = eventLoop->timeEventHead;
aeTimeEvent *nearest = NULL;
while(te) {
if (!nearest || te->when_sec < nearest->when_sec ||
(te->when_sec == nearest->when_sec &&
te->when_ms < nearest->when_ms))
nearest = te;
te = te->next;
}
return nearest;
}
复制代码
遍历整个定时链表,搜到出来最快执行的任务
//创建事件处理 用于处理tcp或unix连接
for (j = 0; j < server.ipfd_count; j ) {
if (aeCreateFileEvent(server.el server.ipfd[j] AE_READABLE
acceptTcpHandler NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
复制代码
这里创建了很多个事件处理器,用于处理redis客户端的连接
void acceptTcpHandler(aeEventLoop *el int fd void *privdata int mask) {
int cport cfd max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
while(max--) {
cfd = anetTcpAccept(server.neterr fd cip sizeof(cip) &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING
"Accepting client connection: %s" server.neterr);
return;
}
serverLog(LL_VERBOSE "Accepted %s:%d" cip cport);
acceptCommonHandler(connCreateAcceptedSocket(cfd) 0 cip);
}
}
复制代码
继续看acceptCommonHandler
static void acceptCommonHandler(connection *conn int flags char *ip) {
client *c;
UNUSED(ip);
//客户端太多了
if (listLength(server.clients) getClusterConnectionsCount()
>= server.maxclients)
{
char *err;
if (server.cluster_enabled)
err = "-ERR max number of clients cluster "
"connections reached\r\n";
else
err = "-ERR max number of clients reached\r\n";
//写入错误信息
if (connWrite(conn err strlen(err)) == -1) {
}
server.stat_rejected_conn ;
connClose(conn);
return;
}
/* 创建客户端 */
if ((c = createClient(conn)) == NULL) {
char conninfo[100];
serverLog(LL_WARNING
"Error registering fd event for the new client: %s (conn: %s)"
connGetLastError(conn)
connGetInfo(conn conninfo sizeof(conninfo)));
connClose(conn); /* May be already closed just ignore errors */
return;
}
c->flags |= flags;
if (connAccept(conn clientAcceptHandler) == C_ERR) {
char conninfo[100];
if (connGetState(conn) == CONN_STATE_ERROR)
serverLog(LL_WARNING
"Error accepting a client connection: %s (conn: %s)"
connGetLastError(conn) connGetInfo(conn conninfo sizeof(conninfo)));
freeClient(connGetPrivateData(conn));
return;
}
}
复制代码
首先判断了最大客户端数量,超出数量返回失败,然后调用createClient创建了客户端
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));
if (conn) {
connNonBlock(conn);
connEnableTcpNoDelay(conn);
if (server.tcpkeepalive)
connKeepAlive(conn server.tcpkeepalive);
connSetReadHandler(conn readQueryFromClient);
connSetPrivateData(conn c);
}
复制代码
只看这一段,给客户端连接设置了读处理器,readQueryFromClient
static inline int connSetReadHandler(connection *conn ConnectionCallbackFunc func) {
return conn->type->set_read_handler(conn func);
}
.set_read_handler = connSocketSetReadHandler
复制代码
set_read_handler是connSocketSetReadHandler方法
static int connSocketSetReadHandler(connection *conn ConnectionCallbackFunc func) {
if (func == conn->read_handler) return C_OK;
conn->read_handler = func;
if (!conn->read_handler)
aeDeleteFileEvent(server.el conn->fd AE_READABLE);
else
if (aeCreateFileEvent(server.el conn->fd
AE_READABLE conn->type->ae_handler conn) == AE_ERR) return C_ERR;
return C_OK;
}
复制代码
看到这里就明白了,redis创建了多个tcp事件,当客户端第一次连接的时候会创建client实例,并且设置ReadHandler,ReadHandler实际上向epoll注册了读事件,当有命令可读的时候调用readQueryFromClient
readQueryFromClient最后会调用到了processCommand方法也就是执行命令,同时在redis6里面是支持多线程的,但是通过前面的源码可以看出来:dict并没有进行加锁,那么这个多线程是怎么回事呢?看下回分解redis多线程模型
总结
作者:LZ
链接:https://juejin.cn/post/6979458051556245535
来源:掘金