快捷搜索:  汽车  科技

如何实现一个简单的线程池(进程池和线程池)

如何实现一个简单的线程池(进程池和线程池)

如何实现一个简单的线程池(进程池和线程池)(1)

#ifndef HTTPCONNECTION_H #define HTTPCONNECTION_H #include <unistd.h> #include <signal.h> #include <sys/types.h> #include <sys/epoll.h> #include <fcntl.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <sys/stat.h> #include <string.h> #include <pthread.h> #include <stdio.h> #include <sys/mman.h> #include <stdarg.h> #include "locker.h" class http_conn { public: /*文件名的最大长度*/ static const int FILENAME_LEN = 200; /*读缓冲区的大小*/ static const int READ_BUFFER_SIZE = 2048; /*写缓冲区的大小*/ static const int WRITE_BUFFER_SIZE = 1024; /*HTTP请求方法,但我们仅支持GET*/ enum METHOD{ GET = 0 POST HEAD PUT DELETE TRACE OPTIONS CONNECT PATCH}; /*解析客户请求时,主状态机所处的状态*/ enum CHECK_STATE{CHECK_STATE_REQUESTLINE = 0 CHECK_STATE_HEADER CHECK_STATE_CONTENT}; /*服务器处理HTTP请求的可能结果*/ enum HTTP_CODE{NO_REQUEST GET_REQUEST BAD_REQUEST NO_RESOURCE FORBIDDEN_REQUEST FILE_REQUEST INTERNAL_ERROR CLOSED_CONNECTION}; /*行的读取状态*/ enum LINE_STATUS{LINE_OK = 0 LINE_BAD LINE_OPEN}; public: http_conn(){}; ~http_conn(){}; public: /*初始化新接受的连接*/ void init(int sockfd const sockaddr_in &addr); /*关闭连接*/ void close_conn(bool real_close = true); /*处理客户请求*/ void process(); /*非阻塞读操作*/ bool read(); /*非阻塞写操作*/ bool write(); private: /*初始化连接*/ void init(); /*解析HTTP请求*/ HTTP_CODE process_read(); /*填充HTTP应答*/ bool process_write(HTTP_CODE ret); /*下面这一组函数被process_read调用以分析HTTP请求*/ HTTP_CODE parse_request_line(char* text); HTTP_CODE parse_headers(char* text); HTTP_CODE parse_content(char* text); HTTP_CODE do_request(); char* get_line(){return m_read_buf m_start_line;} LINE_STATUS parse_line(); /*下面这一组函数被process_write调用以填充HTTP应答*/ void unmap(); bool add_response(const char* format ...); bool add_content(const char* connect); bool add_status_line(int status const char* title); bool add_headers(int content_length); bool add_content_length(int content_length); bool add_linger(); bool add_blank_line(); public: /*所有socket上的事件都被注册到一个epoll内核时间表, 所以将epoll文件描述符设置为静态的*/ static int m_epollfd; /*统计用户数量*/ static int m_user_count; private: /*该HTTP连接的socket和对方的socket地址*/ int m_sockfd; sockaddr_in m_address; /*读缓冲区*/ char m_read_buf[READ_BUFFER_SIZE]; /*标识读缓冲中已经读入的客户数据的最后一个字节的下一个位置*/ int m_read_idx; /*当前正在分析的字符在读缓冲中的位置*/ int m_checked_idx; /*当前正在解析的行的起始位置*/ int m_start_line; /*写缓冲区*/ char m_write_buf[WRITE_BUFFER_SIZE]; /*写缓冲区中待发送的字节数*/ int m_write_idx; /*主状态机当前所处的状态*/ CHECK_STATE m_check_state; /*请求方法*/ METHOD m_method; /*客户请求的目标文件的完整路径, 其内容等于doc_root m_url doc_root是网站根目录*/ char m_real_file[FILENAME_LEN]; /*客户请求的目标文件的文件名*/ char* m_url; /*HTTP 协议版本号,我们仅支持HTTP/1.1*/ char* m_version; /*主机名*/ char* m_host; /*HTTP请求的消息体长度*/ int m_connect_length; /*HTTP请求是否要求保持连接*/ bool m_linger; /*客户请求的目标我文件被mmap到内存中的起始位置*/ char* m_file_address; /*目标文件的状态,通过它我们可以判断文件是否存在、是否为目录、 是否为可读,并获取文件大小等信息*/ struct stat m_file_stat; /*我们将用writev来执行写操作,所以定义下面两个成员 其中m_iv_count表示写内存块的数量*/ struct iovec m_iv[2]; int m_iv_count; }; #endif

