消息队列的实现分为两种,一种为System V的消息队列,一种是Posix消息队列;
消息队列可以认为是一个消息链表,某个进程往一个消息队列中写入消息之前,不需要另外某个进程在该队列上等待消息的达到,这一点与管道和FIFO相反。Posix消息队列与System V消息队列的区别如下:
(1) 对Posix消息队列的读总是返回最高优先级的最早消息,对System V消息队列的读则可以返回任意指定优先级的消息。
(2)当往一个空队列放置一个消息时,Posix消息队列允许产生一个信号或启动一个线程,System V消息队列则不提供类似的机制。
用户空间的消息缓冲区
为在进程与内核之间传递消息,无论发送进程还是接收进程,都需要在进程空间中用消息缓冲区来暂存消息。该消息缓冲区的结构定义如下:
struct msgbuf {
long mtype; /* 消息的类型 /
char mtext[1]; / 消息正文 */
};
从这个缓冲区的定义可以看到,消息通信的最大特点在于,发送进程可以在域mtype中定义消息的类型,这样就为接收进程提供了一个方便,即接收进程可以根据mtype来判断队列中的一个消息是否为它所等待的消息,从而使接收进程可以有选择地进行接收。
域mtext[]为存放消息正文的数组,发送进程可以根据消息的大小定义该数组的长度
内核空间的消息结构
为便于内核对消息的维护和管理,以及要将大型消息分页存放,所以内存中用于消息通信的数据结构要比进程消息缓冲区的结构稍微复杂一些。内核空间消息结构分为首页结构和一般页结构。
特点:
生命周期随内核,消息队列会一直存在,需要我们显示的调用接口删除或使用命令删除
消息队列可以双向通信
克服了管道只能承载无格式字节流的缺点
消息队列函数
1.msgget
功能:创建和访问一个消息队列
原型:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgflag);
参数:
key:某个消息队列的名字,用ftok()产生
msgflag:有两个选项IPC_CREAT和IPC_EXCL,单独使用IPC_CREAT,如果消息队列不存在则创建之,如果存在则打开返回;单独使用IPC_EXCL是没有意义的;两个同时使用,如果消息队列不存在则创建之,如果存在则出错返回。
返回值:成功返回一个非负整数,即消息队列的标识码,失败返回-1
#include <sys/types.h>
#include <sys/ipc.h>
key_t ftok(const char *pathname, int proj_id);
调用成功返回一个key值,用于创建消息队列,如果失败,返回-1
2.msgctl
功能:消息队列的控制函数
原型:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
参数:
msqid:由msgget函数返回的消息队列标识码
cmd:有三个可选的值,在此我们使用IPC_RMID
IPC_STAT 把msqid_ds结构中的数据设置为消息队列的当前关联值
IPC_SET 在进程有足够权限的前提下,把消息队列的当前关联值设置为msqid_ds数据结构中给出的值
IPC_RMID 删除消息队列
返回值:
成功返回0,失败返回-1
3.msgsnd
功能:把一条消息添加到消息队列中
原型:
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
参数:
msgid:由msgget函数返回的消息队列标识码
msgp:指针指向准备发送的消息
msgze:msgp指向的消息的长度(不包括消息类型的long int长整型)
msgflg:默认为0
返回值:成功返回0,失败返回-1
消息结构一方面必须小于系统规定的上限,另一方面必须以一个long int长整型开始,接受者以此来确定消息的类型
struct msgbuf
{
long mtye;
char mtext[1];
};
4.msgrcv
功能:是从一个消息队列接受消息
原型:
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
参数:与msgsnd相同
返回值:成功返回实际放到接收缓冲区里去的字符个数,失败返回-1
此外,我们还需要学习两个重要的命令
前面我们说过,消息队列需要手动删除IPC资源
ipcs:显示IPC资源
ipcrm:手动删除IPC资源
2、消息队列的基本操作
2.1 打开一个消息队列
#include
typedef int mqd_t;
mqd_t mq_open(const char name, int oflag, … / mode_t mode, struct mq_attr *attr */);
返回: 成功时为消息队列描述字,出错时为-1。
功能: 创建一个新的消息队列或打开一个已存在的消息的队列。
2.2 关闭一个消息队列
#include
int mq_close(mqd_t mqdes);
返回: 成功时为0,出错时为-1。
功能: 关闭已打开的消息队列。
2.3 删除一个消息队列
#include
int mq_unlink(const char *name)
返回: 成功时为0,出错时为-1
功能: 从系统中删除消息队列。
这三个函数操作的代码如下:
#include
#include
#include <sys/types.h>
#include <sys/stat.h>
#include
#include
int main(int argc, char* argv[])
{
int flag = O_RDWR | O_CREAT | O_EXCL;
int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
mqd_t mqid = mq_open(“/mq_test”, flag, mode,NULL);
if (mqid == -1)
{
printf(“mqueue create failed!\n”);
return 1;
}
else
{
printf(“mqueue create success!\n”);
}
mq_close(mqid); return 0;
}
#include
#include
int main(int argc, char* argv[])
{
mq_unlink(“/mq_test”);
return 0;
}
关于mqueue更多详细内容可以使用:man mq_overview命令查看,里面有一条需要注意的是,Linux下的Posix消息队列是在vfs中创建的,可以用
mount -t mqueue none /dev/mqueue
将消息队列挂在在/dev/mqueue目录下,便于查看。
2.4 mq_close()和mq_unlink()
mq_close()的功能是关闭消息队列的文件描述符,但消息队列并不从系统中删除,要删除一个消息队列,必须调用mq_unlink();这与文件系统的unlink()机制是一样的。
3、消息队列的属性
#include
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, const struct mq_attr *attr, struct mq_attr *attr);
均返回:成功时为0, 出错时为-1
每个消息队列有四个属性:
struct mq_attr
{
long mq_flags; /* message queue flag : 0, O_NONBLOCK /
long mq_maxmsg; / max number of messages allowed on queue/
long mq_msgsize; / max size of a message (in bytes)/
long mq_curmsgs; / number of messages currently on queue */
};
4、消息收发
#include
int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio);
返回:成功时为0,出错为-1
ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *priop);
返回:成功时为消息中的字节数,出错为-1
mqsend代码如下:
#include
#include
#include
#include
#include <sys/stat.h>
#include <sys/types.h>
int main(int argc, char* argv[])
{
int flag = O_RDWR;
int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
mqd_t mqid = mq_open(“/mq_test”,flag,mode,NULL);
if (mqid == -1)
{
printf(“open mqueue failed!\n”);
return 1;
}
char *buf = "hello, i am sender!";
mq_send(mqid,buf,strlen(buf),20);
mq_close(mqid);
return 0; }
mqrecv代码如下:
#include
#include
#include
#include
#include <sys/stat.h>
#include <sys/types.h>
int main(int argc, char* argv[])
{
int flag = O_RDWR;
int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
mqd_t mqid = mq_open(“/mq_test”,flag,mode,NULL);
if (mqid == -1)
{
printf(“open mqueue failed!\n”);
return 1;
}
struct mq_attr attr;
mq_getattr(mqid,&attr);
char buf[256] = {0};
int priority = 0;
mq_receive(mqid,buf,attr.mq_msgsize,&priority);
printf("%s\n",buf);
mq_close(mqid);
return 0; } 首先我们运行三次send,然后运行四次recv,可见recv的前三次是可以收到消息队列里的三个消息的,当运行第四次的时,系统消息队列里为空,recv就会阻塞;关于非阻塞式mqueue见下文。
5、mq_notify函数
如前文介绍,poxis消息队列运行异步通知,以告知何时有一个消息放置到某个空消息队列中,这种通知有两种方式可以选择:
(1)产生一个信号
(2)创建一个线程来执行一个指定的函数
这种通知通过mq_notify() 函数建立。该函数为指定的消息消息队列建立或删除异步事件通知,
#include
int mq_notify(mqd_t mqdes, const struct sigevent* notification);
(1)如果notification参数为非空,那么当前进程希望在有一个消息到达所指定的先前为空的对列时得到通知。
(2)如果notification参数为空,而且当前进程被注册为接收指定队列的通知,那么已存在的注册将被撤销。
(3)任意时刻只有一个进程可以被注册为接收某个给定队列的通知。
(4)当有一个消息到达先前为空的消息队列,而且已有一个进程被注册为接收该队列的通知时,只有在没有任何线程阻塞在该队列的mq_receive调用中的前提下,通知才会发出。即说明,在mq_receive调用中的阻塞比任何通知的注册都优先。
(5)当前通知被发送给它的注册进程时,其注册即被撤销。该进程必须再次调用mq_notify以重新注册。
sigevent结构如下:
union sigval{
int sival_int; /integer value/
void sival_ptr; /pointer value*/
};
struct sigevent{
int sigev_notify; /SIGEV_{NONE, SIGNAL, THREAD}/
int sigev_signo; /signal number if SIGEV_SIGNAL/
union sigval sigev_value;
void (*sigev_notify_function)(union sigval);
pthread_attr_t *sigev_notify_attributes;
};
5.1 mq_notify() 使用信号处理程序
一个正确的使用非阻塞mq_receive的信号通知的例子:
#include
#include
#include
#include <sys/stat.h>
#include <sys/types.h>
#include
#include
#include
void sig_usr1(int );
volatile sig_atomic_t mqflag;
int main(int argc, char* argv[])
{
mqd_t mqid = 0;
void *buff;
struct mq_attr attr;
struct sigevent sigev;
sigset_t zeromask,newmask,oldmask;
int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
mqid = mq_open("/mq_test",O_RDONLY | O_NONBLOCK,mode,NULL); // 非阻塞式打开mqueue
mq_getattr(mqid,&attr);
buff = malloc(attr.mq_msgsize);
sigemptyset(&zeromask);
sigemptyset(&newmask);
sigemptyset(&oldmask);
sigaddset(&newmask,SIGUSR1); // 初始化信号集
signal(SIGUSR1,sig_usr1); // 信号处理程序
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
int n = mq_notify(mqid,&sigev); // 启用通知
for (;;)
{
sigprocmask(SIG_BLOCK,&newmask,&oldmask);
while(mqflag == 0)
sigsuspend(&zeromask);
mqflag = 0;
ssize_t n;
mq_notify(mqid, &sigev); // 重新注册
while( (n = mq_receive(mqid,buff,attr.mq_msgsize,NULL)) >=0)
printf("SIGUSR1 received, read %ld bytes.\n",(long)n); //读取消息
if(errno != EAGAIN)
printf("mq_receive error\n");
sigprocmask(SIG_UNBLOCK,&newmask,NULL);
}
return 0; }
void sig_usr1(int signo)
{
mqflag = 1;
}
��里为什么使用的是非阻塞式mq_receive,为什么不在信号处理程序中打印接收到的字符请参阅《unp 第二卷》
5.2 mq_notify() 使用线程处理程序
异步事件通知的另一种方式是把sigev_notify设置成SIGEV_THREAD,这会创建一个新线程,该线程调用由sigev_notify_function指定的函数,所用的参数由sigev_value指定,新线程的属性由sigev_notify_attributes指定,要指定线程的默认属性的话,传空指针。新线程是作为脱离线程创建的。
#include
#include
#include
#include <sys/stat.h>
#include <sys/types.h>
#include
#include
#include
#include
mqd_t mqid = 0;
struct mq_attr attr;
struct sigevent sigev;
static void notify_thread(union sigval);
int main(int argc, char* argv[])
{
int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
mqid = mq_open("/mq_test",O_RDONLY | O_NONBLOCK,mode,NULL);
mq_getattr(mqid,&attr);
sigev.sigev_notify = SIGEV_THREAD;
sigev.sigev_notify_function = notify_thread;
sigev.sigev_value.sival_ptr = NULL;
sigev.sigev_notify_attributes = NULL;
int n = mq_notify(mqid,&sigev);
for (;;)
pause();
return 0; }
static void notify_thread(union sigval arg)
{
ssize_t n;
char* buff;
printf(“notify_thread_started!\n”);
buff = malloc(attr.mq_msgsize);
mq_notify(mqid, &sigev);
while( (n = mq_receive(mqid,buff,attr.mq_msgsize,NULL)) >=0)
printf(“SIGUSR1 received, read %ld bytes.\n”,(long)n);
if(errno != EAGAIN)
printf(“mq_receive error\n”);
free(buff);
pthread_exit(0);
}