我有一个函数必须处理一组目录中的所有文件(5-300 个文件之间的任何文件)。要使用的并行线程数由用户指定(通常为 4)。这个想法是在 4 个独立的线程中启动该函数。当一个线程返回时,我必须开始处理下一个(第 5 个)文件,依此类推,直到所有文件完成。
在 Windows 上,WaitForMultipleObjects()
withbWaitAll=False
在这里帮助我。我有一个可以填充的结构,并将其填充到数组中
map<UINT, string>::iterator iter = m_FileList.begin();
string outputPath = GetOutputPath();
void ***threadArgs = (void***)malloc(sizeof(void**)*numThreads);
HANDLE *hdl = (HANDLE*)malloc(sizeof(HANDLE)*numThreads);
DWORD *thr = (DWORD*)malloc(sizeof(DWORD)*numThreads);
for (int t = 0; iter != m_FileList.end() && t < numThreads; t++, iter++)
{
threadArgs[t] = prepThreadData(t, iter->second, opPath);
printf("main: starting thread :%d %s outputPath: %s\n", t, iter->second.c_str(), threadArgs[t][2]);
hdl[t] = CreateThread(NULL, 0, fileProc, (void*)threadArgs[t], 0, &thr[t]);
if (hdl[t] == NULL)
{
err = GetLastError();
printf("main: thread failed %x %x %s %s\n", err, iter->second.c_str(), threadArgs[t][2]);
}
}
for (;iter != m_FileList.end(); iter++)
{
int t = (int)WaitForMultipleObjects(numThreads, hdl, FALSE, INFINITE);
if (t == WAIT_FAILED)
{
err = GetLastError();
printf("main: thread failed %x %x\n", t, err);
}
if (t - WAIT_OBJECT_0 >= 0 && t - WAIT_OBJECT_0 < numThreads)
{
free(threadArgs[t][1]);
free(threadArgs[t][2]);
free(threadArgs[t]);
threadArgs[t] = prepThreadData(t, iter->second, opPath);
printf("main: starting thread :%d %s outputPath: %s\n", t, iter->second.c_str(), threadArgs[t][2]);
hdl[t] = CreateThread(NULL, 0, fileProc, (void*)threadArgs[t], 0, &thr[t]);
if (hdl[t] == NULL)
{
err = GetLastError();
printf("main: thread failed %x %x %s %s\n", err, iter->second.c_str(), threadArgs[t][2]);
}
}
}
if (WAIT_FAILED == WaitForMultipleObjects(numThreads - 1, hdl, TRUE, INFINITE))
{
err = GetLastError();
printf("main: thread failed %x %x\n", err);
}
我现在的问题是使用 pthreads 获得类似的功能。我能想到的最好的方法是使用信号量,当其中一个可用时,生成一个新线程,而不是使用 threadArgs 数组,我将只使用一个为每个线程生成分配内存的指针。此外,为了便于内存管理,为 threadArgs[t] 分配的内存将由生成的线程拥有。
有更好的解决方案吗?或者有类似WaitForMutlipleObjects()
pthreads 的东西吗?更具体地说,如果我替换CreateThread()
为pthread_create()
,我应该替换为什么WaitForMultipleObjects()
呢?
答案1
听起来你想要一个工作队列。您可以使用需要处理的文件集合填充该队列,并使用一个函数将项目从队列中出列,该函数执行必要的锁定以防止线程之间的竞争。然后启动您想要的任意线程。每个线程将从队列中取出一个项目,对其进行处理,然后将下一个项目取出。当队列变空时,线程可以阻塞等待更多输入,或者如果您知道不会有更多输入,则线程可以终止。
这是一个简单的例子:
#include <cstdio>
#include <mutex>
#include <queue>
#include <thread>
template<typename T>
class ThreadSafeQueue {
public:
void enqueue(const T& element)
{
std::lock_guard<std::mutex> lock(m_mutex);
m_queue.push(element);
}
bool dequeue(T& value)
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_queue.empty()) {
return false;
}
value = m_queue.front();
m_queue.pop();
return true;
}
private:
std::mutex m_mutex;
std::queue<T> m_queue;
};
static void threadEntry(const int threadNumber, ThreadSafeQueue<std::string>* const queue)
{
std::string filename;
while (queue->dequeue(filename)) {
printf("Thread %d processing file '%s'\n", threadNumber, filename.c_str());
}
}
int main()
{
ThreadSafeQueue<std::string> queue;
// Populate queue
for (int i = 0; i < 100000; ++i) {
queue.enqueue("filename_" + std::to_string(i) + ".txt");
}
const size_t NUM_THREADS = 4;
// Spin up some threads
std::thread threads[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; ++i) {
threads[i] = std::thread(threadEntry, i, &queue);
}
// Wait for threads to finish
for (int i = 0; i < NUM_THREADS; ++i) {
threads[i].join();
}
return 0;
}
编译:
$ g++ example.cpp -pthread
该程序定义了ThreadSafeQueue
一个具有内部锁定的队列,以允许多个线程同时访问它。
该main
函数首先填充队列。然后它启动 4 个线程。每个线程从队列中读取一个值并“处理”它(这里是通过将消息打印到标准输出)。当队列为空时,线程终止。该main
函数在返回之前等待线程终止。
请注意,此设计假设所有元素在线程启动之前都已填充到队列中。经过一些更改,它可以扩展为支持在线程运行时处理新工作。