Condition Variables for Synchronization
- A condition variable allows a thread to block itself untilspecified data reaches a predefined state.
- A condition variable is associated with this predicate. Whenthe predicate becomes true, the condition variable is used tosignal one or more threads waiting on the condition.
- A single condition variable may be associated with more thanone predicate.
- A condition variable always has a mutex associated with it. Athread locks this mutex and tests the predicate defined on theshared variable.
- If the predicate is not true, the thread waits on the conditionvariable associated with the predicate using the functionpthread_cond_wait.
Keep two overheads in mind: locking overheads and idling overheads due to blocked threads.
Designing Barriers Using Condition Variables
barrier variable
barrier.h
struct barrier_variable {
pthread_mutex_t lock;
pthread_cond_t cond;
int count;
};
struct tree_barrier_variable {
pthread_mutex_t *lock;
pthread_cond_t *cond;
int *count;
};
var1.c
#include <pthread.h>
#include <stdio.h>
#include "barrier.h"
#define NUM_ELEMENTS 16
#define NUM_THREADS 4
struct barrier_variable barrier1;
float variance_sum;
int global_sum;
int array[NUM_ELEMENTS];
pthread_mutex_t global_sum_lock;
void *sum_and_variance(void *s);
struct array_descriptor
{
int *array;
int length;
};
main()
{
pthread_t sum_threads[NUM_THREADS];
int i, j;
float average, variance;
struct array_descriptor desc[NUM_THREADS];
for (i=0; i<NUM_ELEMENTS; i++)
array[i] = 2;
global_sum = 0;
variance_sum = 0.0;
init_barrier(&barrier1);
pthread_mutex_init(&global_sum_lock, NULL);
for (i=0; i<NUM_THREADS; i++) {
desc[i].array = &(array[i*NUM_ELEMENTS/NUM_THREADS]);
desc[i].length = NUM_ELEMENTS/NUM_THREADS;
pthread_create(&sum_threads[i], NULL, sum_and_variance, (void *)&desc[i]);
printf("%d %d\n",i,global_sum);
}
for (i=0; i<NUM_THREADS; i++)
pthread_join(sum_threads[i], NULL);
average = ((float) global_sum) / ((float) NUM_ELEMENTS);
variance = variance_sum / NUM_ELEMENTS;
printf("mean %f\nvariance %f\n",average,variance);
}
void *sum_and_variance(void *s)
{
int i, *array, partial_sum;
float average, partial_variance_sum;
partial_sum = 0;
partial_variance_sum = 0.0;
array = ((struct array_descriptor *) s) -> array;
for (i=0; i<((struct array_descriptor *) s) -> length; i++)
partial_sum += array[i];
printf("here global_sum %d\n",global_sum);
pthread_mutex_lock(&global_sum_lock);
global_sum += partial_sum;
pthread_mutex_unlock(&global_sum_lock);
printf("there global_sum %d\n",global_sum);
barrier(&barrier1, NUM_THREADS);
printf("and global_sum %d\n",global_sum);
average = ((float) global_sum) / ((float) NUM_ELEMENTS);
for (i=0; i<((struct array_descriptor *) s) -> length; i++)
partial_variance_sum += (average - array[i]) * (average - array[i]);
variance_sum += partial_variance_sum;
pthread_exit(0);
}
init_barrier(struct barrier_variable *barrier)
{
pthread_mutex_init(&(barrier->lock), NULL);
pthread_cond_init(&(barrier->cond), NULL);
barrier->count = 0;
}
barrier(struct barrier_variable *barrier, int thread_count)
{
pthread_mutex_lock(&(barrier->lock));
printf("barrier.count %d \n",barrier->count);
(barrier->count)++;
if ((barrier->count) == thread_count) {
barrier->count = 0;
pthread_cond_broadcast(&(barrier->cond));
}
else
while (pthread_cond_wait(&(barrier->cond), &(barrier->lock)) != 0);
pthread_mutex_unlock(&(barrier->lock));
}
treebarrier.c
#include <pthread.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/times.h>
#include "barrier.h"
#define NUM 1024
#define TLEVEL 10
struct thdesc{
int pnum;
};
struct barrier_variable barrierl;
struct tree_barrier_variable barriert;
struct thdesc desc[NUM];
void *t_barrier(void *s);
void *l_barrier(void *s);
main()
{
int i;
float time1,time2;
pthread_t pt[NUM];
pthread_attr_t attr;
pthread_attr_init (&attr);
pthread_attr_setscope (&attr,PTHREAD_SCOPE_SYSTEM);
init_tree_barrier(&barriert,NUM);
init_barrier(&barrierl);
second_(&time1);
for(i=0;i<NUM;i++){
desc[i].pnum=i;
pthread_create(&pt[i], &attr, t_barrier, (void *)&desc[i]);
}
for(i=0;i<NUM;i++)
pthread_join(pt[i],NULL);
second_(&time2);
printf("tree barrier time %f \n",time2-time1);
second_(&time1);
for(i=0;i<NUM;i++){
pthread_create(&pt[i], &attr, l_barrier, NULL);
}
for(i=0;i<NUM;i++)
pthread_join(pt[i],NULL);
second_(&time2);
printf("linear barrier time %f \n",time2-time1);
}
void *l_barrier(void *s)
{
int i;
/* for(i=0; i<50; i++)*/
barrier(&barrierl,NUM);
}
void *t_barrier(void *s)
{
int i, t_num;
t_num=((struct thdesc *)s)->pnum;
/* for(i=0; i<50; i++)*/
tree_barrier(&barriert,t_num,NUM,TLEVEL);
}
init_tree_barrier(struct tree_barrier_variable *t_barrier, int n)
{
int i;
t_barrier->lock = (pthread_mutex_t *) malloc((n-1)*sizeof(pthread_mutex_t));
t_barrier->cond = (pthread_cond_t *) malloc((n-1)*sizeof(pthread_cond_t));
t_barrier->count = (int *) malloc((n-1)*sizeof(int));
for (i=0; i<n-1; i++) {
pthread_mutex_init(&(t_barrier->lock[i]), NULL);
pthread_cond_init(&(t_barrier->cond[i]), NULL);
t_barrier->count[i] = 0;
}
}
tree_barrier(struct tree_barrier_variable *t_barrier, int t_num, int n, int k)
{
int i, j, spot, level, level_ind, powertwo;
powertwo = 1;
level = 0;
level_ind = t_num / 2;
spot = level + level_ind;
for (i=0; i<k; i++) {
powertwo *= 2;
pthread_mutex_lock(&(t_barrier->lock[spot]));
(t_barrier->count[spot])++;
if ((t_barrier->count[spot]) != 2) {
while (pthread_cond_wait(&(t_barrier->cond[spot]),
&(t_barrier->lock[spot])) != 0);
pthread_mutex_unlock(&(t_barrier->lock[spot]));
powertwo = powertwo / 2;
break;
}
level = (n + level) / 2;
level_ind = level_ind / 2;
spot = level + level_ind;
}
for (j=i; j>0; j--) {
level_ind = t_num / powertwo;
powertwo = powertwo / 2;
level = n * (powertwo - 1) / powertwo;
spot = level + level_ind;
t_barrier->count[spot] = 0;
pthread_mutex_unlock(&(t_barrier->lock[spot]));
pthread_cond_signal(&(t_barrier->cond[spot]));
}
}
second_( time )
float *time;
{
struct tms buffer;
times(&buffer);
*time = buffer.tms_utime/60.0;
return;
}
init_barrier(struct barrier_variable *barrier)
{
pthread_mutex_init(&(barrier->lock), NULL);
pthread_cond_init(&(barrier->cond), NULL);
barrier->count = 0;
}
barrier(struct barrier_variable *barrier, int thread_count)
{
pthread_mutex_lock(&(barrier->lock));
(barrier->count)++;
if ((barrier->count) == thread_count) {
barrier->count = 0;
pthread_cond_broadcast(&(barrier->cond));
}
else
while (pthread_cond_wait(&(barrier->cond), &(barrier->lock)) != 0);
pthread_mutex_unlock(&(barrier->lock));
}