#include "http_conn.h" #include <errno.h> #include <stdlib.h> #include <sys/uio.h> const char* ok_200_title = "OK"; const char* error_400_title = "Bad Request"; const char* error_400_form = "Your request has bad syntax or is inherently impossible to satisfy.\n"; const char* error_403_title = "Forbidden"; const char* error_403_form = "You do not have permission to get file from this server.\n"; const char* error_404_title = "Not found"; const char* error_404_form = "THe requested file was not found on this server.\n"; const char* error_500_title = "Internal Error"; const char* error_500_form = "There was an unusual problem serving the requested file.\n"; /*网站的根目录*/ const char* doc_root = "./"; int setnonblocking(int fd) { int old_option = fcntl(fd F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd F_SETFL new_option); return old_option; } void addfd(int epollfd int fd bool one_shot) { epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET | EPOLLRDHUP; if(one_shot){ event.events |= EPOLLONESHOT; } epoll_ctl(epollfd EPOLL_CTL_ADD fd &event); setnonblocking(fd); } void removefd(int epollfd int fd) { epoll_ctl(epollfd EPOLL_CTL_DEL fd 0); close(fd); } void modfd(int epollfd int fd int ev) { epoll_event event; event.data.fd = fd; event.events = ev | EPOLLET | EPOLLONESHOT | EPOLLRDHUP; epoll_ctl(epollfd EPOLL_CTL_MOD fd &event); } int http_conn::m_user_count = 0; int http_conn::m_epollfd = -1; void http_conn::close_conn(bool real_close) { if(real_close && (m_sockfd != -1)){ removefd(m_epollfd m_sockfd); m_sockfd = -1; m_user_count--; } } void http_conn::init(int sockfd const sockaddr_in& addr) { m_sockfd = sockfd; m_address = addr; /*如下两行是为了避免TIME_WAIT状态,仅用于调试, 实际使用应该去掉*/ int reuse = 1; setsockopt(m_sockfd SOL_SOCKET SO_REUSEADDR &reuse sizeof(reuse)); addfd(m_epollfd sockfd true); m_user_count ; init(); } void http_conn::init() { m_check_state = CHECK_STATE_REQUESTLINE; m_linger = false; m_method = GET; m_url = 0; m_version = 0; m_connect_length = 0; m_host = 0; m_start_line = 0; m_checked_idx = 0; m_read_idx = 0; m_write_idx = 0; memset(m_read_buf '\0' READ_BUFFER_SIZE); memset(m_write_buf '\0' WRITE_BUFFER_SIZE); memset(m_real_file '\0' FILENAME_LEN); } /*从状态机*/ http_conn::LINE_STATUS http_conn::parse_line() { char temp; for(;m_checked_idx < m_read_idx; m_checked_idx){ temp = m_read_buf[m_checked_idx]; if(temp == '\r'){ if(m_checked_idx 1 == m_read_idx){ return LINE_OPEN; } else if(m_read_buf[m_checked_idx 1] == '\n'){ m_read_buf[m_checked_idx ] = '\0'; m_read_buf[m_checked_idx ] = '\0'; return LINE_OK; } } else if(temp == '\n'){ if(m_checked_idx > 1 && m_read_buf[m_checked_idx-1] == '\r'){ m_read_buf[m_checked_idx-1] = '\0'; m_read_buf[m_checked_idx ] = '\0'; return LINE_OK; } return LINE_BAD; } } return LINE_OPEN; } /*循环读取客户数据,指导无数据可读或对方关闭连接*/ bool http_conn::read() { if(m_read_idx >= READ_BUFFER_SIZE){ return false; } int bytes_read = 0; while(true){ bytes_read = recv(m_sockfd m_read_buf m_read_idx READ_BUFFER_SIZE - m_read_idx 0); if(bytes_read == -1){ if( errno == EAGAIN || errno == EWOULDBLOCK){ break; } return false; } else if(bytes_read == 0){ return false; } m_read_idx = bytes_read; } return true; } /*解析HTTP请求行,获得请求方法,目标URL,以及HTTP版本号*/ http_conn::HTTP_CODE http_conn::parse_request_line(char* text) { m_url = strpbrk(text " \t"); if(!m_url){ return BAD_REQUEST; } *m_url ='\0'; char* method = text; if(strcasecmp(method "GET") == 0){ m_method = GET; } else{ return BAD_REQUEST; } m_url = strspn(m_url " \t"); m_version = strpbrk(m_url " \t"); if(!m_version){ return BAD_REQUEST; } *m_version = '\0'; m_version = strspn(m_version " \t"); if(strcasecmp(m_version "HTTP/1.1") != 0){ return BAD_REQUEST; } if(strncasecmp(m_url "http://" 7) == 0){ m_url = 7; m_url = strchr(m_url '/'); } if(!m_url || m_url[0] != '/'){ return BAD_REQUEST; } m_check_state = CHECK_STATE_HEADER; return NO_REQUEST; } /*解析HTTP请求的一个头部信息*/ http_conn::HTTP_CODE http_conn::parse_headers(char* text) { /*遇到空行,表示头部字段解析完毕*/ if(text[0] == '\0'){ /*如果HTTP请求有消息体,则还需要读取m_content_leng字节的消息提, 状态机转移到CHECK_STATE_CONTENT*/ if(m_connect_length != 0){ m_check_state = CHECK_STATE_CONTENT; return NO_REQUEST; } /*否则说明我们已经得到了一个完整的HTTP请求*/ return GET_REQUEST; } /*处理Connection 头部信息*/ else if(strncasecmp(text "Connection:" 11) == 0){ text = 11; text = strspn(text " \t"); if(strcasecmp(text "keep-alive") == 0){ m_linger = true; } } /*处理Connect-Length头部信息*/ else if(strncasecmp(text "Content-Length:" 15) == 0){ text = 15; text = strspn(text " \t"); m_connect_length = atol(text); } /*处理Host头部字段*/ else if(strncasecmp(text "Host:" 5) == 0){ text = 5; text = strspn(text " \t"); m_host = text; } else{ printf("oop! unknown header %s\n" text); } return NO_REQUEST; } /*我们没有真正解析HTTP请求的消息体,只是判断它是否被完整的读入了*/ http_conn::HTTP_CODE http_conn::parse_content(char* text) { if(m_read_idx >= (m_connect_length m_checked_idx)){ text[m_connect_length] = '\0'; return GET_REQUEST; } return NO_REQUEST; } /*主状态机*/ http_conn::HTTP_CODE http_conn::process_read() { LINE_STATUS line_status = LINE_OK; HTTP_CODE ret = NO_REQUEST; char *text = 0; while((m_check_state == CHECK_STATE_CONTENT && line_status == LINE_OK) || (line_status = parse_line()) == LINE_OK){ text = get_line(); m_start_line = m_checked_idx; printf("got 1 http line: %s\n" text); switch (m_check_state) { case CHECK_STATE_REQUESTLINE:{ ret = parse_request_line(text); if(ret == BAD_REQUEST){ return BAD_REQUEST; } break; } case CHECK_STATE_HEADER:{ ret = parse_headers(text); if(ret == BAD_REQUEST){ return BAD_REQUEST; } else if (ret == GET_REQUEST) { return do_request(); } break; } case CHECK_STATE_CONTENT:{ ret = parse_content(text); if(ret == GET_REQUEST){ return do_request(); } line_status = LINE_OPEN; break; } default:{ return INTERNAL_ERROR; } } } return NO_REQUEST; } /*当得到一个完整、正确的HTTP请求时,我们就分析目标文件的属性 如果目标存在、对所有用户可读,且不是目录, 则使用mmap将其映射到内存地址m_file_address处, 并告诉调用者获取文件成功*/ http_conn::HTTP_CODE http_conn::do_request() { strcpy(m_real_file doc_root); int len = strlen(doc_root); strncpy(m_real_file len m_url FILENAME_LEN - len - 1); if(stat(m_real_file &m_file_stat) < 0){ return NO_RESOURCE; } if(!(m_file_stat.st_mode & S_IROTH)){ return FORBIDDEN_REQUEST; } if (S_ISDIR(m_file_stat.st_mode)){ return BAD_REQUEST; } int fd = open(m_real_file O_RDONLY); m_file_address = (char*)mmap(0 m_file_stat.st_size PROT_READ MAP_PRIVATE fd 0); close(fd); return FILE_REQUEST; } /*对内存映射区执行munmap操作*/ void http_conn::unmap() { if(m_file_address){ munmap(m_file_address m_file_stat.st_size); m_file_address = 0; } } /*写HTTP响应*/ bool http_conn::write() { int temp = 0; int bytes_have_send = 0; int bytes_to_send = m_write_idx; if(bytes_to_send == 0){ modfd(m_epollfd m_sockfd EPOLLIN); init(); return true; } while(1){ temp = writev(m_sockfd m_iv m_iv_count); if(temp <= -1){ /*如果TCP写缓冲没有空间,则等待下一轮EPOLLOUT事件。 虽然在此期间,服务器无法立即接收到同一客户的下一个请求, 但这可以保证连接的完整性*/ if(errno == EAGAIN){ modfd(m_epollfd m_sockfd EPOLLOUT); return true; } unmap(); return false; } bytes_to_send -= temp; bytes_have_send = temp; if(bytes_to_send <= bytes_have_send){ /*发送HTTP响应成功,根据HTTP请求中的Connection字段决定是否立即关闭连接*/ unmap(); if(m_linger){ init(); modfd(m_epollfd m_sockfd EPOLLIN); return true; } else{ modfd(m_epollfd m_sockfd EPOLLIN); return false; } } } } /*往写缓冲中写入待发送的数据*/ bool http_conn::add_response(const char* format ...) { if(m_write_idx >= WRITE_BUFFER_SIZE){ return false; } va_list arg_list; va_start(arg_list format); int len = vsnprintf(m_write_buf m_write_idx WRITE_BUFFER_SIZE - 1 - m_write_idx format arg_list); if(len >= (WRITE_BUFFER_SIZE - 1 - m_write_idx)){ return false; } m_write_idx = len; va_end(arg_list); return true; } bool http_conn::add_status_line(int status const char* title) { return add_response("%s %d %s\r\n" "HTTP/1.1" status title); } bool http_conn::add_headers(int content_len) { add_content_length(content_len); add_linger(); add_blank_line(); } bool http_conn::add_content_length(int content_len) { return add_response("Content-Length: %d\r\n" content_len); } bool http_conn::add_linger() { return add_response( "Connection: %s\r\n" (m_linger == true)?"keep-alive":"close"); } bool http_conn::add_blank_line() { return add_response("%s" "\r\n"); } bool http_conn::add_content(const char* content) { return add_response("%s" content); } /*根据服务器处理HTTP请求的结果,决定返回给客户端的内容*/ bool http_conn::process_write(HTTP_CODE ret) { switch (ret) { case INTERNAL_ERROR:{ add_status_line(500 error_500_title); add_headers(strlen(error_500_form)); if(!add_content(error_500_form)){ return false; } break; } case BAD_REQUEST:{ add_status_line(400 error_400_title); add_headers(strlen(error_400_form)); if(!add_content(error_400_form)){ return false; } break; } case NO_REQUEST:{ add_status_line(404 error_404_title); add_headers(strlen(error_404_form)); if(!add_content(error_404_form)){ return false; } break; } case NO_RESOURCE:{ add_status_line(404 error_404_title); add_headers(strlen(error_404_form)); if(!add_content(error_404_form)){ return false; } break; } case FORBIDDEN_REQUEST:{ add_status_line(403 error_403_title); add_headers(strlen(error_403_form)); if(!add_content(error_403_form)){ return false; } break; } case FILE_REQUEST:{ add_status_line(200 ok_200_title); if(m_file_stat.st_size != 0){ add_headers(m_file_stat.st_size); m_iv[0].iov_base = m_write_buf; m_iv[0].iov_len = m_write_idx; m_iv[1].iov_base = m_file_address; m_iv[1].iov_len = m_file_stat.st_size; m_iv_count = 2; return true; } else{ const char* ok_string = "<html><body></body></html>"; add_headers(strlen(ok_string)); if(!add_content(ok_string)){ return false; } } } default:{ return false; } } m_iv[0].iov_base = m_write_buf; m_iv[0].iov_len = m_write_idx; m_iv_count = 1; return true; } /*由线程池的工作线程调用, 这是处理HTTP请求的入口函数*/ void http_conn::process() { HTTP_CODE read_ret = process_read(); if(read_ret == NO_REQUEST){ modfd(m_epollfd m_sockfd EPOLLIN); return; } bool write_ret = process_write(read_ret); if(!write_ret){ close_conn(); } modfd(m_epollfd m_sockfd EPOLLOUT); }

