Created by Thomas.
What is message queue
Message queue allows processes to exchange data in the form of messages. The data is exchanged in unites of whole messages.
Model of client and server using message queue.
Client and server communicate using message queue
Characteristics
1. Message-oriented communication
- Receiver reads messages one at a time
- No partial or multiple message reads
2. Messages have priorities
- Delivered in priority order
3. Message notification feature
MQ API
1. Queue management
- Receiver reads messages one at a time
- No partial or multiple message reads
2. Messages have priorities
- Delivered in priority order
3. Message notification feature
MQ API
1. Queue management
mq_open(): open/create MQ, set attributes
mq_close(): close MQ
mq_unlink(): remove MQ pathname
2. I/O
mq_send(): send message
mq_receive(): receive message
3. other
mq_setattr(), mq_getattr(): set/get MQ attributes
mq_notify(): request notification of msg arrival
Opening a POSIX MQ
mqd = mq_open(name, flags, mode, &attr);
mq_close(): close MQ
mq_unlink(): remove MQ pathname
2. I/O
mq_send(): send message
mq_receive(): receive message
3. other
mq_setattr(), mq_getattr(): set/get MQ attributes
mq_notify(): request notification of msg arrival
Opening a POSIX MQ
1
2
3
4
5
| // create new MQ, exclusive, // for writing mqd = mq_open( "/mymq" , O_CREATE|O_EXCL | O_WRONLY, 0600, NULL); // Open existing queue for reading mqd = mq_open( "/mymq" , O_RDONLY); |
mqd = mq_open(name, flags, mode, &attr);
- Open+create new MQ / open existing MQ
- name has form /somename
- Returns mqd_t, a message queue descriptor
- flags (analogous to open()):
O_CREAT – create MQ if it doesn’t exist
O_EXCL – create MQ exclusively
O_RDONLY, O_WRONLY, O_RDWR – just like file open
O_NONBLOCK – non-blocking I/O
- mode sets permissions
- &attr: attributes for new MQ
+ NULL gives defaults
Message ques have a limited capacity, which is controlled by attributes
By default:
- mq_receive() blocks if no messages in queue
- mq_send() blocks if queue is full
- O_NONBLOCK:
+ EAGAIN error instead of blocking
Sending a message
Receiving a message
nb = mq_receive(mqd, msg_ptr, msg_len, &prio);
- mq_receive() blocks if no messages in queue
- mq_send() blocks if queue is full
- O_NONBLOCK:
+ EAGAIN error instead of blocking
Sending a message
1
2
3
4
5
6
| mqd_t mqd; mqd = mq_open( "/mymq" , O_CREAT | O_WRONLY, 0600, NULL); char *msg = "hello world" ; mq_send(mqd, msg, strlen (msg), 0); |
mq_send(mqd, msg_ptr, msg_len, msgprio);
mqd – MQ descriptor
msg_ptr – pointer to bytes forming message
msg_len – size of message
msgprio – priority – non-negative integer – 0 is lowest priority
Receiving a message
1
2
3
4
5
6
| const int BUF_SIZE = 1000; char buf[BUF_SIZE]; unsigned int prio; ... mqd = mq_open( "/mymq" , O_RDONLY); nbytes = mq_receive(mqd, buf, BUF_LEN, &prio); |
mqd – MQ descriptor
msg_ptr – points to buffer that receives message
msg_len – size of buffer
&prio – receives priority
nb – returns size of message (bytes)
POSIX MQ attributes
msg_ptr – points to buffer that receives message
msg_len – size of buffer
&prio – receives priority
nb – returns size of message (bytes)
POSIX MQ attributes
1
2
3
4
5
6
7
8
9
10
11
| struct mq_attr { long mq_flags; // MQ description flags // 0 or O_NONBLOCK // [mq_getattr(), mq_setattr()] long mq_maxmsg; // Max. # of msgs on queue // [mq_open(), mq_getattr()] long mq_msgsize; // Max. msg size (bytes) // [mq_open(), mq_getattr()] long mq_curmsgs; // # of msgs currently in queue // [mq_getattr()] }; |
1
|
This example start 2 threads and synchronize their operation using a message queue.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
| #define MY_MQ_NAME "/my_mq" pthread_t thread1; pthread_t thread2; static struct mq_attr my_mq_attr; static mqd_t my_mq; typedef struct msgbuf { unsigned int mtype; char mtext[128]; } message_buf; static unsigned int counter; message_buf sbuf; void thread1_main( void ); void thread2_main( void ); void sig_handler( int signum) { if (signum != SIGINT) { printf ( "Received invalid signum = %d in sig_handler()\n" , signum); } printf ( "Received SIGINT. Exiting Application\n" ); pthread_cancel(thread1); pthread_cancel(thread2); mq_close(my_mq); mq_unlink(MY_MQ_NAME); exit (0); } int main( void ) { pthread_attr_t attr; int status; signal (SIGINT, sig_handler); //counter = 0; my_mq_attr.mq_maxmsg = 10; my_mq_attr.mq_msgsize = sizeof (message_buf); my_mq = mq_open(MY_MQ_NAME, \ O_CREAT | O_RDWR | O_NONBLOCK, \ 0666, \ &my_mq_attr); pthread_attr_init(&attr); pthread_attr_setstacksize(&attr, 1024*1024); printf ( "Creating thread1\n" ); status = pthread_create(&thread1, &attr, ( void *)&thread1_main, NULL); if (status != 0) { printf ( "Failed to create thread1 with status = %d\n" , status); } printf ( "Creating thread2\n" ); status = pthread_create(&thread2, &attr, ( void *)&thread2_main, NULL); if (status != 0) { printf ( "Failed to create thread2 with status = %d\n" , status); } pthread_join(thread1, NULL); pthread_join(thread2, NULL); sig_handler(SIGINT); return 0; } void thread1_main( void ) { unsigned int exec_period_usecs; int status; exec_period_usecs = 1000000; /*in micro-seconds*/ printf ( "Thread 1 started. Execution period = %d uSecs\n" ,\ exec_period_usecs); while (1) { sbuf.mtype = counter; strcpy (sbuf.mtext, "Hello world" ); status = mq_send(my_mq,&sbuf, sizeof (sbuf), 1); usleep(exec_period_usecs); } } void thread2_main( void ) { unsigned int exec_period_usecs; int status; int recv_counter; exec_period_usecs = 10000; /*in micro-seconds*/ printf ( "Thread 2 started. Execution period = %d uSecs\n" ,\ exec_period_usecs); while (1) { status = mq_receive(my_mq, ( char *)&sbuf, \ sizeof (sbuf), NULL); if (status > 0) { printf ( "RECVd MSG in THRD_2: %d-%s\n" , sbuf.mtype,sbuf.mtext); counter += 1; } usleep(exec_period_usecs); } } |
Resources
http://man7.org/linux/man-pages/man7/mq_overview.7.html
http://www.linuxfocus.org/English/March2003/article287.shtml
https://www.cs.cf.ac.uk/Dave/C/node25.html
https://www.softprayog.in/programming/interprocess-communication-using-posix-message-queues-in-linux