What I write, what I lose.
?
之前有點時間, 重新熟悉Linux的進程間通訊的東西.
于是想起之前項目中自己寫啦個很簡單的線程池.?
這次想重新寫下.
主要目的是用進程間或者線程間通信的阻塞/取消阻塞方法實現對線程池線程的等待作業和開始作業.
算是對這些代碼的一種實踐.
以上.
===================================================================
我對一個簡單線程池的一些理解.
1.創建大量的線程.
2.工作線程的執行體功能為:
while(1)
{
//按照一定條件(A)阻塞.
?
//按照任務的參數設置開始執行任務.
}
3.控制線程的功能為.
{
//接受新任務的參數, 一般為回調函數+參數. (為保持兼容, 我設置的格式為 (void*)(*thread_task)(void*) + void* . 跟線程創建保持形式兼容.)
//按照一定規則查找空閑的線程.
//將接受的新任務參數賦給這條線程數據體.
//解除這條線程的阻塞條件.
}
?
===================================================================
common-thread-pool.c ? ? 線程池主要實現+一個簡單的測試代碼.
? ?接口沒有拿出來.
thread-control.h ? ? ?提供線程池線程的等待作業和開始作業接口.
thread-control-xxxxx.c ? ??thread-control.h的接口實現. 可以使用多種方式.
?
?
common-thread-pool.c?
View Code
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <
string
.h>
#include <pthread.h>
#include <assert.h>
#include <sys/types.h>
#define
DBGPRINTF_DEBUG printf
#define
DBGPRINTF_ERROR printf
#define
ASSERT assert
#include
"
thread-control.h
"
typedef
void
*(*thread_task_func)(
void
* arg);
/*
線程執行任務的數據.
*/
struct
_thread_task_t
{
int
taskid;
/*
任務id.
*/
thread_task_func task_func;
/*
任務函數及參數
*/
void
* task_arg;
};
typedef
struct
_thread_task_t thread_task_t;
/*
線程狀態.
*/
typedef
enum
{
ethread_status_unknown =
0
,
ethread_status_idle ,
ethread_status_running ,
ethread_status_terminel ,
ethread_status_cannotuse ,
}thread_status_e;
/*
線程數據.
*/
struct
_thread_data_t
{
int
thread_id;
pthread_t pid;
thread_status_e status;
thread_task_t thread_task;
THREAD_CONTROL thread_control;
};
typedef
struct
_thread_data_t thread_data_t;
/*
線程池數據.
*/
struct
_thread_pool_t
{
thread_data_t* thread_data_set;
int
num_thread;
int
taskid_base;
pthread_mutex_t thread_pool_lock;
};
typedef
struct
_thread_pool_t thread_pool_t;
thread_pool_t g_thread_pool;
/*
設置線程狀態.
*/
int
thread_pool_setthreadstatus(thread_data_t* thread_data, thread_status_e status)
{
thread_pool_t* thread_pool = &g_thread_pool;
pthread_mutex_lock(&(thread_pool->thread_pool_lock));
thread_data->status = status;
pthread_mutex_unlock(&(thread_pool->thread_pool_lock));
return
0
;
}
/*
線程池線程函數體.
*/
void
* thread_pool_func(
void
* arg)
{
sleep(
1
);
//
Wait pthread_t count.
thread_data_t* thread_data = (thread_data_t*)arg;
DBGPRINTF_DEBUG(
"
Thread start run. Thread_id = %d, pid = 0x%x . \n
"
,
thread_data->thread_id, (unsigned
int
)thread_data->pid);
/*
Continue to wait the task, then based on new task_func and task_arg to perform this task.
*/
while
(
1
)
{
thread_control_wait(thread_data->thread_control);
//
Need to lock? Yes.
thread_pool_setthreadstatus(thread_data, ethread_status_running);
DBGPRINTF_DEBUG(
"
Task start. taskid = %d .\n
"
, thread_data->thread_task.taskid);
thread_data->thread_task.task_func(thread_data->thread_task.task_arg);
DBGPRINTF_DEBUG(
"
Task end. taskid = %d .\n
"
, thread_data->thread_task.taskid);
//
Need to lock?Yes.
thread_pool_setthreadstatus(thread_data, ethread_status_idle);
}
DBGPRINTF_DEBUG(
"
Thread end run. Thread_id = %d, pid = 0x%x . \n
"
,
thread_data->thread_id, (unsigned
int
)thread_data->pid);
}
int
thread_task_init(thread_task_t* thread_task)
{
thread_task->taskid = -
1
;
thread_task->task_func = NULL;
thread_task->task_arg = NULL;
return
0
;
}
int
thread_data_init(thread_data_t* thread_data)
{
thread_data->thread_id = -
1
;
thread_data->pid =
0x0
;
thread_data->status = ethread_status_unknown ,
thread_task_init(&(thread_data->thread_task));
thread_control_init(&(thread_data->thread_control));
return
0
;
}
int
thread_pool_create(
int
num_thread)
{
ASSERT(num_thread >
0
&& num_thread <=
10
*
1024
);
thread_pool_t* thread_pool = &g_thread_pool;
int
i =
0
;
thread_pool->thread_data_set = (thread_data_t*)malloc(
sizeof
(thread_data_t) * num_thread);
ASSERT(thread_pool->thread_data_set != NULL);
thread_pool->num_thread = num_thread;
thread_pool->taskid_base = -
1
;
pthread_mutex_init(&(thread_pool->thread_pool_lock), NULL);
for
(i=
0
; i<num_thread; i++)
{
thread_data_t* thread_data = thread_pool->thread_data_set+i;
thread_data_init(thread_data);
thread_data->thread_id = i;
thread_data->status = ethread_status_idle;
/*
pthread_create set to detached.
*/
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
int
ret = pthread_create(&(thread_data->pid), &attr, thread_pool_func, thread_data);
if
(ret !=
0
)
{
DBGPRINTF_DEBUG(
"
pthread_create error[%d].\n
"
, i);
break
;
}
}
sleep(
2
);
return
0
;
}
void
* test_func(
void
* arg)
{
int
t_sleep = (
int
)arg;
DBGPRINTF_DEBUG(
"
Test func. Sleep %d .\n
"
, t_sleep);
/*
int a[2048*1024] = {0};
int i = 0;
for(i=0; i<2028*1024; i++)
{
a[i] = i*i;
}
DBGPRINTF_DEBUG("a[0]=%d. \n", a[0]);
*/
sleep(t_sleep);
DBGPRINTF_DEBUG(
"
Test func finished. \n
"
);
return
NULL;
}
/*
查詢可接收任務的線程.
*/
int
thread_pool_queryfree(thread_data_t** thread_data_found)
{
*thread_data_found = NULL;
thread_pool_t* thread_pool = &g_thread_pool;
pthread_mutex_lock(&(thread_pool->thread_pool_lock));
int
i =
0
;
for
(i=
0
; i<thread_pool->num_thread; i++)
{
thread_data_t* thread_data = thread_pool->thread_data_set+i;
if
(thread_data->status == ethread_status_idle)
{
*thread_data_found = thread_data;
break
;
}
}
pthread_mutex_unlock(&(thread_pool->thread_pool_lock));
return
0
;
}
/*
分配taskid.
*/
int
thread_pool_gettaskid(
int
* taskid)
{
thread_pool_t* thread_pool = &g_thread_pool;
pthread_mutex_lock(&(thread_pool->thread_pool_lock));
thread_pool->taskid_base ++;
*taskid = thread_pool->taskid_base;
pthread_mutex_unlock(&(thread_pool->thread_pool_lock));
return
0
;
}
/*
向線程池增加任務.
*/
int
thread_pool_addtask(thread_task_func task_func,
void
* arg)
{
/*
Find a free thread.
*/
thread_data_t* thread_data_found = NULL;
thread_pool_queryfree(&thread_data_found);
if
(thread_data_found != NULL)
{
DBGPRINTF_DEBUG(
"
Thread [%d] perferm this task.\n
"
, thread_data_found->thread_id);
/*
Set task data.
*/
thread_data_found->thread_task.task_func = task_func;
thread_data_found->thread_task.task_arg = arg;
thread_pool_gettaskid(&(thread_data_found->thread_task.taskid));
/*
Start the task.
*/
thread_pool_setthreadstatus(thread_data_found, ethread_status_running);
thread_control_start(thread_data_found->thread_control);
DBGPRINTF_DEBUG(
"
Thread [%d] Add task[%d] finished.\n
"
,
thread_data_found->thread_id, thread_data_found->thread_task.taskid);
}
else
{
DBGPRINTF_ERROR(
"
Thread pool full. Task not added.\n
"
);
}
return
0
;
}
int
main()
{
thread_pool_create(
10
);
//
thread_pool_create(10);
thread_pool_addtask(test_func, (
void
*)(
1
<<
0
));
thread_pool_addtask(test_func, (
void
*)(
1
<<
1
));
thread_pool_addtask(test_func, (
void
*)(
1
<<
2
));
thread_pool_addtask(test_func, (
void
*)(
1
<<
3
));
thread_pool_addtask(test_func, (
void
*)(
1
<<
4
));
thread_pool_addtask(test_func, (
void
*)(
1
<<
5
));
thread_pool_addtask(test_func, (
void
*)(
1
<<
6
));
thread_pool_addtask(test_func, (
void
*)(
1
<<
7
));
sleep(
6
);
thread_pool_addtask(test_func, (
void
*)(
1
<<
0
));
thread_pool_addtask(test_func, (
void
*)(
1
<<
1
));
thread_pool_addtask(test_func, (
void
*)(
1
<<
2
));
thread_pool_addtask(test_func, (
void
*)(
1
<<
3
));
thread_pool_addtask(test_func, (
void
*)(
1
<<
4
));
thread_pool_addtask(test_func, (
void
*)(
1
<<
5
));
thread_pool_addtask(test_func, (
void
*)(
1
<<
6
));
thread_pool_addtask(test_func, (
void
*)(
1
<<
7
));
sleep(
100000
);
return
0
;
}
thread-control.h
View Code
#define
THREAD_CONTROL void*
int
thread_control_init(THREAD_CONTROL* thread_control);
int
thread_control_deinit(THREAD_CONTROL* thread_control);
int
thread_control_wait(THREAD_CONTROL thread_control);
int
thread_control_start(THREAD_CONTROL thread_control);
?
thread-control.h的接口實現. 可以使用多種方式.
只要進程間通信/線程間通信中存在阻塞等待/解除阻塞等待的都可以拿來作實驗.
比如:條件變量.
thread-control-condition.c ??
View Code
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <
string
.h>
#include <pthread.h>
#include <assert.h>
#include <sys/types.h>
#define
DBGPRINTF_DEBUG printf
#define
DBGPRINTF_ERROR printf
#define
ASSERT assert
#include
"
thread-control.h
"
struct
_thread_control_cond_t
{
pthread_mutex_t
lock
;
pthread_cond_t condition;
};
typedef
struct
_thread_control_cond_t thread_control_cond_t;
int
thread_control_init(THREAD_CONTROL* thread_control)
{
*thread_control = NULL;
thread_control_cond_t* cond = (thread_control_cond_t*)malloc(
sizeof
(thread_control_cond_t));
assert(cond != NULL);
pthread_mutex_init(&(cond->
lock
), NULL);
pthread_cond_init(&(cond->condition), NULL);
*thread_control = cond;
return
0
;
}
int
thread_control_deinit(THREAD_CONTROL* thread_control)
{
thread_control_cond_t* cond = (thread_control_cond_t*)(*thread_control);
pthread_mutex_destroy(&(cond->
lock
));
pthread_cond_destroy(&(cond->condition));
free(cond);
*thread_control = NULL;
return
0
;
}
int
thread_control_wait(THREAD_CONTROL thread_control)
{
thread_control_cond_t* cond = (thread_control_cond_t*)(thread_control);
//
Wait pthread condition.
pthread_mutex_lock(&(cond->
lock
));
pthread_cond_wait(&(cond->condition), &(cond->
lock
));
pthread_mutex_unlock(&(cond->
lock
));
return
0
;
}
int
thread_control_start(THREAD_CONTROL thread_control)
{
thread_control_cond_t* cond = (thread_control_cond_t*)(thread_control);
//
start pthread condition.
pthread_mutex_lock(&(cond->
lock
));
pthread_cond_signal(&(cond->condition));
pthread_mutex_unlock(&(cond->
lock
));
return
0
;
}
比如:有名管道.
thread-control-fifopipe.c
View Code
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <
string
.h>
#include <pthread.h>
#include <assert.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#define
DBGPRINTF_DEBUG printf
#define
DBGPRINTF_ERROR printf
#define
ASSERT assert
#include
"
thread-control.h
"
static
int
path_index =
0
;
#define
LEN_CMD_PATH 10
struct
_fifopipe_control_t
{
char
fifopipe_cmd_path[LEN_CMD_PATH];
};
typedef
struct
_fifopipe_control_t fifopipe_control_t;
int
thread_control_init(THREAD_CONTROL* thread_control)
{
*thread_control = NULL;
fifopipe_control_t* fifopipe_control = (fifopipe_control_t*)malloc(
sizeof
(fifopipe_control_t));
assert(fifopipe_control != NULL);
path_index ++;
snprintf(fifopipe_control->fifopipe_cmd_path, LEN_CMD_PATH,
"
./xxx%d
"
, path_index);
int
ret = mkfifo(fifopipe_control->fifopipe_cmd_path,
0666
/*
(O_CREAT | O_RDWR)
*/
);
assert(ret ==
0
);
*thread_control = fifopipe_control;
return
0
;
}
int
thread_control_deinit(THREAD_CONTROL* thread_control)
{
fifopipe_control_t* fifopipe_control = (fifopipe_control_t*)(*thread_control);
free(fifopipe_control);
*thread_control = NULL;
return
0
;
}
int
thread_control_wait(THREAD_CONTROL thread_control)
{
fifopipe_control_t* fifopipe_control = (fifopipe_control_t*)(thread_control);
int
fd = open(fifopipe_control->fifopipe_cmd_path, O_RDONLY,
0
);
assert(fd>
0
);
char
tmp =
0
;
read(fd, &tmp,
1
);
return
0
;
}
int
thread_control_start(THREAD_CONTROL thread_control)
{
fifopipe_control_t* fifopipe_control = (fifopipe_control_t*)(thread_control);
int
fd = open(fifopipe_control->fifopipe_cmd_path, O_WRONLY,
0
);
assert(fd>
0
);
char
tmp =
0
;
write(fd, &tmp,
1
);
return
0
;
}
?
比如:管道, 消息隊列, socket, while(condition?){sleep}等等.?
?
以上代碼中, 注釋的比較少.?
差不多.其實我都有點不知道自己在寫什么.
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元

