Translate

Chủ Nhật, 4 tháng 12, 2016

Overview of POSIX message queue

Created by Thomas.



What is message queue

Message queue allows processes to exchange data in the form of messages. The data is exchanged in unites of whole messages.

Model of client and server using message queue.





Client and server communicate using message queue






Characteristics
1. Message-oriented communication
- Receiver reads messages one at a time
- No partial or multiple message reads

2. Messages have priorities
- Delivered in priority order

3. Message notification feature


MQ API
1. Queue management
mq_open(): open/create MQ, set attributes
mq_close(): close MQ
mq_unlink(): remove MQ pathname
2. I/O
mq_send(): send message
mq_receive(): receive message
3. other
mq_setattr(), mq_getattr(): set/get MQ attributes
mq_notify(): request notification of msg arrival

Opening a POSIX MQ
?
1
2
3
4
5
// create new MQ, exclusive,
// for writing
mqd = mq_open("/mymq", O_CREATE|O_EXCL | O_WRONLY, 0600, NULL);
// Open existing queue for reading
mqd = mq_open("/mymq", O_RDONLY);

mqd = mq_open(name, flags, mode, &attr);
- Open+create new MQ / open existing MQ
- name has form /somename
- Returns mqd_t, a message queue descriptor
- flags (analogous to open()):
O_CREAT – create MQ if it doesn’t exist
O_EXCL – create MQ exclusively
O_RDONLY, O_WRONLY, O_RDWR – just like file open
O_NONBLOCK – non-blocking I/O
- mode sets permissions
- &attr: attributes for new MQ
+ NULL gives defaults

Nonblocking I/O on POSIX MQs
Message ques have a limited capacity, which is controlled by attributes
By default:
- mq_receive() blocks if no messages in queue
- mq_send() blocks if queue is full
- O_NONBLOCK:
+ EAGAIN error instead of blocking

Sending a message
?
1
2
3
4
5
6
mqd_t mqd;
mqd = mq_open("/mymq",
 O_CREAT | O_WRONLY,
 0600, NULL);
char *msg = "hello world";
mq_send(mqd, msg, strlen(msg), 0);
mq_send(mqd, msg_ptr, msg_len, msgprio);
mqd – MQ descriptor
msg_ptr – pointer to bytes forming message
msg_len – size of message
msgprio – priority – non-negative integer – 0 is lowest priority


Receiving a message
?
1
2
3
4
5
6
const int BUF_SIZE = 1000;
char buf[BUF_SIZE];
unsigned int prio;
...
mqd = mq_open("/mymq", O_RDONLY);
nbytes = mq_receive(mqd, buf, BUF_LEN, &prio);
nb = mq_receive(mqd, msg_ptr, msg_len, &prio);
mqd – MQ descriptor
msg_ptr – points to buffer that receives message
msg_len – size of buffer
&prio – receives priority
nb – returns size of message (bytes)

POSIX MQ attributes
?
1
2
3
4
5
6
7
8
9
10
11
struct mq_attr {
 long mq_flags; // MQ description flags
 // 0 or O_NONBLOCK
// [mq_getattr(), mq_setattr()]
 long mq_maxmsg; // Max. # of msgs on queue
 // [mq_open(), mq_getattr()]
 long mq_msgsize; // Max. msg size (bytes)
 // [mq_open(), mq_getattr()]
 long mq_curmsgs; // # of msgs currently in queue
 // [mq_getattr()]
};
?
1
 

Download C code example
This example start 2 threads and synchronize their operation using a message queue.

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#define MY_MQ_NAME "/my_mq"
 
pthread_t thread1;
pthread_t thread2;
 
static struct mq_attr my_mq_attr;
static mqd_t my_mq;
 
typedef struct msgbuf {
 unsigned int mtype;
 char mtext[128];
} message_buf;
 
static unsigned int counter;
message_buf sbuf;
 
void thread1_main(void);
void thread2_main(void);
 
void sig_handler(int signum) {
    if (signum != SIGINT) {
        printf("Received invalid signum = %d in sig_handler()\n", signum);       
    }
 
    printf("Received SIGINT. Exiting Application\n");
 
    pthread_cancel(thread1);
    pthread_cancel(thread2);
 
    mq_close(my_mq);
    mq_unlink(MY_MQ_NAME);
 
    exit(0);
}
 
int main(void) {
    pthread_attr_t attr;
    int status;
  
  
    signal(SIGINT, sig_handler);
 
    //counter = 0;
 
    my_mq_attr.mq_maxmsg = 10;
    my_mq_attr.mq_msgsize = sizeof(message_buf);
 
    my_mq = mq_open(MY_MQ_NAME, \
                    O_CREAT | O_RDWR | O_NONBLOCK, \
                    0666, \
                    &my_mq_attr);
 
 
    pthread_attr_init(&attr);
    pthread_attr_setstacksize(&attr, 1024*1024);
    
    printf("Creating thread1\n");
    status = pthread_create(&thread1, &attr, (void*)&thread1_main, NULL);
    if (status != 0) {
        printf("Failed to create thread1 with status = %d\n", status);      
    }   
 
    printf("Creating thread2\n");
    status = pthread_create(&thread2, &attr, (void*)&thread2_main, NULL);
    if (status != 0) {
        printf("Failed to create thread2 with status = %d\n", status);
    }   
 
    pthread_join(thread1, NULL);
    pthread_join(thread2, NULL);
 
    sig_handler(SIGINT);
     
    return 0;
}
 
void thread1_main(void) {
    unsigned int exec_period_usecs;
    int status;
 
    exec_period_usecs = 1000000; /*in micro-seconds*/
 
    printf("Thread 1 started. Execution period = %d uSecs\n",\
                                           exec_period_usecs);
    while(1) {
  sbuf.mtype = counter;
  strcpy(sbuf.mtext,"Hello world");
        status = mq_send(my_mq,&sbuf, sizeof(sbuf), 1);
     
        usleep(exec_period_usecs);
    }
}
 
 
void thread2_main(void) {
    unsigned int exec_period_usecs;
    int status;
    int recv_counter;
 
    exec_period_usecs = 10000; /*in micro-seconds*/
 
    printf("Thread 2 started. Execution period = %d uSecs\n",\
                                           exec_period_usecs);
 
    while(1) {
        status = mq_receive(my_mq, (char*)&sbuf, \
                            sizeof(sbuf), NULL);
 
        if (status > 0) {
            printf("RECVd MSG in THRD_2: %d-%s\n", sbuf.mtype,sbuf.mtext);
            counter += 1;
        }
  
        usleep(exec_period_usecs);
    }
}


Resources

http://man7.org/linux/man-pages/man7/mq_overview.7.html
http://www.linuxfocus.org/English/March2003/article287.shtml
https://www.cs.cf.ac.uk/Dave/C/node25.html
https://www.softprayog.in/programming/interprocess-communication-using-posix-message-queues-in-linux