Translate

Chủ Nhật, 4 tháng 12, 2016

Condition variable

Created by Thomas.

Condition variables: waiting and signaling

A mutex is for locking and a condition variable if for waiting.

A condition variable is always associated with a mutex. In general, some thread are waiting on a condition variable for changing status of shared resource,which is protected by a mutex.
When a thread gains the mutex and change status of shared resource, it will release the mutex and 'signal' to waiting threads via condition variable.

Note that waiting threads is on sleeping status until they are waked up.

The first purpose of using condition variable is when some threads needed to wait on a satisfied condition to do some actions. The second purpose is to utilize CPU effectively. In some situations, a thread needs periodically to check the changing status of a shared resource to do some job. In that case, CPU is needed to allocate time to make checking periodically. With using condition variable, the thread can go to sleep if the status of the shared resource has not changed. The thread is waked up when another thread changes status of the shared resource and notify/signal to the sleeping thread to handle the job.

The following functions are used. 
?
1
2
int pthread_cond_wait(pthread_cond_t *cptr, pthread_mutex_t *mptr);
int pthread_cond_signal(pthread_cond_t *cptr);

We explain how condition variable works by below example on producer-consumer problem.

This code doesn't use condition variable. 
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
 
static int avail = 0;
 
static void *
threadFunc(void *arg)
{
    int cnt = atoi((char *) arg);
    int j;
     
 //pthread_id_np_t   tid;
    //tid = pthread_getthreadid_np();
 
    for (j = 0; j < cnt; j++) {
        sleep(1);
 
        /* Code to produce a unit omitted */
 
        pthread_mutex_lock(&mtx);  
        avail++;        /* Let consumer know another unit is available */
        printf("thread:%d, #items=%d\n", (int) pthread_self(), avail);
        pthread_mutex_unlock(&mtx);
               
    }
 
    return NULL;
}
 
int
main(int argc, char *argv[])
{
    pthread_t tid;
    int s, j;
    int totRequired;            /* Total number of units that all threads
                                   will produce */
    int numConsumed;            /* Total units so far consumed */
    int done;
    time_t t;
    t = time(NULL);
    /* Create all threads */
    totRequired = 0;
    for (j = 1; j < argc; j++) {
        totRequired += atoi(argv[j]);
 
        pthread_create(&tid, NULL, threadFunc, argv[j]);
       
    }
    /* Loop to consume available units */
    numConsumed = 0;
    done = 0;
    for (;;) {
        pthread_mutex_lock(&mtx);              
        /* At this point, 'mtx' is locked... */
 
        while (avail > 0) {             /* Consume all available units */
 
            /* Do something with produced unit */
 
            numConsumed ++;
            avail--;
            printf("T=%ld: numConsumed=%d\n", (long) (time(NULL) - t),
                    numConsumed);
 
            done = numConsumed >= totRequired;
        }
        pthread_mutex_unlock(&mtx);     
        if (done)
            break;
        /* Perhaps do other work here that does not require mutex lock */
    }
    exit(0);
}
The output is below.

















Below code using condition variable. 


?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
 
static int avail = 0;
 
static void *
threadFunc(void *arg)
{
    int cnt = atoi((char *) arg);
    int j;
 
    for (j = 0; j < cnt; j++) {
        sleep(1);
        /* Code to produce a unit omitted */
        pthread_mutex_lock(&mtx);       
 
        avail++;        /* Let consumer know another unit is available */
  printf("thread:%d, #items=%d\n", (int) pthread_self(), avail);
        pthread_mutex_unlock(&mtx);
        
        pthread_cond_signal(&cond);         /* Wake sleeping consumer */     
    }
    return NULL;
}
 
int
main(int argc, char *argv[])
{
    pthread_t tid;
    int j;
    int totRequired;            /* Total number of units that all threads
                                   will produce */
    int numConsumed;            /* Total units so far consumed */
    int done;
    time_t t;
 
    t = time(NULL);
 
    /* Create all threads */
 
    totRequired = 0;
    for (j = 1; j < argc; j++) {
        totRequired += atoi(argv[j]);
 
        pthread_create(&tid, NULL, threadFunc, argv[j]);      
    }
    /* Loop to consume available units */
 
    numConsumed = 0;
    done = 0;
    for (;;) {
        pthread_mutex_lock(&mtx);
        
        while (avail == 0) {            /* Wait for something to consume */
  // steps on pthread_cond_wait()
  // 1. after get lock mutex above, check condition variable
  // 2. if condition variable is not signaled yet, auto release mutex and go to sleep status
  // 3. When time that condition variable is signaled, wake up current thread (consumer)
  // and auto lock mutex (it tried to get mutex but not sure it absolutely get the mutex?)
  // and do next steps on shared resource
  // after finish action on shared resource, release mutex.
        printf("consumer: I am waiting\n"); 
  pthread_cond_wait(&cond, &mtx);        
        }
  printf("consumer: I got signal\n");
        /* At this point, 'mtx' is locked... */
        while (avail > 0) {             /* Consume all available units */
            /* Do something with produced unit */
            numConsumed ++;
            avail--;
            printf("T=%ld: numConsumed=%d\n", (long) (time(NULL) - t),
                    numConsumed);
            done = numConsumed >= totRequired;
        }
        pthread_mutex_unlock(&mtx);      
        if (done)
            break;
        /* Perhaps do other work here that does not require mutex lock */
    }
    exit(0);
}

Output is below.

























We have another example.

Main thread starts to increase 'count' when the count is larger than a threshold. So the condition that main thread is waiting is that the counter is larger than a threshold. Before that, main thread must waits the status change of 'count' (so it goes to sleep).  Another thread increases count from beginning. 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
 
static int count = 0;
int start = 20;
static void *
threadFunc(void *arg)
{
 
    int s, j;
 
    for (;;) {
        sleep(1);
       
        s = pthread_mutex_lock(&mtx);       
 
        count++;        /* Let consumer know another unit is countable */
  printf("Sub-thread:%d, count=%d\n", (int) pthread_self(), count);
        s = pthread_mutex_unlock(&mtx);
        if (count > start)
        s = pthread_cond_signal(&cond);         /* Wake sleeping Main */     
    }
    return NULL;
}
 
int
main(int argc, char *argv[])
{
    pthread_t tid;
    int s, j;
    int totRequired = 100;          
                                        
    int done;
   
 s = pthread_create(&tid, NULL, threadFunc, NULL);
    done = 0;
 
 pthread_mutex_lock(&mtx);
    
 while (count <= start) {           
 // steps on pthread_cond_wait()
 // 1. after get lock mutex above, check condition variable
 // 2. if condition variable is not signaled yet, auto release mutex and go to sleep status
 // 3. When time that condition variable is signaled, wake up current thread (consumer) and auto lock mutex and do next steps on shared resource
 // after finish action on shared resource, release mutex.
 printf("Main thread: waiting here until count=%d\n", start); 
 s = pthread_cond_wait(&cond, &mtx);        
 }
 /* At this point, 'mtx' is locked... */
 printf("Main thread: start running, count=%d\n", count);
 pthread_mutex_unlock(&mtx);
 while (count > 0) {             /* Consume all countable units */
  /* Do something with produced unit */
  pthread_mutex_lock(&mtx);
  count++;
  printf("Main thread: count=%d\n",count);       
  done = count >= totRequired;
  s = pthread_mutex_unlock(&mtx);
  sleep(2);
  if (done)
  break;
 }
                     
    exit(0);
}
Output is below.
























Resources
[1] The Linux programming interface, Michael Kerrisk
[2] Unix network programming Vol 2, W. Richard Stevens

Không có nhận xét nào:

Đăng nhận xét