快捷搜索:  汽车  科技

fastdfs如何实现安全访问(文件传输原理与网络IO模型)

fastdfs如何实现安全访问(文件传输原理与网络IO模型)FDFS_STORAGE_STAGE_NIO_INIT、FDFS_STORAGE_STAGE_NIO_recv、3.dio线程不会直接给nio线程设置各种读写事件,⽽是通过1.accept线程在接受新的连接后,封装成一个任务对象,选择worker线程,写入管道pipe,pipe是能够触发work线程中的epoll。(在接受新连接前,在work线程中就已经把pipe加入了监听)2.文件io线程,通过从队列中取任务,然后写入到磁盘中。那么如何通知文件IO线程呢,直接往文件处理队列(阻塞队列)中添加即可,在文件io线程中就能取到(锁和条件变量,取消阻塞)。没有直接在文件io线程中检测网络io事件,主要是为了将功能解耦合,让work线程(处理网络io)去做这件事,每个线程只做自己的事,使得逻辑清楚。

一、fastdfs网络IO模型的结构

fdfs文件服务器主要有3种线程,accept线程、work线程(网络io处理)、dio线程(处理文件)

fastdfs如何实现安全访问(文件传输原理与网络IO模型)(1)

accept新连接,有个专门的accept线程去处理。每个线程池处理自己的事,比如在业务中,还要设计一个视频解码的功能,要另开个线程池,处理专门的任务。而不是把所有逻辑都放在一个线程池里面。

nio是net io的意思 (网络io)

dio是data io的意思 (文件io)

fastdfs如何实现安全访问(文件传输原理与网络IO模型)(2)

1.accept线程在接受新的连接后,封装成一个任务对象,选择worker线程,写入管道pipe,pipe是能够触发work线程中的epoll。(在接受新连接前,在work线程中就已经把pipe加入了监听)

2.文件io线程,通过从队列中取任务,然后写入到磁盘中。

那么如何通知文件IO线程呢,直接往文件处理队列(阻塞队列)中添加即可,在文件io线程中就能取到(锁和条件变量,取消阻塞)。没有直接在文件io线程中检测网络io事件,主要是为了将功能解耦合,让work线程(处理网络io)去做这件事,每个线程只做自己的事,使得逻辑清楚。

3.dio线程不会直接给nio线程设置各种读写事件,⽽是通过

FDFS_STORAGE_STAGE_NIO_INIT、FDFS_STORAGE_STAGE_NIO_recv、

FDFS_STORAGE_STAGE_NIO_SEND、FDFS_STORAGE_STAGE_NIO_CLOSE、

FDFS_STORAGE_STAGE_DIO_THREAD等状态 通过pipe通知nio线程响应storage_recv_notify_read

进⾏io事件的处理。

只要文件io线程读取完了文件处理队列中所有的数据,那么就会请求fd加入可写事件(通过pipe向work线程发送信号,触发epoll去做这件事),让网络io去读取新的事件。

fastdfs如何实现安全访问(文件传输原理与网络IO模型)(3)

不同线程之间通过以下方式进行通信

1.队列 (阻塞队列,锁 条件变量)

2.管道(通过pipe创建)

二、服务端的一些逻辑

新连接:

首先通过accept线程去接受新的连接incomingsock,让后去获取它的信息,并从对象池取出一个pTask,将新的连接fd以及它的信息封装成pTask。通过管道将pTask发送出去,在work线程epoll中就已经监听了该管道的读fd,accept线程发送的pipe,在work线程中epoll能检测到pipe事件,并读取信息。然后将该客户端fd读事件加入到epoll中。

上传:

work线程的epoll检测到客户端发送的信息,通过解析协议,获取它的数据,如果其中的CMD是上传的标志,那么就会执行client_sock_read–>storage_deal_task

–>storage_upload_File–>storage_write_to_file

然后会将协议解析的数据和上传回调函数(保存到服务器磁盘)dio_write_file,上传完成的回调函数storage_upload_file_done_callback等信息都封装到pTask,然后将它加入队列中。

然后dio线程中队列就能检测到新的任务,就会执行deal_func也就是dio_write_file,进行写入磁盘。

相关视频推荐

linux下的epoll实战揭秘——支撑亿级IO的底层基石

手把手带你实现epoll组件,为tcp并发的实现epoll

学习地址:C/C Linux服务器开发/后台架构师【零声教育】-学习视频教程-腾讯课堂

