下面的程序对每个GROUPSIZE文件使用一个工作线程来处理nbFiles文件。并行运行的工作线程不会超过MAXNBRTHREADS工作线程。使用watchDog()线程(线程0)来引导PTHREAD_CANCEL_DEFERRED相同的工作进程。如果任何一个工作线程失败,它就会在全局互斥锁mtx的保护下对watchDog执行pthread_cond_signal(&errCv)操作,通过errIndc谓词传递它的线程ID。然后,watchDog取消所有正在运行的线程(全局oldest保留仍处于活动状态的最旧线程的ID,以帮助它完成此操作),并退出程序。
// compile with: gcc -Wall -Wextra -Wconversion -pedantic -std=c99 -g -D_BSD_SOURCE -pthread -o pFiles pFiles.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <stdint.h>
#include "pthread.h"
#define INDIC_ALL_DONE_OK -1
typedef int_fast32_t int32;
typedef uint_fast32_t uint32;
uint32 MAXNBRTHREADS = 10; // no more than this amount of threads running in parallel
uint32 GROUPSIZE = 10; // how many files per thread
uint32 nbFiles, gThID; // total #files, group ID for a starting thread
int32 errIndc = 0; // global thread error indicator
pthread_t *thT; // pthread table
void **retVals; // thread ret. val. table, needed in stop_watchDog()
uint32 gThCnt; // calculated size of thT[]
uint32 thCnt, oldest; // running threads count (as they are created), oldest thread *alive*
pthread_cond_t errCv = PTHREAD_COND_INITIALIZER; // thread-originated error signal
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; // mutex to protect errIndc
// Worker thread
void *processFileGroup(void *arg) {
int32 err;
int last_state, last_type;
uint32 i, thId = (uint32)(intptr_t) arg;
fprintf(stderr, "th %ld started\n", thId);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &last_state);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, &last_type);
// Artificial error in thread 17
if(thId==17) {
pthread_mutex_lock(&mtx);
errIndc = (int32) thId;
pthread_cond_signal(&errCv);
pthread_mutex_unlock(&mtx);
pthread_exit((void *)(intptr_t)err); }
for(i = 0; i < GROUPSIZE ; i++) { // simulate processing GROUPSIZE files
pthread_testcancel();
err = 0;
if(usleep(10000)) { err = 1; break; }
}
//fprintf(stderr, " -- th %ld done with err = %ld\n", thId, err);
if(err!=0) { // Signal watch dog
pthread_mutex_lock(&mtx);
errIndc = (int32) thId;
pthread_cond_signal(&errCv);
pthread_mutex_unlock(&mtx);
pthread_exit((void *)(intptr_t) err);
}
pthread_exit((void *)(intptr_t) err);
}
// Mishap : cancel existing threads, exit program
int32 cancel_exit(int32 rc, int32 faultyThId, char *msg) {
uint32 j; int32 rval;
void *retVal;
if(rc==0) return 0;
if(msg!=NULL && msg[0]=='\0') fprintf(stderr, "\nError in thread %ld. Stoping..\n", faultyThId);
else fprintf(stderr, "\n%s %ld. Stop.\n\n", msg, faultyThId);
for(j = oldest; j < thCnt ; j++) pthread_cancel(thT[j]);
for(j = oldest; j < thCnt ; j++){
pthread_join(thT[j], &retVal); rval = (int)(intptr_t) retVal;
//if(retVal == PTHREAD_CANCELED || rval==115390242)
if(retVal == PTHREAD_CANCELED)
fprintf(stderr, " cexit: thread %ld canceled\n", j);
else fprintf(stderr, " cexit: thread %ld finished, rc = %ld\n", j, rval);
}
pthread_join(thT[4], &retVal); rval = (int)(intptr_t) retVal; fprintf(stderr, " cexit1: thread 4 finished, rc = %ld\n", rval);
fprintf(stderr, "Processing stopped\n\n");
exit(EXIT_FAILURE); return rc;
}
// Watch dog thread
// it fires on signal from one of the running threads about a mishap
void *watchDog(void *arg) {
int32 err;
pthread_mutex_lock(&mtx);
while (errIndc == 0) {
pthread_cond_wait(&errCv,&mtx);
if(errIndc == INDIC_ALL_DONE_OK){ // main() says we're done with no issues
pthread_mutex_unlock(&mtx);
err = 0; pthread_exit((void *)(intptr_t) err);
}
}
pthread_mutex_unlock(&mtx);
fprintf(stderr, "watch dog: stopping on error indication %ld\n", errIndc);
cancel_exit(1, errIndc, "");
exit(EXIT_FAILURE); return arg;// not reached
}
void stop_watchDog() {
pthread_mutex_lock(&mtx);
errIndc = INDIC_ALL_DONE_OK;
pthread_cond_signal(&errCv);
pthread_mutex_unlock(&mtx);
pthread_join(thT[0], &retVals[0]);
}
int main() {
uint32 i, k;
int32 rc;
nbFiles = 950;
gThCnt = 1+nbFiles/GROUPSIZE;
if(gThCnt > MAXNBRTHREADS)
fprintf(stderr, "running max %ld threads in parallel\n", MAXNBRTHREADS);
else fprintf(stderr, "using %ld worker thread(s)\n", gThCnt);
gThCnt++; // account for watchDog (thread 0)
thT = (pthread_t *) calloc(gThCnt, sizeof(pthread_t)); if(thT==NULL) { perror("calloc"); exit(EXIT_FAILURE); }
retVals = (void **) calloc( (nbFiles/GROUPSIZE), sizeof(void *)); if(retVals==NULL) { perror("calloc"); exit(EXIT_FAILURE); }
// Start watch dog
rc = pthread_create(&thT[0], NULL, watchDog, NULL);
if(rc != 0) { fprintf(stderr,"pthread_create() failed for thread 0\n"); exit(EXIT_FAILURE); }
thCnt = 1;
i = 0; oldest = 1;
while(thCnt<gThCnt) {
pthread_mutex_lock(&mtx);
if(errIndc != 0){ // watchDog is already tearing down the whole system, no point in creating more threads
pthread_join(thT[0], &retVals[0]); // wait on WatchDog thread, which never returns (it cancel_exists).
exit(EXIT_FAILURE); // not reached
}
pthread_mutex_unlock(&mtx);
gThID = thCnt;
rc = pthread_create(&thT[thCnt], NULL, processFileGroup, (void *)(intptr_t) gThID);
if(rc != 0) {
fprintf(stderr,"pthread_create() failed for thread %ld\n", thCnt);
stop_watchDog();
cancel_exit(1, (int32)thCnt, "Could not create thread");
}
thCnt++;
if(thCnt>MAXNBRTHREADS) { // wait for the oldest thread to finish
pthread_mutex_lock(&mtx);
if(errIndc != 0) { // watchDog is already tearing down the whole system, he'll report the rc of thread "oldest"
printf("[MAXNBRTHREADS] errIndc=%ld, joining watchDog\n", errIndc);
pthread_join(thT[0], &retVals[0]); // wait on WatchDog thread, which never returns (it cancel_exists).
exit(EXIT_FAILURE); // not reached
}
pthread_mutex_unlock(&mtx);
pthread_join(thT[oldest], &retVals[oldest]); rc = (int)(intptr_t) retVals[oldest];
fprintf(stderr, "[MAXNBRTHREADS] Thread %ld done with rc = %ld\n", oldest, rc);
oldest++;
}
}
k = oldest;
while(k<thCnt) {
pthread_mutex_lock(&mtx);
if(errIndc != 0){ // watchDog is already tearing down the whole system, he'll report the rc of thread k
pthread_join(thT[0], &retVals[0]); // wait on WatchDog thread, which never returns (it cancel_exists).
exit(EXIT_FAILURE); // not reached
}
pthread_mutex_unlock(&mtx);
pthread_join(thT[k], &retVals[k]); rc = (int)(intptr_t) retVals[k];
fprintf(stderr, "Thread %ld done with rc = %ld\n", k, rc);
oldest = ++k;
}
// Signal watch dog to quit
stop_watchDog();
exit(EXIT_SUCCESS);
}第82行导致此程序出现segfault。为什么?加入一个被取消的线程是非法的吗?
如果您注释第82行,就会出现其他问题。如果你运行该程序4次中的3次,你会看到这些病理结果之一:
线程11怎么会有两个不同的退出代码?
..
watch dog: stopping on error indication 17
Error in thread 17. Stoping..
th 19 started
cexit: thread 11 finished, rc = 115390242
[MAXNBRTHREADS] Thread 11 done with rc = -1有时程序会在MAXNBRTHREADS部分挂起:
...
[MAXNBRTHREADS] errIndc=17, joining watchDog显然,这一部分存在竞争条件;但我无法弄清楚。
感谢您的帮助。
发布于 2017-10-19 23:33:58
你会问:
第82行导致该程序分段错误。为什么?加入一个被取消的线程是非法的吗?
POSIX并没有用这么多的话来说明这一点,但它确实暗示了这一点。The specifications for pthread_join()说:
如果pthread_join()的
参数指定的值不引用可接合线程,则行为未定义。
后来,在理论基础上,
如果某个实现在其生命周期结束后检测到线程ID的使用,则建议该函数失败并报告
错误。
您观察到的段错误与理论基础中的(非标准化)建议不一致,但理论基础确实支持这样的命题,即线程在其生命周期结束后不再是“可接合线程”(例如,因为它已被取消),否则该建议将与函数的指定行为不一致。当然,已经连接的线程不再是可连接的,尽管使用“可连接的”而不是"live“或类似的原因可能更多的是分离线程的规定。
线程11怎么会有两个不同的退出代码?
它不能,而且您的输出也不能证明这一点。您将两次加入线程11,因此这些pthread_join()调用中至少有一个会失败。如果发生这种情况,您就不能依赖它可能存储的任何结果值(无论如何,不是基于POSIX )。您应该检查函数调用的返回值中是否有错误标志。
有时程序会在MAXNBRTHREADS部分挂起
是的,它似乎可以做到这一点。
这里的想法似乎是,在失败的情况下,主线程将调用stop_watchDog(),它将设置一个标志来通知监视线程它应该停止,然后向条件变量发出信号,使监视程序唤醒并注意到它。当它被唤醒时,看门狗线程必须重新获取互斥mtx,然后才能从pthread_cond_wait()返回。
从stop_watchDog()返回后,主线程锁定互斥mtx并尝试加入监视线程。但是发送简历信号是不同步的。因此,主线程有可能在看门狗线程重新获取互斥锁之前锁定它,在这种情况下,您将死锁:看门狗不能从pthread_cond_wait()返回并继续终止,直到它获取互斥锁,但主线程在看门狗终止之前不会解锁互斥锁。
我还没有对程序进行足够的分析,以确定主线程需要在那里保护什么状态,尽管它似乎至少包含了errIndc变量。然而,无论如何,它似乎不需要在尝试加入watchdog线程时锁定互斥锁。
https://stackoverflow.com/questions/46832246
复制相似问题