传送文件描述符是高并发网络服务编程的一种常见实现方式。Nebula 高性能通用网络框架即采用了UNIX域套接字传递文件描述符设计和实现
https://github.com/Bwar/Nebula
在Unix 域套接字概述一节中介绍了什么是 Unix 及相关函数,本文将继续介绍 Unix 域套接字在进程间传递描述符的应用。
在进程间传递打开的描述符时通常会采用如下两种方法:
(1)fork 调用返回后,子进程自动共享父进程的所有打开的描述符。
(2)exec 调用执行后,所有描述符通常保持打开状态不变。
第一种方法中,进程先打开一个描述符,再调用 fork,之后父进程关闭这个描述符,子进程则处理该描述符。这样一个打开的描述符就从父进程传递到子进程。不过有时候可能想让子进程打开一个描述符并把他传递给父进程。
使用 Unix 域套接字,可以从一个进程向任一其他进程传递打开的描述符,而无需这两个进程之间存在亲缘关系。这种技术要求首先在这两个进程之间创建一个 Unix 域套接字,然后使用 sendmsg 跨这个套接字发送一个特殊消息。该消息由内核来专门处理,会把打开的描述符从发送进程传递到接收进程。
使用 Unix 域套接字在两个进程之间传递描述符涉及的步骤如下。
(1)创建一个字节流或数据报的 Unix 域套接字。如果目标是让子进程将打开的描述符传递回父进程,则父进程可以预先调用 socketpair 函数创建一个可用于在父子进程之间交换描述符的流管道。如果进程间没有亲缘关系,则服务器进程必须先创建一个 Unix 域字节流套接字(也可以是 Unix 域数据报套接字,不过这没什么好处,而且数据报还存在被丢弃的可能性),然后 bind 一个路径名到该套接字,以允许客户进程 connect 到套接字,发送一个打开某个描述符的请求。
(2)发送进程通过调用返回描述符的任一 Unix 函数(如 open、pipe、mkfifo、socket 和 accept,可以在进程之间传递的描述符不限类型)打开一个描述符。
(3)发送进程创建一个 msghdr 结构,其中含有待传递的描述符。POSIX 规定描述符作为辅助数据(msghdr 结构的 msg_control 成员,见辅助数据)发送。发送进程调用 sendmsg 跨来自步骤 1 的 Unix 域套接字发送该描述符。至此,称这个描述符“在飞行中(in flight)”。即使发送进程在调用 sendmsg 之后但在接收进程调用 recvmsg 之前关闭了该描述符,对于接收进程它仍然保持打开状态。发送一个描述符会使该描述符的引用计数加一。
(4)接收进程调用 recvmsg 在来自步骤 1 的 Unix 域套接字上接收这个描述符。这个描述符在接收进程中的描述符号不同于它在发送进程中的描述符号是正常的,因为传递一个描述符并不是传递一个描述符号,而是涉及在接收进程中创建一个新的描述符,该描述符和发送进程中飞行前的那个描述符指向内核中相同的文件表项。
参考文献:1) 《Unix网络编程》
2) http://book.51cto.com/art/200912/168560.htm
最近学习了使用Unix域套接字在进程间传递文件描述符,仿照参考资料,自己也写了简单的程序来实践这种技术。
其他不多说了,具体理论知识参见参考资料,开始我自己的程序介绍(在OpenSolaris 2009.06平台上测试):
1 程序作用说明:父进程,子进程以及另外一个进程向同一个文件的文件描述符向这个文件中写内容。
具体如下:
1)父进程指定要打开的文件名,打开权限,打开模式;
2)fork一个子进程;
3)子进程调用execl函数来执行程序openfile:该新程序取得指定文件的文件描述符;向指定文件中写入“openfileprog write test”;向父进程返回该文件描述符;
4)父进程收到该文件描述符后,向文件中写“paraent process write ppp”;
5)父进程作为server端建立域socket等待客户端进程连接;
6)客户端进程连接父进程;
7)父进程向该客户端进程返回从子进程得到的文件描述符;
8)客户端进程收到该文件描述符后使用它在文件中写“this is client process ccc”;
其中父子进程传递文件描述符通过建立的一对套接字管道,父进程和客户端进程传递文件描述符通过Unix域套接字。
2 具体代码说明:
1) 首先看openfile程序,这时子进程通过调用execl执行的。调用方法如下:
execl(“./openfileprog”, “openfileprog”, permit, mode, argsockfd, (char *)NULL);
其中参数1: openfile程序路径;
参数2: openfile程序名;
参数3: 待打开文件的权限;
参数4: 待打开文件模式;
参数5: 父进程建立的一对套接字管道的其中之一;
作为openfile程序,主要按照execl传的参数,打开指定文件,取得文件描述符;向该文件中写入内容;然后调用func_send_fd函数通过argsockfd把取得的文件描述符传给父进程。该程序代码如下:
int main(int argc, char argv[]) / openfileprog /
{
int i, fd, ret;
ssize_t size;
size_t buflen;
char data[10];
char buf[] = “openfileprog write test\n”; / 向文件中写入的内容 /
/ execl(“./openfileprog”, permit, mode, argsockfd, (char *)NULL); */
fd = -1;
if((fd = open(“./file”, atoi(argv[1]), atoi(argv[2]))) < 0)
{
printf(“in openfileprog, open failed\n”);
exit(-1);
}
size = -1;
buflen = sizeof(buf);
if((size = write(fd, buf, buflen)) <= 0)
{
printf(“in openfileprog, write failed\n”);
}
/* 把设定的data信息也传给父进程 */
ret = ‘a’;
for(i = 0; i < sizeof(data); i++, ret++)
{
data[i] = ret;
}
data[sizeof(data) - 1] = ‘\0’;
ret = -1;
if(0 > (ret = func_send_fd(atoi(argv[3]), fd, data, 10)))
{
printf(“in openfileprog, func_send_fd failed\n”);
}
close(fd);
return 0;
}
func_send_fd函数负责把取得的文件描述符传出去:
int func_send_fd(int send_sock, int send_fd, void *data, int bytes)
{
struct msghdr msghead;
struct iovec passdata[1];
int ret;
/* 填充msghead结构 */
msghead.msg_accrights = (caddr_t)&send_fd;
msghead.msg_accrightslen = sizeof(send_fd);
msghead.msg_name = NULL;
msghead.msg_namelen = 0;
passdata[0].iov_base = data;
passdata[0].iov_len = bytes;
msghead.msg_iov = passdata;
msghead.msg_iovlen = 1;
/* 发送信息 */
if(0 > (ret = sendmsg(send_sock, &msghead, 0)))
{
printf(“in func_send, send_fd is %d, sendsock is %d, sendmsg failed,errno is %d\n”, send_fd,send_sock,errno);
return -1;
}
return ret;
}
在上述两个函数之前,加上以下必要头文件和宏:
#include
#include
#include
#include
#include
#include
#include
#define SLEEPTIME 3
#define ARGLEN 20
以上作为一个c文件。
2)然后看父进程代码
下面是父进程程序:
int main(int argc, char *argv)
{
int status,sockfd[2];
char permit[ARGLEN];
char mode[ARGLEN];
char argsockfd[ARGLEN];
int recvfd;
char data[20];
int bytes;
int ret,i;
ssize_t size;
int buflen;
pid_t pid,chldpid;
/* 以下几行是使用域套接字必要变量 */
int fdsock, fdaccept;
struct sockaddr_un addr_server;
int len;
const char path[] = “/export/home/temp/test/other_prog/fengxianzhong”;
/* 以下是父进程写入文件的内容 */
char buf[] = “paraent process write ppp\n”;
/* 父进程同时向处理向client发送的数据 */
char datasend[] = “send by myopen\n”;
memset(permit, ‘\0’, sizeof(permit));
memset(mode, ‘\0’, sizeof(mode));
memset(argsockfd, ‘\0’, sizeof(argsockfd));
memset(data, ‘\0’, sizeof(data));
printf(“now it is parent process,now will fork a child process\n”);
sleep(SLEEPTIME);
/* 设置文件权限和打开模式 */
snprintf(permit, sizeof(permit), “%d”,PERMIT);
snprintf(mode, sizeof(mode), “%d”,MODE);
// printf(“in myopen %s, %s\n”, permit, mode);
/* 建立和子进程通信的socket套接字管道 */
ret = socketpair(AF_UNIX,SOCK_STREAM,0,sockfd);
if(0 > ret)
{
printf(“socketpair failed,errno is %d \n”,errno);
}
/* fork 子进程 /
if(0 == (chldpid = fork())) / child process */
{
printf(“now it is child process, sendsock is %d\n”,sockfd[1]);
close(sockfd[0]);
snprintf(argsockfd, sizeof(argsockfd), “%d”, sockfd[1]);
/* 子进程中执行新程序openfile */
execl(“./openfileprog”, “openfileprog”, permit, mode, argsockfd, (char *)NULL);
printf(“execl failed, perimit is %s, mode is %s\n”,permit, mode);
exit(-1);
}
/* paraent process start to write the file opened by child process */
printf(“now it is parent process\n”);
close(sockfd[1]);
bytes = sizeof(data);
/* 等待子进程结束 /
pid = wait(&status);
if((status = WEXITSTATUS(status)) == 0) / child process terminate */
{
printf(“child %d process terminate,now parent will write file …\n”,pid);
}
/* 从子进程取得文件描述符 /
recvfd = -1;
// printf(“recv sock is %d\n”, sockfd[0]);
ret = func_recv_fd(sockfd[0], &recvfd, data, bytes);
if(ret < 0)
{
printf(“paraent recv failed\n”);
}
/
else
{
printf(“fd %d paraent recv %d bytes data is %s\n”, recvfd,strlen(data),data);
}
*/
/* 向文件写入数据 */
size = -1;
buflen = sizeof(buf);
if((size = write(recvfd, buf, buflen)) <= 0)
{
printf(“in openfileprog, write failed\n”);
}
/* 父进程作为server建立域套接字,等待client连接 */
printf(“parent write over! Accept other process ……\n”);
fdsock = socket(AF_UNIX, SOCK_STREAM, 0);
if(-1 == fdsock)
{
printf(“myopen creat socket error!errno is %d\n”, errno);
}
unlink(path);
memset(&addr_server, 0, sizeof(addr_server));
addr_server.sun_family = AF_UNIX;
strcpy(addr_server.sun_path, path);
len = sizeof(struct sockaddr_un);
ret = bind(fdsock, (struct sockaddr*)&addr_server, len);
if(-1 == ret)
{
printf(“in myopen bind error, errorno is %d\n”,errno);
close(fdsock);
unlink(path);
}
ret = listen(fdsock,1);
if(-1 == ret)
{
printf(“in myopen listen error, errorno is %d\n”,errno);
close(fdsock);
unlink(path);
}
fdaccept = accept(fdsock, (struct sockaddr*)&addr_server, &len);
if(-1 == ret)
{
printf(“in myopen accept error, errorno is %d\n”,errno);
close(fdsock);
unlink(path);
}
/* 向已经连接的client传递该文件描述符 */
ret = func_send_fd(fdaccept, recvfd, datasend, sizeof(datasend));
if(0 > ret)
{
printf(“in myopen, func_send_fd failed\n”);
}
printf(“send fd over! Will sleep 10s \n”);
sleep(10);
exit(0);
}
func_recv_fd函数负责从子进程接受文件描述符:
int func_recv_fd(int recv_sock, int *recvfd, void *data, int bytes)
{
struct msghdr msghead;
struct iovec passdata[1];
int ret;
int temp;
int newfd;
struct cmsghdr *msgptr1;
struct cmsghdr *msgptr = NULL;
memset(&msghead, 0, sizeof(msghead));
/* 同func_send_fd ,填充所需要的结构 */
msghead.msg_accrights = (caddr_t)&newfd;
msghead.msg_accrightslen = sizeof(recvfd);
msghead.msg_name = NULL;
msghead.msg_namelen = 0;
passdata[0].iov_base = data;
passdata[0].iov_len = bytes;
msghead.msg_iov = passdata;
msghead.msg_iovlen = 1;
/* 接收信息(文件描述符 )*/
if(0 > (ret = recvmsg(recv_sock, &msghead, 0)))
{
printf(“in func_recv_fd, recvmsg failed\n”);
return -1;
}
if(msghead.msg_accrightslen == sizeof(recvfd))
{
recvfd = newfd; / 文件描述符 */
}
return ret;
}
其中父进程向client进程发送文件描述符也使用了func_send_fd函数,该函数在此c文件中重新写了一遍;其实没有必要这样重复。我们可以把它作为一个库来使用;不过这里暂且这样使用。函数代码参考上面所写的。
在这个c文件中还要加入以下头文件和宏定义:
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define SLEEPTIME 3
#define ARGLEN 20
#define MODE S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IROTH /* -rwxr–r– /
#define PERMIT O_RDWR | O_APPEND | O_CREAT / if the file not exit ,creat it , data written to it append */
3)最后看client进程代码:
int main()
{
int sockfd, recvfd,ret;
struct sockaddr_un addr_client;
int length,buflen;
char data[10];
ssize_t size;
const char path[] = “/export/home/temp/test/other_prog/fengxianzhong”;
char buf[] = “this is client process ccc\n” ;
sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
if(-1 == sockfd)
{
printf(“client creat socket error!errno is %d\n”, errno);
}
addr_client.sun_family = AF_UNIX;
strcpy(addr_client.sun_path, path);
length = sizeof(addr_client.sun_family) + sizeof(addr_client.sun_path);
ret = connect(sockfd, (struct sockaddr*)&addr_client, length); if(-1 == ret) { printf("in client connect error, errorno is %d\n",errno); close(sockfd); }
ret = func_recv_fd(sockfd, &recvfd, data, sizeof(data));
if(-1 == ret)
{
printf(“in client func_recv_fd failed\n”);
close(sockfd);
}
size = -1;
buflen = sizeof(buf);
if((size = write(recvfd, buf, buflen)) <= 0)
{
printf(“in openfileprog, write failed\n”);
}
printf(“client write over!\n”);
exit(0);
}
其中同样调用了func_recv_fd函数。在这里,我们对它的处理同func_send_fd函数。
不要忘了再加上以下头文件和宏定义:
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define SLEEPTIME 3
#define ARGLEN 20
以上3个部分代码分别作为3个c文件编译即可。
总结:上面的函数如func_recv_fd,func_send_fd代码被重复使用了,这样显得很多余。就像上面所说的可以使用其他方法来避免如此重复。但是我这里重点实践文件描述符的传递,其余的,就不去多追究了。
参考资料中以下几段话值得我们去理解:
进程可以用任何返回描述符的UNIX函数打开一个描述符:例如open()、pipe()、mkfifo()、socket()或者accept()。可以在进程间传递任何类型的描述符。
(3)发送进程建立一个msghdr结构,其中包含要传递的描述符。在POSIX中说明该描述符作为辅助数据发送,但老的实现使用msg_accright成员。(这里,在Opensolaris上,我使用的就是老的成员)发送进程调用sendmsg()通过第一部得到的UNIX域套接字发出套接字。这时这个描述符是在飞行中的。即使在发送进程调用sendmsg()之后,但在接受进程调用recvmsg()之前将描述符关闭,它仍会为接收进程保持打开状态。描述符的发送导致它的访问统计数加1。
(4)接收进程调用recvmsg()在UNIX域套接字上接收套接字。通常接收进程收到的描述符的编号和发送进程中的描述符的编号不同,但这没有问题。传递描述符不是传递描述符的编号,而是在接收进程中建立一个新的描述符,指向内核的文件表中与发送进程发送的描述符相同的项。
迭代服务器
并发服务器,每个客户请求fork一个子进程
预先派生子进程,每个子进程无保护地调用accept
预先派生子进程,使用文件上锁保护accept
预先派生子进程,使用线程互斥锁上锁保护accept
预先派生子进程,父进程向子进程传递套接字描述符
并发服务器,每个客户端请求创建一个线程
预先创建线程服务器,使用互斥锁上锁保护accept
预先创建线程服务器,由主线程调用accept
当系统负载较轻时,传统的并发服务器程序模型就够了。相对于传统的每个客户一次fork设计,预先创建一个进程池或线程池可以减少进程控制CPU时间,大约可减少10倍以上。
某些实现允许多个子进程或线程阻塞在accept上,然而在另一些实现中,我们必须使用文件锁、线程互斥锁或其他类型的锁来确保每次只有一个子进程或线程在accept。
一般来讲,所有子进程或线程都调用accept要比父进程或主线程调用accept后将描述字传递个子进程或线程来得快且简单。
Nebula框架采用无锁设计,进程之前完全不共享数据,不存在需要互斥访问的地方。没错,会存在数据多副本问题,但这些多副本往往只是些配置数据,占用不了太大内存,与加锁解锁带来的代码复杂度及锁开销相比这点内存代价更划算也更简单。
同一个Nebula服务的工作进程间不相互通信,采用进程和线程并无太大差异,之所以采用进程而不是线程的最重要考虑是Nebula是出于稳定性和容错性考虑。Nebula是通用框架,完全业务无关,业务都是通过动态加载的方式或通过将Nebula链接进业务Server的方式来实现。Nebula框架无法预知业务代码的质量,但可以保证在服务因业务代码导致coredump或其他情况时,框架可以实时监控到并立刻拉起服务进程,最大程度保障服务可用性。
决定Nebula采用传递文件描述符方式的最重要一点是:Nebula定位是高性能分布式服务集群解决方案的基础通信框架,其设计更多要为构建分布式服务集群而考虑。集群不同服务节点之间通过TCP通信,而所有逻辑都是Worker进程负责,这意味着节点之间通信需要指定到Worker进程,而如果采用子进程竞争accept的方式无法保证指定的子进程获得资源,那么第一个通信数据包将会路由错误。采用传递文件描述符方式可以很完美地解决这个问题,而且传递文件描述符也非常高效。
#include <sys/types.h>
#include <sys/socket.h>
ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);
ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
这两个函数与sendto和recvfrom函数相似,只不过可以传输更复杂的数据结构,不仅可以传输一般数据,还可以传输额外的数据,即文件描述符。下面来看结构体msghdr及其相关结构体 :
struct msghdr {
void msg_name; / optional address /
socklen_t msg_namelen; / size of address /
struct iovec *msg_iov; / scatter/gather array /
size_t msg_iovlen; / # elements in msg_iov /
void *msg_control; / ancillary data, see below /
size_t msg_controllen; / ancillary data buffer len /
int msg_flags; / flags on received message */
};
/* iovec结构体 /
struct iovec {
void *iov_base; / Starting address /
size_t iov_len; / Number of bytes to transfer */
};
/* cmsghdr结构体 /
struct cmsghdr {
socklen_t cmsg_len; / data byte count, including header /
int cmsg_level; / originating protocol /
int cmsg_type; / protocol-specific type /
/ followed by unsigned char cmsg_data[]; */
};
msghdr结构成员说明:
msg_name :即对等方的地址指针,不关心时设为NULL即可.
msg_namelen:地址长度,不关心时设置为0即可.
msg_iov:结构体iovec指针; iovec的成员iov_base 可以认为是传输正常数据时的buf,iov_len 是buf 的大小。
msg_iovlen:iovec类型的元素的个数,每一个缓冲区的起始地址和大小由iovec类型自包含。当有n个iovec结构体时,此值为n。
msg_control:是一个指向cmsghdr 结构体的指针,用来发送或接收控制信息。
msg_controllen :控制信息所占用的字节数。注意,msg_controllen与前面的msg_iovlen不同,msg_iovlen是指的由成员msg_iov所指向的iovec型的数组的元素个数,而msg_controllen,则是所有控制信息所占用的总的字节数。
msg_flags : 用来描述接受到的消息的性质,由调用recvmsg时传入的flags参数设置。
为了对齐,可能存在一些填充字节,跟不同系统的实现有关控制信息的数据部分,是直接存储在cmsghdr结构体的cmsg_type之后的。但中间可能有一些由于对齐产生的填充字节,由于这些填充数据的存在,对于这些控制数据的访问,必须使用Linux提供的一些专用宏来完成:
#include <sys/socket.h>
/* 返回msgh所指向的msghdr类型的缓冲区中的第一个cmsghdr结构体的指针。*/
struct cmsghdr *CMSG_FIRSTHDR(struct msghdr *msgh);
/* 返回传入的cmsghdr类型的指针的下一个cmsghdr结构体的指针。 */
struct cmsghdr *CMSG_NXTHDR(struct msghdr *msgh, struct cmsghdr *cmsg);
/* 根据传入的length大小,返回一个包含了添加对齐作用的填充数据后的大小。 */
size_t CMSG_ALIGN(size_t length);
/* 传入的参数length指的是一个控制信息元素(即一个cmsghdr结构体)后面数据部分的字节数,返回的是这个控制信息的总的字节数,即包含了头部(即cmsghdr各成员)、数据部分和填充数据的总和。*/
size_t CMSG_SPACE(size_t length);
/* 根据传入的cmsghdr指针参数,返回其后面数据部分的指针。*/
size_t CMSG_LEN(size_t length);
/* 传入的参数是一个控制信息中的数据部分的大小,返回的是这个根据这个数据部分大小,需要配置的cmsghdr结构体中cmsg_len成员的值。这个大小将为对齐添加的填充数据也包含在内。*/
unsigned char *CMSG_DATA(struct cmsghdr *cmsg);
具体地说,为msghdr的成员msg_control分配一个cmsghdr的空间,将该cmsghdr结构的cmsg_level设置为SOL_SOCKET,cmsg_type设置为SCM_RIGHTS,并将要传递的文件描述符作为数据部分,调用sendmsg即可。其中SCM表示socket-level control message,SCM_RIGHTS表示我们要传递访问权限。
跟发送部分一样,为控制信息配置好属性,并在其后分配一个文件描述符的数据部分后,在成功调用recvmsg后,控制信息的数据部分就是在接收进程中的新的文件描述符了,接收进程可直接对该文件描述符进行操作。
文件描述符传递并不是将文件描述符数字传递,而是文件描述符对应数据结构。在主进程accept的到的文件描述符7传递到子进程后文件描述符有可能是7,更有可能是7以外的其他数值,但无论是什么数值并不重要,重要的是传递之后的连接跟传递之前的连接是同一个连接。
通常在完成文件描述符传递后,接收进程接管文件描述符,发送进程则应调用close关闭已传递的文件描述符。发送进程关闭描述符并不造成关闭该文件或设备,因为该描述符对应的文件仍被视为由接收者进程打开(即使接收进程尚未接收到该描述符)。
文件描述符传递可经由基于STREAMS的管道,也可经由UNIX域套接字。两种方式在《UNIX网络编程》中均有描述,Nebula采用的UNIX域套接字传递文件描述符。
创建用于传递文件描述符的UNIX域套接字用到socketpair函数:
#include <sys/types.h>
#include <sys/socket.h>
int socketpair(int d, int type, int protocol, int sv[2]);
传入的参数sv为一个整型数组,有两个元素。当调用成功后,这个数组的两个元素即为2个文件描述符。一对连接起来的Unix匿名域套接字就建立起来了,它们就像一个全双工的管道,每一端都既可读也可写。
文件描述符传递方法声明:
static int SendChannelFd(int iSocketFd, int iSendFd, int iCodecType, std::shared_ptr
static int RecvChannelFd(int iSocketFd, int& iRecvFd, int& iCodecType, std::shared_ptr
文件描述符发送方法实现:
/**
@return errno 错误码
*/
int SocketChannel::SendChannelFd(int iSocketFd, int iSendFd, int iCodecType, std::shared_ptr
{
ssize_t n;
struct iovec iov[1];
struct msghdr msg;
tagChannelCtx stCh;
int iError = 0;
stCh.iFd = iSendFd;
stCh.iCodecType = iCodecType;
union
{
struct cmsghdr cm;
char space[CMSG_SPACE(sizeof(int))];
} cmsg;
if (stCh.iFd == -1)
{
msg.msg_control = NULL;
msg.msg_controllen = 0;
}
else
{
msg.msg_control = (caddr_t) &cmsg;
msg.msg_controllen = sizeof(cmsg);
memset(&cmsg, 0, sizeof(cmsg));
cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
cmsg.cm.cmsg_level = SOL_SOCKET;
cmsg.cm.cmsg_type = SCM_RIGHTS;
*(int *) CMSG_DATA(&cmsg.cm) = stCh.iFd; }
msg.msg_flags = 0;
iov[0].iov_base = (char*)&stCh;
iov[0].iov_len = sizeof(tagChannelCtx);
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
n = sendmsg(iSocketFd, &msg, 0);
if (n == -1)
{
pLogger->WriteLog(neb::Logger::ERROR, FILE, LINE, FUNCTION, “sendmsg() failed, errno %d”, errno);
iError = (errno == 0) ? ERR_TRANSFER_FD : errno;
return(iError);
}
return(ERR_OK);
}
文件描述符接收方法实现:
/**
@return errno 错误码
*/
int SocketChannel::RecvChannelFd(int iSocketFd, int& iRecvFd, int& iCodecType, std::shared_ptr
{
ssize_t n;
struct iovec iov[1];
struct msghdr msg;
tagChannelCtx stCh;
int iError = 0;
union {
struct cmsghdr cm;
char space[CMSG_SPACE(sizeof(int))];
} cmsg;
iov[0].iov_base = (char*)&stCh;
iov[0].iov_len = sizeof(tagChannelCtx);
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
msg.msg_control = (caddr_t) &cmsg;
msg.msg_controllen = sizeof(cmsg);
n = recvmsg(iSocketFd, &msg, 0);
if (n == -1) {
pLogger->WriteLog(neb::Logger::ERROR, FILE, LINE, FUNCTION, “recvmsg() failed, errno %d”, errno);
iError = (errno == 0) ? ERR_TRANSFER_FD : errno;
return(iError);
}
if (n == 0) {
pLogger->WriteLog(neb::Logger::ERROR, FILE, LINE, FUNCTION, “recvmsg() return zero, errno %d”, errno);
iError = (errno == 0) ? ERR_TRANSFER_FD : errno;
return(ERR_CHANNEL_EOF);
}
if ((size_t) n < sizeof(tagChannelCtx))
{
pLogger->WriteLog(neb::Logger::ERROR, FILE, LINE, FUNCTION, “rrecvmsg() returned not enough data: %z, errno %d”, n, errno);
iError = (errno == 0) ? ERR_TRANSFER_FD : errno;
return(iError);
}
if (cmsg.cm.cmsg_len < (socklen_t) CMSG_LEN(sizeof(int)))
{
pLogger->WriteLog(neb::Logger::ERROR, FILE, LINE, FUNCTION, “recvmsg() returned too small ancillary data”);
iError = (errno == 0) ? ERR_TRANSFER_FD : errno;
return(iError);
}
if (cmsg.cm.cmsg_level != SOL_SOCKET || cmsg.cm.cmsg_type != SCM_RIGHTS)
{
pLogger->WriteLog(neb::Logger::ERROR, FILE, LINE, FUNCTION,
“recvmsg() returned invalid ancillary data level %d or type %d”, cmsg.cm.cmsg_level, cmsg.cm.cmsg_type);
iError = (errno == 0) ? ERR_TRANSFER_FD : errno;
return(iError);
}
stCh.iFd = *(int *) CMSG_DATA(&cmsg.cm);
if (msg.msg_flags & (MSG_TRUNC|MSG_CTRUNC))
{
pLogger->WriteLog(neb::Logger::ERROR, FILE, LINE, FUNCTION, “recvmsg() truncated data”);
iError = (errno == 0) ? ERR_TRANSFER_FD : errno;
return(iError);
}
iRecvFd = stCh.iFd;
iCodecType = stCh.iCodecType;
return(ERR_OK);
}
Manager进程的void Manager::CreateWorker()方法创建用于传递文件描述符的UNIX域套接字:
int iControlFds[2];
int iDataFds[2];
if (socketpair(PF_UNIX, SOCK_STREAM, 0, iControlFds) < 0)
{
LOG4_ERROR(“error %d: %s”, errno, strerror_r(errno, m_szErrBuff, 1024));
}
if (socketpair(PF_UNIX, SOCK_STREAM, 0, iDataFds) < 0)
{
LOG4_ERROR(“error %d: %s”, errno, strerror_r(errno, m_szErrBuff, 1024));
}
Manager进程发送文件描述符:
int iCodec = m_stManagerInfo.eCodec; // 将编解码方式和文件描述符一同发送给Worker进程
int iErrno = SocketChannel::SendChannelFd(worker_pid_fd.second, iAcceptFd, iCodec, m_pLogger);
if (iErrno == 0)
{
AddWorkerLoad(worker_pid_fd.first);
}
else
{
LOG4_ERROR(“error %d: %s”, iErrno, strerror_r(iErrno, m_szErrBuff, 1024));
}
close(iAcceptFd); // 发送完毕,关闭文件描述符
Worker进程接收文件描述符:
int iAcceptFd = -1;
int iCodec = 0; // 这里的编解码方式在RecvChannelFd方法中获得
int iErrno = SocketChannel::RecvChannelFd(m_stWorkerInfo.iManagerDataFd, iAcceptFd, iCodec, m_pLogger);
至此,Nebula框架的文件描述符传递分享完毕,下面再看看nginx中的文件描述符传递实现。
nginx中发送文件描述符代码:
ngx_int_t
ngx_write_channel(ngx_socket_t s, ngx_channel_t *ch, size_t size,
ngx_log_t *log)
{
ssize_t n;
ngx_err_t err;
struct iovec iov[1];
struct msghdr msg;
#if (NGX_HAVE_MSGHDR_MSG_CONTROL)
union {
struct cmsghdr cm;
char space[CMSG_SPACE(sizeof(int))];
} cmsg;
if (ch->fd == -1) {
msg.msg_control = NULL;
msg.msg_controllen = 0;
} else {
msg.msg_control = (caddr_t) &cmsg;
msg.msg_controllen = sizeof(cmsg);
ngx_memzero(&cmsg, sizeof(cmsg));
cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
cmsg.cm.cmsg_level = SOL_SOCKET;
cmsg.cm.cmsg_type = SCM_RIGHTS;
/*
* We have to use ngx_memcpy() instead of simple
* *(int *) CMSG_DATA(&cmsg.cm) = ch->fd;
* because some gcc 4.4 with -O2/3/s optimization issues the warning:
* dereferencing type-punned pointer will break strict-aliasing rules
*
* Fortunately, gcc with -O1 compiles this ngx_memcpy()
* in the same simple assignment as in the code above
*/
ngx_memcpy(CMSG_DATA(&cmsg.cm), &ch->fd, sizeof(int));
}
msg.msg_flags = 0;
#else
if (ch->fd == -1) {
msg.msg_accrights = NULL;
msg.msg_accrightslen = 0;
} else {
msg.msg_accrights = (caddr_t) &ch->fd;
msg.msg_accrightslen = sizeof(int);
}
#endif
iov[0].iov_base = (char *) ch;
iov[0].iov_len = size;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
n = sendmsg(s, &msg, 0);
if (n == -1) {
err = ngx_errno;
if (err == NGX_EAGAIN) {
return NGX_AGAIN;
}
ngx_log_error(NGX_LOG_ALERT, log, err, "sendmsg() failed");
return NGX_ERROR;
}
return NGX_OK; } nginx中接收文件描述符代码:
ngx_int_t
ngx_read_channel(ngx_socket_t s, ngx_channel_t *ch, size_t size, ngx_log_t *log)
{
ssize_t n;
ngx_err_t err;
struct iovec iov[1];
struct msghdr msg;
#if (NGX_HAVE_MSGHDR_MSG_CONTROL)
union {
struct cmsghdr cm;
char space[CMSG_SPACE(sizeof(int))];
} cmsg;
#else
int fd;
#endif
iov[0].iov_base = (char *) ch;
iov[0].iov_len = size;
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = iov;
msg.msg_iovlen = 1;
#if (NGX_HAVE_MSGHDR_MSG_CONTROL)
msg.msg_control = (caddr_t) &cmsg;
msg.msg_controllen = sizeof(cmsg);
#else
msg.msg_accrights = (caddr_t) &fd;
msg.msg_accrightslen = sizeof(int);
#endif
n = recvmsg(s, &msg, 0);
if (n == -1) {
err = ngx_errno;
if (err == NGX_EAGAIN) {
return NGX_AGAIN;
}
ngx_log_error(NGX_LOG_ALERT, log, err, "recvmsg() failed");
return NGX_ERROR;
}
if (n == 0) {
ngx_log_debug0(NGX_LOG_DEBUG_CORE, log, 0, "recvmsg() returned zero");
return NGX_ERROR;
}
if ((size_t) n < sizeof(ngx_channel_t)) {
ngx_log_error(NGX_LOG_ALERT, log, 0,
"recvmsg() returned not enough data: %z", n);
return NGX_ERROR;
}
#if (NGX_HAVE_MSGHDR_MSG_CONTROL)
if (ch->command == NGX_CMD_OPEN_CHANNEL) {
if (cmsg.cm.cmsg_len < (socklen_t) CMSG_LEN(sizeof(int))) {
ngx_log_error(NGX_LOG_ALERT, log, 0,
"recvmsg() returned too small ancillary data");
return NGX_ERROR;
}
if (cmsg.cm.cmsg_level != SOL_SOCKET || cmsg.cm.cmsg_type != SCM_RIGHTS)
{
ngx_log_error(NGX_LOG_ALERT, log, 0,
"recvmsg() returned invalid ancillary data "
"level %d or type %d",
cmsg.cm.cmsg_level, cmsg.cm.cmsg_type);
return NGX_ERROR;
}
/* ch->fd = *(int *) CMSG_DATA(&cmsg.cm); */
ngx_memcpy(&ch->fd, CMSG_DATA(&cmsg.cm), sizeof(int));
}
if (msg.msg_flags & (MSG_TRUNC|MSG_CTRUNC)) {
ngx_log_error(NGX_LOG_ALERT, log, 0,
"recvmsg() truncated data");
}
#else
if (ch->command == NGX_CMD_OPEN_CHANNEL) {
if (msg.msg_accrightslen != sizeof(int)) {
ngx_log_error(NGX_LOG_ALERT, log, 0,
"recvmsg() returned no ancillary data");
return NGX_ERROR;
}
ch->fd = fd;
}
#endif
return n; }
Unix 域套接字是一种client和server在单主机上的 IPC 方法。Unix 域套接字不运行协议处理,不须要加入或删除网络报头,无需验证和,不产生顺序号,无需发送确认报文,比因特网域套接字的效率更高。Unix 域套接字提供字节流(类似于 TCP)和数据报(类似于 UDP)两种接口,UNIX域数据报服务是可靠的,既不会丢失消息也不会传递出错。UNIX域套接字是套接字和管道之间的混合物。
Unix 域套接字编程
地址结构:
struct sockaddr_un{
sa_family_t sun_family; /* AF_UNIX /
char sun_path[108]; / pathname */
};
存放在 sun_path 数组中的路径名必须以空字符结尾。以下是把一个路径名绑定到 Unix 域套接字上实现的程序:
/* 创建一个Unix域套接字,并bind一个路径名 */
#include <sys/socket.h>
#include <sys/un.h>
#include
#include
#include
#include
#include
extern void err_sys(const char *, …);
extern void err_quit(const char *, …);
int main(int argc, char **argv)
{
int sockfd, size;
socklen_t len;
struct sockaddr_un addr1, addr2;
if(argc != 2)
err_quit("usage: %s <pathname>", argv[0]);
bzero(&addr1, sizeof(addr1));
addr1.sun_family = AF_UNIX;
strncpy(addr1.sun_path, argv[1], sizeof(addr1.sun_path)-1);
/* 创建一个Unix域套接字 */
if( (sockfd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
err_sys("socket error");
/* 若路径名在文件系统已存在,则bind会出错;所以先调用unlink删除要绑定的路径名,防止bind出错 */
unlink(argv[1]);
/* 将路径名bind绑定到该套接字上 */
size = offsetof(struct sockaddr_un, sun_path) + strlen(addr1.sun_path);
if(bind(sockfd, (struct sockaddr *)&addr1, size) < 0)
err_sys("bind error");
/* 显示已绑定的路径名 */
len = sizeof(addr2);
getsockname(sockfd, (struct sockaddr *)&addr2, &len);
printf("bound name = %s, returned len = %d\n", addr2.sun_path, len);
exit(0); } $ ./main /tmp/sock bound name = /tmp/sock, returned len = 12
/当该路径名存在,且不使用unlink函数时,会出现下面提示/
$ ./main /tmp/sock
bind error: Address already in use
为了创建一对非命名的,相互连接的 UNXI 域套接字,用户能够使用socketopair函数。事实上现例如以下:
#include <sys/socket.h>
int socketpair(int domain, int type, int protocol, int sockfd[2]);
/* 返回值:若成功则返回0,出错则返回-1 /
/ 说明
由 bind 创建的路径名默认訪问权限应为 0777,并按当前 umask 值进行改动;
路径名必须是一个绝对路径名,避免使用相对路径名。由于它的解析依赖于调用者的当前工作文件夹,若server绑定的是一个相对路径名,则client和server必须在同样的文件夹才干正常工作;
在 connect 调用中指定的路径名必须是一个当前绑定在某个已打开的 Unix 域套接字上的路径名,并且套接字类型必须一致;
调用 connect 连接一个 Unix 域套接字涉及的权限測试等价于调用 open 以仅仅写方式訪问对应的路径名;
Unix 域字节流套接字类似于 TCP 套接字:它们都为进程提供一个无记录边界的字节流接口;
在 Unix 域字节流套接字中,若 connect 调用时发现监听套接字的队列已满,则马上返回 ECONNREFUSED 错误。而 TCP 套接字遇到这样的情况,TCP 监听套接字忽略这些到达的 SYN 连接请求,TCP client则会重发数次 SYN 报文段;
Unix 域数据报套接字类似于 UDP 套接字:它们都提供一个保留记录边界的不可靠数据报;
为一个未绑定路径名的 Unix 套接字发送数据时,不会自己主动给该套接字绑定一个路径名。而 UDP 套接字在给一个未绑定的 UDP 套接字发送数据时,会自己主动为其绑定一个暂时port;
Unix 域字节流编程
server程序:
#include <sys/socket.h>
#include <sys/wait.h>
#include <sys/un.h>
#include
#include
#include
#include
#include
#include
#include
#define QLEN 1024
typedef void Sigfunc(int);
extern void err_sys(const char *, …);
extern void err_quit(const char *, …);
extern void str_echo(int);
static Sigfunc *MySignal(int signo, Sigfunc *func);
static Sigfunc *M_signal(int signo, Sigfunc *func);
static void sig_chld(int);
int main(int argc, char **argv)
{
int sockfd, conndfd, size;
socklen_t len;
pid_t childpid;
struct sockaddr_un cliaddr, servaddr;
if(argc != 2)
err_quit("usage: %s <pathname>", argv[0]);
bzero(&servaddr, sizeof(servaddr));
servaddr.sun_family = AF_UNIX;
strcpy(servaddr.sun_path, argv[1]);
/* 创建一个Unix域套接字 */
if( (sockfd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
err_sys("socket error");
/* 若路径名在文件系统已存在,则bind会出错;所以先调用unlink删除要绑定的路径名,防止bind出错 */
unlink(argv[1]);
/* 将路径名bind绑定到该套接字上 */
size = offsetof(struct sockaddr_un, sun_path) + strlen(servaddr.sun_path);
if(bind(sockfd, (struct sockaddr *)&servaddr, size) < 0)
err_sys("bind error");
/* 监听套接字 */
if(listen(sockfd, QLEN) < 0)
{
close(sockfd);
err_sys("listen error");
}
/* 信号处理 */
MySignal(SIGCHLD, sig_chld);
for( ; ;)
{
len = sizeof(cliaddr);
if( (conndfd = accept(sockfd, (struct sockaddr *)&cliaddr, &len)) < 0)
{
if(errno == EINTR)
continue;
else
err_sys("accept error");
}
}
if( (childpid = fork()) == 0)
{
close(sockfd);
str_echo(conndfd);
exit(0);
}
close(conndfd); }
void sig_chld(int signo)
{
pid_t pid;
int stat;
while( (pid = waitpid(-1, &stat, WNOHANG)) > 0)
printf(“child %d terminated\n”, pid);
return;
}
static Sigfunc *MySignal(int signo, Sigfunc *func)
{
Sigfunc *sigfunc;
if( (sigfunc = M_signal(signo, func)) == SIG_ERR)
err_sys(“signal error”);
return (sigfunc);
}
static Sigfunc *M_signal(int signo, Sigfunc *func)
{
struct sigaction act, oact;
/* 设置信号处理函数 */
act.sa_handler = func;
/* 初始化信号集 */
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
if(signo == SIGALRM)
{/* 若是SIGALRM信号,则系统不会自己主动重新启动 */ #ifdef SA_INTERRUPT
act.sa_flags |= SA_INTERRUPT; #endif
}
else
{/* 其余信号设置为系统会自己主动重新启动 */ #ifdef SA_RESTART
act.sa_flags |= SA_RESTART; #endif
}
/* 调用 sigaction 函数 */
if(sigaction(signo, &act, &oact) < 0)
return(SIG_ERR);
return(oact.sa_handler); } client程序:
#include <sys/socket.h>
#include <sys/un.h>
#include
#include
#include
#include
#include
extern void err_sys(const char *, …);
extern void err_quit(const char *, …);
extern void str_cli(FILE *, int);
int
main(int argc, char **argv)
{
int sockfd;
struct sockaddr_un servaddr;
if(argc != 2)
err_quit("usage: %s <pathname>", argv[0]);
if( (sockfd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
err_sys("socket error");
bzero(&servaddr, sizeof(servaddr));
servaddr.sun_family = AF_UNIX;
strcpy(servaddr.sun_path, argv[1]);
int err;
err = connect(sockfd, (struct sockaddr *) &servaddr, sizeof(servaddr));
if( err < 0)
err_sys("connect error");
str_cli(stdin, sockfd); /* do it all */
exit(0); } 參考资料:
《Unix 网络编程》
一、UNIX Domain Socket IPC
socket API原本是为网络通讯设计的,但后来在socket的框架上发展出一种IPC机制,就是UNIX Domain Socket。虽然网络socket也可用于同一台主机的进程间通讯(通过loopback地址127.0.0.1),但是UNIX Domain Socket用于IPC更有效率:不需要经过网络协议栈,不需要打包拆包、计算校验和、维护序号和应答等,只是将应用层数据从一个进程拷贝到另一个进程。UNIX域套接字与TCP套接字相比较,在同一台主机的传输速度前者是后者的两倍。这是因为,IPC机制本质上是可靠的通讯,而网络协议是为不可靠的通讯设计的。UNIX Domain Socket也提供面向流和面向数据包两种API接口,类似于TCP和UDP,但是面向消息的UNIXDomain Socket也是可靠的,消息既不会丢失也不会顺序错乱。
使用UNIX Domain Socket的过程和网络socket十分相似,也要先调用socket()创建一个socket文件描述符,address family指定为AF_UNIX,type可以选择SOCK_DGRAM或SOCK_STREAM,protocol参数仍然指定为0即可。
UNIX Domain Socket与网络socket编程最明显的不同在于地址格式不同,用结构体sockaddr_un表示,网络编程的socket地址是IP地址加端口号,而UNIX Domain Socket的地址是一个socket类型的文件在文件系统中的路径,这个socket文件由bind()调用创建,如果调用bind()时该文件已存在,则bind()错误返回。
#define UNIX_PATH_MAX 108
struct sockaddr_un {
sa_family_t sun_family; /* AF_UNIX /
char sun_path[UNIX_PATH_MAX]; / pathname */
};
二、回射/客户服务器程序
通信的流程跟前面说过的tcp/udp 是类似的。下面直接来看服务器serv.c程序:
#include
#include
#include
#include
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include
#include<sys/un.h>
#define ERR_EXIT(m)
do {
perror(m);
exit(EXIT_FAILURE);
} while (0)
void echo_ser(int conn)
{
char recvbuf[1024];
int n;
while (1)
{
memset(recvbuf, 0, sizeof(recvbuf));
n = read(conn, recvbuf, sizeof(recvbuf));
if (n == -1)
{
if (n == EINTR)
continue;
ERR_EXIT("read error");
}
else if (n == 0)
{
printf("client close\n");
break;
}
fputs(recvbuf, stdout);
write(conn, recvbuf, strlen(recvbuf));
}
close(conn); }
/* unix domain socket与TCP套接字相比较,在同一台主机的传输速度前者是后者的两倍。*/
int main(void)
{
int listenfd;
if ((listenfd = socket(PF_UNIX, SOCK_STREAM, 0)) < 0)
ERR_EXIT(“socket error”);
unlink("/tmp/test socket"); //地址复用
struct sockaddr_un servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sun_family = AF_UNIX;
strcpy(servaddr.sun_path, "/tmp/test socket");
if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0)
ERR_EXIT("bind error");
if (listen(listenfd, SOMAXCONN) < 0)
ERR_EXIT("listen error");
int conn;
pid_t pid;
while (1)
{
conn = accept(listenfd, NULL, NULL);
if (conn == -1)
{
if (conn == EINTR)
continue;
ERR_EXIT("accept error");
}
pid = fork();
if (pid == -1)
ERR_EXIT("fork error");
if (pid == 0)
{
close(listenfd);
echo_ser(conn);
exit(EXIT_SUCCESS);
}
close(conn);
}
return 0; } 客户端程序cli.c程序:
#include
#include
#include
#include
#include
#include
#include
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include
#include<sys/un.h>
#define ERR_EXIT(m)
do {
perror(m);
exit(EXIT_FAILURE);
} while (0)
void echo_cli(int conn)
{
char sendbuf[1024] = {0};
char recvbuf[1024] = {0};
while (fgets(sendbuf, sizeof(sendbuf), stdin) != NULL)
{
write(conn, sendbuf, strlen(sendbuf));
read(conn, recvbuf, sizeof(recvbuf));
fputs(recvbuf, stdout);
memset(recvbuf, 0, sizeof(recvbuf));
memset(sendbuf, 0, sizeof(sendbuf));
}
close(conn); }
int main(void)
{
int sock;
if ((sock = socket(PF_UNIX, SOCK_STREAM, 0)) < 0)
ERR_EXIT(“socket error”);
struct sockaddr_un servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sun_family = AF_UNIX;
strcpy(servaddr.sun_path, "/tmp/test socket");
if (connect(sock, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0)
ERR_EXIT("connect error");
echo_cli(sock);
return 0; } server 使用fork 的形式来接受多个连接,server调用bind 会创建一个文件,如下所示:
huangcheng@ubuntu:/tmp$ ls -l test\ socket
srwxr-xr-x 1 huangcheng huangcheng 0 2013-08-02 19:30 test socket
即文件类型为s,表示SOCKET文件,与FIFO(命名管道)文件,类型为p,类似,都表示内核的一条通道,读写文件实际是在读写内核通道。程序中调用unlink 是为了在开始执行程序时删除以前创建的文件,以便在重启服务器时不会提示address in use。其他方面与以前说过的回射客户服务器程序没多大区别,不再赘述。
三、UNIX域套接字编程注意点
1、bind成功将会创建一个文件,权限为:0777 & ~umask
2、sun_path最好用一个绝对路径。
3、UNIX域协议支持流式套接口与报式套接口。
4、UNIX域流式套接字connect发现监听队列满时,会立刻返回一个ECONNREFUSED,这和TCP不同,如果监听队列满,会忽略到来的SYN,这导致对方重传SYN。
四、socketpair 函数
功能:创建一个全双工的流管道
int socketpair(int domain, int type, int protocol, int sv[2]);
参数:
domain: 协议家族
type: 套接字类型
protocol:协议类型
sv:返回套接字对
返回值:成功返回0;失败返回-1
实际上socketpair 函数跟pipe 函数是类似的,也只能在同个主机上具有亲缘关系的进程间通信,但pipe 创建的匿名管道是半双工的,而socketpair 可以认为是创建一个全双工的管道。
可以使用socketpair 创建返回的套接字对进行父子进程通信:
#include
#include<sys/types.h>
#include<sys/socket.h>
#include
#include
#include
#include<arpa/inet.h>
#include<netinet/in.h>
#include
#define ERR_EXIT(m)
do {
perror(m);
exit(EXIT_FAILURE);
} while (0)
int main(void)
{
int sockfds[2];
if (socketpair(PF_UNIX, SOCK_STREAM, 0, sockfds) < 0)
ERR_EXIT("sockpair");
pid_t pid;
pid = fork();
if (pid == -1)
ERR_EXIT("fork");
if (pid > 0)
{
int val = 0;
close(sockfds[1]);
while (1)
{
++val;
printf(" sending data: %d\n", val);
write(sockfds[0], &val, sizeof(val));
read(sockfds[0], &val, sizeof(val));
printf("recv data : %d\n", val);
sleep(1);
}
}
else if (pid == 0)
{
int val;
close(sockfds[0]);
while (1)
{
read(sockfds[1], &val, sizeof(val));
++val;
write(sockfds[1], &val, sizeof(val));
}
}
return 0; } 输出如下:
huangcheng@ubuntu:~$ ./a.out
sending data: 1
recv data : 2
sending data: 3
recv data : 4
sending data: 5
recv data : 6
sending data: 7
recv data : 8
sending data: 9
recv data : 10
sending data: 11
recv data : 12
sending data: 13
………………………..
即父进程持有sockfds[0] 套接字进行读写,而子进程持有sockfds[1] 套接字进行读写。
在前面我们介绍了UNIX域套接字编程,更重要的一点是UNIX域套接字可以在同一台主机上各进程之间传递文件描述符。
下面先来看两个函数:
#include <sys/types.h>
#include <sys/socket.h>
ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);
ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
它们与sendto 和 recvfrom 函数相似,只不过可以传输更复杂的数据结构,不仅可以传输一般数据,还可以传输额外的数据,即文件描述符。下面来看结构体msghdr :
struct msghdr {
void msg_name; / optional address /
socklen_t msg_namelen; / size of address /
struct iovec *msg_iov; / scatter/gather array /
size_t msg_iovlen; / # elements in msg_iov /
void *msg_control; / ancillary data, see below /
size_t msg_controllen; / ancillary data buffer len /
int msg_flags; / flags on received message */
};
如下图所示:
1、msg_name :即对等方的地址指针,不关心时设为NULL即可;
2、msg_namelen:地址长度,不关心时设置为0即可;
3、msg_iov:是结构体iovec 的指针。
struct iovec {
void iov_base; / Starting address /
size_t iov_len; / Number of bytes to transfer */
};
成员iov_base 可以认为是传输正常数据时的buf,iov_len 是buf 的大小。
4、msg_iovlen:当有n个iovec 结构体时,此值为n;
5、msg_control:是一个指向cmsghdr 结构体的指针
struct cmsghdr {
socklen_t cmsg_len; /* data byte count, including header /
int cmsg_level; / originating protocol /
int cmsg_type; / protocol-specific type /
/ followed by unsigned char cmsg_data[]; */
};
6、msg_controllen :参见下图,即cmsghdr 结构体可能不止一个;
7、flags : 一般设置为0即可;
为了对齐,可能存在一些填充字节,跟每个系统的实现有关,但我们不必关心,可以通过一些函数宏来获取相关的值,如下:
#include <sys/socket.h>
struct cmsghdr *CMSG_FIRSTHDR(struct msghdr *msgh);
struct cmsghdr *CMSG_NXTHDR(struct msghdr *msgh, struct cmsghdr *cmsg);
size_t CMSG_ALIGN(size_t length);
size_t CMSG_SPACE(size_t length);
size_t CMSG_LEN(size_t length);
unsigned char *CMSG_DATA(struct cmsghdr *cmsg);
下面通过封装两个函数,send_fd 和 recv_fd 来进一步认识这些函数宏的作用:
void send_fd(int sock_fd, int send_fd)
{
int ret;
struct msghdr msg;
struct cmsghdr *p_cmsg;
struct iovec vec;
char cmsgbuf[CMSG_SPACE(sizeof(send_fd))];
int *p_fds;
char sendchar = 0;
msg.msg_control = cmsgbuf;
msg.msg_controllen = sizeof(cmsgbuf);
p_cmsg = CMSG_FIRSTHDR(&msg);
p_cmsg->cmsg_level = SOL_SOCKET;
p_cmsg->cmsg_type = SCM_RIGHTS;
p_cmsg->cmsg_len = CMSG_LEN(sizeof(send_fd));
p_fds = (int *)CMSG_DATA(p_cmsg);
*p_fds = send_fd; // 通过传递辅助数据的方式传递文件描述符
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = &vec;
msg.msg_iovlen = 1; //主要目的不是传递数据,故只传1个字符
msg.msg_flags = 0;
vec.iov_base = &sendchar;
vec.iov_len = sizeof(sendchar);
ret = sendmsg(sock_fd, &msg, 0);
if (ret != 1)
ERR_EXIT("sendmsg"); }
int recv_fd(const int sock_fd)
{
int ret;
struct msghdr msg;
char recvchar;
struct iovec vec;
int recv_fd;
char cmsgbuf[CMSG_SPACE(sizeof(recv_fd))];
struct cmsghdr *p_cmsg;
int *p_fd;
vec.iov_base = &recvchar;
vec.iov_len = sizeof(recvchar);
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = &vec;
msg.msg_iovlen = 1;
msg.msg_control = cmsgbuf;
msg.msg_controllen = sizeof(cmsgbuf);
msg.msg_flags = 0;
p_fd = (int *)CMSG_DATA(CMSG_FIRSTHDR(&msg));
*p_fd = -1;
ret = recvmsg(sock_fd, &msg, 0);
if (ret != 1)
ERR_EXIT("recvmsg");
p_cmsg = CMSG_FIRSTHDR(&msg);
if (p_cmsg == NULL)
ERR_EXIT("no passed fd");
p_fd = (int *)CMSG_DATA(p_cmsg);
recv_fd = *p_fd;
if (recv_fd == -1)
ERR_EXIT("no passed fd");
return recv_fd; } 来解释一下send_fd 函数:
msg.msg_name = NULL;
msg.msg_namelen = 0;
msg.msg_iov = &vec;
msg.msg_iovlen = 1; //主要目的不是传递数据,故只传1个字符
msg.msg_flags = 0;
vec.iov_base = &sendchar;
vec.iov_len = sizeof(sendchar);
这几行中需要注意的是我们现在的目的不是传输正常数据,而是为了传递文件描述符,所以只定义一个1字节的char,其余参照前面对参数的解释可以理解。
现在我们只有一个cmsghdr 结构体,把需要传递的文件描述符send_fd 长度,也就是需要传输的额外数据大小,当作参数传给CMSG_SPACE 宏,可以得到整个结构体的大小,包括一些填充字节,如上图所示,也即
char cmsgbuf[CMSG_SPACE(sizeof(send_fd))];
也就可以进一步得出以下两行:
msg.msg_control = cmsgbuf;
msg.msg_controllen = sizeof(cmsgbuf);
接着,需要填充cmsghdr 结构体,传入msghdr 指针,CMSG_FIRSTHDR宏可以得到首个cmsghdr 结构体的指针,即
p_cmsg = CMSG_FIRSTHDR(&msg);
然后使用指针来填充各字段,如下:
p_cmsg->cmsg_level = SOL_SOCKET;
p_cmsg->cmsg_type = SCM_RIGHTS;
p_cmsg->cmsg_len = CMSG_LEN(sizeof(send_fd));
传入send_fd 的大小,CMSG_LEN宏可以得到cmsg_len 字段的大小。
最后,传入结构体指针 p_cmsg ,宏CMSG_DATA 可以得到准备存放send_fd 的位置指针,将send_fd 放进去,如下:
p_fds = (int*)CMSG_DATA(p_cmsg);
*p_fds = send_fd; // 通过传递辅助数据的方式传递文件描述符
recv_fd 函数就类似了,不再赘述。
可以写个小程序测试一下:
#include
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include
#include
#include
#include
#include
#define ERR_EXIT(m)
do
{
perror(m);
exit(EXIT_FAILURE);
} while(0)
int main(void)
{
int sockfds[2];
/* 只有unix域协议才能在进程间传递文件描述符,如果想要在没有亲缘关系的进程间
* 传递,则不能用socketpair函数,要用socket()函数 */
if (socketpair(PF_UNIX, SOCK_STREAM, 0, sockfds) < 0)
ERR_EXIT(“socketpair”);
pid_t pid;
pid = fork();
if (pid == -1)
ERR_EXIT("fork");
/* 如果是父进程打开的文件描述符,子进程可以共享
* 这里演示的是子进程打开的文件描述符通过封装的函数传给父进程 */
if (pid > 0)
{
close(sockfds[1]);
int fd = recv_fd(sockfds[0]);
char buf[1024] = {0};
read(fd, buf, sizeof(buf));
printf("buf=%s\n", buf);
}
else if (pid == 0)
{
close(sockfds[0]);
int fd;
fd = open("test.txt", O_RDONLY);
if (fd == -1);
send_fd(sockfds[1], fd);
}
return 0; } 我们知道,父进程在fork 之前打开的文件描述符,子进程是可以共享的,但是子进程打开的文件描述符,父进程是不能共享的,上述程序就是举例在子进程中打开了一个文件描述符,然后通过send_fd 函数将文件描述符传递给父进程,父进程可以通过recv_fd 函数接收到这个文件描述符。先建立一个文件test.txt 后输入几个字符,然后运行程序,输出如下:
huangcheng@ubuntu:~$ cat test.txt
ctthuangcheng
huangcheng@ubuntu:~$ ./a.out
buf=ctthuangcheng
huangcheng@ubuntu:~$
证明父进程确实可以打开test.txt 文件。
最后提醒一点,只有unix域协议才能在本机进程间传递文件描述符,如果想要在没有亲缘关系的进程间传递,则不能用socketpair函数,要用socket()函数 才行。