Table of Contents
A simple C thread pool implementation
Currently, the implementation:
The API contains addtional unused 'flags' parameters that would allow some additional options:
https://github.com/mbrossard/threadpool
上图来自:(https://mp.weixin.qq.com/s/O5Ubr9nyUm7os4M6BJGYDA)
相关技术文章
https://blog.csdn.net/qq_36359022/article/details/78796784
https://mp.weixin.qq.com/s/O5Ubr9nyUm7os4M6BJGYDA
/*
* Copyright (c) 2016, Mathias Brossard <[email protected]>.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_
#ifdef __cplusplus
extern "C" {
#endif
/**
* @file threadpool.h
* @brief Threadpool Header File
*/
/**
* Increase this constants at your own risk
* Large values might slow down your system
*/
#define MAX_THREADS 64
#define MAX_QUEUE 65536
typedef struct threadpool_t threadpool_t;
typedef enum {
threadpool_invalid = -1,
threadpool_lock_failure = -2,
threadpool_queue_full = -3,
threadpool_shutdown = -4,
threadpool_thread_failure = -5
} threadpool_error_t;
typedef enum {
threadpool_graceful = 1
} threadpool_destroy_flags_t;
/**
* @function threadpool_create
* @brief Creates a threadpool_t object.
* @param thread_count Number of worker threads.
* @param queue_size Size of the queue.
* @param flags Unused parameter.
* @return a newly created thread pool or NULL
*/
threadpool_t *threadpool_create(int thread_count, int queue_size, int flags);
/**
* @function threadpool_add
* @brief add a new task in the queue of a thread pool
* @param pool Thread pool to which add the task.
* @param function Pointer to the function that will perform the task.
* @param argument Argument to be passed to the function.
* @param flags Unused parameter.
* @return 0 if all goes well, negative values in case of error (@see
* threadpool_error_t for codes).
*/
int threadpool_add(threadpool_t *pool, void (*routine)(void *),
void *arg, int flags);
/**
* @function threadpool_destroy
* @brief Stops and destroys a thread pool.
* @param pool Thread pool to destroy.
* @param flags Flags for shutdown
*
* Known values for flags are 0 (default) and threadpool_graceful in
* which case the thread pool doesn't accept any new tasks but
* processes all pending tasks before shutdown.
*/
int threadpool_destroy(threadpool_t *pool, int flags);
#ifdef __cplusplus
}
#endif
#endif /* _THREADPOOL_H_ */
/*
* Copyright (c) 2016, Mathias Brossard <[email protected]>.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/**
* @file threadpool.c
* @brief Threadpool implementation file
*/
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include "threadpool.h"
typedef enum {
immediate_shutdown = 1,
graceful_shutdown = 2
} threadpool_shutdown_t;
/**
* @struct threadpool_task
* @brief the work struct
*
* @var function Pointer to the function that will perform the task.
* @var argument Argument to be passed to the function.
*/
typedef struct {
void (*function)(void *);
void *argument;
} threadpool_task_t;
/**
* @struct threadpool
* @brief The threadpool struct
*
* @var notify Condition variable to notify worker threads.
* @var threads Array containing worker threads ID.
* @var thread_count Number of threads
* @var queue Array containing the task queue.
* @var queue_size Size of the task queue.
* @var head Index of the first element.
* @var tail Index of the next element.
* @var count Number of pending tasks
* @var shutdown Flag indicating if the pool is shutting down
* @var started Number of started threads
*/
struct threadpool_t {
pthread_mutex_t lock;
pthread_cond_t notify;
pthread_t *threads;
threadpool_task_t *queue;
int thread_count;
int queue_size;
int head;
int tail;
int count;
int shutdown;
int started;
};
/**
* @function void *threadpool_thread(void *threadpool)
* @brief the worker thread
* @param threadpool the pool which own the thread
*/
static void *threadpool_thread(void *threadpool);
int threadpool_free(threadpool_t *pool);
threadpool_t *threadpool_create(int thread_count, int queue_size, int flags)
{
threadpool_t *pool;
int i;
(void) flags;
if(thread_count <= 0 || thread_count > MAX_THREADS || queue_size <= 0 || queue_size > MAX_QUEUE) {
return NULL;
}
if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {
goto err;
}
/* Initialize */
pool->thread_count = 0;
pool->queue_size = queue_size;
pool->head = pool->tail = pool->count = 0;
pool->shutdown = pool->started = 0;
/* Allocate thread and task queue */
pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thread_count);
pool->queue = (threadpool_task_t *)malloc
(sizeof(threadpool_task_t) * queue_size);
/* Initialize mutex and conditional variable first */
if((pthread_mutex_init(&(pool->lock), NULL) != 0) ||
(pthread_cond_init(&(pool->notify), NULL) != 0) ||
(pool->threads == NULL) ||
(pool->queue == NULL)) {
goto err;
}
/* Start worker threads */
for(i = 0; i < thread_count; i++) {
if(pthread_create(&(pool->threads[i]), NULL,
threadpool_thread, (void*)pool) != 0) {
threadpool_destroy(pool, 0);
return NULL;
}
pool->thread_count++;
pool->started++;
}
return pool;
err:
if(pool) {
threadpool_free(pool);
}
return NULL;
}
int threadpool_add(threadpool_t *pool, void (*function)(void *),
void *argument, int flags)
{
int err = 0;
int next;
(void) flags;
if(pool == NULL || function == NULL) {
return threadpool_invalid;
}
if(pthread_mutex_lock(&(pool->lock)) != 0) {
return threadpool_lock_failure;
}
next = (pool->tail + 1) % pool->queue_size;
do {
/* Are we full ? */
if(pool->count == pool->queue_size) {
err = threadpool_queue_full;
break;
}
/* Are we shutting down ? */
if(pool->shutdown) {
err = threadpool_shutdown;
break;
}
/* Add task to queue */
pool->queue[pool->tail].function = function;
pool->queue[pool->tail].argument = argument;
pool->tail = next;
pool->count += 1;
/* pthread_cond_broadcast */
if(pthread_cond_signal(&(pool->notify)) != 0) {
err = threadpool_lock_failure;
break;
}
} while(0);
if(pthread_mutex_unlock(&pool->lock) != 0) {
err = threadpool_lock_failure;
}
return err;
}
int threadpool_destroy(threadpool_t *pool, int flags)
{
int i, err = 0;
if(pool == NULL) {
return threadpool_invalid;
}
if(pthread_mutex_lock(&(pool->lock)) != 0) {
return threadpool_lock_failure;
}
do {
/* Already shutting down */
if(pool->shutdown) {
err = threadpool_shutdown;
break;
}
pool->shutdown = (flags & threadpool_graceful) ?
graceful_shutdown : immediate_shutdown;
/* Wake up all worker threads */
if((pthread_cond_broadcast(&(pool->notify)) != 0) ||
(pthread_mutex_unlock(&(pool->lock)) != 0)) {
err = threadpool_lock_failure;
break;
}
/* Join all worker thread */
for(i = 0; i < pool->thread_count; i++) {
if(pthread_join(pool->threads[i], NULL) != 0) {
err = threadpool_thread_failure;
}
}
} while(0);
/* Only if everything went well do we deallocate the pool */
if(!err) {
threadpool_free(pool);
}
return err;
}
int threadpool_free(threadpool_t *pool)
{
if(pool == NULL || pool->started > 0) {
return -1;
}
/* Did we manage to allocate ? */
if(pool->threads) {
free(pool->threads);
free(pool->queue);
/* Because we allocate pool->threads after initializing the
mutex and condition variable, we're sure they're
initialized. Let's lock the mutex just in case. */
pthread_mutex_lock(&(pool->lock));
pthread_mutex_destroy(&(pool->lock));
pthread_cond_destroy(&(pool->notify));
}
free(pool);
return 0;
}
static void *threadpool_thread(void *threadpool)
{
threadpool_t *pool = (threadpool_t *)threadpool;
threadpool_task_t task;
for(;;) {
/* Lock must be taken to wait on conditional variable */
pthread_mutex_lock(&(pool->lock));
/* Wait on condition variable, check for spurious wakeups.
When returning from pthread_cond_wait(), we own the lock. */
while((pool->count == 0) && (!pool->shutdown)) {
pthread_cond_wait(&(pool->notify), &(pool->lock));
}
if((pool->shutdown == immediate_shutdown) ||
((pool->shutdown == graceful_shutdown) &&
(pool->count == 0))) {
break;
}
/* Grab our task */
task.function = pool->queue[pool->head].function;
task.argument = pool->queue[pool->head].argument;
pool->head = (pool->head + 1) % pool->queue_size;
pool->count -= 1;
/* Unlock */
pthread_mutex_unlock(&(pool->lock));
/* Get to work */
(*(task.function))(task.argument);
}
pool->started--;
pthread_mutex_unlock(&(pool->lock));
pthread_exit(NULL);
return(NULL);
}
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include "threadpool.h"
#define THREAD 4
#define SIZE 8192
#define QUEUES 64
/*
* Warning do not increase THREAD and QUEUES too much on 32-bit
* platforms: because of each thread (and there will be THREAD *
* QUEUES of them) will allocate its own stack (8 MB is the default on
* Linux), you'll quickly run out of virtual space.
*/
threadpool_t *pool[QUEUES];
int tasks[SIZE], left;
pthread_mutex_t lock;
int error;
void dummy_task(void *arg) {
int *pi = (int *)arg;
*pi += 1;
if(*pi < QUEUES) {
assert(threadpool_add(pool[*pi], &dummy_task, arg, 0) == 0);
} else {
pthread_mutex_lock(&lock);
left--;
pthread_mutex_unlock(&lock);
}
}
int main(int argc, char **argv)
{
int i, copy = 1;
left = SIZE;
pthread_mutex_init(&lock, NULL);
for(i = 0; i < QUEUES; i++) {
pool[i] = threadpool_create(THREAD, SIZE, 0);
assert(pool[i] != NULL);
}
usleep(10);
for(i = 0; i < SIZE; i++) {
tasks[i] = 0;
assert(threadpool_add(pool[0], &dummy_task, &(tasks[i]), 0) == 0);
}
while(copy > 0) {
usleep(10);
pthread_mutex_lock(&lock);
copy = left;
pthread_mutex_unlock(&lock);
}
for(i = 0; i < QUEUES; i++) {
assert(threadpool_destroy(pool[i], 0) == 0);
}
pthread_mutex_destroy(&lock);
return 0;
}
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include "threadpool.h"
#define THREAD 4
#define SIZE 8192
threadpool_t *pool;
int left;
pthread_mutex_t lock;
int error;
void dummy_task(void *arg) {
usleep(100);
pthread_mutex_lock(&lock);
left--;
pthread_mutex_unlock(&lock);
}
int main(int argc, char **argv)
{
int i;
pthread_mutex_init(&lock, NULL);
/* Testing immediate shutdown */
left = SIZE;
pool = threadpool_create(THREAD, SIZE, 0);
for(i = 0; i < SIZE; i++) {
assert(threadpool_add(pool, &dummy_task, NULL, 0) == 0);
}
assert(threadpool_destroy(pool, 0) == 0);
assert(left > 0);
/* Testing graceful shutdown */
left = SIZE;
pool = threadpool_create(THREAD, SIZE, 0);
for(i = 0; i < SIZE; i++) {
assert(threadpool_add(pool, &dummy_task, NULL, 0) == 0);
}
assert(threadpool_destroy(pool, threadpool_graceful) == 0);
assert(left == 0);
pthread_mutex_destroy(&lock);
return 0;
}
#define THREAD 32
#define QUEUE 256
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include "threadpool.h"
int tasks = 0, done = 0;
pthread_mutex_t lock;
void dummy_task(void *arg) {
usleep(10000);
pthread_mutex_lock(&lock);
done++;
pthread_mutex_unlock(&lock);
}
int main(int argc, char **argv)
{
threadpool_t *pool;
pthread_mutex_init(&lock, NULL);
assert((pool = threadpool_create(THREAD, QUEUE, 0)) != NULL);
fprintf(stderr, "Pool started with %d threads and "
"queue size of %d\n", THREAD, QUEUE);
while(threadpool_add(pool, &dummy_task, NULL, 0) == 0) {
pthread_mutex_lock(&lock);
tasks++;
pthread_mutex_unlock(&lock);
}
fprintf(stderr, "Added %d tasks\n", tasks);
while((tasks / 2) > done) {
usleep(10000);
}
assert(threadpool_destroy(pool, 0) == 0);
fprintf(stderr, "Did %d tasks\n", done);
return 0;
}
文章浏览阅读1.6k次。安装配置gi、安装数据库软件、dbca建库见下:http://blog.csdn.net/kadwf123/article/details/784299611、检查集群节点及状态:[root@rac2 ~]# olsnodes -srac1 Activerac2 Activerac3 Activerac4 Active[root@rac2 ~]_12c查看crs状态
文章浏览阅读1.3w次,点赞45次,收藏99次。我个人用的是anaconda3的一个python集成环境,自带jupyter notebook,但在我打开jupyter notebook界面后,却找不到对应的虚拟环境,原来是jupyter notebook只是通用于下载anaconda时自带的环境,其他环境要想使用必须手动下载一些库:1.首先进入到自己创建的虚拟环境(pytorch是虚拟环境的名字)activate pytorch2.在该环境下下载这个库conda install ipykernelconda install nb__jupyter没有pytorch环境
文章浏览阅读5.2k次,点赞19次,收藏28次。选择scoop纯属意外,也是无奈,因为电脑用户被锁了管理员权限,所有exe安装程序都无法安装,只可以用绿色软件,最后被我发现scoop,省去了到处下载XXX绿色版的烦恼,当然scoop里需要管理员权限的软件也跟我无缘了(譬如everything)。推荐添加dorado这个bucket镜像,里面很多中文软件,但是部分国外的软件下载地址在github,可能无法下载。以上两个是官方bucket的国内镜像,所有软件建议优先从这里下载。上面可以看到很多bucket以及软件数。如果官网登陆不了可以试一下以下方式。_scoop-cn
文章浏览阅读4.5k次,点赞2次,收藏3次。首先要有一个color-picker组件 <el-color-picker v-model="headcolor"></el-color-picker>在data里面data() { return {headcolor: ’ #278add ’ //这里可以选择一个默认的颜色} }然后在你想要改变颜色的地方用v-bind绑定就好了,例如:这里的:sty..._vue el-color-picker
文章浏览阅读640次。基于芯片日益增长的问题,所以内核开发者们引入了新的方法,就是在内核中只保留函数,而数据则不包含,由用户(应用程序员)自己把数据按照规定的格式编写,并放在约定的地方,为了不占用过多的内存,还要求数据以根精简的方式编写。boot启动时,传参给内核,告诉内核设备树文件和kernel的位置,内核启动时根据地址去找到设备树文件,再利用专用的编译器去反编译dtb文件,将dtb还原成数据结构,以供驱动的函数去调用。firmware是三星的一个固件的设备信息,因为找不到固件,所以内核启动不成功。_exynos 4412 刷机
文章浏览阅读2w次,点赞24次,收藏42次。Linux系统配置jdkLinux学习教程,Linux入门教程(超详细)_linux配置jdk
文章浏览阅读3.3k次,点赞5次,收藏19次。xlabel('\delta');ylabel('AUC');具体符号的对照表参照下图:_matlab微米怎么输入
文章浏览阅读119次。顺序读写指的是按照文件中数据的顺序进行读取或写入。对于文本文件,可以使用fgets、fputs、fscanf、fprintf等函数进行顺序读写。在C语言中,对文件的操作通常涉及文件的打开、读写以及关闭。文件的打开使用fopen函数,而关闭则使用fclose函数。在C语言中,可以使用fread和fwrite函数进行二进制读写。 Biaoge 于2024-03-09 23:51发布 阅读量:7 ️文章类型:【 C语言程序设计 】在C语言中,用于打开文件的函数是____,用于关闭文件的函数是____。
文章浏览阅读3.4k次,点赞2次,收藏13次。跟随鼠标移动的粒子以grid(SOP)为partical(SOP)的资源模板,调整后连接【Geo组合+point spirit(MAT)】,在连接【feedback组合】适当调整。影响粒子动态的节点【metaball(SOP)+force(SOP)】添加mouse in(CHOP)鼠标位置到metaball的坐标,实现鼠标影响。..._touchdesigner怎么让一个模型跟着鼠标移动
文章浏览阅读178次。项目运行环境配置:Jdk1.8 + Tomcat7.0 + Mysql + HBuilderX(Webstorm也行)+ Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。项目技术:Springboot + mybatis + Maven +mysql5.7或8.0+html+css+js等等组成,B/S模式 + Maven管理等等。环境需要1.运行环境:最好是java jdk 1.8,我们在这个平台上运行的。其他版本理论上也可以。_基于java技术的停车场管理系统实现与设计
文章浏览阅读3.5k次。前言对于MediaPlayer播放器的源码分析内容相对来说比较多,会从Java-&amp;gt;Jni-&amp;gt;C/C++慢慢分析,后面会慢慢更新。另外,博客只作为自己学习记录的一种方式,对于其他的不过多的评论。MediaPlayerDemopublic class MainActivity extends AppCompatActivity implements SurfaceHolder.Cal..._android多媒体播放源码分析 时序图
文章浏览阅读2.4k次,点赞41次,收藏13次。java 数据结构与算法 ——快速排序法_快速排序法