这是大学的多核程序设计课程内容,所以在这里就简单的总结一下。如果你之前有过多线程方面的编程经验,完全可以忽略本文的内容,它非常的初级。

首先说明一下,本人在Linux虚拟机上编写多线程程序,包含头文件

#include <pthread.h> 

一、线程的创建

在Linux下创建的线程的API接口是pthread_create(),它的完整定义是:

int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void*), void *arg); 

1 . 线程句柄 thread:当一个新的线程调用成功之后,就会通过这个参数将线程的句柄返回给调用者,以便对这个线程进行管理。
2 . 线程属性 attr: pthread_create()接口的第二个参数用于设置线程的属性。这个参数是可选的,当不需要修改线程的默认属性时,给它传递NULL就行。
3 . 入口函数 start_routine(): 当你的程序调用了这个接口之后,就会产生一个线程,而这个线程的入口函数就是start_routine()。如果线程创建成功,这个接口会返回0。
4 . 入口函数参数 *arg : start_routine()函数有一个参数,这个参数就是pthread_create的最后一个参数arg。这种设计可以在线程创建之前就帮它准备好一些专有数据

  • 线程对象:存储线程信息,对用户不透明
  • 线程对象的数据类型:pthread_t
  • 创建线程:pthread_create

二、等待函数

int pthread_join( pthread_t  thread, void** ret_val_ p);

pthread_join()这个接口的第一个参数就是新创建线程的句柄了,而第二个参数就会去接受线程的返回值。pthread_join()接口会阻塞主进程的执行,直到合并的线程执行结束。由于线程在结束之后会将0返回给系统,那么pthread_join()获得的线程返回值自然也就是0。

小例子(代码抄的,我解读)

#include <stdio.h>  
#include <pthread.h>  
void* thread( void *arg )  
{  
   printf( "This is a thread and arg = %d.\n", *(int*)arg); 
  //将arg强制类型转换为指针,指向变量的值 
    *(int*)arg = 0;  
    return arg;  
}  
int main( int argc, char *argv[] )  
{  
    pthread_t th;  
    int ret;  
    int arg = 10;  
    int *thread_ret = NULL;  
    ret = pthread_create( &th, NULL, thread, &arg ); 
    //成功返回0 
    if( ret != 0 )
    {  
        printf( "Create thread error!\n");  
        return -1;  
    }  
    printf( "This is the main process.\n" );  
    pthread_join( th, (void**)&thread_ret );  
   //二级指针强制类型转换,而后引用
    printf( "thread_ret = %d.\n", *thread_ret );  
    return 0;  
}  

三、线程间的通信和同步

虽然线程本地存储可以避免线程访问共享数据,但是线程之间的大部分数据始终还是共享的。在涉及到对共享数据进行读写操作时,就必须使用同步机制,Linux提供的线程同步机制主要有互斥锁和条件变量。

临界区

临界区:当多个线程访问共享数据时,为保正数据的完整性,将共享数据保护起来

访问临界区的原则:

  1. 一次最多只能一个线程停留在临界区内;
  2. 不能让一个线程无限地停留在临界区内。

忙等待

flag = 0;   主线程初始化
………..
y = Compute(my_rank);
while (flag != my_rank);
x = x + y;
flag++;

执行临界区代码的顺序:线程0,线程1,线程2…

互斥量

互斥量:限制每次只有一个线程能进入临界区。
互斥量数据类型:pthread_mutex_t

互斥量初始化
int pthread_mutex_init(pthread_mutex_t∗ mutex_p,
const pthread_mutexattr_t∗ attr_p);
释放互斥量
int pthread_mutex_destroy(pthread_mutex_t∗ mutex_p);
加锁(阻塞和非阻塞)
int pthread_mutex_lock(pthread_mutex_t∗ mutex_p );
int pthread_mutex_trylock(pthread_mutex_t *mutex_p)
解锁
int pthread_mutex_unlock(pthread_mutex_t∗ mutex_p );

当线程数多于核数时候,忙等待效率降低。
忙等待:强调访问临界区的顺序
互斥量:访问临界区的顺序随机的

多个矩阵相乘

voidThread_work(void∗ rank) 
{
      long my_rank = (long) rank;
      matrix_t my_mat = Allocate_matrix(n);
      Generate_matrix(my_mat);
      pthread_mutex_lock(&mutex);
      Multiply_matrix(product_mat, my_mat);
      pthread_mutex_unlock(&mutex);
      Free_matrix(&my_mat);
      return NULL;
}

使用互斥量和忙等待来实现路障的方法;

使用一个通过互斥量保护的计数器;

当计数器表明,所有线程都进入过临界区, 线程就可以离开了。

信号量

信号量可以用于临界区,保护共享资源。
信号量的特性如下:

  • 信号量有一个非负整数
  • 要访问共享资源的 线程必须获取一个信号量,则信号量减1。
  • 当信号量为0时,试图访问共享资源的线程将处于等待状态。
  • 离开共享资源的 线程释放信号量,则信号量加1。

信号量可以认为是一种特殊类型的 unsigned int 无符号整型变量,可以赋值为 0,1,2,3 等,一般只赋0(对应上锁的互斥量)/1(未上锁的互斥量)。要把一个二元互斥量用作互斥量时候=,需要把信号量的值初始化为
1,即开锁状态。在要保护的临界区前调用函数 sem_wait,线程执行到 sem_wait 函数时,如果信号量为 0,线程就会被阻塞,否则减1 后进去临界区。执行完临界区的操作后,再调用 sem_post 对信号量的值加 1,使得在 sem_wait中阻塞的其他线程能够继续运行。

