动态创建线程池,为每个客户端连接申请一个单独的线程来处理请求。处理完成返回数据后回收线程
缺点
在网络编程中,生产者-消费者模式(Producer-Consumer> Pattern)是一种经典的多线程并发协作模式,它主要用于解耦数据生产和数据消费的过程,提高程序的模块化和可维护性。
比如使用一小组线程负责处理客户端连接的任务,一组负责处理请求,另外一组的负责处理响应。
生产者-消费者模式可以与epoll 等事件驱动的 I/O 多路复用技术结合使用
这种架构的主要优点是它对并发和线程解耦了,不再需要同等数量的线程服务连接。尽管这样,线程组之间必须共享任务队列,任务队列需要用锁来保护,还有数据破坏,死锁,条件竞争等需要注意
缺点
使用事件驱动的方式处理客户端连接,常见的实现包括 select, poll, epoll, kqueue 等,通过读写异常等事件的注册、等待与触发、事件处理、状态更新与循环等步骤实现了高效的事件驱动机制
在基于 事件驱动模型中,通常会使用状态机来管理每个连接的状态。状态机可以帮助处理不同阶段的连接和事件,从而实现高效的并发处理。
事件驱动的优点
事件驱动的缺点
连接队列 可以使用数组实现
#define CONN_MAXFD 65536
struct connection_st g_conn_table[CONN_MAXFD] = {0};
连接结构
根据具体业务修改BUF_SIZE
注意 如果超过栈空间大小 请自己动态分配内存,加锁会影响性能
#define BUF_SIZE 8192
typedef struct connection_st {
int sock;
int using;
int roff; // 读缓冲区长度
char rbuf[BUF_SIZE];//读缓存区
int woff; // 写缓冲区长度
char wbuf[BUF_SIZE]; //写缓存区
}*connection_t;
初始化 g_conn_table 以及信号处理函数
int c;
for (c = 0; c < CONN_MAXFD; ++c) {
g_conn_table[c].sock = c;
}
//sigaction 是一个比 signal 更强大且更安全的函数,用于设置信号处理函数。
struct sigaction act;
memset(&act, 0, sizeof(act));
act.sa_handler = shut_server_handler; // 信号函数 shut_server = 1;
sigaction(SIGINT, &act, NULL);//通常是终端输入 Ctrl+C
sigaction(SIGTERM, &act, NULL);//kill命令或其他进程管理工具(如init系统或systemd)发送给进程
创建监听
lisSock = socket(AF_INET, SOCK_STREAM, 0);
//SO_REUSEADDR 选项可以避免因操作系统内核的TIME_WAIT状态而导致的端口暂时不可用的问题。
int reuse = 1;
setsockopt(lisSock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
// 设置非阻塞
int flag;
fcntl(lisSock, F_GETFL, &flag);
fcntl(lisSock, F_SETFL, flag | O_NONBLOCK);
struct sockaddr_in lisAddr;
lisAddr.sin_family = AF_INET;
lisAddr.sin_port = htons(8384);
lisAddr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(lisSock, (struct sockaddr *)&lisAddr, sizeof(lisAddr)) == -1) {
perror("bind");
return -1;
}
listen(lisSock, 8192);
设置epoll 监听lisSock 的 EPOLLIN | EPOLLONESHOT;
EPOLLIN 有数据可以被读
EPOLLONESHOT epoll_wait 捕获并返回,之后 epoll_wait 将不再报告该文件描述符的事件,手动再添加
epfd = epoll_create(65534);
struct epoll_event evReg;
evReg.events = EPOLLIN | EPOLLONESHOT;
evReg.data.fd = lisSock;
epoll_ctl(epfd, EPOLL_CTL_ADD, lisSock, &evReg);
启动线程 workerThread 负责读写 以及业务处理
int i;
for (i = 0; i < NUM_WORKER; ++i) {
pthread_create(worker + i, NULL, workerThread, NULL);
}
for (i = 0; i < NUM_WORKER; ++i) {
pthread_join(worker[i], NULL);
}
work线程 根据epoll_wait返回处理
void *workerThread(void *arg) {
struct epoll_event event;
struct epoll_event evReg;
while (!shut_server) {
int numEvents = epoll_wait(epfd, &event, 1, 1000);
if (numEvents > 0) {
// 新客户端连接
if (event.data.fd == lisSock) {
int sock = accept(lisSock, NULL, NULL);
if (sock > 0) {// 设置标志位 非阻塞 加入epoll
g_conn_table[sock].using = 1;
int flag;
fcntl(sock, F_GETFL, &flag);
fcntl(sock, F_SETFL, flag | O_NONBLOCK);
evReg.data.fd = sock;
evReg.events = EPOLLIN | EPOLLONESHOT;
epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &evReg);
}
//继续监听 lisSock 等待可读 新的客户端连接
evReg.data.fd = lisSock;
evReg.events = EPOLLIN | EPOLLONESHOT;
epoll_ctl(epfd, EPOLL_CTL_MOD, lisSock, &evReg);
} else {
int sock = event.data.fd;
connection_t conn = &g_conn_table[sock];
if (event.events & EPOLLOUT) { // 套接字可写
if (handleWriteEvent(conn) == -1) {
closeConnection(conn);
continue;
}
}
if (event.events & EPOLLIN) { // 套接字可读
if (handleReadEvent(conn) == -1) {
closeConnection(conn);
continue;
}
}
//加入epoll
evReg.events = EPOLLIN | EPOLLONESHOT;
if (conn->woff > 0) { // 如果有没写完的继续写
evReg.events |= EPOLLOUT;
printf("conn->woff %d add write\n",conn->woff);
}
evReg.data.fd = sock;
epoll_ctl(epfd, EPOLL_CTL_MOD, conn->sock, &evReg);
}
}
}
}
读处理函数
int handleReadEvent(connection_t conn) {
char*p = NULL;
char*p2 = NULL;
char ip[20];
int ret = read(conn->sock, conn->rbuf + conn->roff, BUF_SIZE - conn->roff);
if (ret > 0) {
conn->roff += ret;
//处理业务逻辑,生成响应数据,拷贝到写缓存,等待套接字可写,发送
strcpy(conn->wbuf,"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n");
if (sendData(conn, conn->wbuf,strlen(conn->wbuf)+1) == -1) {
printf("send error\n");
}
} else if (ret == 0) {
return -1;
} else {
if (errno != EINTR && errno != EAGAIN) {
return -1;
}
}
return 0;
}
发送数据 首先尝试发送,剩余的数据存入发送缓存区
int sendData(connection_t conn, char *data, int len) {
int ret = write(conn->sock, data, len);
if (ret > 0){
if (ret == len) {
return 0;
}
int left = len - ret;
if (left > BUF_SIZE) return -1;
memcpy(conn->wbuf, data + ret, left);
conn->woff = left;
} else {
if (errno != EINTR && errno != EAGAIN) {
return -1;
}
if (len > BUF_SIZE) {
return -1;
}
memcpy(conn->wbuf, data, len);
conn->woff = len;
}
return 0;
}
int handleWriteEvent(connection_t conn) {
if (conn->woff == 0) return 0;
int ret = write(conn->sock, conn->wbuf, conn->woff);
if (ret == -1) {
if (errno != EINTR && errno != EAGAIN) {
return -1;
}
} else {
int left = conn->woff - ret;
if (left > 0) {
memmove(conn->wbuf, conn->wbuf + ret, left);
}
conn->woff = left;
}
return 0;
}
结束关闭资源
for (c = 0; c < CONN_MAXFD; ++c) {
connection_t conn = g_conn_table + c;
if (conn->using) {
epoll_ctl(epfd, EPOLL_CTL_DEL, conn->sock, &evReg);
close(conn->sock);
}
}
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <pthread.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <time.h>
#define BUF_SIZE 4096
typedef struct connection_st {
int sock;
int using;
int roff;
char rbuf[BUF_SIZE];
int woff;
char wbuf[BUF_SIZE];
}*connection_t;
#define CONN_MAXFD 65536
struct connection_st g_conn_table[CONN_MAXFD] = {0};
static sig_atomic_t shut_server = 0;
void shut_server_handler(int signo) {
shut_server = 1;
}
int epfd;
int lisSock;
#define NUM_WORKER 64
pthread_t worker[NUM_WORKER];
int sendData(connection_t conn, char *data, int len) {
int ret = write(conn->sock, data, len);
if (ret > 0){
if (ret == len) {
return 0;
}
int left = len - ret;
if (left > BUF_SIZE) return -1;
memcpy(conn->wbuf, data + ret, left);
conn->woff = left;
} else {
if (errno != EINTR && errno != EAGAIN) {
return -1;
}
if (len > BUF_SIZE) {
return -1;
}
memcpy(conn->wbuf, data, len);
conn->woff = len;
}
return 0;
}
int handleReadEvent(connection_t conn) {
char*p = NULL;
char*p2 = NULL;
char ip[20];
int ret = read(conn->sock, conn->rbuf + conn->roff, BUF_SIZE - conn->roff);
if (ret > 0) {
conn->roff += ret;
p=strstr(conn->rbuf,"/test?conplete=1"); //判断数据完整逻辑 不完整继续读取 ,完整后进入业务处理逻辑
if(p){
strcpy(conn->wbuf,"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n");
}
else{
strcpy(conn->wbuf,"HTTP/1.1 403\r\nContent-Length: 0\r\n\r\n");
}
if (sendData(conn, conn->wbuf,strlen(conn->wbuf)+1) == -1) {
printf("send error\n");
}
} else if (ret == 0) {
return -1;
} else {
if (errno != EINTR && errno != EAGAIN) {
return -1;
}
}
return 0;
}
int handleWriteEvent(connection_t conn) {
if (conn->woff == 0) return 0;
int ret = write(conn->sock, conn->wbuf, conn->woff);
if (ret == -1) {
if (errno != EINTR && errno != EAGAIN) {
return -1;
}
} else {
int left = conn->woff - ret;
if (left > 0) {
memmove(conn->wbuf, conn->wbuf + ret, left);
}
conn->woff = left;
}
return 0;
}
void closeConnection(connection_t conn) {
struct epoll_event evReg;
conn->using = 0;
conn->woff = conn->roff = 0;
epoll_ctl(epfd, EPOLL_CTL_DEL, conn->sock, &evReg);
close(conn->sock);
}
void *workerThread(void *arg) {
struct epoll_event event;
struct epoll_event evReg;
while (!shut_server) {
int numEvents = epoll_wait(epfd, &event, 1, 1000);
if (numEvents > 0) {
if (event.data.fd == lisSock) {
int sock = accept(lisSock, NULL, NULL);
if (sock > 0) {
g_conn_table[sock].using = 1;
int flag;
fcntl(sock, F_GETFL, &flag);
fcntl(sock, F_SETFL, flag | O_NONBLOCK);
evReg.data.fd = sock;
evReg.events = EPOLLIN | EPOLLONESHOT;
epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &evReg);
}
evReg.data.fd = lisSock;
evReg.events = EPOLLIN | EPOLLONESHOT;
epoll_ctl(epfd, EPOLL_CTL_MOD, lisSock, &evReg);
} else {
int sock = event.data.fd;
connection_t conn = &g_conn_table[sock];
if (event.events & EPOLLOUT) {
if (handleWriteEvent(conn) == -1) {
closeConnection(conn);
continue;
}
}
if (event.events & EPOLLIN) {
if (handleReadEvent(conn) == -1) {
closeConnection(conn);
continue;
}
}
evReg.events = EPOLLIN | EPOLLONESHOT;
if (conn->woff > 0) {
evReg.events |= EPOLLOUT;
printf("conn->woff %d add write\n",conn->woff);
}
evReg.data.fd = sock;
epoll_ctl(epfd, EPOLL_CTL_MOD, conn->sock, &evReg);
}
}
}
}
int main(int argc, char *const argv[]) {
int c=0;
for (c = 0; c < CONN_MAXFD; ++c) {
g_conn_table[c].sock = c;
}
struct sigaction act;
memset(&act, 0, sizeof(act));
act.sa_handler = shut_server_handler;
sigaction(SIGINT, &act, NULL);
sigaction(SIGTERM, &act, NULL);
epfd = epoll_create(65534);
lisSock = socket(AF_INET, SOCK_STREAM, 0);
int reuse = 1;
setsockopt(lisSock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
int flag;
fcntl(lisSock, F_GETFL, &flag);
fcntl(lisSock, F_SETFL, flag | O_NONBLOCK);
struct sockaddr_in lisAddr;
lisAddr.sin_family = AF_INET;
lisAddr.sin_port = htons(8384);
lisAddr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(lisSock, (struct sockaddr *)&lisAddr, sizeof(lisAddr)) == -1) {
perror("bind");
return -1;
}
listen(lisSock, 8192);
struct epoll_event evReg;
evReg.events = EPOLLIN | EPOLLONESHOT;
evReg.data.fd = lisSock;
epoll_ctl(epfd, EPOLL_CTL_ADD, lisSock, &evReg);
int i;
for (i = 0; i < NUM_WORKER; ++i) {
pthread_create(worker + i, NULL, workerThread, NULL);
}
for (i = 0; i < NUM_WORKER; ++i) {
pthread_join(worker[i], NULL);
}
for (c = 0; c < CONN_MAXFD; ++c) {
connection_t conn = g_conn_table + c;
if (conn->using) {
epoll_ctl(epfd, EPOLL_CTL_DEL, conn->sock, &evReg);
close(conn->sock);
}
}
return 0;
}
协程是一种轻量级的执行单元,它允许程序在多个任务之间进行协作式的切换,而无需操作系统级别的上下文切换。协程在不同的编程语言和环境中有着不同的实现方式。下面是一些常见的协程实现:
协程主要是用来提高代码的编写速度和可维护性的,性能能否提高还与其他因素相关
比如协议栈解析中最常见的结构 length-data , 其中length字段(假设为2字节)用于指示紧随其后的data部分的长度
解析:
协程就是为了解决线程粒度还不够细的问题。操作系统调度线程,线程调度协程。
举个例子,在网络服务中,调用read函数读取数据,如果socket缓冲区没有数据,当前线程就会阻塞一直到缓冲区可读才行。注意,整个线程会被阻塞,而并发性能自然会受到影响。如果能把线程更细粒度区分为很多子任务,线程在多个子任务之间交替执行。比如在子任务A里面调用read 函数,如果socket不可读,那么子任务A阻塞,让出执行权,线程转而去执行其他的子任务。当可读条件满足后,线程又唤醒子任务A,从上次read阻塞的地方恢复继续执行。
另外,这里子任务简单来说就是一个函数罢了,要封装这么一个子任务也很简单,把当前函数的栈空间、寄存器状态保存下来即可。吃CPU的用线程,等io的用协程
链接:https://www.zhihu.com/question/496181340/answer/2626832687
官网 https://state-threads.sourceforge.net/
介绍
The State Threads Library is a small application library which provides a foundation for writing fast and highly scalable Internet applications (such as web servers, proxy servers, mail transfer agents, and so on, really any network-data-driven application) on UNIX-like platforms. It combines the simplicity of the multithreaded programming paradigm, in which one thread supports each simultaneous connection, with the performance and scalability of an event-driven state machine architecture. In other words, this library offers a threading API for structuring an Internet application as a state machine. For more details, please see the library documentation.
译
State Threads是一个小型应用程序库,为在类UNIX平台上编写快速、高度可扩展的互联网应用程序(如web服务器、代理服务器、邮件传输代理等)提供了基础。它将多线程支持每个同时连接的多线程编程范式的简单性与事件驱动状态机器架构的性能和可扩展性结合起来。换言之,该库提供了一个线程API,用于将Internet应用程序构建为状态机
特点
它的api提供了像线程一样的编程方式,允许一个并发在一个“线程”里面执行,但这些线程都在一个进程里面。底层的实现和EDSM架构类似,每个并发连接的session在单独的内存空间。
协程的优点
#include <stdio.h>
#include "st.h"
void* do_calc(void* arg){
int sleep_ms = (int)(long int)(char*)arg * 10;
for(;;){
printf("in sthread #%dms\n", sleep_ms);
st_usleep(sleep_ms * 1000);
}
return NULL;
}
int main(int argc, char** argv){
if(argc <= 1){
printf("Test the concurrence of state-threads!\n");
printf("Usage: %s <sthread_count>\n");
printf("eg. %s 10000\n", argv[0], argv[0]);
return -1;
}
if(st_init() < 0){
printf("st_init error!");
return -1;
}
int i;
int count = atoi(argv[1]);
for(i = 1; i <= count; i++){
if(st_thread_create(do_calc, (void*)i, 0, 0) == NULL){
printf("st_thread_create error!");
return -1;
}
}
st_thread_exit(NULL);
return 0;
}
编译 gcc -g tcp-srv.c -o tcp-srv -l st -L. -I .
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <st.h>
#define BACKLOG 5
#define IOBUFSIZE 8192
#define PORT 7474
static void *handle_request(void *arg)
{
st_netfd_t cli_nfd = (st_netfd_t) arg;
char buf[IOBUFSIZE];
int nw, nr;
for ( ; ; ) {
nr = (int) st_read(cli_nfd, buf, IOBUFSIZE, ST_UTIME_NO_TIMEOUT);
if (nr <= 0)break;
printf("[recv][%d] %s\n",nr,buf);
nw = st_write(cli_nfd, buf, nr, ST_UTIME_NO_TIMEOUT);
printf("[write] %d\n",nw);
if (nw == nr)break;
}
done:
st_netfd_close(cli_nfd);
return NULL;
}
int main() {
int sock, client_fd;
struct sockaddr_in address;
struct sockaddr_in cli_addr;
socklen_t addrlen = sizeof(address);
st_netfd_t cli_nfd, srv_nfd;
int n =0;
// 初始化StateThreads库
if (st_init() != 0) {
perror("st_init");
return 0;
}
if ((sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
printf("socket");
exit(1);
}
n = 1;
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *)&n, sizeof(n)) < 0) {
printf("setsockopt");
exit(1);
}
// 绑定socket到端口
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
if (bind(sock, (struct sockaddr *)&address, sizeof(address)) < 0) {
printf("bind");
exit(1);
}
listen(sock, 128);
if ((srv_nfd = st_netfd_open_socket(sock)) == NULL) {
printf("st_netfd_open");
exit(1);
}
printf("tcp srv start\n");
for ( ; ; ) {
n = sizeof(cli_addr);
cli_nfd = st_accept(srv_nfd, (struct sockaddr *)&cli_addr, &n,ST_UTIME_NO_TIMEOUT);
if (cli_nfd == NULL) {
printf("st_accept\n");
continue;
}
if (st_thread_create(handle_request, cli_nfd, 0, 0) == NULL) {
printf("st_thread_create\n");
continue;
}
}
close(sock);
st_netfd_close(srv_nfd);
return 0;
}
测试命令 wget http://127.0.0.1:7474