/* tthreads.h */
#ifndef tthreads_included
#define tthreads_included
#include "setjmp.h"
typedef struct Thread_s {/* a Thread */
void (*initFn)(void *);
void * initArg;
jmp_buf context;
int tid;
struct Thread_s *next;/* for readyq */
int enqueued;/* used as bool, for checking */
} Thread;
/* Thread Queue */
typedef struct {
Thread *head, *tail;
int count;
} ThreadQ;
void initThreadQ(ThreadQ *tq);
extern void enqueueThread(ThreadQ *tq, Thread *t);
extern Thread *dequeueThread(ThreadQ *tq);
extern ThreadQ readyQ;/* ready queue (fcfs) */
/* initialize thread system; all threads put on "freshQ" */
extern void makeThreads(int n);
/* start new thread running (*initFn)(initArg) */
extern int initAndScheduleThread(void (*initFn)(void *), void *initArg) ;
/* switch context (from readyQ) & enqueue self */
/* start threads, returns when all threads have terminated */
extern void startThreads();
/* thread termination, or can just return */
extern void terminateThread();
extern int getTid();/* get thread id */
extern void yield();/* volunteer for preemption */
extern void contextSwitch(ThreadQ *resched);
#endif /* tthreads_included */
#include "stdio.h"
#include "assert2.h"
#include "tthreads.h"
Thread *threads, *currentThread = (Thread *)0;
static int numThreads = 0;
ThreadQ readyQ;
static ThreadQ freshQ;/* queue of fresh (never run) threads */
jmp_buf mainContext;/* context of main thread */
void initThreadQ(ThreadQ *tq) {
tq->head = tq->tail = (Thread*)0;
tq->count = 0;
}
void enqueueThread(ThreadQ *tq, Thread *t) {
tq->count++;
assert2(!t->enqueued, "Attempted to enqueue an already already enqueued thread!");
assert2(!t->next, "Attempted to enqueue thread with a next????");
if (tq->head) {/* queue not empty */
assert2(tq->count > 1, "corrupt queue count!");
tq->tail->next = t;
tq->tail = t;
} else {/* queue was empty */
assert2(tq->count == 1, "corrupt queue count!");
tq->head = tq->tail = t;/* insert singleton */
}
t->enqueued = 1;
}
Thread *dequeueThread(ThreadQ *tq) { /* return 0 if no thread enqueued */
Thread *t = (Thread *)0;/* thread to return */
if (tq->head) {/* if Q non empty */
t = tq->head;/* get thread */
tq->head = t->next;/* remove from queue */
}
if (t) {
assert2(tq->count > 0, "corrupt queue count!");
tq->count--;
assert2(t->enqueued, "Thread dequeued twice without reqeueing????");
t->enqueued = 0;
t->next = (Thread *)0;/* forget next (not on queue) */
} else {
assert2(tq->count == 0, "corrupt queue count!");
}
return t;
}
/*
Yield cpu.
Reschedule onto <tq> if not zero.
System terminates if no thread is ready.
*/
void contextSwitch(ThreadQ *tq) {
assert2(currentThread, "Can't context switch away from main!");
if (setjmp(currentThread->context)) /* save context, return 0 unless restore */
return;
if (tq) /* don't resched if tq=0 */
enqueueThread(tq, currentThread);
currentThread = dequeueThread(&readyQ); /* next thread to run */
if (currentThread) {
longjmp(currentThread->context, 1);/* restore other thread's context */
} else {/* no ready threads -- terminate */
assert2(freshQ.count == numThreads, "At least one unterminated thread orphaned! (would deadlock)");
longjmp(mainContext, 1);/* done, return to startThreads */
}
}
void terminateThread() {/* kill self */
assert2(currentThread, "TerminateThread called from outside thread context!");
currentThread->initFn = (void *)0;/* make ready for new work */
contextSwitch(&freshQ); /* put back on queue of fresh threads */
}
/*
Start executing ready threads.
Returns when all threads have terminated.
*/
void startThreads()
{
assert2(!currentThread, "Threads already started!");
if (!setjmp(mainContext)) {/* start threads */
currentThread = dequeueThread(&readyQ);
assert2(currentThread, "No threads to run???");
longjmp(currentThread->context, 1);
} else { /* all threads terminated */
free(threads);
threads = currentThread = (Thread *)0;
initThreadQ(&readyQ); initThreadQ(&freshQ);
}
}
/* mutually recursive helper functions for makeThreads() */
void _makeThreads2(int, char*, jmp_buf*); /* forward declaration */
void _makeThreads1(int n, jmp_buf *initContext)/* allocate 64k on stack */
{
char stackSpace[(1<16)];/* reserve 64k on stack */
_makeThreads2(n, stackSpace, initContext); /* ref to stackSpace (don't optimize away) */
}
void _makeThreads2(int n, char *ignored, jmp_buf *initContext) /* save context; 2nd arg is ignored */
{
Thread *t = &threads[n];
t->tid = n;/* set tid */
enqueueThread(&freshQ, t);
/* initial context */
if (setjmp(t->context)) {/* setjmp returns 0 unless target of longjmp */
for (;;) {
assert2(t->initFn, "thread not initialized");
(t->initFn)(t->initArg); /* will continue here first time run */
/* thread terminated! */
terminateThread();/* kill self */
}
}
if (n > 0)/* make other threads! */
_makeThreads1(n-1, initContext);
else
longjmp(*initContext, 1);/* "return" to makeThreads() via longjmp */
}
/*
Create pool of <n> threads (only call once)
New threads are blocked until they are initialized.
Initialize new threads using initAndScheduleThread()
*/
void makeThreads(int n) {
jmp_buf initContext;
assert2(!threads, "makeThreads called multiple times!");
numThreads = n;
threads = (Thread *)calloc(sizeof(Thread), n);
assert2(threads, "allocation of thread memory failed");
if (!setjmp(initContext))
_makeThreads1(n-1, &initContext);
}
/*
Prepare a thread for execution.
Returns tid
Only call once per thread
*/
int initAndScheduleThread(void (*initFn)(void *), void *initArg)
{
Thread *t = dequeueThread(&freshQ);
assert2(threads, "Threads not yet created (call makeThreads())");
assert2(t, "No available threads.");
int tid = t->tid;
assert2(tid >= 0, "tid must not be negative!");
assert2(tid < numThreads, "tid out of range!");
assert2(!t->initFn, "Thread initialized more than once!");
t->initFn = initFn; t->initArg = initArg;
enqueueThread(&readyQ, t);
return tid;
}
int getTid() {
assert2(currentThread, "getTid can not be called outside of a thread!");
return currentThread->tid;
}
void yield() /* voluntarily offer to preempt */
{
if (readyQ.count)/* if readyQ not empty */
contextSwitch(&readyQ);/* make self ready */
}
// threadTest.c
#include "tthreads.h"
#include "stdio.h"
typedef struct {
int workerNum;
} TWork;
void worker(TWork *w)
{
int workerNum = w->workerNum;
int i;
if (workerNum) {/* start worker <workerNum>-1 */
TWork *nextWork = malloc(sizeof(TWork));
nextWork->workerNum = workerNum -1;
initAndScheduleThread((void*)(void*)worker, nextWork);
yield();/* vollunteer for preemption */
}
for (i=3*workerNum; i >= 0; i--) { /* iterate 3*<workerNum> times */
fprintf(stderr, "workerNum=%d, at iteration %d (tid=%d)\n", workerNum, i, getTid());
yield();/* vollunteer for preemption */
}
fprintf(stderr, "worker %d (tid=%d) terminating\n", workerNum, getTid());
free(w);
return;
}
int main()
{
int numThreads = 3, i;
makeThreads(numThreads);
TWork *work = malloc(sizeof(TWork));
work->workerNum = numThreads-1;
initAndScheduleThread((void*)(void*)worker, work);
startThreads();/* start working! */
printf("Threads Terminated\n");
makeThreads(numThreads);
work = malloc(sizeof(TWork));
work->workerNum = numThreads-1;
initAndScheduleThread((void*)(void*)worker, work);
startThreads();/* start working! */
printf("Threads Terminated\n");
return 0;
}
// setJmpDemo.c
#include "setjmp.h"
jmp_buf jb;
main()
{
int n = 0;
int r = setjmp(jb);
printf("setjmp returned %d\n", r);
if (r == 0)
foo();
}
foo()
{
longjmp(jb, 10);
}
// mutex.h
#include "tthreads.h"
typedef struct {
int numAvailable;
ThreadQ waitingThreads;
} Mutex;
void mutexInit(Mutex *m, int available);
void mutexLock(Mutex *m);
void mutexUnlock(Mutex *m);
// mutex.c
#include "mutex.h"
void mutexInit(Mutex *m, int available) {
initThreadQ(&m->waitingThreads);
m->numAvailable = available;
}
void mutexLock(Mutex *m) {
yield();/* just to mix things up a bit! */
while (m->numAvailable == 0) {
contextSwitch(&m->waitingThreads); /* block */
}
m->numAvailable = 0;
yield();/* just to mix things up a bit! */
}
void
mutexUnlock(Mutex *m) {
yield();/* just to mix things up a bit! */
m->numAvailable = 1;
if (m->waitingThreads.count != 0) /* make another thread ready */
enqueueThread(&readyQ, dequeueThread(&m->waitingThreads));
yield();/* just to mix things up a bit! */
}
// mutexTest.c
#include "tthreads.h"
#include "stdio.h"
#include "mutex.h"
Mutex m;
void worker(void *ignored)
{
int i;
for (i = 0; i < 2; i++) {
fprintf(stderr, "==thread %d outside critical section!\n", getTid());
yield();
mutexLock(&m);
fprintf(stderr, ".thread %d in critical section!\n", getTid());
yield();/* mix things up */
fprintf(stderr, "..thread %d still in critical section!\n", getTid());
yield();/* mix things up */
fprintf(stderr, "...thread %d about to leave critical section!\n", getTid());
mutexUnlock(&m);
fprintf(stderr, "==thread %d outside critical section!\n", getTid());
yield();
}
}
int main()
{
mutexInit(&m, 1);
makeThreads(2);
initAndScheduleThread((void*)(void*)worker, 0);
initAndScheduleThread((void*)(void*)worker, 0);
startThreads();
return 0;
}