void* Send_msg(void* rank)
{  
	long my_rank = (long) rank;  
	long dest = (my_rank + 1) % thread_count;  
	char∗  my_msg = malloc(MSG_MAX∗sizeof(char));    			
	sprintf(my_msg, "Hello to %ld from %ld", dest, my_rank);  	  
	messages[dest] = my_msg;  
	sem_post(&semaphores[dest])//信号量为 0,线程就会被阻塞,否则减1 后进去临界区。				       
	sem_wait(&semaphores[my_rank]);  
	printf("Thread %ld > %s n", my_rank, messages[my_rank]);     return NULL;
} 

不同信号量的语法

int sem_init(sem_t∗ semaphore_p, int shared, unsigned initial_val );
int sem_destroy(sem_t∗ semaphore_p);
int sem_post(sem_t∗ semaphore_p); 
int sem_wait(sem_t∗ semaphore_p);

注意:信号量不是 Pthreads 线程库的一部分,所以在使用信号量的程序开头加头文件

#include <semaphore.h>

路障

作用

使线程之间同步,并保证它们运行到了同一个位置。没有线程可以越过设置的路障,直到所有线程都抵达这里。

使用忙等待和互斥量实现路障

#include <stdio.h>
#include <pthread.h>
#pragma comment(lib, "pthreadVC2.lib")

const int thread = 8;
int count;
pthread_mutex_t pmt;

void* work(void* rank)
{
    const long long localRank = (long long)rank, dest = (localRank + 1) % thread;
    pthread_mutex_lock(&pmt);   // 进入读写区,上锁,计数器加一,解锁
    printf("Thread %2d reached the barrier.\n", localRank); fflush(stdout);
    count++;
    pthread_mutex_unlock(&pmt);
    while (count < thread);     // 使用忙等待来等所有的线程都达到栅栏
    printf("Thread %2d passed the barrier.\n", localRank); fflush(stdout);
    return ;
}

int main()
{
    pthread_t pth[thread];
    int i;
    long long list[thread];
    pthread_mutex_init(&pmt, NULL);
    for (i = count = 0; i < thread; i++)
    {
        list[i] = i;
        pthread_create(&pth[i], NULL, work, (void *)list[i]);
    }
    for (i = 0; i < thread; i++)
        pthread_join(pth[i], NULL);
    pthread_mutex_destroy(&pmt);
    printf("\nfinish.\n");
    getchar();
    return 0;
}

条件变量

一个条件变量允许停止一个线程,直到某个事件发生;当条件被满足时,另一个线程可以激活这个线程;

条件变量总是和互斥量绑在一起。

条件变量数据类型:pthread_cond_t

条件变量初始化

int pthread_cond_init(pthread_cond_t* cond_var_p, const pthread_condattr_t* attr)

释放条件变量

int pthread_cond_destroy (pthread_cond_t* cond_var_p)

解锁一个阻塞的线程

int pthread_cond_signal(pthread_cond_t∗ cond_var_p);

解锁所有被阻塞的线程

int pthread_cond_broadcast(pthread_cond_t∗ cond_var_p);

通过互斥量阻塞线程

int pthread_cond_wait(pthread_cond_t∗ cond_var_p,pthread_mutex_t∗ mutex_p);

使用条件变量来实现路障

#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#pragma comment(lib, "pthreadVC2.lib")

const int thread = 8;
int count;
pthread_mutex_t mutex;
pthread_cond_t cond;

void* work(void* rank)
{
    const long long localRank = (long long)rank, dest = (localRank + 1) % thread;
    printf("Thread %2d reached the barrier.\n", localRank); fflush(stdout);
    pthread_mutex_lock(&mutex);         // 上锁
    count++;
    if (count == thread)                // 最后一个进入的线程
    {
        count = 0;                      // 计数器清零
        pthread_cond_broadcast(&cond);  // 广播所有线程继续向下执行
    }
    else
        for (; pthread_cond_wait(&cond, &mutex) != 0;);// 等待其他线程
    pthread_mutex_unlock(&mutex);       // 条件变量阻塞解除后会自动将互斥量上锁,需要手工解除

    printf("Thread %2d passed the barrier.\n", localRank); fflush(stdout);
    return ;
}

int main()
{
    pthread_t pth[thread];
    int i;
    long long list[thread];
    pthread_mutex_init(&mutex, NULL);
    pthread_cond_init(&cond, NULL);
    for (i = count = 0; i < thread; i++)
    {
        list[i] = i;
        pthread_create(&pth[i], NULL, work, (void *)list[i]);
    }
    for (i = 0; i < thread; i++)
        pthread_join(pth[i], NULL);
    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond);
    printf("\nfinish.\n");
    getchar();
    return 0;
}

四、读写锁

读写锁有点像互斥量,但提供两个方法。

第 1 个用来对读上锁,而第 2 个用来对写上锁;

很多线程都可以获得读锁,但只有一个线程可以获得写锁。

如果有线程获得了读锁,那么其他线程无法获得写锁。

初始化
int pthread_rwlock_init(pthread_rwlock_t∗ rwlock_p,
const pthread_rwlockattr_t∗ attr_p );
读加锁
int pthread_rwlock_rdlock(pthread_rwlock_t∗ rwlock_p);
写加锁
int pthread_rwlock_wrlock(pthread_rwlock_t∗ rwlock_p);
解锁
int pthread_rwlock_unlock(pthread_rwlock_t∗ rwlock_p);
销毁
int pthread_rwlock_destroy(pthread_rwlock_t∗ rwlock_p );

详细例子

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