如何实现一个简单的线程池(进程池和线程池)
如何实现一个简单的线程池(进程池和线程池)
#ifndef HTTPCONNECTION_H
#define HTTPCONNECTION_H
#include <unistd.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <sys/stat.h>
#include <string.h>
#include <pthread.h>
#include <stdio.h>
#include <sys/mman.h>
#include <stdarg.h>
#include "locker.h"
class http_conn
{
public:
/*文件名的最大长度*/
static const int FILENAME_LEN = 200;
/*读缓冲区的大小*/
static const int READ_BUFFER_SIZE = 2048;
/*写缓冲区的大小*/
static const int WRITE_BUFFER_SIZE = 1024;
/*HTTP请求方法,但我们仅支持GET*/
enum METHOD{
GET = 0 POST HEAD PUT DELETE
TRACE OPTIONS CONNECT PATCH};
/*解析客户请求时,主状态机所处的状态*/
enum CHECK_STATE{CHECK_STATE_REQUESTLINE = 0
CHECK_STATE_HEADER
CHECK_STATE_CONTENT};
/*服务器处理HTTP请求的可能结果*/
enum HTTP_CODE{NO_REQUEST GET_REQUEST BAD_REQUEST
NO_RESOURCE FORBIDDEN_REQUEST FILE_REQUEST
INTERNAL_ERROR CLOSED_CONNECTION};
/*行的读取状态*/
enum LINE_STATUS{LINE_OK = 0 LINE_BAD LINE_OPEN};
public:
http_conn(){};
~http_conn(){};
public:
/*初始化新接受的连接*/
void init(int sockfd const sockaddr_in &addr);
/*关闭连接*/
void close_conn(bool real_close = true);
/*处理客户请求*/
void process();
/*非阻塞读操作*/
bool read();
/*非阻塞写操作*/
bool write();
private:
/*初始化连接*/
void init();
/*解析HTTP请求*/
HTTP_CODE process_read();
/*填充HTTP应答*/
bool process_write(HTTP_CODE ret);
/*下面这一组函数被process_read调用以分析HTTP请求*/
HTTP_CODE parse_request_line(char* text);
HTTP_CODE parse_headers(char* text);
HTTP_CODE parse_content(char* text);
HTTP_CODE do_request();
char* get_line(){return m_read_buf m_start_line;}
LINE_STATUS parse_line();
/*下面这一组函数被process_write调用以填充HTTP应答*/
void unmap();
bool add_response(const char* format ...);
bool add_content(const char* connect);
bool add_status_line(int status const char* title);
bool add_headers(int content_length);
bool add_content_length(int content_length);
bool add_linger();
bool add_blank_line();
public:
/*所有socket上的事件都被注册到一个epoll内核时间表,
所以将epoll文件描述符设置为静态的*/
static int m_epollfd;
/*统计用户数量*/
static int m_user_count;
private:
/*该HTTP连接的socket和对方的socket地址*/
int m_sockfd;
sockaddr_in m_address;
/*读缓冲区*/
char m_read_buf[READ_BUFFER_SIZE];
/*标识读缓冲中已经读入的客户数据的最后一个字节的下一个位置*/
int m_read_idx;
/*当前正在分析的字符在读缓冲中的位置*/
int m_checked_idx;
/*当前正在解析的行的起始位置*/
int m_start_line;
/*写缓冲区*/
char m_write_buf[WRITE_BUFFER_SIZE];
/*写缓冲区中待发送的字节数*/
int m_write_idx;
/*主状态机当前所处的状态*/
CHECK_STATE m_check_state;
/*请求方法*/
METHOD m_method;
/*客户请求的目标文件的完整路径,
其内容等于doc_root m_url doc_root是网站根目录*/
char m_real_file[FILENAME_LEN];
/*客户请求的目标文件的文件名*/
char* m_url;
/*HTTP 协议版本号,我们仅支持HTTP/1.1*/
char* m_version;
/*主机名*/
char* m_host;
/*HTTP请求的消息体长度*/
int m_connect_length;
/*HTTP请求是否要求保持连接*/
bool m_linger;
/*客户请求的目标我文件被mmap到内存中的起始位置*/
char* m_file_address;
/*目标文件的状态,通过它我们可以判断文件是否存在、是否为目录、
是否为可读,并获取文件大小等信息*/
struct stat m_file_stat;
/*我们将用writev来执行写操作,所以定义下面两个成员
其中m_iv_count表示写内存块的数量*/
struct iovec m_iv[2];
int m_iv_count;
};
#endif
#include "http_conn.h"
#include <errno.h>
#include <stdlib.h>
#include <sys/uio.h>
const char* ok_200_title = "OK";
const char* error_400_title = "Bad Request";
const char* error_400_form = "Your request has bad syntax or is inherently impossible to satisfy.\n";
const char* error_403_title = "Forbidden";
const char* error_403_form = "You do not have permission to get file from this server.\n";
const char* error_404_title = "Not found";
const char* error_404_form = "THe requested file was not found on this server.\n";
const char* error_500_title = "Internal Error";
const char* error_500_form = "There was an unusual problem serving the requested file.\n";
/*网站的根目录*/
const char* doc_root = "./";
int setnonblocking(int fd)
{
int old_option = fcntl(fd F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd F_SETFL new_option);
return old_option;
}
void addfd(int epollfd int fd bool one_shot)
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
if(one_shot){
event.events |= EPOLLONESHOT;
}
epoll_ctl(epollfd EPOLL_CTL_ADD fd &event);
setnonblocking(fd);
}
void removefd(int epollfd int fd)
{
epoll_ctl(epollfd EPOLL_CTL_DEL fd 0);
close(fd);
}
void modfd(int epollfd int fd int ev)
{
epoll_event event;
event.data.fd = fd;
event.events = ev | EPOLLET | EPOLLONESHOT | EPOLLRDHUP;
epoll_ctl(epollfd EPOLL_CTL_MOD fd &event);
}
int http_conn::m_user_count = 0;
int http_conn::m_epollfd = -1;
void http_conn::close_conn(bool real_close)
{
if(real_close && (m_sockfd != -1)){
removefd(m_epollfd m_sockfd);
m_sockfd = -1;
m_user_count--;
}
}
void http_conn::init(int sockfd const sockaddr_in& addr)
{
m_sockfd = sockfd;
m_address = addr;
/*如下两行是为了避免TIME_WAIT状态,仅用于调试,
实际使用应该去掉*/
int reuse = 1;
setsockopt(m_sockfd SOL_SOCKET SO_REUSEADDR &reuse sizeof(reuse));
addfd(m_epollfd sockfd true);
m_user_count ;
init();
}
void http_conn::init()
{
m_check_state = CHECK_STATE_REQUESTLINE;
m_linger = false;
m_method = GET;
m_url = 0;
m_version = 0;
m_connect_length = 0;
m_host = 0;
m_start_line = 0;
m_checked_idx = 0;
m_read_idx = 0;
m_write_idx = 0;
memset(m_read_buf '\0' READ_BUFFER_SIZE);
memset(m_write_buf '\0' WRITE_BUFFER_SIZE);
memset(m_real_file '\0' FILENAME_LEN);
}
/*从状态机*/
http_conn::LINE_STATUS http_conn::parse_line()
{
char temp;
for(;m_checked_idx < m_read_idx; m_checked_idx){
temp = m_read_buf[m_checked_idx];
if(temp == '\r'){
if(m_checked_idx 1 == m_read_idx){
return LINE_OPEN;
}
else if(m_read_buf[m_checked_idx 1] == '\n'){
m_read_buf[m_checked_idx ] = '\0';
m_read_buf[m_checked_idx ] = '\0';
return LINE_OK;
}
}
else if(temp == '\n'){
if(m_checked_idx > 1 && m_read_buf[m_checked_idx-1] == '\r'){
m_read_buf[m_checked_idx-1] = '\0';
m_read_buf[m_checked_idx ] = '\0';
return LINE_OK;
}
return LINE_BAD;
}
}
return LINE_OPEN;
}
/*循环读取客户数据,指导无数据可读或对方关闭连接*/
bool http_conn::read()
{
if(m_read_idx >= READ_BUFFER_SIZE){
return false;
}
int bytes_read = 0;
while(true){
bytes_read = recv(m_sockfd m_read_buf m_read_idx
READ_BUFFER_SIZE - m_read_idx 0);
if(bytes_read == -1){
if( errno == EAGAIN || errno == EWOULDBLOCK){
break;
}
return false;
}
else if(bytes_read == 0){
return false;
}
m_read_idx = bytes_read;
}
return true;
}
/*解析HTTP请求行,获得请求方法,目标URL,以及HTTP版本号*/
http_conn::HTTP_CODE http_conn::parse_request_line(char* text)
{
m_url = strpbrk(text " \t");
if(!m_url){
return BAD_REQUEST;
}
*m_url ='\0';
char* method = text;
if(strcasecmp(method "GET") == 0){
m_method = GET;
}
else{
return BAD_REQUEST;
}
m_url = strspn(m_url " \t");
m_version = strpbrk(m_url " \t");
if(!m_version){
return BAD_REQUEST;
}
*m_version = '\0';
m_version = strspn(m_version " \t");
if(strcasecmp(m_version "HTTP/1.1") != 0){
return BAD_REQUEST;
}
if(strncasecmp(m_url "http://" 7) == 0){
m_url = 7;
m_url = strchr(m_url '/');
}
if(!m_url || m_url[0] != '/'){
return BAD_REQUEST;
}
m_check_state = CHECK_STATE_HEADER;
return NO_REQUEST;
}
/*解析HTTP请求的一个头部信息*/
http_conn::HTTP_CODE http_conn::parse_headers(char* text)
{
/*遇到空行,表示头部字段解析完毕*/
if(text[0] == '\0'){
/*如果HTTP请求有消息体,则还需要读取m_content_leng字节的消息提,
状态机转移到CHECK_STATE_CONTENT*/
if(m_connect_length != 0){
m_check_state = CHECK_STATE_CONTENT;
return NO_REQUEST;
}
/*否则说明我们已经得到了一个完整的HTTP请求*/
return GET_REQUEST;
}
/*处理Connection 头部信息*/
else if(strncasecmp(text "Connection:" 11) == 0){
text = 11;
text = strspn(text " \t");
if(strcasecmp(text "keep-alive") == 0){
m_linger = true;
}
}
/*处理Connect-Length头部信息*/
else if(strncasecmp(text "Content-Length:" 15) == 0){
text = 15;
text = strspn(text " \t");
m_connect_length = atol(text);
}
/*处理Host头部字段*/
else if(strncasecmp(text "Host:" 5) == 0){
text = 5;
text = strspn(text " \t");
m_host = text;
}
else{
printf("oop! unknown header %s\n" text);
}
return NO_REQUEST;
}
/*我们没有真正解析HTTP请求的消息体,只是判断它是否被完整的读入了*/
http_conn::HTTP_CODE http_conn::parse_content(char* text)
{
if(m_read_idx >= (m_connect_length m_checked_idx)){
text[m_connect_length] = '\0';
return GET_REQUEST;
}
return NO_REQUEST;
}
/*主状态机*/
http_conn::HTTP_CODE http_conn::process_read()
{
LINE_STATUS line_status = LINE_OK;
HTTP_CODE ret = NO_REQUEST;
char *text = 0;
while((m_check_state == CHECK_STATE_CONTENT && line_status == LINE_OK)
|| (line_status = parse_line()) == LINE_OK){
text = get_line();
m_start_line = m_checked_idx;
printf("got 1 http line: %s\n" text);
switch (m_check_state)
{
case CHECK_STATE_REQUESTLINE:{
ret = parse_request_line(text);
if(ret == BAD_REQUEST){
return BAD_REQUEST;
}
break;
}
case CHECK_STATE_HEADER:{
ret = parse_headers(text);
if(ret == BAD_REQUEST){
return BAD_REQUEST;
}
else if (ret == GET_REQUEST)
{
return do_request();
}
break;
}
case CHECK_STATE_CONTENT:{
ret = parse_content(text);
if(ret == GET_REQUEST){
return do_request();
}
line_status = LINE_OPEN;
break;
}
default:{
return INTERNAL_ERROR;
}
}
}
return NO_REQUEST;
}
/*当得到一个完整、正确的HTTP请求时,我们就分析目标文件的属性
如果目标存在、对所有用户可读,且不是目录,
则使用mmap将其映射到内存地址m_file_address处,
并告诉调用者获取文件成功*/
http_conn::HTTP_CODE http_conn::do_request()
{
strcpy(m_real_file doc_root);
int len = strlen(doc_root);
strncpy(m_real_file len m_url FILENAME_LEN - len - 1);
if(stat(m_real_file &m_file_stat) < 0){
return NO_RESOURCE;
}
if(!(m_file_stat.st_mode & S_IROTH)){
return FORBIDDEN_REQUEST;
}
if (S_ISDIR(m_file_stat.st_mode)){
return BAD_REQUEST;
}
int fd = open(m_real_file O_RDONLY);
m_file_address = (char*)mmap(0 m_file_stat.st_size PROT_READ
MAP_PRIVATE fd 0);
close(fd);
return FILE_REQUEST;
}
/*对内存映射区执行munmap操作*/
void http_conn::unmap()
{
if(m_file_address){
munmap(m_file_address m_file_stat.st_size);
m_file_address = 0;
}
}
/*写HTTP响应*/
bool http_conn::write()
{
int temp = 0;
int bytes_have_send = 0;
int bytes_to_send = m_write_idx;
if(bytes_to_send == 0){
modfd(m_epollfd m_sockfd EPOLLIN);
init();
return true;
}
while(1){
temp = writev(m_sockfd m_iv m_iv_count);
if(temp <= -1){
/*如果TCP写缓冲没有空间,则等待下一轮EPOLLOUT事件。
虽然在此期间,服务器无法立即接收到同一客户的下一个请求,
但这可以保证连接的完整性*/
if(errno == EAGAIN){
modfd(m_epollfd m_sockfd EPOLLOUT);
return true;
}
unmap();
return false;
}
bytes_to_send -= temp;
bytes_have_send = temp;
if(bytes_to_send <= bytes_have_send){
/*发送HTTP响应成功,根据HTTP请求中的Connection字段决定是否立即关闭连接*/
unmap();
if(m_linger){
init();
modfd(m_epollfd m_sockfd EPOLLIN);
return true;
}
else{
modfd(m_epollfd m_sockfd EPOLLIN);
return false;
}
}
}
}
/*往写缓冲中写入待发送的数据*/
bool http_conn::add_response(const char* format ...)
{
if(m_write_idx >= WRITE_BUFFER_SIZE){
return false;
}
va_list arg_list;
va_start(arg_list format);
int len = vsnprintf(m_write_buf m_write_idx
WRITE_BUFFER_SIZE - 1 - m_write_idx format arg_list);
if(len >= (WRITE_BUFFER_SIZE - 1 - m_write_idx)){
return false;
}
m_write_idx = len;
va_end(arg_list);
return true;
}
bool http_conn::add_status_line(int status const char* title)
{
return add_response("%s %d %s\r\n" "HTTP/1.1" status title);
}
bool http_conn::add_headers(int content_len)
{
add_content_length(content_len);
add_linger();
add_blank_line();
}
bool http_conn::add_content_length(int content_len)
{
return add_response("Content-Length: %d\r\n" content_len);
}
bool http_conn::add_linger()
{
return add_response( "Connection: %s\r\n" (m_linger == true)?"keep-alive":"close");
}
bool http_conn::add_blank_line()
{
return add_response("%s" "\r\n");
}
bool http_conn::add_content(const char* content)
{
return add_response("%s" content);
}
/*根据服务器处理HTTP请求的结果,决定返回给客户端的内容*/
bool http_conn::process_write(HTTP_CODE ret)
{
switch (ret)
{
case INTERNAL_ERROR:{
add_status_line(500 error_500_title);
add_headers(strlen(error_500_form));
if(!add_content(error_500_form)){
return false;
}
break;
}
case BAD_REQUEST:{
add_status_line(400 error_400_title);
add_headers(strlen(error_400_form));
if(!add_content(error_400_form)){
return false;
}
break;
}
case NO_REQUEST:{
add_status_line(404 error_404_title);
add_headers(strlen(error_404_form));
if(!add_content(error_404_form)){
return false;
}
break;
}
case NO_RESOURCE:{
add_status_line(404 error_404_title);
add_headers(strlen(error_404_form));
if(!add_content(error_404_form)){
return false;
}
break;
}
case FORBIDDEN_REQUEST:{
add_status_line(403 error_403_title);
add_headers(strlen(error_403_form));
if(!add_content(error_403_form)){
return false;
}
break;
}
case FILE_REQUEST:{
add_status_line(200 ok_200_title);
if(m_file_stat.st_size != 0){
add_headers(m_file_stat.st_size);
m_iv[0].iov_base = m_write_buf;
m_iv[0].iov_len = m_write_idx;
m_iv[1].iov_base = m_file_address;
m_iv[1].iov_len = m_file_stat.st_size;
m_iv_count = 2;
return true;
}
else{
const char* ok_string = "<html><body></body></html>";
add_headers(strlen(ok_string));
if(!add_content(ok_string)){
return false;
}
}
}
default:{
return false;
}
}
m_iv[0].iov_base = m_write_buf;
m_iv[0].iov_len = m_write_idx;
m_iv_count = 1;
return true;
}
/*由线程池的工作线程调用, 这是处理HTTP请求的入口函数*/
void http_conn::process()
{
HTTP_CODE read_ret = process_read();
if(read_ret == NO_REQUEST){
modfd(m_epollfd m_sockfd EPOLLIN);
return;
}
bool write_ret = process_write(read_ret);
if(!write_ret){
close_conn();
}
modfd(m_epollfd m_sockfd EPOLLOUT);
}
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <cassert>
#include <sys/epoll.h>
#include "locker.h"
#include "threadpool.h"
#include "http_conn.h"
#define MAX_FD 65535
#define MAX_EVENT_NUMBER 10000
extern int addfd(int epollfd int fd bool one_shot);
extern int removefd(int epollfd int fd);
void addsig(int sig void(handler)(int) bool restart = true)
{
struct sigaction sa;
memset(&sa '\0' sizeof(sa));
sa.sa_handler = handler;
if(restart){
sa.sa_flags |= SA_RESTART;
}
sigfillset(&sa.sa_mask);
assert(sigaction(sig &sa NULL) != -1);
}
void show_error(int connfd const char* info)
{
printf("%s" info);
send(connfd info strlen(info) 0);
close(connfd);
}
int main(int argc char const *argv[])
{
if(argc < 2){
printf("usage: %s ip_address port_number\n" basename(argv[0]));
return 1;
}
const char* ip = argv[1];
int port = atoi(argv[2]);
/*忽略SIGPIPE*/
addsig(SIGPIPE SIG_IGN);
/*创建线程池*/
threadpool<http_conn>* pool = NULL;
try{
pool = new threadpool<http_conn>();
}
catch(...){
return 1;
}
/*预先为每个可能的客户连接分配一个http_conn对象*/
http_conn* users = new http_conn[MAX_FD];
assert(users);
int user_count = 0;
int listenfd = socket(PF_INET SOCK_STREAM 0);
assert(listenfd >= 0);
struct linger tmp = {1 0};
setsockopt(listenfd SOL_SOCKET SO_LINGER &tmp sizeof(tmp));
int ret = 0;
struct sockaddr_in address;
bzero(&address sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET ip &address.sin_addr);
address.sin_port = htons(port);
ret = bind(listenfd (struct sockaddr*)&address sizeof(address));
assert(ret >= 0);
ret = listen(listenfd 5);
assert(ret >= 0);
epoll_event events[MAX_EVENT_NUMBER];
int epollfd = epoll_create(5);
assert(epollfd != -1);
addfd(epollfd listenfd false);
http_conn::m_epollfd = epollfd;
while(true){
int number = epoll_wait(epollfd events MAX_EVENT_NUMBER -1);
if (number < 0 && errno != EINTR){
printf("epoll failure");
break;
}
for(int i = 0; i < number; i){
int sockfd = events[i].data.fd;
if(sockfd == listenfd){
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd (struct sockaddr*)&client_address
&client_addrlength);
if(connfd <= 0){
printf("errno is: %d\n" errno);
continue;
}
if(http_conn::m_user_count >= MAX_FD){
show_error(connfd "Internal server busy");
continue;
}
/*初始化客户连接*/
users[connfd].init(connfd client_address);
}
else if(events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)){
/*如果有异常,直接关闭客户端*/
users[sockfd].close_conn();
}
else if(events[i].events & EPOLLIN){
/*根据读的结果,决定是将任务添加到线程池,还是关闭链接*/
if(users[sockfd].read()){
pool->append(users sockfd);
}
else{
users[sockfd].close_conn();
}
}
else if(events[i].events & EPOLLOUT){
/*根据写的结果,决定是否关闭连接*/
if(!users[sockfd].write()){
users[sockfd].close_conn();
}
}
else{}
}
}
close(epollfd);
close(listenfd);
delete[] users;
delete pool;
return 0;
}