需要C/C Linux服务器架构师学习资料加qun812855908获取(资料包括C/C ,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享

fastdfs如何实现安全访问(文件传输原理与网络IO模型)(4)

三、源码阅读

1、fastdfs/storage/fdfs_storage.c

这一部分是进行启动storage服务端

  • 一些初始化工作(如创建socket对象(进行listen),等)
  • 创建work线程(读写网络io)
  • 创建dio线程(dataIO线程,也就是处理文件的,比如将上传的内容写入到磁盘)
  • 创建accept线程(用于接受新的连接)

当然还有写同步的功能,但不在本文介绍。

int main(int argc char *argv[]) { ... sock = socketServer(g_bind_addr g_server_port &result);//socket、bind、listen if (sock < 0) { logCrit("exit abnormally!\n"); delete_pid_file(pidFilename); log_destroy(); return result; } ... if ((result=storage_service_init()) != 0)//storage_service初始化,包含work线程初始化(网络IO部分) { logCrit("file: "__FILE__" line: %d " \ "storage_service_init fail program exit!" __LINE__); g_continue_flag = false; return result; } ... if ((result=storage_dio_init()) != 0)//初始化dio线程(dataIO线程,就也是处理文件的) { logCrit("exit abnormally!\n"); log_destroy(); return result; } log_set_cache(true); bTerminateFlag = false; accept_stage = ACCEPT_STAGE_DOING; storage_accept_loop(sock);//初始化accept线程 accept_stage = ACCEPT_STAGE_DONE; ... }

2、storage_accept_loop

创建accept线程(数量由g_accept_threads决定)

线程的执行函数为accept_thread_entrance

void storage_accept_loop(int server_sock) { if (g_accept_threads > 1) { pthread_t tid; pthread_attr_t thread_attr; int result; int i; if ((result=init_pthread_attr(&thread_attr g_thread_stack_size)) != 0) { logWarning("file: "__FILE__" line: %d " \ "init_pthread_attr fail!" __LINE__); } else { for (i=1; i<g_accept_threads; i ) { if ((result=pThread_create(&tid &thread_attr \ accept_thread_entrance \ (void *)(long)server_sock)) != 0)//创建accept线程 { logError("file: "__FILE__" line: %d " \ "create thread failed startup threads: %d " \ "errno: %d error info: %s" \ __LINE__ i result STRERROR(result)); break; } } pthread_attr_destroy(&thread_attr); } } accept_thread_entrance((void *)(long)server_sock); }

1)accept_thread_entrance

流程:

fastdfs如何实现安全访问(文件传输原理与网络IO模型)(5)

其中获取任务对象是从对象池中获取,然后 将新连接的信息封装到任务对象。

通过轮询的方式指定work线程,然后将任务对象发送(write)给该work线程

源码:

static void *accept_thread_entrance(void* arg) { int server_sock; int incomesock; struct sockaddr_in inaddr; socklen_t sockaddr_len; in_addr_t client_addr; char szClientIp[IP_ADDRESS_SIZE]; long task_addr; struct fast_task_info *pTask; StorageClientInfo *pClientInfo; struct storage_nio_thread_data *pThreadData; server_sock = (long)arg; while (g_continue_flag) { sockaddr_len = sizeof(inaddr); incomesock = accept(server_sock (struct sockaddr*)&inaddr \ &sockaddr_len);//accept if (incomesock < 0) //error { if (!(errno == EINTR || errno == EAGAIN)) { logError("file: "__FILE__" line: %d " \ "accept failed " \ "errno: %d error info: %s" \ __LINE__ errno STRERROR(errno)); } continue; } client_addr = getPeerIpaddr(incomesock \ szClientIp IP_ADDRESS_SIZE); if (g_allow_ip_count >= 0) { if (bsearch(&client_addr g_allow_ip_addrs \ g_allow_ip_count sizeof(in_addr_t) \ cmp_by_ip_addr_t) == NULL) { logError("file: "__FILE__" line: %d " \ "ip addr %s is not allowed to access" \ __LINE__ szClientIp); close(incomesock); continue; } } if (tcpsetnonblockopt(incomesock) != 0) { close(incomesock); continue; } pTask = free_queue_pop(); // 取task对象(从对象池中取) if (pTask == NULL) { logError("file: "__FILE__" line: %d " "malloc task buff fail you should " "increase the parameter \"max_connections\" " "in storage.conf or check your applications " "for connection leaks" __LINE__); close(incomesock); continue; } pClientInfo = (StorageClientInfo *)pTask->arg; // 封装客户端信息 pTask->event.fd = incomesock; // socket fd pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_INIT; // 初始化client的状态 pClientInfo->nio_thread_index = pTask->event.fd % g_work_threads;//通过轮询的方式,发送给对应的work线程 pThreadData = g_nio_thread_data pClientInfo->nio_thread_index;//g_nio_thread_data是一个全局的信息(指针) 加上一个index就可以指向,具体的线程信息 strcpy(pTask->client_ip szClientIp); task_addr = (long)pTask; if (write(pThreadData->thread_data.pipe_fds[1] &task_addr \ sizeof(task_addr)) != sizeof(task_addr))//写入管道 { close(incomesock); free_queue_push(pTask);//如果写入失败,就把对象重新放入对象池中 logError("file: "__FILE__" line: %d " \ "call write failed " \ "errno: %d error info: %s" \ __LINE__ errno STRERROR(errno)); } else { int current_connections; current_connections = __sync_add_and_fetch(&g_storage_stat.connection. current_count 1);//连接数量 1 (CAS) if (current_connections > g_storage_stat.connection.max_count) { g_storage_stat.connection.max_count = current_connections; } g_stat_change_count; } } return NULL; }

注意pipe_fds[1]是管道的写端,pipe_fds[0]是读端。因此如果pipe_fds[0]加入epoll的话,往pipe_fds[1]中写入数据,那么epoll就能监听到。

3、storage_service_init

这部分的主要内容是创建work线程,work线程的执行函数为work_thread_entrance

int storage_service_init() { ... bytes = sizeof(struct storage_nio_thread_data) * g_work_threads;//work线程默认个数是4,也可以从配置文件中读出来 g_nio_thread_data = (struct storage_nio_thread_data *)malloc(bytes); if (g_nio_thread_data == NULL) { logError("file: "__FILE__" line: %d " \ "malloc %d bytes fail errno: %d error info: %s" \ __LINE__ bytes errno STRERROR(errno)); return errno != 0 ? errno : ENOMEM; } memset(g_nio_thread_data 0 bytes); g_storage_thread_count = 0; pDataEnd = g_nio_thread_data g_work_threads; for (pThreadData=g_nio_thread_data; pThreadData<pDataEnd; pThreadData ) { ... if (pipe(pThreadData->thread_data.pipe_fds) != 0)//创建管道 { result = errno != 0 ? errno : EPERM; logError("file: "__FILE__" line: %d " \ "call pipe fail " \ "errno: %d error info: %s" \ __LINE__ result STRERROR(result)); break; } ... if ((result=pthread_create(&tid &thread_attr \ work_thread_entrance pThreadData)) != 0)//创建work线程 { logError("file: "__FILE__" line: %d " \ "create thread failed startup threads: %d " \ "errno: %d error info: %s" \ __LINE__ g_storage_thread_count \ result STRERROR(result)); break; } else { if ((result=pthread_mutex_lock(&g_storage_thread_lock)) != 0) { logError("file: "__FILE__" line: %d " \ "call pthread_mutex_lock fail " \ "errno: %d error info: %s" \ __LINE__ result STRERROR(result)); } g_storage_thread_count ;//创建线程成功,因此 1 if ((result=pthread_mutex_unlock(&g_storage_thread_lock)) != 0) { logError("file: "__FILE__" line: %d " \ "call pthread_mutex_lock fail " \ "errno: %d error info: %s" \ __LINE__ result STRERROR(result)); } } } ... }

1)work_thread_entrance

ioevent_loop是 事件循环所在

static void *work_thread_entrance(void* arg) { int result; struct storage_nio_thread_data *pThreadData; pThreadData = (struct storage_nio_thread_data *)arg; ... // 事件循环所在 ioevent_loop(&pThreadData->thread_data storage_recv_notify_read task_finish_clean_up &g_continue_flag); ioevent_destroy(&pThreadData->thread_data.ev_puller); ... return NULL; }

2)ioevent_loop

事件循环所在

比如将pipe_fds[0]管道的读端fd加入epoll管理

进行epoll_wait

如果事件触发,就执行相应的回调函数

int ioevent_loop(struct nio_thread_data *pThreadData IOEventCallback recv_notify_callback TaskCleanUpCallback clean_up_callback volatile bool *continue_flag) { int result; struct ioevent_notify_entry ev_notify; FastTimerEntry head; struct fast_task_info *task; time_t last_check_time; int count; memset(&ev_notify 0 sizeof(ev_notify)); ev_notify.event.fd = FC_NOTIFY_READ_FD(pThreadData); // socket fd ev_notify.event.callback = recv_notify_callback; // 对应的是 storage_recv_notify_read ev_notify.thread_data = pThreadData; // 自己所属的线程 if (ioevent_attach(&pThreadData->ev_puller pThreadData->pipe_fds[0] IOEVENT_READ &ev_notify) != 0) // 管道添加到epoll管理 { result = errno != 0 ? errno : ENOMEM; logCrit("file: "__FILE__" line: %d " \ "ioevent_attach fail " \ "errno: %d error info: %s" \ __LINE__ result STRERROR(result)); return result; } pThreadData->deleted_list = NULL; last_check_time = g_current_time; while (*continue_flag) { pThreadData->ev_puller.iterator.count = ioevent_poll( // 实际是调用epoll_wait &pThreadData->ev_puller); if (pThreadData->ev_puller.iterator.count > 0) { deal_ioevents(&pThreadData->ev_puller); // 真正有数据来进入该函数(执行回调函数) } else if (pThreadData->ev_puller.iterator.count < 0) { result = errno != 0 ? errno : EINVAL; if (result != EINTR) { logError("file: "__FILE__" line: %d " \ "ioevent_poll fail " \ "errno: %d error info: %s" \ __LINE__ result STRERROR(result)); return result; } } ... } return 0; }

3)storage_recv_notify_read

fastdfs如何实现安全访问(文件传输原理与网络IO模型)(6)

// 这里的socket实际是pipe void storage_recv_notify_read(int sock short event void *arg)// 数据服务器socket事件回调 比如说在上传文件时 接收了一部分之后 调用storage_nio_notify(pTask) { struct fast_task_info *pTask; StorageClientInfo *pClientInfo;//注意这个参数是不同的 一个是跟踪服务器参数 一个是数据服务器参数 long task_addr; // 读取task的地址 int64_t remain_bytes; int bytes; int result; while (1) // 循环读取task任务 { if ((bytes=read(sock &task_addr sizeof(task_addr))) < 0) // 读取task任务 { if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { logError("file: "__FILE__" line: %d " \ "call read failed " \ "errno: %d error info: %s" \ __LINE__ errno STRERROR(errno)); } break; // 没有task可读 } else if (bytes == 0) { logError("file: "__FILE__" line: %d " \ "call read failed end of file" __LINE__); break; } pTask = (struct fast_task_info *)task_addr; // 还原任务 pClientInfo = (StorageClientInfo *)pTask->arg; if (pTask->event.fd < 0) //quit flag 这个是对应的 socket fd { return; } /* //logInfo("=====thread index: %d pTask->event.fd=%d" \ pClientInfo->nio_thread_index pTask->event.fd); */ if (pClientInfo->stage & FDFS_STORAGE_STAGE_DIO_THREAD) { pClientInfo->stage &= ~FDFS_STORAGE_STAGE_DIO_THREAD; } switch (pClientInfo->stage) { case FDFS_STORAGE_STAGE_NIO_INIT: //数据服务器服务端socket接收过来的任务的pClientInfo->stage=FDFS_STORAGE_STAGE_NIO_INIT result = storage_nio_init(pTask); //因此在这里在重新绑定读写事件 //每连接一个客户端 在这里都会触发这个动作 break; case FDFS_STORAGE_STAGE_NIO_RECV: pTask->offset = 0; //在次接受包体时pTask->offset偏移量被重置 remain_bytes = pClientInfo->total_length - \ pClientInfo->total_offset;//任务的长度=包的总长度-包的总偏移量 if (remain_bytes > pTask->size) //总是试图将余下的自己一次接收收完 { pTask->length = pTask->size; // pTask->size 是每次最大的数据接收长度 } else { pTask->length = remain_bytes; } if (set_recv_event(pTask) == 0) { client_sock_read(pTask->event.fd // 通过socket fd读取数据 IOEVENT_READ pTask); // 读取数据 } result = 0; break; case FDFS_STORAGE_STAGE_NIO_SEND: result = storage_send_add_event(pTask); // 数据发送 break; case FDFS_STORAGE_STAGE_NIO_CLOSE: result = EIO; //close this socket break; default: logError("file: "__FILE__" line: %d " \ "invalid stage: %d" __LINE__ \ pClientInfo->stage); result = EINVAL; break; } if (result != 0) { ioevent_add_to_deleted_list(pTask); // 如果出错再将对应的task加入到删除队列进行处理 } } }

(1)client_sock_read

从socket读取数据(也可以是pipe)

大部分内容都是读取数据的操作,最后一步才是关键的

fastdfs如何实现安全访问(文件传输原理与网络IO模型)(7)

recv读取完数据后,分为两种,

1、storage_deal_task(pTask)

这里面可以解析协议中的CMD,后续调用到storage_upload_file进行文件上传的初始化工作

2、data数据处理

读取完的数据交由dio(dataIO线程)进行处理storage_dio_queue_push(pTask),放入队列中,那么dio线程就能读取到 并执行dio_write_file写入磁盘中

static void client_sock_read(int sock short event void *arg) { ... while (1) { if (pClientInfo->total_length == 0) //recv header //初始时pClientInfo->total_length=0 pTask->offset=0 { recv_bytes = sizeof(TrackerHeader) - pTask->offset; } else // 至少读到了10个字节后 sizeof(TrackerHeader) { recv_bytes = pTask->length - pTask->offset; //在次接受上传文件的数据包时 因为发生storage_nio_notify(pTask) } /* logInfo("total_length=%"PRId64" recv_bytes=%d " "pTask->length=%d pTask->offset=%d" pClientInfo->total_length recv_bytes pTask->length pTask->offset); */ bytes = recv(sock pTask->data pTask->offset recv_bytes 0); // 根据buffer情况读取数据 if (bytes < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { } else if (errno == EINTR) { continue; } else { logError("file: "__FILE__" line: %d " \ "client ip: %s recv failed " \ "errno: %d error info: %s" \ __LINE__ pTask->client_ip \ errno STRERROR(errno)); task_finish_clean_up(pTask); } return; } else if (bytes == 0) { logDebug("file: "__FILE__" line: %d " \ "client ip: %s recv failed " \ "connection disconnected." \ __LINE__ pTask->client_ip); task_finish_clean_up(pTask); return; } if (pClientInfo->total_length == 0) //header { // 要来解析header if (pTask->offset bytes < sizeof(TrackerHeader)) // 还没有读够 header { pTask->offset = bytes; return; } pClientInfo->total_length=buff2long(((TrackerHeader *) \ //确定包data的总长度:比如下载文件时 接收的包 就只有包的长度, 这里不包括header pTask->data)->pkg_len); if (pClientInfo->total_length < 0) { logError("file: "__FILE__" line: %d " \ "client ip: %s pkg length: " \ "%"PRId64" < 0" \ __LINE__ pTask->client_ip \ pClientInfo->total_length); task_finish_clean_up(pTask); return; } //包的总长度=包头 包体的长度 //设想发送的场景:包头 包体 包体 ...(其中在包头里面含有多个包体的总长度) pClientInfo->total_length = sizeof(TrackerHeader); //因为默认的接收缓冲只有K 所以会分次发送, 计算出来包括header的长度 if (pClientInfo->total_length > pTask->size) { pTask->length = pTask->size; //如果包的总长大于包的分配的长度 那么任务长度等于任务分配的长度, 读到对应的数据就去触发dio } else { pTask->length = pClientInfo->total_length; //确定任务的长度 } } pTask->offset = bytes; // offset增加 if (pTask->offset >= pTask->length) //recv current pkg done //接收到当前包完成 { if (pClientInfo->total_offset pTask->length >= \ //上次操作接收的总的偏移量 这次接收的数据长度 如果大于包的总长度 那么说明包接收完毕 pClientInfo->total_length) { /* current req recv done */ pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_SEND; pTask->req_count ; } if (pClientInfo->total_offset == 0) { // 说明还没有开始处理 pClientInfo->total_offset = pTask->length; //数据服务器进行处理 storage_deal_task(pTask); // 解析header 以及我们协议附加的信息 } else { pClientInfo->total_offset = pTask->length; //否则继续写文件 /* continue write to file */ storage_dio_queue_push(pTask); // 比如文件增加 } return; } } return; }

(2)client_sock_write

服务端发送数据write给客户端,对于客户端来说,就是下载文件

一个下载的任务,要分成好几个ptask,让网络io线程去发送(work线程)。

如果ptask的数据发送完了(ptask->offset>=ptask->length),那么看总任务的是不是都发送完了。如果总任务都发送完成了,那么就切换 接受RECV状态。

如果总任务还没发送,那么就加入队列中storage_dio_queue_push(pTask),让dio线程再去读取数据。

static void client_sock_write(int sock short event void *arg) { int bytes; struct fast_task_info *pTask; StorageClientInfo *pClientInfo; pTask = (struct fast_task_info *)arg; pClientInfo = (StorageClientInfo *)pTask->arg; if (pTask->canceled) { return; } if (event & IOEVENT_TIMEOUT) { logError("file: "__FILE__" line: %d " "client ip: %s send timeout offset: %d " "remain bytes: %d" __LINE__ pTask->client_ip pTask->offset pTask->length - pTask->offset); task_finish_clean_up(pTask); return; } if (event & IOEVENT_ERROR) { logDebug("file: "__FILE__" line: %d " "client ip: %s recv error event: %d " "close connection" __LINE__ pTask->client_ip event); task_finish_clean_up(pTask); return; } while (1) { fast_timer_modify(&pTask->thread_data->timer &pTask->event.timer g_current_time g_fdfs_network_timeout); bytes = send(sock pTask->data pTask->offset \ pTask->length - pTask->offset 0); //printf("X sended %d bytes\n" (int)pTask bytes); if (bytes < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { set_send_event(pTask); } else if (errno == EINTR) { continue; } else { logError("file: "__FILE__" line: %d " \ "client ip: %s recv failed " \ "errno: %d error info: %s" \ __LINE__ pTask->client_ip \ errno STRERROR(errno)); task_finish_clean_up(pTask); } return; } else if (bytes == 0) { logWarning("file: "__FILE__" line: %d " \ "send failed connection disconnected." \ __LINE__); task_finish_clean_up(pTask); return; } pTask->offset = bytes; if (pTask->offset >= pTask->length) { if (set_recv_event(pTask) != 0) { return; } pClientInfo->total_offset = pTask->length; if (pClientInfo->total_offset>=pClientInfo->total_length) { if (pClientInfo->total_length == sizeof(TrackerHeader) && ((TrackerHeader *)pTask->data)->status == EINVAL) { logDebug("file: "__FILE__" line: %d "\ "close conn: #%d client ip: %s" \ __LINE__ pTask->event.fd pTask->client_ip); task_finish_clean_up(pTask); return; } /* response done try to recv again */ pClientInfo->total_length = 0; pClientInfo->total_offset = 0; pTask->offset = 0; pTask->length = 0; pClientInfo->stage = FDFS_STORAGE_STAGE_NIO_RECV; } else //continue to send file content { pTask->length = 0; /* continue read from file */ storage_dio_queue_push(pTask); // 继续发送数据 } return; } } }

4)storage_upload_file

如果client_sock_read读取数据,协议中CMD为上传的指令,那么在work进程中(nio)中

执行该函数storage_upload_file,初始化要保存的文件信息,并且storage_upload_file末尾,通过storage_write_to_file将写入磁盘任务,放入队列中storage_dio_queue_push,让dio线程去执行

上传文件是在dio线程中执行dio_write_file,它是作为回调函数执行的。

static int storage_upload_file(struct fast_task_info *pTask bool bAppenderFile) { StorageClientInfo *pClientInfo; StorageFileContext *pFileContext; DisconnectCleanFunc clean_func; char *p; char filename[128]; char file_ext_name[FDFS_FILE_PREFIX_MAX_LEN 1]; int64_t nInPackLen; int64_t file_offset; int64_t file_bytes; int crc32; int store_path_index; int result; int filename_len; pClientInfo = (StorageClientInfo *)pTask->arg; pFileContext = &(pClientInfo->file_context); // 对应一个文件上下文 nInPackLen = pClientInfo->total_length - sizeof(TrackerHeader); if (nInPackLen < 1 FDFS_PROTO_PKG_LEN_SIZE FDFS_FILE_EXT_NAME_MAX_LEN) { logError("file: "__FILE__" line: %d " \ "cmd=%d client ip: %s package size " \ "%"PRId64" is not correct " \ "expect length >= %d" __LINE__ \ STORAGE_PROTO_CMD_UPLOAD_FILE \ pTask->client_ip nInPackLen \ 1 FDFS_PROTO_PKG_LEN_SIZE \ FDFS_FILE_EXT_NAME_MAX_LEN); return EINVAL; } p = pTask->data sizeof(TrackerHeader); // 跳过header store_path_index = *p ; // store_path_index 解析store path值 if (store_path_index == -1) // { if ((result=storage_get_storage_path_index( \ &store_path_index)) != 0) { logError("file: "__FILE__" line: %d " \ "get_storage_path_index fail " \ "errno: %d error info: %s" __LINE__ \ result STRERROR(result)); return result; } } else if (store_path_index < 0 || store_path_index >= \ g_fdfs_store_paths.count) { logError("file: "__FILE__" line: %d " \ "client ip: %s store_path_index: %d " \ "is invalid" __LINE__ \ pTask->client_ip store_path_index); return EINVAL; } file_bytes = buff2long(p); // 解析处理要上传的文件大小 p = FDFS_PROTO_PKG_LEN_SIZE; if (file_bytes < 0 || file_bytes != nInPackLen - \ (1 FDFS_PROTO_PKG_LEN_SIZE \ FDFS_FILE_EXT_NAME_MAX_LEN)) { logError("file: "__FILE__" line: %d " \ "client ip: %s pkg length is not correct " \ "invalid file bytes: %"PRId64 \ " total body length: %"PRId64 \ __LINE__ pTask->client_ip file_bytes nInPackLen); return EINVAL; } memcpy(file_ext_name p FDFS_FILE_EXT_NAME_MAX_LEN); *(file_ext_name FDFS_FILE_EXT_NAME_MAX_LEN) = '\0'; p = FDFS_FILE_EXT_NAME_MAX_LEN; if ((result=fdfs_validate_filename(file_ext_name)) != 0) // 检验扩展名 { logError("file: "__FILE__" line: %d " \ "client ip: %s file_ext_name: %s " \ "is invalid!" __LINE__ \ pTask->client_ip file_ext_name); return result; } pFileContext->calc_crc32 = true; pFileContext->calc_file_hash = g_check_file_duplicate; // 是否要检测文件唯一性 pFileContext->extra_info.upload.start_time = g_current_time; strcpy(pFileContext->extra_info.upload.file_ext_name file_ext_name); storage_format_ext_name(file_ext_name \ pFileContext->extra_info.upload.formatted_ext_name); pFileContext->extra_info.upload.trunk_info.path. \ store_path_index = store_path_index; pFileContext->extra_info.upload.file_type = _FILE_TYPE_REGULAR; // 常规文件 pFileContext->sync_flag = STORAGE_OP_TYPE_SOURCE_CREATE_FILE; // 创建文件 pFileContext->timestamp2log = pFileContext->extra_info.upload.start_time; // 时间戳 pFileContext->op = FDFS_STORAGE_FILE_OP_WRITE; if (bAppenderFile) { pFileContext->extra_info.upload.file_type |= \ // 是否为追加文件模式 _FILE_TYPE_APPENDER; } else { if (g_if_use_trunk_file && trunk_check_size( \ TRUNK_CALC_SIZE(file_bytes))) { pFileContext->extra_info.upload.file_type |= \ // 附加信息 _FILE_TYPE_TRUNK; // 设置为trunk文件模式 } } if (pFileContext->extra_info.upload.file_type & _FILE_TYPE_TRUNK) // trunk文件 { FDFSTrunkFullInfo *pTrunkInfo; pFileContext->extra_info.upload.if_sub_path_alloced = true; pTrunkInfo = &(pFileContext->extra_info.upload.trunk_info); if ((result=trunk_client_trunk_alloc_space( \ // TRUNK_CALC_SIZE(file_bytes) = trunk header file size TRUNK_CALC_SIZE(file_bytes) pTrunkInfo)) != 0) { return result; } clean_func = dio_trunk_write_finish_clean_up; file_offset = TRUNK_FILE_START_OFFSET((*pTrunkInfo)); pFileContext->extra_info.upload.if_gen_filename = true; trunk_get_full_filename(pTrunkInfo pFileContext->filename \ sizeof(pFileContext->filename)); pFileContext->extra_info.upload.before_open_callback = \ dio_check_trunk_file_when_upload; pFileContext->extra_info.upload.before_close_callback = \ dio_write_chunk_header; pFileContext->open_flags = O_RDWR | g_extra_open_file_flags; } else { char reserved_space_str[32]; if (!storage_check_reserved_space_path(g_fdfs_store_paths.paths \ [store_path_index].total_mb g_fdfs_store_paths.paths \ [store_path_index].free_mb - (file_bytes/FDFS_ONE_MB) \ g_avg_storage_reserved_mb)) { logError("file: "__FILE__" line: %d " \ "no space to upload file " "free space: %d MB is too small file bytes: " \ "%"PRId64" reserved space: %s" \ __LINE__ g_fdfs_store_paths.paths[store_path_index].\ free_mb file_bytes \ fdfs_storage_reserved_space_to_string_ex( \ g_storage_reserved_space.flag \ g_avg_storage_reserved_mb \ g_fdfs_store_paths.paths[store_path_index]. \ total_mb g_storage_reserved_space.rs.ratio \ reserved_space_str)); return ENOSPC; } crc32 = rand(); *filename = '\0'; filename_len = 0; pFileContext->extra_info.upload.if_sub_path_alloced = false; if ((result=storage_get_filename(pClientInfo \ // 获取file id pFileContext->extra_info.upload.start_time \ file_bytes crc32 pFileContext->extra_info.upload.\ formatted_ext_name filename &filename_len \ // 生成的文件需要扩展名 pFileContext->filename)) != 0) { return result; } clean_func = dio_write_finish_clean_up; file_offset = 0; pFileContext->extra_info.upload.if_gen_filename = true; pFileContext->extra_info.upload.before_open_callback = NULL; pFileContext->extra_info.upload.before_close_callback = NULL; pFileContext->open_flags = O_WRONLY | O_CREAT | O_TRUNC \ | g_extra_open_file_flags; } pFileContext->continue_callback = storage_nio_notify; // 处理完毕后 return storage_write_to_file(pTask file_offset file_bytes \ p - pTask->data dio_write_file \ storage_upload_file_done_callback \ clean_func store_path_index); }

4、storage_dio_init

主要是对dio线程进行初始化

线程的执行函数是dio_thread_entrance

int storage_dio_init() { ... for (pThreadData=g_dio_thread_data; pThreadData<pDataEnd; pThreadData ) { ... for (pContext=pThreadData->contexts; pContext<pContextEnd; \ pContext ) { if ((result=blocked_queue_init(&(pContext->queue))) != 0) { return result; } if ((result=pthread_create(&tid &thread_attr \ dio_thread_entrance pContext)) != 0) { logError("file: "__FILE__" line: %d " \ "create thread failed " \ "startup threads: %d " \ "errno: %d error info: %s" \ __LINE__ g_dio_thread_count \ result STRERROR(result)); return result; } else { pthread_mutex_lock(&g_dio_thread_lock); g_dio_thread_count ; pthread_mutex_unlock(&g_dio_thread_lock); } } } pthread_attr_destroy(&thread_attr); return result; }

1)dio_thread_entrance

dio线程要做的事,就是从阻塞队列中,一旦获取数据,就执行回调函数

static void *dio_thread_entrance(void* arg) { ... while (g_continue_flag) { while ((pTask=blocked_queue_pop(&(pContext->queue))) != NULL) { ((StorageClientInfo *)pTask->arg)->deal_func(pTask);//执行回调函数 } } ... }

2)dio_write_file

在dio线程中执行,将上传的文件写入磁盘中

fastdfs如何实现安全访问(文件传输原理与网络IO模型)(8)

int dio_write_file(struct fast_task_info *pTask) { StorageClientInfo *pClientInfo; StorageFileContext *pFileContext; int result; int write_bytes; char *pDataBuff; pClientInfo = (StorageClientInfo *)pTask->arg; pFileContext = &(pClientInfo->file_context); result = 0; do { if (pFileContext->fd < 0) { if (pFileContext->extra_info.upload.before_open_callback!=NULL) { result = pFileContext->extra_info.upload. \ before_open_callback(pTask); if (result != 0) { break; } } if ((result=dio_open_file(pFileContext)) != 0) { break; } } pDataBuff = pTask->data pFileContext->buff_offset; // 跳过header以及附加信息 在deal task的时候赋值的 pFileContext->buff_offset write_bytes = pTask->length - pFileContext->buff_offset; // if (fc_safe_write(pFileContext->fd pDataBuff write_bytes) != write_bytes) { result = errno != 0 ? errno : EIO; logError("file: "__FILE__" line: %d " \ "write to file: %s fail fd=%d write_bytes=%d " \ "errno: %d error info: %s" \ __LINE__ pFileContext->filename \ pFileContext->fd write_bytes \ result STRERROR(result)); } pthread_mutex_lock(&g_dio_thread_lock); g_storage_stat.total_file_write_count ; if (result == 0) { g_storage_stat.success_file_write_count ; } pthread_mutex_unlock(&g_dio_thread_lock); if (result != 0) { break; } if (pFileContext->calc_crc32) { pFileContext->crc32 = CRC32_ex(pDataBuff write_bytes \ pFileContext->crc32); } if (pFileContext->calc_file_hash) { if (g_file_signature_method == STORAGE_FILE_SIGNATURE_METHOD_HASH) { CALC_HASH_CODES4(pDataBuff write_bytes \ pFileContext->file_hash_codes) } else { my_md5_update(&pFileContext->md5_context \ (unsigned char *)pDataBuff write_bytes); } } /* logInfo("###dio write bytes: %d pTask->length=%d buff_offset=%d" \ write_bytes pTask->length pFileContext->buff_offset); */ pFileContext->offset = write_bytes; // 增加写入文件的字数数量 if (pFileContext->offset < pFileContext->end) // pFileContext->end实际是指文件的大小。 { pFileContext->buff_offset = 0; // 为什么设置为0?因为下一次传输的数据全部为文件内容了 pFileContext->continue_callback(pTask); // 等待下一次的继续触发,比如 storage_nio_notify } else // 文件已经写入完毕 { if (pFileContext->calc_crc32) { pFileContext->crc32 = CRC32_FINAL( \ pFileContext->crc32); } if (pFileContext->calc_file_hash) { if (g_file_signature_method == STORAGE_FILE_SIGNATURE_METHOD_HASH) { FINISH_HASH_CODES4(pFileContext->file_hash_codes) } else { my_md5_final((unsigned char *)(pFileContext-> \ file_hash_codes) &pFileContext->md5_context); } } if (pFileContext->extra_info.upload.before_close_callback != NULL) { result = pFileContext->extra_info.upload. \ before_close_callback(pTask); } /* file write done close it */ close(pFileContext->fd); pFileContext->fd = -1; if (pFileContext->done_callback != NULL) { pFileContext->done_callback(pTask result);// 比如 storage_upload_file_done_callback } } return 0; } while (0); pClientInfo->clean_func(pTask); if (pFileContext->done_callback != NULL) { pFileContext->done_callback(pTask result); } return result; }

猜您喜欢: