I/O多路复用 - select、poll、epoll讲解(epoll工作图解介绍+红黑树)_epoll 红黑树-程序员宅基地

技术标签: c语言  运维  linux  服务器  unix  UNIX环境高级编程  


当有多个多种事件同一段时间发生时,多进程/多线程模式可以解决,但是创建进程和创建线程都是需要时间开销的。在编写服务器客户端程序的时候,如果服务器性能一般,但是客户端连接的又太多的时候,这会造成很大的代价。

该篇要说的多路复用就是解决这中问题的办法之一。通过多路复用来监听,是否有事件发生,并把发生的事件是什么告诉服务器端,然后执行相关操作。这种方法叫做多路复用。

多路复用有三种,下面一一介绍:select、poll以及epoll多路复用。


1- select多路复用

(1)工作原理

我们先讲解select的工作原理,然后再用代码讲解进行实践。

可以理解为select监视并等待多个文件描述符的属性发生变化,它监视的属性分3类,分别是read-fds(文件描述符有数据到来可读)、write_fds(文件描述符可写)、和except_fds(文件描述符异常)。调用后select函数会阻塞,直到有描述符就绪(有数据可读、可写、或者有错误异常),或者超时( timeout 指定等待时间)发生函数才返回。当select()函数返回后,可以通过遍历 fdset,来找到究竟是哪些文件描述符就绪。

函数原型:

#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
int select(int nfds, fd_set *read_fds, fd_set *write_fds, fd_set *except_fds, struct timeval *timeout);
  • nfds:
    指待测试的fd的总个数,它的值是待测试的最大文件描述符加1。Linux内核从0开始到max_fd-1扫描文件描述符,如果有数据出现事件(读、写、异常)将会返回;假设需要监测的文件描述符是8,9,10,那么Linux内核实际也要监测0-7,此时真正带测试的文件描述符是0-10总共11个,即max(8,9,10)+1,所以第一个参数是所有要监听的文件描述符中最大的+1;
  • read_fds、write_fds、except_fds:
    中间三个参数read_fds、write_fds和except_fds指定要让内核测试读、写和异常条件的fd集合,如果不需要测试的可以设置为NULL;
  • timeout:
    最后一个参数是设置select的超时时间,如果设置为NULL则永不超时。timeval结构体: struct timeval { long tv_sec; /* seconds 秒*/ long tv_usec; /* microseconds 微妙*/ };

操作文件描述符的几个函数:

在这里插入图片描述

注意:

在Linux内核有个参数__FD_SETSIZE定义了每个FD_SET的句柄个数中,这也意味着select所用到的FD_SET是有限的,也正是这个原因select()默认只能同时处理1024个客户端的连接请求: /linux/posix_types.h: #define __FD_SETSIZE 1024

(2)select-服务器编程

socket_server_socket.c服务器代码:

#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<unistd.h>
#include<errno.h>
#include<ctype.h>
#include<time.h>
#include<pthread.h>
#include<getopt.h>
#include<libgen.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>

#define  ARRAY_SIZE(x)      ( sizeof(x)/sizeof(x[0]) )//数组的大小/第一个元素的大小=元素的个数

static inline void msleep(unsigned long ms);
static inline void print_usage(char *programname);
int socket_server_init( char *listen_ip, int listen_port);

int main(int argc, char *argv[])
{
    
	char     *programname = NULL;
	int        server_port = 0;
	int        daemon_run = 0;
	int        fds_array[1024];
	fd_set   rdset;
	int        maxfd = 0;
	int        found;
	int        listenfd,connfd;
	int        opt;
	int        rv;
	int        i,j;
	char     buf[1024];

	struct option long_option[] =
	{
    
		{
    "daemon", no_argument, NULL, 'b'},
		{
    "port", required_argument, NULL, 'p'},
		{
    "help", no_argument, NULL, 'h'},
		{
    NULL, 0, NULL, 0}
	};

	programname = basename(argv[0]);

	while( (opt = getopt_long(argc, argv, "bp:h", long_option, NULL))  != -1)
	{
    
		switch(opt)
		{
    
			case 'b':
				daemon_run = 1;
				break;

			case 'p':
				server_port = atoi(optarg);
				break;

			case 'h':
				print_usage(programname);
				return EXIT_SUCCESS;//EXIT_SUCCESS- successful execution of a
				//program (程序的成功执行)   EXIT_FAILUEEunsuccessful execution of a program (程序的不成功执行) 

			default:
					break;//default只有在case匹配失败的时候才会执行,位置没关系
		}
	}

	if( !server_port )
	{
    
		print_usage(programname);
		return -1;
	}

	if( (listenfd = socket_server_init(NULL, server_port)) < 0)
	{
    
		printf("ERROR: %s server listen on port %d failure\n", argv[0], server_port);
		return -2;
	}
	printf("%s server start to listen on port %d\n", programname, server_port);

	if(daemon_run)
	{
    
		daemon(0, 0);
	}

	for( i = 0; i < ARRAY_SIZE(fds_array); i++)
	{
    
		fds_array[i] = -1;//将数组里面的内容都设为-1,为什么不设为0,因为文件描述符也可能为0
	}
	fds_array[0] = listenfd;//刚开始只有fds_array【0】有内容listenfd
	
	for( ; ; )
	{
    
		FD_ZERO(&rdset);//清空rdset集合中的值
		for( i = 0; i < ARRAY_SIZE(fds_array); i++)
		{
    
			if(fds_array[i] < 0)//初始值为-1,<0表示还未被占用
			{
    
				continue;
			}
			maxfd = fds_array[i] >maxfd ? fds_array[i] : maxfd;//三目运算符
			FD_SET(fds_array[i], &rdset);//将数组中的描述符全部放到rdset集合中去,第一次将listenfd放到集合中去
		}

		rv = select(maxfd+1, &rdset, NULL, NULL, NULL);//不关心写,异常和超时;
		if(rv < 0)
		{
    
			printf("select get timeout");
			continue;
		}

		if( FD_ISSET(listenfd, &rdset) )//判断集合中是否有listenfd
		{
    
			if( (connfd = accept( listenfd, (struct sockaddr*)NULL, NULL)) < 0)
			{
    
				printf("accept new client failure:%s\n", strerror(errno));
				continue;
			}
			found = 0;
			for(i=0; i<ARRAY_SIZE(fds_array); i++)
			{
    
				if(fds_array[i] < 0)
				{
    
					printf("accept new client[%d] and add it into array\n", connfd);
					fds_array[i] = connfd;
					found =1;
					break;
				}
			}

			if( !found )
			{
    
				printf("accept new client[%d] but full, so refuse it\n", connfd);
				close(connfd);
			}
		}

		else/*data arrive from already connected client*/
		{
    
			for(i=0; i<ARRAY_SIZE(fds_array); i++)
			{
    
				if(fds_array[i]<0 || !FD_ISSET(fds_array[i], &rdset) )
				{
    
					continue;
				}
                memset(buf, 0, sizeof(buf));
				if( (rv = read(fds_array[i], buf, sizeof(buf))) <= 0 )
				{
    
					printf("socket[%d] read failure or get disconnect.\n", fds_array[i]);
					close(fds_array[i]);
					fds_array[i] = -1;
				}
				else
				{
    
					printf("socket[%d] read get %d bytes data:%s\n", fds_array[i], rv, buf);
					for(j=0; j<rv; j++)
					{
    
						buf[j] = toupper(buf[j]);
					}

					if( (write(fds_array[i], buf, rv)) < 0)
					{
    
						printf("socket[%d] write failure: %s\n", fds_array[i], strerror(errno));
						close(fds_array[i]);
						fds_array[i] = -1;
					}
				}
			}
		}
	}
Cleanup:
	close(listenfd);
	return 0;
}

static inline void print_usage(char *programname)//inline函数也称为内联函数或内嵌函数,_inline定义的类的内联函数,函数代码被放入符号调用表,使用时直接展开,不需要调用,即在编译期间将所调用的函数的代码直接嵌入到主调函数中,是一种以空间换时间的函数。如果不加static,则表示该函数有可能会被其他编译单元所调用,所以一定会产生函数本身的代码。所以加了static,一般可令可执行文件变小。
{
    
	printf("Usage: %s [OPTION]...\n", programname);
	printf("%s is a socket server program, which used to verify client and echo back string from it\n", programname);
	printf(" -b[daemon] set program running on background");
	printf(" -p[port] socket server port address\n");
	printf(" -h[help] Display this help information\n");

	printf("\nExample: %s -b -p 8900\n", programname);
		return ;
}

int socket_server_init( char *listen_ip, int listen_port)
{
    
	struct sockaddr_in     serveraddr;
	int      listenfd;
	int      rv = 0;
	int      on = 1;

	if( (listenfd = socket (AF_INET, SOCK_STREAM, 0))< 0)
	{
    
		printf("Use socket() to create a TCP socket failure: %s\n", strerror(errno));
		return -1;
	}
	setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));

	memset(&serveraddr, 0, sizeof(serveraddr));
	serveraddr.sin_family = AF_INET;
	serveraddr.sin_port = htons(listen_port);//主机字节序转换为网络字节序,s——short

	if( !listen_ip )
	{
    
		serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);//监听所有的IP
	}
	else
	{
    
		if( inet_pton(AF_INET, listen_ip, &serveraddr.sin_addr) <= 0 )//inet_pton函数原型如下[将"点分十进制" -> "整数"]
		{
    
			printf("inet_pton() set listen IP address failure.\n");
			rv = -2;
			goto Cleanup;
		}
	}

	if(bind(listenfd, (struct sockaddr *) &serveraddr, sizeof(serveraddr)) < 0 )
	{
    
		printf("Use bind() to bind the TCP socket failure: %s\n", strerror(errno));
		rv = -3;
		goto Cleanup;
	}

	if(listen(listenfd, 13) < 0)
	{
    
		printf("Use bind() to bind the TCP socket failure:%s\n", strerror(errno));
		rv = -4;
		goto Cleanup;
	}
Cleanup:
	if(rv < 0)
	{
    
		close(listenfd);
	}
	else
	{
    
		rv = listenfd;
	}
	return rv;
}

static inline void msleep(unsigned long ms)
{
    
	struct timeval   tv;
	tv.tv_sec = ms/1000;
	tv.tv_usec = (ms%1000)*1000;

	select(0, NULL, NULL, NULL, &tv);
}

通过tcp-test-tools软件进行连接,两个客户端都可连接并且发送消息不冲突。

 ./socket_server_select -p 12345
 
socket_server_select server start to listen on port 12345
accept new client[4] and add it into array
accept new client[5] and add it into array
socket[4] read get 12 bytes data:I am 4 hello
socket[5] read get 12 bytes data:I am 5 hello

在这里插入图片描述


2- poll多路复用

(1)工作原理

我们先讲解poll的工作原理,然后再用代码讲解进行实践。

poll()的机制与 select() 类似,与 select() 在本质上没有多大差别,管理多个描述符也是进行轮询,根据描述符的状态进行处理,但是 poll() 没有最大文件描述符数量的限制(但是数量过大后性能也是会下降)。poll() 和 select() 同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

函数原型:

#include <poll.h>
struct pollfd{
    
 int fd; /* 文件描述符 */
 short events; /* 等待的事件 */
 short revents; /* 实际发生了的事件 */
} ;
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
  • nfds:
    指定数组中监听的元素个数;
  • timeout:
    指定等待的毫秒数,无论I/O是否准备好,poll都会返回。timeout指定为负数值表示无限超时,使poll()一直挂起直到一个指定事件发生;timeout为0指示poll调用立即返回并列出准备好I/O的文件描述符,但并不等待其它的事件。
  • fds:
    用来指向一个struct pollfd类型的数组,每一个pollfd结构体指定了一个被监视的文件描述符,指示poll()监视多个文件描述符。每个结构体的events域是监视该文件描述符的事件掩码,由用户来设置这个域。revents域是文件描述符的操作结果事件掩码,内核在调用返回时设置这个域,events域中请求的任何事件都可能在revents域中返回。下表列出指定 events 标志以及测试 revents 标志的一些常值:

在这里插入图片描述
返回值注意事项:
该函数成功调用时,poll()返回结构体中revents域不为0的文件描述符个数;如果在超时前没有任何事件发生,poll()返回0;失败时,poll()返回-1,并设置errno为下列值之一:

  • EBADF:一个或多个结构体中指定的文件描述符无效。
  • EFAULTfds:指针指向的地址超出进程的地址空间。
  • EINTR:请求的事件之前产生一个信号,调用可以重新发起。
  • EINVALnfds:参数超出PLIMIT_NOFILE值。
  • ENOMEM:可用内存不足,无法完成请求。

(2)poll-服务器编程

socket_server_poll.c服务器代码:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <ctype.h>
#include <time.h>
#include <pthread.h>
#include <getopt.h>
#include <libgen.h>
#include <sys/types.h> 
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <poll.h>

#define ARRAY_SIZE(x) (sizeof(x)/sizeof(x[0]))

static inline void print_usage(char *progname);
int socket_server_init(char *listen_ip, int listen_port);

int main(int argc, char **argv)
{
    
    int listenfd, connfd;
    int serv_port = 0;
    int daemon_run = 0;
    char *progname = NULL;
    int opt;
    int rv;
    int i, j;
    int found;
    int max;
    char buf[1024];
    struct pollfd fds_array[1024];
    struct option long_options[] =
    {
     
        {
    "daemon", no_argument, NULL, 'b'},
        {
    "port", required_argument, NULL, 'p'},
        {
    "help", no_argument, NULL, 'h'},
        {
    NULL, 0, NULL, 0}
    }; 
    progname = basename(argv[0]);
    /* Parser the command line parameters */
    while ((opt = getopt_long(argc, argv, "bp:h", long_options, NULL)) != -1)
    {
     
        switch (opt)
        {
     
            case 'b':
                daemon_run=1;
                break;
            case 'p':
                serv_port = atoi(optarg);
                break;
            case 'h': /* Get help information */
                print_usage(progname);
                return EXIT_SUCCESS;
            default:
                break;
        } 
    } 
    if( !serv_port )
    {
     
        print_usage(progname);
        return -1;
    }
    if( (listenfd=socket_server_init(NULL, serv_port)) < 0 )
    {
    
        printf("ERROR: %s server listen on port %d failure\n", argv[0],serv_port);
        return -2;
    }
    printf("%s server start to listen on port %d\n", argv[0],serv_port);
    /* set program running on background */
    if( daemon_run )
    {
    
        daemon(0, 0);
    }
    for(i=0; i<ARRAY_SIZE(fds_array) ; i++)
    {
    
        fds_array[i].fd=-1;
    }
    fds_array[0].fd = listenfd;
    fds_array[0].events = POLLIN;
    max = 0;
    for ( ; ; )
    {
    
        /* program will blocked here */
        rv = poll(fds_array, max+1, -1);
        if(rv < 0)
        {
    
            printf("select failure: %s\n", strerror(errno));
            break;
        }
        else if(rv == 0)
        {
    
            printf("select get timeout\n");
            continue;
        }
        /* listen socket get event means new client start connect now */
        if (fds_array[0].revents & POLLIN)
        {
    
            if( (connfd=accept(listenfd, (struct sockaddr *)NULL, NULL)) < 0)
            {
    
                printf("accept new client failure: %s\n", strerror(errno));
                continue;
            }
            found = 0;
            for(i=1; i<ARRAY_SIZE(fds_array) ; i++)
            {
    
                if( fds_array[i].fd < 0 )
                {
    
                    printf("accept new client[%d] and add it into array\n", connfd );
                    fds_array[i].fd = connfd;
                    fds_array[i].events = POLLIN;
                    found = 1;
                    break;
                }
            }
            if( !found )
            {
    
                printf("accept new client[%d] but full, so refuse it\n", connfd);
                close(connfd);
                continue;
            }

            max = i>max ? i : max;
            if (--rv <= 0)
                continue;
        }
        else /* data arrive from already connected client */
        {
    
            for(i=1; i<ARRAY_SIZE(fds_array); i++)
            {
    
                memset(buf, 0, sizeof(buf));
                if( fds_array[i].fd < 0 )
                    continue;
                if( (rv=read(fds_array[i].fd, buf, sizeof(buf))) <= 0)
                {
    
                    printf("socket[%d] read failure or get disconncet.\n", fds_array[i].fd);
                    close(fds_array[i].fd);
                    fds_array[i].fd = -1;
                }
                else
                {
    
                    printf("socket[%d] read get %d bytes data: %s\n", fds_array[i].fd, rv, buf);
                    /* convert letter from lowercase to uppercase */
                    for(j=0; j<rv; j++)
                        buf[j]=toupper(buf[j]);
                    if( write(fds_array[i].fd, buf, rv) < 0 )
                    {
    
                        printf("socket[%d] write failure: %s\n", fds_array[i].fd, strerror(errno));
                        close(fds_array[i].fd);
                        fds_array[i].fd = -1;
                    }
                }
            }
        }
    }
CleanUp:
    close(listenfd);
    return 0;
}
static inline void print_usage(char *progname)
{
    
    printf("Usage: %s [OPTION]...\n", progname);

    printf(" %s is a socket server program, which used to verify client and echo back string from it\n",
            progname);
    printf("\nMandatory arguments to long options are mandatory for short options too:\n");

    printf(" -b[daemon ] set program running on background\n");
    printf(" -p[port ] Socket server port address\n");
    printf(" -h[help ] Display this help information\n");

    printf("\nExample: %s -b -p 8900\n", progname);
    return ;
}
int socket_server_init(char *listen_ip, int listen_port)
{
    
    struct sockaddr_in servaddr;
    int rv = 0;
    int on = 1;
    int listenfd;
    if ( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
    {
    
        printf("Use socket() to create a TCP socket failure: %s\n", strerror(errno));
        return -1;
    }
    /* Set socket port reuseable, fix 'Address already in use' bug when socket server restart */
    setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET; 
    servaddr.sin_port = htons(listen_port);
    if( !listen_ip ) /* Listen all the local IP address */
    {
    
        servaddr.sin_addr.s_addr = htonl(INADDR_ANY); 
    }
    else /* listen the specified IP address */
    {
    
        if (inet_pton(AF_INET, listen_ip, &servaddr.sin_addr) <= 0)
        {
    
            printf("inet_pton() set listen IP address failure.\n");
            rv = -2;
            goto CleanUp;
        }
    }
    if(bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0)
    {
    
        printf("Use bind() to bind the TCP socket failure: %s\n", strerror(errno));
        rv = -3;
        goto CleanUp;
    }
    if(listen(listenfd, 13) < 0)
    {
    
        printf("Use bind() to bind the TCP socket failure: %s\n", strerror(errno));
        rv = -4;
        goto CleanUp;
    }
CleanUp:
    if(rv<0)
        close(listenfd);
    else
        rv = listenfd;
    return rv;
}

通过tcp-test-tools软件进行连接,两个客户端都可连接并且发送消息不冲突。

./socket_server_poll -p 12345

./socket_server_poll server start to listen on port 12345
accept new client[4] and add it into array
accept new client[5] and add it into array
socket[4] read get 12 bytes data: I am 4 hello
socket[5] read get 12 bytes data: I am 5 hello
socket[4] read get 18 bytes data: I am 4 hello hello
socket[5] read get 18 bytes data: I am 5 hello hello

在这里插入图片描述


3- epoll多路复用

(1)工作原理

我们先讲解epoll的工作原理,然后再用代码讲解进行实践。

在linux 没有实现epoll事件驱动机制之前,我们一般选择用select或者poll等IO多路复用的方法来实现并发服务程序。自Linux 2.6内核正式引入epoll以来,epoll已经成为了目前实现高性能网络服务器的必备技术,在大数据、高并发、集群等一些名词唱得火热之年代,select和poll的用武之地越来越有限,风头已经被epoll占尽。

epoll的设计和实现与select完全不同。epoll通过在Linux内核中申请一个简易的文件系统,把原先的select/poll调用分成了3个部分(三个函数):
创建epoll实例:epoll_create()

#include <sys/epoll.h>
int epoll_create(int size);

系统调用epoll_create()创建了一个新的epoll实例,其对应的兴趣列表初始化为空。若成功返回文件描述符,若出错返回-1。参数size指定了我们想要通过epoll实例来检查的文件描述符个数。该参数并不是一个上限,而是告诉内核应该如何为内部数据结构划分初始大小。
作为函数返回值,epoll_create()返回了代表新创建的epoll实例的文件描述符。这个文件描述符在其他几个epoll系统调用中用
来表示epoll实例。当这个文件描述符不再需要时,应该通过close()来关闭。

修改epoll的兴趣列表:epoll_ctl()

#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev);

系统调用epoll_ctl()能够修改由文件描述符epfd所代表的epoll实例中的兴趣列表。若成功返回0,若出错返回-1。

  • epfd:
    第一个参数epfd是epoll_create()的返回值;

  • op:
    第二个参数op用来指定需要执行的操作,它可以是如下几种值:
    在这里插入图片描述

  • fd:
    第三个参数fd指明了要修改兴趣列表中的哪一个文件描述符的设定。

  • ev:
    第四个参数ev是指向结构体epoll_event的指针,结构体的定义如下:

typedef union epoll_data{
    
 void *ptr; /* Pointer to user-defind data */
 int fd; /* File descriptor */
 uint32_t u32; /* 32-bit integer */
 uint64_t u64; /* 64-bit integer */
} epoll_data_t;

struct epoll_event{
    
 uint32_t events; /* epoll events(bit mask) */
 epoll_data_t data; /* User data */
};

事件等待:epoll_wait()

#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *evlist, int maxevents, int timeout);

系统调用epoll_wait()返回epoll实例中处于就绪态的文件描述符信息,单个epoll_wait()调用能够返回多个就绪态文件描述符的信息。调用成功后epoll_wait()返回数组evlist中的元素个数,如果在timeout超时间隔内没有任何文件描述符处于就绪态的话就返回0,出错时返回-1并在errno中设定错误码以表示错误原因。

  • epfd:
    第一个参数epfd是epoll_create()的返回值;

  • evlist:
    第二个参数evlist所指向的结构体数组中返回的是有关就绪态文件描述符的信息,数组evlist的空间由调用者负责申请;

  • maxevents:
    第三个参数maxevents指定所evlist数组里包含的元素个数;

  • timeout
    第四个参数timeout用来确定epoll_wait()的阻塞行为,有如下几种:

    timeout = -1:调用将一直阻塞,直到兴趣列表中的文件描述符上有事件产生或者直到捕获到一个信号为止。
    timeout =  0:执行一次非阻塞式地检查,看兴趣列表中的描述符上产生了哪个事件。
    timeout >  0:调用将阻塞至多timeout毫秒,直到文件描述符上有事件发生,或者直到捕获到一个信号为止。  
    

(2)select-服务器编程

socket_server_epoll.c服务器代码:

#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<unistd.h>
#include<errno.h>
#include<ctype.h>
#include<time.h>
#include<pthread.h>
#include<getopt.h>
#include<libgen.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<sys/epoll.h>
#include<sys/resource.h>

#define MAX_EVENTS         512
#define  ARRAY_SIZE(x)      ( sizeof(x)/sizeof(x[0]) )//数组的大小/第一个元素的大小=元素的个数

static inline void print_usage(char *programname);
int socket_server_init( char *listen_ip, int listen_port);
void set_socket_rlimit(void);

int main(int argc, char *argv[])
{
    
	char     *programname = NULL;
	int        server_port = 0;
	int        daemon_run = 0;
	int        found;
	int        listenfd,connfd;
	int        opt;
	int        rv;
	int        i,j;
	char     buf[1024];

	int       epollfd;
	struct  epoll_event    event;
	struct  epoll_event    event_array[MAX_EVENTS];
	int       events;

	struct option long_option[] =
	{
    
		{
    "daemon", no_argument, NULL, 'b'},
		{
    "port", required_argument, NULL, 'p'},
		{
    "help", no_argument, NULL, 'h'},
		{
    NULL, 0, NULL, 0}
	};

	programname = basename(argv[0]);

	while( (opt = getopt_long(argc, argv, "bp:h", long_option, NULL))  != -1)
	{
    
		switch(opt)
		{
    
			case 'b':
				daemon_run = 1;
				break;

			case 'p':
				server_port = atoi(optarg);
				break;

			case 'h':
				print_usage(programname);
				return EXIT_SUCCESS;//EXIT_SUCCESS  successful execution of a
						    //program (程序的成功执行)   EXIT_FAILUEE  unsuccessful execution of a program (程序的不成功执行) 

			default:
				break;//default只有在case匹配失败的时候才会执行,位置没关系
		}
	}

	if( !server_port )
	{
    
		print_usage(programname);
		return -1;
	}

	set_socket_rlimit();

	if( (listenfd = socket_server_init(NULL, server_port)) < 0)
	{
    
		printf("ERROR: %s server listen on port %d failure\n", argv[0], server_port);
		return -2;
	}
	printf("%s server start to listen on port %d\n", argv[0], server_port);

	if(daemon_run < 0)
	{
    
		daemon(0, 0);
	}

	if( (epollfd = epoll_create(MAX_EVENTS)) < 0 )
			/*listen socket get event means new client start connect now*/

	{
    
		printf("epoll_create() failure: %s\n", strerror(errno));
		return -3;
	}
	event.events = EPOLLIN;
	event.data.fd = listenfd;

	if( epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &event) < 0 )
	{
    
		printf("epoll:%d,listenfd:%d\n", epollfd, listenfd);
		printf("epoll add listen socket failure: %s\n", strerror(errno));
		return -4;
	}

	for( ; ; )
	{
    
		/*program will broked here*/
		events = epoll_wait( epollfd, event_array, MAX_EVENTS, -1);
		if( events < 0 )
		{
    
			printf("epoll failure: %s\n", strerror(errno));
			break;
		}
		else if( events == 0)
		{
    
			printf("epoll get timeout\n");
			continue;
		}

		for( i=0; i<events; i++)
		{
    
			if( (event_array[i].events&EPOLLERR) || (event_array[i].events&EPOLLHUP) )
			{
    
				printf("epoll_wait get error on fd[%d]: %s\n", event_array[i].data.fd, strerror(errno) );
				epoll_ctl(epollfd, EPOLL_CTL_DEL, event_array[i].data.fd, NULL);
				close(event_array[i].data.fd);
			}

			/*listen socket get event means new client start connect now*/
			if( event_array[i].data.fd == listenfd )
			{
    
				if( (connfd=accept(listenfd, (struct sockaddr *)NULL, NULL ) )< 0 )
				{
    
					printf("accept new client failure:%s\n", strerror(errno) );
					continue;
				}
				event.data.fd = connfd;
				event.events = EPOLLIN;
				if( epoll_ctl( epollfd, EPOLL_CTL_ADD, connfd, &event) < 0 )
				{
    
					printf("epoll add client socket failure: %s\n", strerror(errno) );
					close(event_array[i].data.fd);
					continue;
				}
				printf("epoll add new client socket[%d] ok.\n",connfd);
			}

			else
			{
    
                memset(buf, 0, sizeof(buf));
				if( (rv = read(event_array[i].data.fd, buf, sizeof(buf))) < 0 )
				{
    
					printf("socket[%d] read failure or get disconnected and will be removed.\n", event_array[i].data.fd);
					epoll_ctl(epollfd, EPOLL_CTL_DEL, event_array[i].data.fd, NULL);
					close(event_array[i].data.fd);
					continue;
				}

				else
				{
    
					printf("socket[%d] read get %d bytes data: %s\n", event_array[i].data.fd, rv, buf);

					for(j=0; j<rv; j++)
					{
    
						buf[j] = toupper(buf[j]);
					}
					if(write(event_array[i].data.fd, buf, rv) < 0)
					{
    
						printf("socket[%d] write failure: %s\n", event_array[i].data.fd, strerror(errno) );
						epoll_ctl(epollfd, EPOLL_CTL_DEL, event_array[i].data.fd, NULL);
						close(event_array[i].data.fd);
					}
				}
			}
		}
	}	
CleanUp:
	close(listenfd);
	return 0;
}

static inline void print_usage(char *programname)
{
    
	printf("Usage: %s [OPTION]...\n", programname);
	printf("%s is a socket server program, which used to verify client and echo back string from it\n", programname);
	printf(" -b[daemon] set program running on background");
	printf(" -p[port] socket server port address\n");
	printf(" -h[help] Display this help information\n");

	printf("\nExample: %s -b -p 8900\n", programname);
	return ;
}

int socket_server_init( char *listen_ip, int listen_port)
{
    
	struct sockaddr_in     serveraddr;
	int      listenfd;
	int      rv = 0;
	int      on = 1;

	if( (listenfd = socket (AF_INET, SOCK_STREAM, 0))< 0)
	{
    
		printf("Use socket() to create a TCP socket failure: %s\n", strerror(errno));
		return -1;
	}
	setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));

	memset(&serveraddr, 0, sizeof(serveraddr));
	serveraddr.sin_family = AF_INET;
	serveraddr.sin_port = htons(listen_port);//主机字节序转换为网络字节序,s——short

	if( !listen_ip )
	{
    
		serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);//监听所有的IP
	}
	else
	{
    
		if( inet_pton(AF_INET, listen_ip, &serveraddr.sin_addr) <= 0 )//inet_pton函数原型如下[将"点分十进制" -> "整数"]
		{
    
			printf("inet_pton() set listen IP address failure.\n");
			rv = -2;
			goto Cleanup;
		}
	}

	if(bind(listenfd, (struct sockaddr *) &serveraddr, sizeof(serveraddr)) < 0 )
	{
    
		printf("Use bind() to bind the TCP socket failure: %s\n", strerror(errno));
		rv = -3;
		goto Cleanup;
	}

	if(listen(listenfd, 13) < 0)
	{
    
		printf("Use bind() to bind the TCP socket failure:%s\n", strerror(errno));
		rv = -4;
		goto Cleanup;
	}

Cleanup:
	if(rv < 0)
	{
    
		close(listenfd);
	}
	else
	{
    
		rv = listenfd;
	}
	return rv;
}

void set_socket_rlimit(void)
{
    
	struct rlimit limit = {
    0};

	getrlimit(RLIMIT_NOFILE, &limit);
	limit.rlim_cur = limit.rlim_max;
	setrlimit(RLIMIT_NOFILE, &limit);

	printf("set socket open fd max count to %ld\n", limit.rlim_max);
}

通过tcp-test-tools软件进行连接,两个客户端都可连接并且发送消息不冲突。

./socket_server_epoll -p 12345

set socket open fd max count to 1048576
./socket_server_epoll server start to listen on port 12345
epoll add new client socket[5] ok.
socket[5] read get 24 bytes data: I am 5 hello hello hello
epoll add new client socket[6] ok.
socket[6] read get 24 bytes data: I am 6 hello hello hello

在这里插入图片描述


4- epoll工作原理底层介绍

(1)工作机制

epoll的实现机制与select/poll机制完全不同。

epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。

epoll除了提供select/poll那种IO事件的水平触发(Level Triggered)外,还提供了边缘触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。

  • LT(level triggered)水平触发:是缺省的工作方式。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。
  • ET (edge-triggered)边缘触发:是高速工作方式。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)。

设想一下如下场景:有100万个客户端同时与一个服务器进程保持着TCP连接。而每一时刻,通常只有几百上千个TCP连接是活跃的(事实上大部分场景都是这种情况)。如何实现这样的高并发?在select/poll时代,服务器进程每次都把这100万个连接告诉操作系统(从用户态复制句柄数据结构到内核态),让操作系统内核去查询这些套接字上是否有事件发生,轮询完后,再将句柄数据复制到用户态,让服务器应用程序轮询处理已发生的网络事件,这一过程资源消耗较大,因此,select/poll一般只能处理几千的并发连接。

(2)epoll工作图解

在这里插入图片描述

  1. epollfd = epoll_create()内核创建epoll实例(创建红黑树Red_Tree和就绪链表Read_List);
  2. epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &event)对红黑树进行操作,添加listenfd节点;
  3. events = epoll_wait( epollfd, event_array, MAX_EVENTS, -1)内核查找红黑树中就绪的socket,放入就绪列表;然后就绪列表将类容复制到struct epoll_event List数组中;
  4. 然后对数组中需要处理的socket直接进行判断读写操作。

至于红黑树具体是什么,怎么实现的,等我弄懂了再分享…好吧还是去看看吧,但是增删改查太麻烦了,有点难受,先理解一下浅层面的吧。

真心想了解的可以参考这篇文章:【数据结构】史上最好理解的红黑树讲解,让你彻底搞懂红黑树

(3)红黑树浅层理解

我们一步一步探索了解一下红黑树…

二叉搜索树
一棵空树或者满足以下性质的二叉树被称之为二叉搜索树:

  • 如果左子树不为空,则左子树所有结点值都小于根结点的值
  • 如果右子树不为空,则右子树所有结点值都大于或等于根结点的值
  • 任意一棵子树也是一棵二叉搜索树

在这里插入图片描述
但是极端情况下的搜索效率很低,于是就有了平衡二叉树(AVL数)

平衡二叉树(AVL数)
平衡树:任意节点的子树的高度差都小于等于1;

平衡二叉树:满足二叉树以及平衡树的特点,就是两者的结合体嘛。

平衡二叉树可以有效的减少二叉树的深度,从而提高了查询。但是效率严格平衡,代价高,还是不推荐。

红黑树(Red Black Tree R-B Tree)
一种特化的平衡二叉树(AVL树),在插入和删除时通过特定操作保持二叉查找树的相对平衡,从而获得较高的查找性能。
在这里插入图片描述
符合二叉树的基本特征,同时还具备以下特点:

  • 节点非黑即红 (每个节点要么是黑色,要么是红色)
  • 其根节点是黑色
  • 叶子节点是黑色(为了简单期间,一般会省略该节点)
  • 相邻节点不同为红色(红色节点的子节点必是黑色)
  • 从一个节点到该节点的下叶子节点的所有路径上包含的黑节点数量相等(这一点是平衡的关键)

记忆方法:黑根黑叶红不邻,同祖等高只数黑


版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/m0_51429770/article/details/129364697

智能推荐

字节流和字符流详解-程序员宅基地

文章浏览阅读2.6w次,点赞73次,收藏313次。1.流的概念在编程中是一种抽象的概念,就好比“水流”,从一段流向另一端在程序中所有的数据都是以流的方式进行传输或保存的,程序需要数据的时候要使用输入流读取数据,而当程序需要将一些数据保存起来的时候,就要使用输出流完成。程序中的输入输出都是以流的形式保存的,流中保存的实际上全都是字节文件。2.流的分类按照流向可以分为:输入流(如:键盘,麦克风),输出流(如:显示器,音箱)按照传输单位可以分为:字节流和字符流3.什么是字节流,什么是字符流字节流: 它处理单元为1个字节(byte),操作字节和_字节流和字符流

问题 A: 算法7-12:有向无环图的拓扑排序_读入一个无向图的邻接矩阵(即数组表示),建立无向图-程序员宅基地

文章浏览阅读2.2k次,点赞3次,收藏9次。问题 A: 算法7-12:有向无环图的拓扑排序 时间限制: 1 Sec 内存限制: 32 MB 献花..._读入一个无向图的邻接矩阵(即数组表示),建立无向图

各平台Linux以及windows安装redis6.0.6_redis 6.0 window版本升级-程序员宅基地

文章浏览阅读5.6k次。官网:https://redis.io/中文网:http://www.redis.cn/项目地址:https://github.com/redis/redisLinux关于Linux下手动编译一定要注意gcc版本gcc --version,如果太低可能导致编译失败,升级gcc版本就可以了centos安装前的准备工作# 安装gcc套件yum install cpp binutils glibc glibc-kernheaders glibc-common glibc-devel gcc _redis 6.0 window版本升级

sysctl优化linux网络_net.ipv4.conf.eth0.accept_redirects-程序员宅基地

文章浏览阅读466次。1, 优化网络设备接收队列 net.core.netdev_max_backlog=3000 该文件表示在每个网络接口接收数据包的速率比内核处理这些包的速率快时,允许送到队列的数据包的最大数目。 默认值:Red Hat Linux release 9 (Shrike)默认为300 rhel5 默认为1000 建议值为30002,_net.ipv4.conf.eth0.accept_redirects

基于灰狼优化深度置信网络(GWO-DBN)的数据分类预测,优化参数为隐藏层节点数目,迭代次数,学习率。多特征输入单输出的二分类及多分类模型。程序内注释详细,直接替换数据就可以用。程序语言为ma_灰狼优化bp数据分类-程序员宅基地

文章浏览阅读76次。基于灰狼优化深度置信网络(GWO-DBN)的数据分类预测,优化参数为隐藏层节点数目,迭代次数,学习率。多特征输入单输出的二分类及多分类模型。程序内注释详细,直接替换数据就可以用。程序语言为matlab,程序可出分类效果图,迭代优化图,混淆矩阵图。_灰狼优化bp数据分类

CPP编译流程知识点_cpp文件如何编译-程序员宅基地

文章浏览阅读1k次,点赞18次,收藏18次。介绍cpp编译流程_cpp文件如何编译

随便推点

imp导入IMP-00098: INTERNAL ERROR: impccr2_字符集_部分存储过程创建不了-程序员宅基地

文章浏览阅读9.1k次。Metalink 说Symptom(s) ~~~~~~~~~~ Export from a V734 database, while importing in V920 database gave imp-98 error: IMP-00098: INTERNAL ERROR: impccr2 Cause ~~~~~~~Imp-98 errors were g_imp-00098: internal error: impccr2

指令计数器--Program counter-程序员宅基地

文章浏览阅读3.1k次,点赞2次,收藏3次。别名:指令指针、指令地址寄存器、程序计数器;操作:顺序操作(计数器加一)、分支操作(计数器修改);Theprogram counter(PC), commonly called theinstruction pointer(IP) inIntelx86andItaniummicroprocessors, and sometimes called theinst..._program counter

魅蓝note5 Flyme6.3.0.2A root教程-程序员宅基地

文章浏览阅读2.3k次。下载系统zip卡刷包版本:Flyme 6.3.0.2AMD5:c309932d888642a34d634453b9e14fb1文件大小:1430MB发布时间:2018-02-11开发者:Flyme问题改进• 系统修复相机花屏现象解决截图音效失效问题解决部分机型振动异常的问题解决在支付宝界面卡死的问题解决部分场景下Flyme重启的问题解决微信后台无法收到消息的问题解决部分..._flyme 5.1.8.0a root的方法

java h5实现视频播放_Springboot项目使用html5的video标签完成视频播放功能-程序员宅基地

文章浏览阅读6k次。文件的上传与下载会另外再写一篇博客,本篇博客只是记录视频播放功能的实现过程1.首先引入pom文件: pom.xmlxsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">4.0.0org.springframework.bootspring-boot-..._java h5播放yuv

jenkins详解-程序员宅基地

文章浏览阅读10w+次,点赞57次,收藏643次。Jenkins是一个功能强大的应用程序,允许持续集成和持续交付项目,无论用的是什么平台。这是一个免费的源代码,可以处理任何类型的构建或持续集成。集成Jenkins可以用于一些测试和部署技术。Jenkins是一种软件允许持续集成。_jenkins

asp控件Repeater运用-程序员宅基地

文章浏览阅读91次。双层repeater嵌套 <asp:Repeater ID="rpt_dataRepeatgroup" runat="server" OnItemDataBound="rpt_dataRepeatgroup_ItemDataBound"> <HeaderTemplate> ..._"