来说线程最后一个内容,今天将补充线程互斥的缺陷,同时我们将学习最常见的一个设计模式,最后我们写出一个线程池,这就是今天的目标.
上一个博客我们谈到了线程互斥,我们知道线程互斥是对的?但是它合理吗?就像我们生活中在食堂吃饭,对于一个窗口在同一时间我们只有允许有一个人进行打饭,这就是互斥.也就是我们所有人来争抢这个窗口,谁的拳头大谁来吃饭,这很不合理,但是我们确实有遵循者互斥的规则.
再比如有一个图书馆,它只允许一个人进去读书,每一天会有一把钥匙放在门口,谁先拿到钥匙,谁就进去读书.例如张三很热爱学习,每一天都早早的起来,他拿到钥匙就进去读书了,随着时间的流逝,慢慢的人都来了,他们都在外面排队,因为张三在里面读书,此时张三读书累了,想出去玩玩,他把闷一锁,钥匙一踹就走了,但是外面的人还在等着,因为他们没有钥匙,张三回来后开门继续学习,此时张三心里面想偷偷懒,出了门,把钥匙往门上一放,正准备离开的时候心里想到我不该如此堕落,又把钥匙抢来了,毕竟他里的钥匙最近,谁都抢不过他,此时张三重复这样的动作,每一次张三一出门外面排队的人都是一阵骚动,都在铆劲准备强钥匙,可是每一次都是张三里的最近,抢不过张三,那么请问这个张三错了吗?没有,毕竟它符合规则,拿到钥匙才能进去,但是它是不合理的.
线程互斥也是如此,它是对的,可以保证我们访问临界资源的安全,但是他有一点缺陷,故我们需要线程同步做补充,就那张三为例子,张三一旦把钥匙放下了,想要再次获取钥匙,可以去后面老老实实的排队,我们让众多的线程有几乎相同的概率访问临界资源,或者我们这样说线程访问某一个临界资源具有一定的顺序性,这就是线程同步.
那么我们是如何做到线程同步的呢?Linux为我们提供了很多的接口,我们先认识一下,见识见识是如何同步的.你会发现这些接口和我们的加锁接口是很相似的.
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
我们先来和大家写一个代码,这样大家就可以明白了,我们仔细观察上面的接口,我们发现线程同步是要和线程互斥搭配使用的,至于原因等到后面再谈.我们这里定义两个全局变量.
// 定义一个条件变量,这个我们使用接口来进行初始化
pthread_cond_t cond;
// 定义一个互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
那么我们想要实现什么样的代码呢?这里我们想要主线程通过手动输入来控制多个线程访问临界资源,那么这里就要开始了.
int main()
{// 第二个参数直接置为NULLpthread_cond_init(&cond, nullptr);pthread_t t1, t2, t3;pthread_create(&t1, nullptr, waitCommand, nullptr);pthread_create(&t2, nullptr, waitCommand, nullptr);pthread_create(&t3, nullptr, waitCommand, nullptr);// 控制语句pthread_join(t1, nullptr);pthread_join(t2, nullptr);pthread_join(t3, nullptr);pthread_cond_destroy(&cond);return 0;
}
先把线程控制的函数写出来,这里都是谈过的.
void *waitCommand(void *args)
{while (true){cout << "thread id " << pthread_self() << " running" << endl;}
}
此时我们先来验证一下,自己写的代码是不是正确的.
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PeW0etlJ-1679066099980)(https://qkj0302.oss-cn-beijing.aliyuncs.com/20221228_153504.gif)]
此时我们就发现了上面的代码是正确的,但是现象和我们想的一样,总是同一个线程拿到锁,这就是互斥的不合理之处.我们如果想要按照一定顺序访问临界资源,这个时候就要使用线程同步了.我们先看一个接口.
[bit@Qkj 12_25]$ man pthread_cond_wait
这个接口很有意思,如果我们线程想要访问临界资源,可以,你直接开始.但是一旦我门遇到这个接口,我们需要在这里等待,注意我们是在wait的阻塞队列中进行等待(先这样理解),此时直到有信号说这个线程你可以运行了,我们在放出线程.我们看一下发送信号的函数.
[bit@Qkj 12_25]$ man pthread_cond_signal
其中signal函数是唤醒阻塞队列中排在最前面的线程,broadcast是唤醒阻塞队列中所有的线程,我们都和大家演示一下现象.
pthread_cond_t cond;
// 定义一个互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;void *waitCommand(void *args)
{while (true){// 让所有的线程等待被唤醒 注意锁的使用还没开始// 所有的线程都会在这里等着,给我排队.等着一个一个被叫醒pthread_cond_wait(&cond, &mutex);cout << "thread id " << pthread_self() << " running" << endl;}
}int main()
{pthread_cond_init(&cond, nullptr);pthread_t t1, t2, t3;pthread_create(&t1, nullptr, waitCommand, nullptr);pthread_create(&t2, nullptr, waitCommand, nullptr);pthread_create(&t3, nullptr, waitCommand, nullptr);while (true){// 控制char n = 0;cout << "请输入你的commmand: ";cin >> n; // cin cout 交叉使用缓冲区会被强制刷新if (n == 'n'){// cout << "aaaaaaaaaaaa" << endl;// 唤醒一个线程pthread_cond_signal(&cond);}else{break;}sleep(1);}pthread_join(t1, nullptr);pthread_join(t2, nullptr);pthread_join(t3, nullptr);pthread_cond_destroy(&cond);return 0;
}
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Lp0JWHjy-1679066099982)(https://qkj0302.oss-cn-beijing.aliyuncs.com/20221228_153504_1.gif)]
我们继续测试一下全部唤醒的接口
pthread_cond_t cond;
// 定义一个互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;void *waitCommand(void *args)
{while (true){// 让所有的线程等待被唤醒 注意锁的使用还没开始// 所有的线程都会在这里等着,给我排队.等着一个一个被叫醒pthread_cond_wait(&cond, &mutex);cout << "thread id " << pthread_self() << " running" << endl;}
}int main()
{pthread_cond_init(&cond, nullptr);pthread_t t1, t2, t3;pthread_create(&t1, nullptr, waitCommand, nullptr);pthread_create(&t2, nullptr, waitCommand, nullptr);pthread_create(&t3, nullptr, waitCommand, nullptr);while (true){// 控制char n = 0;cout << "请输入你的commmand: ";cin >> n; // cin cout 交叉使用缓冲区会被强制刷新if (n == 'n'){pthread_cond_broadcast(&cond);}else{break;}sleep(1);}pthread_join(t1, nullptr);pthread_join(t2, nullptr);pthread_join(t3, nullptr);pthread_cond_destroy(&cond);return 0;
}
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-di2i1Ycc-1679066099983)(https://qkj0302.oss-cn-beijing.aliyuncs.com/20221228_153504_2.gif)]
现在我们已经初步认识到线程同步接口的基本用法了,那么如果我们想一起退出这些线程该如何做?此时我们是不是应该在退出的时候把他们都唤醒,让后直接join就可以了,我们需要一个全局变量.
pthread_cond_t cond;
// 定义一个互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
volatile bool quit = false; // 为了线程退出 volatile前面已经谈过了
void *waitCommand(void *args)
{while (!quit){pthread_cond_wait(&cond, &mutex);cout << "thread id " << pthread_self() << " running" << endl;}cout << "thread id " << pthread_self() << " ending" << endl;
}int main()
{pthread_cond_init(&cond, nullptr);pthread_t t1, t2, t3;pthread_create(&t1, nullptr, waitCommand, nullptr);pthread_create(&t2, nullptr, waitCommand, nullptr);pthread_create(&t3, nullptr, waitCommand, nullptr);while (true){// 控制char n = 0;cout << "请输入你的commmand: ";cin >> n; // cin cout 交叉使用缓冲区会被强制刷新if (n == 'n'){// 启动所有的阻塞线程pthread_cond_broadcast(&cond);}else{quit = true;pthread_cond_broadcast(&cond);break;}sleep(1);}pthread_join(t1, nullptr);pthread_join(t2, nullptr);pthread_join(t3, nullptr);pthread_cond_destroy(&cond);return 0;
}
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-m6SMRtNt-1679066099984)(https://qkj0302.oss-cn-beijing.aliyuncs.com/20221228_153504_3.gif)]
我们发现只有一个线程退出了,其余的线程还在那里阻塞住了,我们先说解决方法,原因后面谈,我们在线程退出前把锁给释放了,这样就可以了
pthread_cond_t cond;
// 定义一个互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
volatile bool quit = false;
void *waitCommand(void *args)
{while (!quit){pthread_cond_wait(&cond, &mutex);cout << "thread id " << pthread_self() << " running" << endl;}cout << "thread id " << pthread_self() << " ending" << endl;// 这里解锁 -- 后面我们会知道原因的pthread_mutex_unlock(&mutex);
}int main()
{pthread_cond_init(&cond, nullptr);pthread_t t1, t2, t3;pthread_create(&t1, nullptr, waitCommand, nullptr);pthread_create(&t2, nullptr, waitCommand, nullptr);pthread_create(&t3, nullptr, waitCommand, nullptr);while (true){// 控制char n = 0;cout << "请输入你的commmand: ";cin >> n; // cin cout 交叉使用缓冲区会被强制刷新if (n == 'n'){// cout << "aaaaaaaaaaaa" << endl;// 唤醒一个线程//pthread_cond_signal(&cond);pthread_cond_broadcast(&cond);}else{quit = true;//sleep(5);pthread_cond_broadcast(&cond);break;}sleep(1);}pthread_join(t1, nullptr);pthread_join(t2, nullptr);pthread_join(t3, nullptr);pthread_cond_destroy(&cond);return 0;
}
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kj1kweuf-1679066099984)(https://qkj0302.oss-cn-beijing.aliyuncs.com/20221228_153504_4.gif)]
此时关于线程互同步我们已经谈完了,就上面剩下一个小问题,这个暂时想不谈.我们想在可以使用线程互斥来完成一个处理任务的小程序了,为了避免上面线程退出造成的问题,我们直接线程分离.
pthread_cond_t cond;
// 定义一个互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;// 定义一个全局退出变量
volatile bool quit = false;vector> funcs;
void show()
{cout << "hello print()" << endl;
}
void print()
{cout << "hello show()" << endl;
}
void *waitCommand(void *args)
{pthread_detach(pthread_self()); // 线程分离,之所以在这里,主要我们也不想等待了,反正主线程还需要一段时间结束,够我们线程分离的while (!quit){pthread_cond_wait(&cond, &mutex);for (auto f : funcs){f();}}cout << "thread id " << pthread_self() << " end" << endl;
}int main()
{// 加载任务funcs.push_back(show);funcs.push_back(print);funcs.push_back([](){ cout << "你好世界" << endl; });pthread_cond_init(&cond, nullptr);pthread_t t1, t2, t3;pthread_create(&t1, nullptr, waitCommand, nullptr);pthread_create(&t2, nullptr, waitCommand, nullptr);pthread_create(&t3, nullptr, waitCommand, nullptr);while (true){sleep(1);pthread_cond_signal(&cond); //一个一个的老}pthread_cond_broadcast(&cond);pthread_cond_destroy(&cond);return 0;
}
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xi9idJsz-1679066099985)(https://qkj0302.oss-cn-beijing.aliyuncs.com/20221228_182606.gif)]
我们来谈一个从来没有说过的概念,这是我们计算机的一种编程模型.那么他究竟是什么意思呢?我想用生活中例子和大家说一下.在生活中,我们都知道有超市这个东西,假设我们都是消费者,也就是我们是超时买东西的,那么我们关心这个产品是从哪个工厂生产出来的吗(这里不提品牌)?是的,我们不关心,只需要知道我们可以拿到自己的想要的就可以的了.同理,作为供应商他也不关心自己的产品买个谁了,只需要知道自己的产品卖出去了就行了.我们想这个超市作为一种什么角色存在?此时他就是我们之前说的缓冲区,
在早些的时候,我们想一想是不是自己去工厂拿产品,例如我门要一包方便面,你说工厂会为这一包方便面开动机器吗?开什么玩笑?作为工厂,他想的是一次性产出多些产品,然后放在超里面就可以了,这不就是我们的缓冲器区的作用!!同理,我们说消费者不关心,需要就去超市那就可以了,这就是提高了效率.在我们休息的使用工厂会不会生产呢?会的,在周末的时候,工厂停工了,我们会不会去买东西呢?会的,这就是供应商可以随时随地供应,消费者随时随时消费,不再像我买一包你生产一包这种强制性的关系,超市让他们两个直接关系变得解耦合了.
这个时候,在计算机中超市就是内存中的一段空间,他有自己的组织方式,同理消费者和生产者都是会访问这个超市的,消费者买东西,生产者放东西,这不就是访问临界资源吗?超市就是一个临界资源,那么既让是临界资源,我们就要保护起来.生产者和消费者都可能有多个,我们要保护临界资源,就要分析出这些角色关系.对于众多的消费者,我们是竞争关系,例如我们现在竞争药品,那么他们就是互斥关系.对于供应商而言,他们一定是竞争关系,争夺的超市的展架,注意这些都是一些普遍的关系,细节我们不做雕琢.那么生产者和消费者是什么关系?假设生产者正在放东西,我们消费者要拿,那么此时所谓的正在放东西究竟是放了还是没有放,此时也就是在这个展架上我们只能存在一个角色,这个就是互斥关系,例如我们想买一个手机,每一天都去超市问问有没有手机,超市的工作人员都烦了,由于我们是互斥关系,超市的工作人员在同一时间只能服务一个角色,我们每一次问,浪费我的时间,此时工作人员说我加一下你的联系方式,到货了你来拿就可以了.同理,工作人员也会拿到供应商的联系方式,有人要东西了并且货卖完了工作人员就联系他.此时我们知道了,生产满了就消费,消费完了就生产,这是有一定的顺序的,也就是同步.
那么我们如何让多个消费者线程进行等待呢?又如何唤醒消费者线程呢?如何让生产者者经行等待呢?又如何判断唤醒生产者呢?我们又是如何衡量消费者和生产者的条件就绪了呢?此时我们心中有一定的想法,我们应该使用条件变量,而且条件变量就是配合着锁使用的?就是这么巧.我们要给大家写两份代码给大家演示这些原理.
我们要设计一个阻塞队列,这个不难.那么我们要设计的是什么的阻塞队列呢?这个队列有下面的一些特性.
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞),这个东西何和我们的管道很像.
此时我们上面阻塞队列的为满为空就是我们条件不满足,为了方面我们设计,我们按照单生产者单消费者.这里借助一下STL里面的queue容器.先把框架拉出来,我们想作为一个阻塞队列我们肯定需要一个队列,同时由于多个线程访问这个队列,我们需要加锁.还有一个来记录队列的容量.按理说这些我们就够了,但是我们还是知道的,对于生产者而言,他们拉了货已经到超市了,我们不能由于超市满了就让他走,此时我们需要让他等会,等到超市空出拉一部分位置就让他把或放进去,这个时候我们需要线程等待,需要条件变量.同理对于消费者而言,在没有货的时候也等会就可以了,总之我们还需要两个条件变量,由于条件变量是需要配合锁使用的,我们已经有把锁了,我们只能允许 一个角色来进行访问临界资源.注意,这个代码我感觉还是没有体现出来同步,等会下个代码给大家演示出这样的效果.
const uint32_t gDefaultCap = 5; // 队列的容量
// 设计一个阻塞队列
template
class BlockqQueue
{public:BlockqQueue(uint32_t cap = gDefaultCap): _cap(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_conCond, nullptr);pthread_cond_init(&_proCond, nullptr);}~BlockqQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_conCond);pthread_cond_destroy(&_proCond);}public:// 生产接口void push(const T &in){}// 消费接口// void pop(T *out)T pop(){}private:uint32_t _cap; // 阻塞队列的容量queue _bq; // 阻塞队列pthread_mutex_t _mutex; // 一把锁pthread_cond_t _conCond; // 消费者等待的条件变量pthread_cond_t _proCond; // 生产者等待的条件变量
};
此时我们对外提供两个接口,一个是生产者的push接口,一个是消费者的pop接口.我们现在想一想这两个接口是如何实现的.
对于push接口,也就是生产者,我们知道生产者是带着数据来的的,首先我们需要访问临界资源,那么就是需要加锁,出去这个函数我门需要解锁,这是大框架,那么加锁后呢?我们应该判断是不是应该这个队列是不是满了?满了我们应该让线程在自己的条件变量下等待,不满直接放数据就可以了.这就是我们的流程.
void push(const T &in)
{// 加锁// 判断 是不是适合生产 是否满了// 满了(不生产,顺便解锁,)-- 产生饥饿问题 这里要阻塞时锁都被扔掉了) 唤醒消费者// 不满(生产) -- 程序员视角的条件// 生产// 解锁lockQueue();if (isFull()) // 小bug{// 阻塞等待,等待被唤醒proBlockWait();}// 条件满足可以生产pushCore(in);unlockQueue();// 等会在谈
}void proBlockWait()
{// 让生产者去等待 _proCond 生产者的条件变量pthread_cond_wait(&_proCond, &_mutex);
}
bool isFull()
{return _bq.size() == _cap;
}void lockQueue()
{pthread_mutex_lock(&_mutex);
}
void unlockQueue()
{pthread_mutex_unlock(&_mutex);
}
void pushCore(const T &in)
{_bq.push(in);
}
这里还有一个问题,假设生产者线程拿到了锁,此时他进入push函数内部,假设此时我们队列已经满了,那么线程是不是应该等待,是的,我想问的是如果我们线程拿着锁等待了,这个时候其他线程会争到锁吗?不会的,我们线程在等待的时候需要把锁给放开,那么需要我们手动放吗?不是的,wait函数在线程进行等待的时候会自动把锁给放开,那么线程放开哪个锁呢?就是我们wait函数传入的锁,这个就是我们为何同步要配合锁使用.我们把pop函数也和大家写一下.
const T pop()
{// 加锁// 消费 -> 阻塞队列是否为空// 空 -- 等待(休眠)// 不空 -- 消费 接着唤醒生产者// 解锁lockQueue();if (isEmpty()){conBlockWait(); // 消费者阻塞等待}T ret = popCore();unlockQueue();return ret;
}void wakePro()
{pthread_cond_signal(&_proCond);
}T popCore()
{T ret = _bq.front();_bq.pop();return ret;
}void conBlockWait()
{pthread_cond_wait(&_conCond, &_mutex);
}bool isEmpty()
{return _bq.empty();
}
这里还是有一点问题的,我们线程在push完数据之后,或者这样说我们是如何唤醒一个pop一个空的队列线程,不就是入数据之后告诉一下线程有数据了,也就是给pop线程发送一下条件满足了,这个时候我们就知道了push数据后要通知有数据了,pop之后说数据被拿走了,空间有了.
void push(const T &in)
{//...// 唤醒消费者wakeCon();
} const T pop()
{//...// 唤醒生产者wakePro();return ret;
}
此时我们已经把所有的代码写完了,我们先来运行一下,我们可以根据消费者和生产者产生和处理任务的速度不同来控制我们的现象.这里面还有几个问题需要和大家分析一下.
void *consumer(void *args)
{BlockqQueue *p = (BlockqQueue *)args;while (true){int ret = p->pop();cout << "我消费了一个数据 " << ret << endl;//sleep(1);}
}void *producer(void *args)
{BlockqQueue *p = (BlockqQueue *)args;while (true){int ret = rand() % 10;p->push(ret);cout << "我生产了一个数据 " << ret << endl;sleep(1);}
}
int main()
{srand((unsigned long)time(nullptr));BlockqQueue bq;pthread_t c, q;pthread_create(&c, nullptr, consumer, &bq);pthread_create(&q, nullptr, producer, &bq);pthread_join(c, nullptr);pthread_join(q, nullptr);return 0;
}
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UdXV6XgC-1679066099985)(https://qkj0302.oss-cn-beijing.aliyuncs.com/20221230_101734.gif)]
void *consumer(void *args)
{BlockqQueue *p = (BlockqQueue *)args;while (true){int ret = p->pop();cout << "我消费了一个数据 " << ret << endl;sleep(1);}
}void *producer(void *args)
{BlockqQueue *p = (BlockqQueue *)args;while (true){int ret = rand() % 10;p->push(ret);cout << "我生产了一个数据 " << ret << endl;//sleep(1);}
}
int main()
{srand((unsigned long)time(nullptr));BlockqQueue bq;pthread_t c, q;pthread_create(&c, nullptr, consumer, &bq);pthread_create(&q, nullptr, producer, &bq);pthread_join(c, nullptr);pthread_join(q, nullptr);return 0;
}
为何使用条件变量的第二层函数,消费者饥饿问题,即使数据满了,生产者也能够快速拿到锁.–有点牵强附会.
我们知道到了wait函数在进行线程阻塞的时候是会放开锁的,那么线程唤醒呢?这个时候要知道线程是在那里被唤醒的,它是在临界区,也就是我们还是需要对线程加锁,那么我们为何上面没有做?这是由于wait函数在线程阻塞的时候放开锁,线程被唤醒就拿到锁,这是wait函数已经帮助我们实现好的,这个就可以解释我们在认识条件变量的时候,为何在退出的时候我们唤醒所有线程,只有一个线程推出了,这是由于我们只有一把锁,我们把三个线程唤醒后,这三个线程开始抢这一把锁,此时只有一个线程抢到,另外两个线程被放在锁的阻塞队列中,由于拿到锁的线程退出了,那么这个时候由于我们退出时没有释放锁,锁也跟着退出了,另外两个线程变成了死锁,这也是我为何在线程退出时释放锁可以解决问题的原因.
还有一个问题要和大家分析,线程被唤醒就等于线程条件满足吗?不一定,上面我们写的代码是可以这么认为的,但是总会有下面的情况,由于OS的一些原因,导致我们线程被伪唤醒,此时就会出错,我们需要处理这个情况,改成while判断就可以了,这就是我们上面说的小bug.
void push(const T &in)
{lockQueue();while (isFull()) // 小bug{// 阻塞等待,等待被唤醒proBlockWait();}// 条件满足可以生产pushCore(in);unlockQueue();// 等会在谈
}
const T pop()
{lockQueue();while (isEmpty()){conBlockWait(); // 消费者阻塞等待}T ret = popCore();unlockQueue();return ret;
}
我们来总结一下生产者消费者模型的额优点.
这里重点说一下并发,我们上面好像没有看到并发啊,都是一个一个线程在执行,此时我们需要好好理解一下这个并发.我们先来设计一个任务,让生产者制作任务,消费者处理认任务.
#include
class Task
{
public:Task(/* args */): _oneElem(0), _twoElem(0), _operator('0'){}Task(int one, int two, char op): _oneElem(one), _twoElem(two), _operator(op){}int run(){int result = 0;switch (_operator){case '+':result = _oneElem + _twoElem;break;case '-':result = _oneElem - _twoElem;break;case '*':result = _oneElem * _twoElem;break;case '/':{if (_twoElem == 0){std::cout << "div zero, abort" << std::endl;result = -1;}else{result = _oneElem / _twoElem;}}break;case '%':{if (_twoElem == 0){std::cout << "mod zero, abort" << std::endl;result = -1;}else{result = _oneElem % _twoElem;}}break;default:break;}return result;}int operator()(){return run();}private:int _oneElem;int _twoElem;char _operator;
};
我们让生产者和消费者共同处理这个任务.
#include "BlockqQueue.hpp"
#include
#include
#include #include "Task.hpp"
std::string str = "+-*/%";
using namespace std;
void *consumer(void *args)
{BlockqQueue *p = (BlockqQueue *)args;while (true){//sleep(1);Task t = p->pop();int one = 0;int two = 0;char op = 0;t.get(&one, &two, &op);int result = t();cout << "我 [ " << pthread_self() << " ] 消费了一个任务 "<< one << " " << op << " " << two << "=" << result << endl;}
}void *producer(void *args)
{BlockqQueue *p = (BlockqQueue *)args;while (true){int one = rand() % 10;int two = rand() % 10;char op = str[rand() % str.size()];Task t(one, two, op);p->push(t);cout << "我 [ " << pthread_self() << " ] 生产了一个任务 "<< one << " " << op << " " << two << "=?" << endl;sleep(1);}
}int main()
{srand((unsigned long)time(nullptr));BlockqQueue bq;pthread_t c, q;pthread_create(&c, nullptr, consumer, &bq);pthread_create(&q, nullptr, producer, &bq);pthread_join(c, nullptr);pthread_join(q, nullptr);return 0;
}
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ceDEQdhu-1679066099986)(https://qkj0302.oss-cn-beijing.aliyuncs.com/20221230_103226.gif)]
我们上面已经做了,可是你还是没有和我说并发是怎么回事,此时我们不应该把视角单单放在线程上,我们想制作任务需要花费时间吗?处理任务呢?这些都是需要花费时间的,我们在制作任务的时候,或许有的线程正处理任务,这就是我们所谓的并发,(这里是蛋哥谈的,我感觉有一点问题,既然在制作任务了,不是拿到锁了吗?这里暂时这样理解,等我看会书在来仔细的想一想,蛋哥说了他们是互斥,是).这里我看到了不是在临界区的并发,放在缓冲区不时间,生产前(发在阻塞队列前)和消费后(拿出来后),从网络,磁盘,用户慢,有一些余量,还在消费.这是并发.
我们在之前说过信号量是是什么,他就是一个计数器,我们也知道他的是原子性的,那里我们只谈到这些就没有往后面谈,主要是没有应用场景,这里我们需要在往下分析一点.我们知道在一个电影院我们可以把票看作一个信号量,我们拿到票,票数就–,看完电影票数就++,不就是我们谈的PV操作吗.这个时候我们联系一下现实生活,在现实中,我们是和很多人一起看到电影,并且我们每一个人都有一张电影票,我们是不是可以这么说拿到信号量的线程是可以在同一时间访问这个临界资源的.在计算机我们是可以做到的.而且计算机都帮助我们封装好了接口,我们认识一下.
初始化和销毁信号量
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
// 销毁信号量
int sem_destroy(sem_t *sem);
等待信号量,也就是P操作
int sem_wait(sem_t *sem); //P()
发布信号量,也就是V操作
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
下面我们将根据信号量给大家设计一个环形队列.
在数据结构那里我们是说过环形队列的,那个时候注意的是如何判满判空,那时我们说多留一个空间,今天不需要了.这里我们想的是如何让多个线程共同访问这个环形队列.此时我们这么分析,我们来两个线程,其中一个负责生产数据,一个负责消费数据,如果有数据,消费者怎么都可以,如果空间没有满,生产者怎么也都可以,此时我们就疑惑当消费者和生产者碰到一起了,那么此时我们该如何办?
我们先来分析一下消费者和生产者什么时候会碰到一起,此时就是为满或者为空的时候,我们应该这么解决,当为空的时候生产者先生产,为满的时候消费者先消费.那么我门是如何实现呢?注意消费者关心的是数据,生产者关心的是空间,此时我们就可以使用两个信号量来完成,我们假设环形队列的空间是N,那么生产者会在[N,0]这个范围内变化,消费者会在[0,N]这个区间变化,我们此时就明白了.
sem_t roomSem; // 空间信号量
sem_t dataSem; // 数据信号量
我们开始设计环形队列,其中我们知道了,生产者和消费者都一个信号量,这个是为了保持当为空为满时生产者和消费者该如何做,也就是上面我们说了有一定的顺序,这里很好的体现了两个角色同步,由于环形队列是临界资源,此时我们知道生产者和消费者是可以同时访问这个资源的,也就是我们需要两把锁,为了保证多个消费者之间的互斥和多个生产者之间的互斥.下面就很好处理了.
const int gCap = 5;template
class RingQueue
{
public:RingQueue(){}~RingQueue(){}private:vector _ringQueue; // 环形队列sem_t _roomSem; // 空间计数器sem_t _dataSem; // 数据计数器uint32_t _pIndex; // 生产者位置uint32_t _cIndex; // 消费者位置pthread_mutex_t _pmutex; // 生产者的锁pthread_mutex_t _cmutex; // 消费者的锁
};
既然我们成员函数是存在锁和信号量的,这里我们直接在对象的构造函数进行初始化,析构的时候释放,顺便我们也确定自己的环形队列的空间大小.
const int gCap = 5;template
class RingQueue
{
public:RingQueue(int cap = gCap): _ringQueue(cap){// 生产者使用 _ringQueue.size()sem_init(&_roomSem, 0, _ringQueue.size());// 消费者使用 0sem_init(&_dataSem, 0, 0);_pIndex = 0;_cIndex = 0;pthread_mutex_init(&_pmutex, nullptr); // this指针跑哪了pthread_mutex_init(&_cmutex, nullptr);}~RingQueue(){sem_destroy(&_roomSem);sem_destroy(&_dataSem);pthread_mutex_destroy(&_pmutex);pthread_mutex_destroy(&_cmutex);}private:vector _ringQueue; // 环形队列sem_t _roomSem; // 空间计数器sem_t _dataSem; // 数据计数器uint32_t _pIndex; // 生产者位置uint32_t _cIndex; // 消费者位置pthread_mutex_t _pmutex; // 生产者的锁pthread_mutex_t _cmutex; // 消费者的锁
};
这里我就不和大家封装了,直接使用原始的接口,这个我们直接开始下push函数.
// 生产者调用
void push(const T &in)
{// 申请信号量 --拿到信号量再继续sem_wait(&_roomSem); // P 操作 空间--// 生产pthread_mutex_lock(&_pmutex);_ringQueue[_pIndex] = in;_pIndex++;_pIndex %= _ringQueue.size();pthread_mutex_unlock(&_pmutex);// 数据那里变化sem_post(&_dataSem); // V操作 数据++
}
所有的线程在进行访问临界资源的时候必须先申请信号量,只有拿到信号量的线程才有资格访问临界资源,此时我们有点疑惑,上面的的加锁位置是不是有点不对,不应该先加锁吗?就像下面的一样.
// 生产者调用
void push(const T &in)
{pthread_mutex_lock(&_pmutex);// 申请信号量 --拿到信号量再继续sem_wait(&_roomSem); // P 操作// 生产_ringQueue[_pIndex] = in;_pIndex++;_pIndex %= _ringQueue.size();pthread_mutex_unlock(&_pmutex);// 数据那里变化sem_post(&_dataSem); // V操作
}
首先我先确定的和大家说一下,上面的两种方法都是可以的,第二种方法更加符合我们的想法,只有拿到锁的线程才能申请信号量,此时有点小小思路上的问题,我们如果拿到锁了,信号量没有了就很尴尬,毕竟我们现在还是不清楚wait函数在拿不到信号量会不会释放锁,这里应该是会释放的.我们推荐第一种写法**,对于线程而言,只有拿到信号量的线程才有资格进行对锁的申请,关键的是信号量的申请是原子的,即使申请到信号量的线程被切走了,我们也不用担心,毕竟他手中有信号量,也就有资格争抢锁.**
在写一下pop函数,这个是为了消费者准备的.
// 消费者来拿
T pop()
{sem_wait(&_dataSem); // P操纵 数据--pthread_mutex_lock(&_cmutex);T ret = _ringQueue[_cIndex];_cIndex++;_cIndex %= _ringQueue.size();pthread_mutex_unlock(&_cmutex);sem_post(&_roomSem); // V操作 空间++return ret;
}
下面我们直接开始测试,使用多个消费者和生产者.
using namespace std;
void *productor(void *args)
{RingQueue *p = (RingQueue *)args;while (true){ int data = rand() % 10;p->push(data);cout << "pthread[ " << pthread_self() << " ] 生产了一个数据 " << data << endl;}
}void *consumer(void *args)
{RingQueue *p = (RingQueue *)args;while (true){sleep(10);int data = p->pop();cout << "pthread[ " << pthread_self() << " ] 消费了一个数据 " << data << endl;}
}int main()
{srand((unsigned long)time(nullptr) ^ getpid());RingQueue rq;pthread_t c1;pthread_t c2;pthread_t c3;pthread_t c4;pthread_t p1;pthread_t p2;pthread_t p3;pthread_t p4;pthread_create(&p1, nullptr, productor, &rq);pthread_create(&p2, nullptr, productor, &rq);pthread_create(&p3, nullptr, productor, &rq);pthread_create(&p4, nullptr, productor, &rq);pthread_create(&c1, nullptr, consumer, &rq);pthread_create(&c2, nullptr, consumer, &rq);pthread_create(&c3, nullptr, consumer, &rq);pthread_create(&c4, nullptr, consumer, &rq);pthread_join(p1, nullptr);pthread_join(p2, nullptr);pthread_join(p3, nullptr);pthread_join(p4, nullptr);pthread_join(c1, nullptr);pthread_join(c2, nullptr);pthread_join(c3, nullptr);pthread_join(c4, nullptr);// cout << "hello bit" << endl;return 0;
}
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8H2D5CkN-1679066099986)(https://qkj0302.oss-cn-beijing.aliyuncs.com/20221230_112335.gif)]
我们在进程的时候和大家说过池化的概念,想一想我们只有当任务到来的时候才会创建新的线程进行处理,总感觉新线程创建是在是有点慢,我们是不是可以让多个线程先创建好,当任务到来的时候,我们从这里多个线程选择一个进行处理不就可以了,这就是我们池化技术的理念,我们在进程池中也说过.我们想一想我们线程池里面的应该有什么接口,首先我们要知道有一个变量控制线程的多少,有一个队列保存任务,既然队列出现了,我们应该给他加锁.,我们让线程先创建出来,此时应该有一个条件变量把线程给阻塞住,等到有任务了要被处理就唤醒线程.
const int gThreadNum = 5;
template
class ThreadPool
{
public:ThreadPool(int num = gThreadNum): _threadNum(num), _isStart(false){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}
private:bool _isStart; // 标记线程池是否被创建int _threadNum;queue _taskQueue; // 任务队列pthread_mutex_t _mutex;pthread_cond_t _cond; // 条件变量
};
我们应该设计出一个接口让线程池先跑起来,让然后让线程被条件变量阻塞住.
static void *threadRoutine(void *args) // 主要是 this指针
{pthread_detach(pthread_self()); // 线程分离ThreadPool *tp = (ThreadPool *)args;while (true){tp->lockQueue();while (!tp->haveTask()){// 进来是没有任务tp->waitForTask();}// 有任务了T t = tp->pop();tp->unlockQueue();int one = 0;int two = 0;char op = 0;t.get(&one, &two, &op);int result = t.run(); // 规定所有的任务必须有一个run方法Log() << "新线程处理任务 " << one << op << two << "=" << result << std::endl;}
}
void start()
{assert(!_isStart);for (size_t i = 0; i < _threadNum; i++){pthread_t temp;pthread_create(&temp, nullptr, threadRoutine, this);}_isStart = true;
}void waitForTask()
{pthread_cond_wait(&_cond, &_mutex);
}
bool haveTask()
{return !_taskQueue.empty();
}
T pop()
{T ret = _taskQueue.front();_taskQueue.pop();return ret;
}
void lockQueue()
{pthread_mutex_lock(&_mutex);
}
void unlockQueue()
{pthread_mutex_unlock(&_mutex);
}
先来解释两个东西,第一个为何我们线程的控制函数是start的,这是由于在类内的成员函数默认参数是第一个this指针,我们是无法传参的,只能使用static.再来说一下Log是啥,这是我们写的日志打印函数.
std::ostream &Log()
{std::cout << "For Debug | "<< " timestamp " << (uint32_t)time(nullptr)<< "| Thread [" << pthread_self() << "] | ";return std::cout;
}
我门只需要想外部暴露两个接口就可以了,这是由于我们用户只关心线程池创建何办任务被传入线程池,任务处理是线程池自己做的我们开始写吧.
// 放任务
void push(const T &in)
{lockQueue();_taskQueue.push(in);// 选择一个线程进行唤醒choiceThreadForHandler();// 这里为何不会死锁unlockQueue();
}
void choiceThreadForHandler()
{pthread_cond_signal(&_cond);
}
我们测试一下.
int main()
{const string str = "+-*/%";// unique_ptr> tp(new ThreadPool());unique_ptr> tp(new ThreadPool());tp->start();srand((unsigned long)time(nullptr) ^ getpid() ^ pthread_self());sleep(1);while (1){int one = rand() % 50;int two = rand() % 10;char op = str[rand() % str.size()];Task t(one, two, op);Log() << "主线程派发任务 " << one << op << two << "=?" << endl;tp->push(t);sleep(1);}// cout << "hello bit "<< endl;return 0;
}
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vBxW5OLz-1679066099987)(https://qkj0302.oss-cn-beijing.aliyuncs.com/20221230_185902.gif)]
我们说下流程,主要是加锁我害怕给大家造成勿扰.线程先起来,所有的线程都会在条件变量下等待,此时我们push任务,拿到了锁,我门选择一个线程进行唤醒,他把锁抢了回去,处理任务时把任务从队列中拿出来,此时队列就相当于一个中转站,顺便把任务拿出来把锁给解除了,由于处理任务是不涉及临界资源的,这个线程独自处理就可以了.此时push函数继续获得锁,让后把锁给解除.注意我说的流程是一个很理想的过程,实际情况还要看我们任务产生的速度,具体看OS是如何切换线程的.
关于什么是单例模式我们这里不谈,毕竟C++已经说过了.我们设计成懒汉模式.这个是很好设计的,我们及不谈了.
const int gThreadNum = 5;
template
class ThreadPool
{public:~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}public:static ThreadPool *getInstance(){static Mutex m;if (instance == nullptr){Lock_GUARD l(&m);// 有线程安全问题 -- 可以加锁解决if (instance == nullptr) // 单例不存在{instance = new ThreadPool();}// 出了代码块就是自动解锁}return instance;}// 每一个线程都会来到这里static void *threadRoutine(void *args) // 主要是 this指针{// 线程分离pthread_detach(pthread_self());// this指针ThreadPool *tp = (ThreadPool *)args;while (true){// sleep(1);// std::cout << "thread [" << pthread_self() << "] running..." << std::endl;tp->lockQueue();while (!tp->haveTask()){// 进来是没有任务,都给我去等着tp->waitForTask();}//此时 有任务了T t = tp->pop();tp->unlockQueue();// 这里应该封装成一个函数,处理任务的的函数,我们暂时这里保存一下,后面网络的时候要用int one = 0;int two = 0;char op = 0;t.get(&one, &two, &op);int result = t.run(); // 规定所有的任务必须有一个run方法Log() << "新线程处理任务 " << one << op << two << "=" << result << std::endl;}}public:void start(){assert(!_isStart);for (size_t i = 0; i < _threadNum; i++){pthread_t temp;// 创建线程pthread_create(&temp, nullptr, threadRoutine, this);}_isStart = true;}// 放任务void push(const T &in){lockQueue();_taskQueue.push(in);// 选择线程持处理任务choiceThreadForHandler();// 这里为何不会死锁,很简单,我们所有的线程在阻塞等待的时候,等待的时候锁已经被放走了,我们一旦拿到任务,首先拿到锁,继续执行,说实话后面的解锁感觉没有必要,这里保存下来也是不错的.unlockQueue();}private:ThreadPool(int num = gThreadNum): _threadNum(num), _isStart(false){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);}ThreadPool(const ThreadPool &t) = delete;ThreadPool &operator=(const ThreadPool &t) = delete;// 告知一个线程来处理任务了void choiceThreadForHandler(){pthread_cond_signal(&_cond);}// 没有任务,都在这里等这void waitForTask(){pthread_cond_wait(&_cond, &_mutex);}// 是否有任务bool haveTask(){return !_taskQueue.empty();}T pop(){T ret = _taskQueue.front();_taskQueue.pop();return ret;}void lockQueue(){pthread_mutex_lock(&_mutex);}void unlockQueue(){pthread_mutex_unlock(&_mutex);}private:bool _isStart;int _threadNum;queue _taskQueue;pthread_mutex_t _mutex;pthread_cond_t _cond;static ThreadPool *instance;
};template
ThreadPool *ThreadPool::instance = nullptr;
int main()
{const string str = "+-*/%";unique_ptr> tp(ThreadPool::getInstance());tp->start(); // 开启线程池srand((unsigned long)time(nullptr) ^ getpid() ^ pthread_self());sleep(1);while (1){int one = rand() % 50;int two = rand() % 10;char op = str[rand() % str.size()];Task t(one, two, op);Log() << "主线程派发任务 " << one << op << two << "=?" << endl;// 田间任务tp->push(t);sleep(1);}// cout << "hello bit "<< endl;return 0;
}
注意,我们知道getInstance函数在判断指针是不是为空的时候会出现线程不安全,此时我们同过加锁和多次判断来处理这一点,我们在C++中也是谈过的.
我们在产看线程的运行状况时看线程都是名字都是和可执行程序一样,感觉很不舒服,我们此时可以通过接口修改线程的名字.
[bit@Qkj threadpool]$ man 2 prctl
int main()
{prctl(PR_SET_NAME, "master");//..return 0;
}
static void *threadRoutine(void *args) // 主要是 this指针
{prctl(PR_SET_NAME, "follower");//...
}
我们顺便还是可以一个grep的指令.
[bit@Qkj threadpool]$ ps -aL | grep -E 'master|follower'
在线程的互斥的那里我们只是谈了一点皮毛,这里我们稍微加深一点,毕竟我们常用的就是互斥锁,但是这些概念我们还是需要了解的.
我们前面学的都是悲观锁,此时这里谈一下乐观锁,注意这个锁不是系统界别的,是程序员已经帮助我我们实现好了,乐观锁认为我们别人该我们的数据的概率非常低,所以我们不加锁.主要就是依据两种方式:版本号机制和CAS操作。我们谈一下版本号机制,首先我们是读写分离的,这里给大家举一个例子,我们假设一个软件要升级,我们是版本1,此时我们如果想要升级到版本2,很容易,我们拿到版本1,先写出一个版本,此时写完了,我们和版本1这个标志位做对比,如果标志位还是版本1,可以新版本覆盖老版本,并且把标志位改成版本2,如果变了,此时我们就不能覆盖,这就是版本号机制,其中我们写新版的操作我们是CAS操作.
我们都是挂起等待锁.我们在加锁和解锁之间我们称呼临界区,我都是把没有拿到锁的线程挂起.假设你去找张三一起去食堂吃饭,张三说自己有事,还要四十分钟,那么我就会去上会网,上网后到时间的我再叫张三一起吃饭.如果明天我又去找张三,他说3分钟立马下来,此时几分钟过去了,张三还没有下来,我们就在给张三打电话,还不下来我们再打,此时就是轮询查询.那么是什么决定我们以何种方式等待张三,是等待时间,如果等待时间太长,我们就挂起,时间短就是轮询检测,也就是自旋锁.
如何判断时间长短,这是我们工作的经验,其中自旋锁我们不谈的原因是因为他们的接口都差不多,关于如何自选,程序员已经帮助我们设计好了.
我们再来谈线程最后一个问题,读者写者问题.在现实中,我们有过画黑板报的经历,其中我们知道我们画黑板报的人很少,但是看着人很多,我们也是可以明白的,我们画黑板报的时候,如果我们还没有画好,此时读者就有很大可能性看到的随之产生的联想和我们认为的不同,此时我们又得到另外一个模型,对于画黑板报的人,也就是写者,他们之间是互斥的,写者与读者直加是互斥的,同时也是同步的,那么我想问问读者和读者之间有什么关系?实际上他们没有任何关系,这是由于读者只是看数据,他不会修改我们的数据.程序员针对这种情况给我们设计了不同的锁,也就是读写锁,拿到读锁的线程是读者,拿到写锁的是写者.
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t*restrict attr); //初始化
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock); //销毁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); // 加读锁
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); // 加写锁
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock); // 解锁
我们也写一个小代码,我先演示一下现象.
using namespace std;
int board = 0;
pthread_rwlock_t rw;
void *reader(void *args)
{const char *name = (char *)args;while (1){pthread_rwlock_rdlock(&rw);cout << "reader read: " << board << endl;sleep(3);pthread_rwlock_unlock(&rw);sleep(2);}
}void *writer(void *args)
{sleep(1);const char *name = (char *)args;while (1){// 加写锁pthread_rwlock_wrlock(&rw);board++;pthread_rwlock_unlock(&rw);sleep(1);}
}int main()
{pthread_rwlock_init(&rw, nullptr);pthread_t w;pthread_t r1;pthread_t r2;pthread_t r3;pthread_t r4;pthread_create(&r1, nullptr, reader, (void *)"reader");pthread_create(&r2, nullptr, reader, (void *)"reader");pthread_create(&r3, nullptr, reader, (void *)"reader");pthread_create(&r4, nullptr, reader, (void *)"reader");pthread_create(&w, nullptr, writer, (void *)"reader");pthread_join(w, nullptr);pthread_join(r1, nullptr);pthread_join(r2, nullptr);pthread_join(r3, nullptr);pthread_join(r4, nullptr);pthread_rwlock_destroy(&rw);return 0;
}
读者为何可以一下子出现很多个,不是只有一个锁吗?这是由于底层实现的原因.这个是如何实现的,我们这个伪代码,他不严谨.
读者写者进行操作的时候,可以看到读者非常多,写者非常少,这个时候就会出现一个问题,读者和写者没有很强的同步关系,我读完了不代表写者可以写,还有其他的读者,这个总是又会面对一个问题,读写同时出现,那么写者该如何?还有一个问题,读者一直有,还一直来,那么读者的总是存在,这就造成写者饥饿,这个锁本来设计的就是写者优先,所以这个饥饿问题,我们很少考虑,但是他是存在的,但我们不得不面对读者和写者同时存在,我们有两个概念来缓解这个问题,读者优先和写者优先,就是读者和写者是谁先拿到锁.我们如何做到写者优先,这个很难实现,我们通过策略当写者来的时候我们不让读者来了,已经存在的读者等着这些读者都走了,我们在让写者进行写. 我一关门,不让进了,等到读者走完了,我先写,在开门.