Concurrent Porgramming
- Concurrent programming with Process
- Concurrent programming with I/O Multiplexing
- Concurrent programming with Threads
- Concurrency Issues
Mordern OS provide 3 basic approach for building concurrent programs
- Process
- each logical control flow is a process
- processes are scheduled and maintained by kernel
- use explicit interprocess communication(IPC) mechanism for communication
- I/O multiplexing
- application explicitly schedule their own logical flow in context of a process
- logical flows are modeled as state machines that the main program explicitly transitions from state to state as a result of data arriving on file descriptors
- since the program is a single process, all flows share the same address space
- Threads
- think of threads as a hybrid of the other two approcheas: scheduled by kernel, sharing the same virtual address space
Concurrent programming with Process
NOTE
- Once a child process is forked, for example, in an server-client model, remember to release unused resource in both parent process and child process, for example, connection fd in parent process and listening fd in child process since they don’t need it
Pros & Cons
- Pros:
- Have separate address space, which eliminates accidentally overwrite the VM of another process
- Processes are scheduled automatically by the kernel
- Cons:
- Have separate address space, which is more difficult to share state information. To share information, they must use explicit IPC mechanisms
- Tends to be slower because the overhead for process control and IPC is high
Concurrent programming with I/O Multiplexing
The basic idea is to use the select
function to ask the kernel to suspend the process, returing control to the application only after one or more I/O events have occured, as in the following examples:
- Return when any descriptor in the set {0, 4} is ready for reading.
- Return when any descriptor in the set {1, 2, 7} is ready for writing.
- Timeout if 152.13 seconds have elapsed waiting for an I/O event to occur.
select
prototype:
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
In simple case, select
takes two inputs:
- a descriptor set called the readfds
- cardinality n of the readfds
select
blocks until at least one descriptor in the readfds is ready for reading. A descriptor k is ready for reading if and only if a request to read 1 byte from that descriptor would not block.
As a side effect, select
modifies the fd_set pointed to by argment readfds to indicate a subset of the readfds called the ready set, consiting of the descriptors in the readfds that are ready for reading.
The value returned indicates the cardinality of the ready set
Pros & Cons
- Pros:
- Give programmer more control over the behavior for their programs. E.g we can imagine writing an event-driven concurrent server that gives preferred service to some clients
- Since I/O multiplexing runs in the context of a single process, thus every logical flow has access to the entire address space of the process. This makes it easy to share data between flows
- Is able to be debugged as any sequential program, using a familiar debugging tool as GDB
- More efficient
- Cons:
- Coding complexity
Concurrent programming with Threads
Introducing thread
General Introduction
- The threads are scheduled by kernel
- Each thread has its own thread context, including a unique integer thread ID(TID), stack, stack pointer, program counter, general-purpose registers and condition codes
- All threads share the entire VM space of one process, including its code, data, heap, shared libraries and open files
- main thread is the first thread to run in the process, peer threads are the following created threads. Together they form a pool of peers, independent of which threads were created by which other threads
- The code and local data for a thread is encapsulated in a thread routine, it takes a generic pointer as input and returns a generic pointer
Thread creation
New thread is created via calling pthread_create
:
#include <pthread.h>
typedef void *(func)(void *);
int pthread_create(pthread_t *tid, pthread_attr_t *attr,
func *f, void *arg);
When pthread_create
returns, argument tid contains the ID of the newly created thread.
The new thread can determine its own thread ID by calling pthread_self
:
#include <pthread.h>
pthread_t pthread_self(void);
Take note to arg argument in pthread_create
. When arg points to a local variable in creating thread, it might raise race condition against following thread creation.
For example, imagine there is a continuous creation of threads:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void main(){
int connfd;
...
while(1){
connfd = accpet(listenfd, (SA *)&clientaddr, &clientlen);
pthread_create(&tid, NULL, thread, &connfd);
}
}
void *thread(void *vargp){
int connfd = *((int*)vargp);
...
}
In this case, there is a race condition between line 13 and line 7. When line 13 is executed after line 7, then it will cause two thread using identical connection file descriptor.
In order to avoid potentially dadly race, we should assign each arg to its own dynamically allocated memory block, which is tended to be used exclusively by the created thread. Therefore, the changed code should look like this:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void main(){
int *connfdp; // define a pointer
...
while(1){
connfdp = (int*)malloc(sizeof(int)); // dynamically allocate thread-specific argument
*connfdp = accpet(listenfd, (SA *)&clientaddr, &clientlen);
pthread_create(&tid, NULL, thread, connfd);
}
}
void *thread(void *vargp){
int connfd = *((int*)vargp);
free(vargp) // thread is responsible to free the allocated memory
...
}
Thread termiation
A thread terminates in one of the following ways:
- The thread terminates implicitly when its top-level thread routine returns
- The thread terminates explicitly by calling the
pthread_exit
function. If the main thread callspthread_exit
, it waits for all other peer threads to terminate, and then terminates the main thread and the entire process with a return value ofthread_return
#include <pthread.h>
void pthread_exit(void *thread_return);
- main thread or peer thread calls
exit
function, which terminates the process and all threads assciated - Some other peer thread terminates the current thread by calling the
pthread_cancel
function with the TID of the current thread
#include <pthread.h>
int pthread_cancel(pthread_t tid);
Thread repeatedly termination
Threads wait for other threads to terminate by calling the pthread_join
function:
#include <pthread.h>
int pthread_join(pthread_t tid, void **thread_return);
The thread_join
function blocks until thread tid terminates, assigns the generic (void *) pointer returned by the thread routine to the location pointed to by thread_return , and then reaps any memory resources held by the terminated thread.
Note: unlike Unix wait
function, the pthread_join
function can only wait for a specific thread to terminiate. There is no way to instruct pthread_wait
to wait for an arbitrary thread to terminate.
Thread detachment
At any point in time, a thread is joinable or detached.
- A joinable thread can be reaped and killed by other threads. Its memory resrouces are not freed until it is reaped by another thread;
- A detached thread cannot be reaped or killed by other threads. Its memory resources are freed automatically by the system when it terminates.
By default, threads are created joinable. In order to avoid memory leaks, each joinable thread should:
- be explicitly reaped by another thread; or
- be detached by a call to
pthread_detach
function:
#include <pthread.h>
int pthread_detach(pthread_t tid);
Thread initialization
The pthread_once
function allows you to initialize the state associated with a thread routine.
#include <pthread.h>
pthread_once_t once_control = PTHREAD_ONCE_INIT;
int pthread_once(pthread_once_t *once_control,
void (*init_routine)(void));
The once_control is a global or static variable that is always initialized to PTHREAD_ONCE_INIT
.
The first time any thread in a process call pthread_once
with an argument of once_control, it invokes init_routine
, which is a function with no input arguments that returning nothing(it is always used to define some global variables).
Subsequent calls from any other threads inside this process to pthread_once
with the same once_control variable do nothing.
Share variables in thread programs
The variable is shared if and only if multiple threads reference some instance of the variable.
Each thread has its own separate thread context, which includes:
- thread ID (kernel space)
- stack (mem)
- stack pointer (reg)
- program counter (reg)
- condition codes (reg)
- general-purpose register (reg)
Each thread shares the rest of the process context with the other threads, which includes:
- entire user virtual address space, includes:
- read-only text (code)
- r/w data
- heap
- shared library code and data
- same set of open files
In operational sense, it is impossible for one thread to read or write the register values of another thread; On the other hand, any thread can access any location in the shared VM (including other threads’ stack). Thus, registers are never shared, whereas VM is always shared(stack is not clean).
mapping variables to memory
- Global variables: At run time, the r/w area of VM contains exactly one instance of each global variables that can be referenced by any thread.
- Local automatic variables: At run time, each thread’s stack contain its own instance of any local automatic variables.
- Local static variables: Same as Global variables.
synchronizing threads with semaphores
machine instrcution interleave
When peer threads run concurrently on a uniprocessor, the machine instructions are completed one after the other in some order. This will cause nasty synchronizing errors.
For example:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
volatile int cnt = 0; // "volatile" is necessary for global variables shared by threads
void main(){
int niters = 100;
...
pthread_create(&tid1, NULL, thread, &niters);
pthread_create(&tid2, NULL, thread, &niters);
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
...
}
void *thread(void *vargp){
int i, niters = *((int*)vargp);
for (i = 0; i < niters; i++)
cnt++;
return NULL;
}
In this case, the resulting cnt is not ensured to be 2 * niters
. This is because, the for loop body in line 17 consists of 3 machine instructions (load, update, store). In general, there is no way for you to predict wether the operating system will choose a correct ordering for your threds.
progress graph
A progress graph models the execution of n concurrent threads as a trajectory through an n-dimensional Cartesian space. Each axis k corresponds to the progress of thread k. Each point represents the state where the corresponding thread has completed last instruction. The origin of the graph corresponds to the initial state where none of the threads has yet completed an instruction.
A progress graph models instruction execution as a transition from one state to another. A transition is represented as a directed edge from one point to an adjacent point. Legal transitions move to the right or up. The execution history of a program is modeled as a trajectory through the state space.
Following is an example trajectory of code above:
In this case, for thread i, the instructions (Li, Ui, Si) that manipulate the contents of the shared variable cnt consitute a critical section that should not be interleaved with the critical section of the other thread. The phenomenon in general is known as mutual exclusion.
In order to guarantee correct execution of concurrent program that shares global data structures, we must synchronize the threads so that they always have a safe trajectory.
Semaphore Introduction
A semaphore, s, is a global variable with a nonnegative integer value that can only be manipulated by two special operations, called P and V:
- P(s):
- If s is nonzero, P decrements s and returns immediately;
- If s is zero, then suspend the thread until s becomes nonzero and the process is restarted by a V operation. After restarting, the P operation decrements s and returns.
- V(s): increment s by 1
NOTE:
- The test and decrement operations in P occur indivisibly;
- The increment operation in_V_ occurs indivisibly;
- V does not define the order in wihch waiting threads are restarted. Thus, when several threads are waiting at a semaphore, you cannot predict which one will be restarted as a result of the V.
The semaphore ensures a running program can never enter a state where a properly initialized semaphore has a negative value. This property, known as the semaphore invariant, provides a powerful tool for controlling the trajectories of concurrent programs.
The Posix standard defines a variety of functions for semaphores:
# include <semaphore.h>
int sem_init(sem_t *sem, 0, unsigned int value);
int sem_wait(sem_t *s); /* P(s) */
int sem_post(sem_t *s); /* V(s) */
Use semaphore for mutual exclusion
The basic idea is to associate a semaphore s, initially 1, with each shared variable (or related set of shared variables) and then surround the correspoding critical section with P(s) and V(s) operations.
A semaphore that is used in this way to protect shared variables is called a binary semaphore because its value is always 0 or 1. Binary semaphores whose purpose is to provide mutual exclusion, AKA mutexes.
- Performing a P operation on a mutex is called locking the mutex
- Performing a V operation on a mutex is called unlocking the mutex
- A semaphore that is used as a counter for a set of available resources is called a counting semaphore
Use this method, we could ensure the cnt example above to make the critical region as a forbidden area on progress graph, which ensures it is impossible for multiple threads to be executing instructions in the enclosed critical region at any point in time. In other words, the semaphore operations ensure mutually exclusive access to the shared resources.
The code should be changed to:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
volatile int cnt = 0; // "volatile" is necessary for global variables shared by threads
sem_t mutex_cnt; // 1. Semaphore that protects counter
void main(){
int niters = 100;
...
sem_init(&mutex_cnt, 0, 1); // 2. mutext_cnt = 1
...
pthread_create(&tid1, NULL, thread, &niters);
pthread_create(&tid2, NULL, thread, &niters);
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
...
}
void *thread(void *vargp){
int i, niters = *((int*)vargp);
for (i = 0; i < niters; i++){
sem_wait(&mutex_cnt); // 3. P
cnt++;
sem_post(&mutext_cnt); // 4. V
}
return NULL;
}
With changes above, we could get following progress graph with forbidden region that surrounds the unsafe region:
Use semaphore to schedule shared resources
A thread could use semaphore operation to notify another thread that some condition in the program state has become true.
using threads for parallelism
The set of all programs can be partitioned into the disjoint set of sequential and concurrent programs.
- A sequential program is of a single logical flow
- A concurrent program is of multiple concurrent flows. A parallel program is a concurrent program running on multiple processors
Concurrency Issues
Thread safety
A function is said to be thead-safe if and only if it will always produce correct results when called repeatedly from one or multiple concurrent threads
Normally, there are four classes of thread-unsafe functions:
-
Class 1: Functions that do not protect shared variables
Fix
-
Option1: Rewrite the callee function with a mutex to protect the shared variables with synchronization operations such as P and V
-
Option2: Rewrite the caller function to associate a mutex with the callee function to protect the shared variables
-
-
Class 2: Functions that keep state in some static variables(e.g. variables in .data section) across multiple invocations
Fix
- Rewrite the callee and caller function so that callee function don’t use any shared data, relying instead on the caller to pass state information in arguments. (in this case, the callee is a reentrant function
-
Class 3: Functions that return a pointer to a static variable
Fix
-
Option1: Rewrite the callee and caller function so that caller function pass the address of the variable to callee in which to store the results.
-
Option2: Write a thread-safe wrapper function using lock-and-copy technique and replace all calls in caller function to the thread-unsafe function with calls to the wrapper. The basic idea of lock-and-copy technique is to associate a mutex with the thread-unsafe function. At each call, lock the mutex, call the thread-unsafe function, copy the result returned by function to a thread-private memory location, and then unlock the mutex.
-
-
Class 4: Functions that call thread-unsafe functions
If a function f calls a thread-unsafe function g, it depends:
-
g is Class 1 or Class 3
f could use mutex for invoking g to make f itself to be thread-safe
-
g is Class 2
Only when g is rewritten to be thread-safe, otherwise, f is thread-unsafe
-
Reentrancy
Reentrant function is characterized by the property that it doesn’t reference any shared data when they are called by multiple threads. It is one class of thread-safe function.
There are two favors of reentrant function:
- explicitly reentrant function: If all function arguments are passed by value(i.e. no pointers) and all data references are to local automatic stack variables(i.e. no references to static or global variables), then the function is explicitly reentrant function. We can assert its reentrancy regardless of how it is called
- implicitly reentrant function: If some arguments are pointers, than we have an implicitly reentrant function. We can’t tell if its reentrancy in this case. It is reentrant only if the pointers point to nonshared data.
Use existing library functions in thread programs
Most Unix functions, including functions defined in standard C library, are thread-safe, with only a few exceptions.
Here lists the common exceptions:
Thread-unsafe function | Thread-unsafe class | Unix thread-safe version |
---|---|---|
rand | 2 | rand_r |
strtok | 2 | strtok_r |
asctime | 3 | asctime_r |
ctime | 3 | ctime_r |
gethostbyaddr | 3 | gethostbyaddr_r |
gethostbyname | 3 | gethostbyname_r |
inet_ntoa | 3 | (none) |
localtime | 3 | localtime_r |
Races
A race occurs when the correctness of a program depends on one thread reaching point x in its control flow before another thread reaches point y.
Take following example to illustrate:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#define N 4
void *thread(void *vargp);
int main(){
pthread_t tid[N];
int i;
for (i = 0; i < N; i++)
pthread_create(&tid[i], NULL, thread, &i);
for (i = 0; i < N; i++)
pthread_join(tid[i], NULL);
exit(0);
}
void *thread(void *vargp){
int myid = *((int*)vargp);
printf("Hello from thread %d\n", myid);
return NULL;
}
The output is:
Hello from thread 3
Hello from thread 0
Hello from thread 0
Hello from thread 0
This incorrect result is because of race between line 13 and line 20.
To eliminate the race, we can use one of following 2 approaches:
- Dynamically allocate a separate block for each integer ID, and pass the thread routine a pointer to this block. Notice that the thread is responsible to free the block in order to avoid a memory leak;
- Schedule access to integer ID via semaphore.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/* Dynamically allocate thread-specific block for each ID */
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#define N 4
void *thread(void *vargp);
int main(){
pthread_t tid[N];
int i, *id;
for (i = 0; i < N; i++){
id = malloc(sizeof(int)); // allocate thread-specific block
*id = i;
pthread_create(&tid[i], NULL, thread, id);
}
for (i = 0; i < N; i++)
pthread_join(tid[i], NULL);
exit(0);
}
void *thread(void *vargp){
int myid = *((int*)vargp);
free(vargp); // free thread-specific block
printf("Hello from thread %d\n", myid);
return NULL;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/* Use semaphore to schedule resource access */
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#define N 4
void *thread(void *vargp);
sem_t sem; // define a semaphore to make "i" mutual exclusive
int main(){
pthread_t tid[N];
int i;
sem_init(&sem, 0, 0); // init sem with value 0
for (i = 0; i < N; i++){
pthread_create(&tid[i], NULL, thread, &i);
sem_wait(&sem); // P, wait until last thread finish store i
}
for (i = 0; i < N; i++)
pthread_join(tid[i], NULL);
exit(0);
}
void *thread(void *vargp){
int myid = *((int*)vargp);
sem_post(&sem); // V, after finish store i, increase sem
printf("Hello from thread %d\n", myid);
return NULL;
}
Deadlocks
Semaphores introduce the potential for a nasty kind of run-time error, called deadlock. It means a collection of threads are blocked, waiting for a condition that will never be true.
The progress graph is like below:
From this graph, we can glean some important insights about deadlock:
- The programmer has incorrectly ordered the P and V operations such that the forbidden regions for the two semaphores create a closed, left-bottom corner deadlock region. If a trajectory happends to touch a state in the deadlock region, then deadlock is inevitable. Trajectories can enter deadlock regions, but they can never leave!
- Deadlock is not predictable.
Programs deadlock for many reasons and avoiding them is a difficult problem in general. However, when binary semaphores) are used for mutual exclusion, then you can apply the following simple and effective rule to avoid deadlocks:
Mutex lock ordering rule: A program is deadlock-free if, for each pair of mutexes ( s, t, k, …) in the program, each thread that holds s, t and k simultaneously, locks these semaphores in the same order.
For example, we can fix the deadlock above by locking s first, then t in each thread:
Comments