#include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <cassert> #include <sys/epoll.h> #include "locker.h" #include "threadpool.h" #include "http_conn.h" #define MAX_FD 65535 #define MAX_EVENT_NUMBER 10000 extern int addfd(int epollfd int fd bool one_shot); extern int removefd(int epollfd int fd); void addsig(int sig void(handler)(int) bool restart = true) { struct sigaction sa; memset(&sa '\0' sizeof(sa)); sa.sa_handler = handler; if(restart){ sa.sa_flags |= SA_RESTART; } sigfillset(&sa.sa_mask); assert(sigaction(sig &sa NULL) != -1); } void show_error(int connfd const char* info) { printf("%s" info); send(connfd info strlen(info) 0); close(connfd); } int main(int argc char const *argv[]) { if(argc < 2){ printf("usage: %s ip_address port_number\n" basename(argv[0])); return 1; } const char* ip = argv[1]; int port = atoi(argv[2]); /*忽略SIGPIPE*/ addsig(SIGPIPE SIG_IGN); /*创建线程池*/ threadpool<http_conn>* pool = NULL; try{ pool = new threadpool<http_conn>(); } catch(...){ return 1; } /*预先为每个可能的客户连接分配一个http_conn对象*/ http_conn* users = new http_conn[MAX_FD]; assert(users); int user_count = 0; int listenfd = socket(PF_INET SOCK_STREAM 0); assert(listenfd >= 0); struct linger tmp = {1 0}; setsockopt(listenfd SOL_SOCKET SO_LINGER &tmp sizeof(tmp)); int ret = 0; struct sockaddr_in address; bzero(&address sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET ip &address.sin_addr); address.sin_port = htons(port); ret = bind(listenfd (struct sockaddr*)&address sizeof(address)); assert(ret >= 0); ret = listen(listenfd 5); assert(ret >= 0); epoll_event events[MAX_EVENT_NUMBER]; int epollfd = epoll_create(5); assert(epollfd != -1); addfd(epollfd listenfd false); http_conn::m_epollfd = epollfd; while(true){ int number = epoll_wait(epollfd events MAX_EVENT_NUMBER -1); if (number < 0 && errno != EINTR){ printf("epoll failure"); break; } for(int i = 0; i < number; i){ int sockfd = events[i].data.fd; if(sockfd == listenfd){ struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address); int connfd = accept(listenfd (struct sockaddr*)&client_address &client_addrlength); if(connfd <= 0){ printf("errno is: %d\n" errno); continue; } if(http_conn::m_user_count >= MAX_FD){ show_error(connfd "Internal server busy"); continue; } /*初始化客户连接*/ users[connfd].init(connfd client_address); } else if(events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)){ /*如果有异常,直接关闭客户端*/ users[sockfd].close_conn(); } else if(events[i].events & EPOLLIN){ /*根据读的结果,决定是将任务添加到线程池,还是关闭链接*/ if(users[sockfd].read()){ pool->append(users sockfd); } else{ users[sockfd].close_conn(); } } else if(events[i].events & EPOLLOUT){ /*根据写的结果,决定是否关闭连接*/ if(!users[sockfd].write()){ users[sockfd].close_conn(); } } else{} } } close(epollfd); close(listenfd); delete[] users; delete pool; return 0; }

猜您喜欢: