Condition Variables for Synchronization

  1. A condition variable allows a thread to block itself untilspecified data reaches a predefined state.
  2. A condition variable is associated with this predicate. Whenthe predicate becomes true, the condition variable is used tosignal one or more threads waiting on the condition.
  3. A single condition variable may be associated with more thanone predicate.
  4. A condition variable always has a mutex associated with it. Athread locks this mutex and tests the predicate defined on theshared variable.
  5. If the predicate is not true, the thread waits on the conditionvariable associated with the predicate using the functionpthread_cond_wait.

Keep two overheads in mind: locking overheads and idling overheads due to blocked threads.

Designing Barriers Using Condition Variables

barrier variable

barrier.h

struct barrier_variable {

pthread_mutex_t lock;

pthread_cond_t cond;

int count;

};

struct tree_barrier_variable {

pthread_mutex_t *lock;

pthread_cond_t *cond;

int *count;

};

var1.c

#include <pthread.h>

#include <stdio.h>

#include "barrier.h"

#define NUM_ELEMENTS 16

#define NUM_THREADS 4

struct barrier_variable barrier1;

float variance_sum;

int global_sum;

int array[NUM_ELEMENTS];

pthread_mutex_t global_sum_lock;

void *sum_and_variance(void *s);

struct array_descriptor

{

int *array;

int length;

};

main()

{

pthread_t sum_threads[NUM_THREADS];

int i, j;

float average, variance;

struct array_descriptor desc[NUM_THREADS];

for (i=0; i<NUM_ELEMENTS; i++)

array[i] = 2;

global_sum = 0;

variance_sum = 0.0;

init_barrier(&barrier1);

pthread_mutex_init(&global_sum_lock, NULL);

for (i=0; i<NUM_THREADS; i++) {

desc[i].array = &(array[i*NUM_ELEMENTS/NUM_THREADS]);

desc[i].length = NUM_ELEMENTS/NUM_THREADS;

pthread_create(&sum_threads[i], NULL, sum_and_variance, (void *)&desc[i]);

printf("%d %d\n",i,global_sum);

}

for (i=0; i<NUM_THREADS; i++)

pthread_join(sum_threads[i], NULL);

average = ((float) global_sum) / ((float) NUM_ELEMENTS);

variance = variance_sum / NUM_ELEMENTS;

printf("mean %f\nvariance %f\n",average,variance);

}

void *sum_and_variance(void *s)

{

int i, *array, partial_sum;

float average, partial_variance_sum;

partial_sum = 0;

partial_variance_sum = 0.0;

array = ((struct array_descriptor *) s) -> array;

for (i=0; i<((struct array_descriptor *) s) -> length; i++)

partial_sum += array[i];

printf("here global_sum %d\n",global_sum);

pthread_mutex_lock(&global_sum_lock);

global_sum += partial_sum;

pthread_mutex_unlock(&global_sum_lock);

printf("there global_sum %d\n",global_sum);

barrier(&barrier1, NUM_THREADS);

printf("and global_sum %d\n",global_sum);

average = ((float) global_sum) / ((float) NUM_ELEMENTS);

for (i=0; i<((struct array_descriptor *) s) -> length; i++)

partial_variance_sum += (average - array[i]) * (average - array[i]);

variance_sum += partial_variance_sum;

pthread_exit(0);

}

init_barrier(struct barrier_variable *barrier)

{

pthread_mutex_init(&(barrier->lock), NULL);

pthread_cond_init(&(barrier->cond), NULL);

barrier->count = 0;

}

barrier(struct barrier_variable *barrier, int thread_count)

{

pthread_mutex_lock(&(barrier->lock));

printf("barrier.count %d \n",barrier->count);

(barrier->count)++;

if ((barrier->count) == thread_count) {

barrier->count = 0;

pthread_cond_broadcast(&(barrier->cond));

}

else

while (pthread_cond_wait(&(barrier->cond), &(barrier->lock)) != 0);

pthread_mutex_unlock(&(barrier->lock));

}

treebarrier.c

#include <pthread.h>

#include <stdio.h>

#include <sys/types.h>

#include <sys/times.h>

#include "barrier.h"

#define NUM 1024

#define TLEVEL 10

struct thdesc{

int pnum;

};

struct barrier_variable barrierl;

struct tree_barrier_variable barriert;

struct thdesc desc[NUM];

void *t_barrier(void *s);

void *l_barrier(void *s);

main()

{

int i;

float time1,time2;

pthread_t pt[NUM];

pthread_attr_t attr;

pthread_attr_init (&attr);

pthread_attr_setscope (&attr,PTHREAD_SCOPE_SYSTEM);

init_tree_barrier(&barriert,NUM);

init_barrier(&barrierl);

second_(&time1);

for(i=0;i<NUM;i++){

desc[i].pnum=i;

pthread_create(&pt[i], &attr, t_barrier, (void *)&desc[i]);

}

for(i=0;i<NUM;i++)

pthread_join(pt[i],NULL);

second_(&time2);

printf("tree barrier time %f \n",time2-time1);

second_(&time1);

for(i=0;i<NUM;i++){

pthread_create(&pt[i], &attr, l_barrier, NULL);

}

for(i=0;i<NUM;i++)

pthread_join(pt[i],NULL);

second_(&time2);

printf("linear barrier time %f \n",time2-time1);

}

void *l_barrier(void *s)

{

int i;

/* for(i=0; i<50; i++)*/

barrier(&barrierl,NUM);

}

void *t_barrier(void *s)

{

int i, t_num;

t_num=((struct thdesc *)s)->pnum;

/* for(i=0; i<50; i++)*/

tree_barrier(&barriert,t_num,NUM,TLEVEL);

}

init_tree_barrier(struct tree_barrier_variable *t_barrier, int n)

{

int i;

t_barrier->lock = (pthread_mutex_t *) malloc((n-1)*sizeof(pthread_mutex_t));

t_barrier->cond = (pthread_cond_t *) malloc((n-1)*sizeof(pthread_cond_t));

t_barrier->count = (int *) malloc((n-1)*sizeof(int));

for (i=0; i<n-1; i++) {

pthread_mutex_init(&(t_barrier->lock[i]), NULL);

pthread_cond_init(&(t_barrier->cond[i]), NULL);

t_barrier->count[i] = 0;

}

}

tree_barrier(struct tree_barrier_variable *t_barrier, int t_num, int n, int k)

{

int i, j, spot, level, level_ind, powertwo;

powertwo = 1;

level = 0;

level_ind = t_num / 2;

spot = level + level_ind;

for (i=0; i<k; i++) {

powertwo *= 2;

pthread_mutex_lock(&(t_barrier->lock[spot]));

(t_barrier->count[spot])++;

if ((t_barrier->count[spot]) != 2) {

while (pthread_cond_wait(&(t_barrier->cond[spot]),

&(t_barrier->lock[spot])) != 0);

pthread_mutex_unlock(&(t_barrier->lock[spot]));

powertwo = powertwo / 2;

break;

}

level = (n + level) / 2;

level_ind = level_ind / 2;

spot = level + level_ind;

}

for (j=i; j>0; j--) {

level_ind = t_num / powertwo;

powertwo = powertwo / 2;

level = n * (powertwo - 1) / powertwo;

spot = level + level_ind;

t_barrier->count[spot] = 0;

pthread_mutex_unlock(&(t_barrier->lock[spot]));

pthread_cond_signal(&(t_barrier->cond[spot]));

}

}

second_( time )

float *time;

{

struct tms buffer;

times(&buffer);

*time = buffer.tms_utime/60.0;

return;

}

init_barrier(struct barrier_variable *barrier)

{

pthread_mutex_init(&(barrier->lock), NULL);

pthread_cond_init(&(barrier->cond), NULL);

barrier->count = 0;

}

barrier(struct barrier_variable *barrier, int thread_count)

{

pthread_mutex_lock(&(barrier->lock));

(barrier->count)++;

if ((barrier->count) == thread_count) {

barrier->count = 0;

pthread_cond_broadcast(&(barrier->cond));

}

else

while (pthread_cond_wait(&(barrier->cond), &(barrier->lock)) != 0);

pthread_mutex_unlock(&(barrier->lock));

}