将队列中的作业调度到多个线程上

将队列中的作业调度到多个线程上

我有一个函数必须处理一组目录中的所有文件(5-300 个文件之间的任何文件)。要使用的并行线程数由用户指定(通常为 4)。这个想法是在 4 个独立的线程中启动该函数。当一个线程返回时,我必须开始处理下一个(第 5 个)文件,依此类推,直到所有文件完成。

在 Windows 上,WaitForMultipleObjects()withbWaitAll=False在这里帮助我。我有一个可以填充的结构,并将其填充到数组中

map<UINT, string>::iterator iter = m_FileList.begin();
string outputPath = GetOutputPath();
void ***threadArgs = (void***)malloc(sizeof(void**)*numThreads);
HANDLE *hdl = (HANDLE*)malloc(sizeof(HANDLE)*numThreads);
DWORD *thr = (DWORD*)malloc(sizeof(DWORD)*numThreads);

for (int t = 0; iter != m_FileList.end() && t < numThreads; t++, iter++)
{
    threadArgs[t] = prepThreadData(t, iter->second, opPath);
    printf("main: starting thread :%d %s outputPath: %s\n", t, iter->second.c_str(), threadArgs[t][2]);
    hdl[t] = CreateThread(NULL, 0, fileProc, (void*)threadArgs[t], 0, &thr[t]);
    if (hdl[t] == NULL)
    {
        err = GetLastError();
        printf("main: thread failed %x %x %s %s\n", err, iter->second.c_str(), threadArgs[t][2]);
    }
}

for (;iter != m_FileList.end(); iter++)
{
    int t = (int)WaitForMultipleObjects(numThreads, hdl, FALSE, INFINITE);
    if (t == WAIT_FAILED)
    {
        err = GetLastError();
        printf("main: thread failed %x %x\n", t, err);
    }
    if (t - WAIT_OBJECT_0 >= 0 && t - WAIT_OBJECT_0 < numThreads)
    {
        free(threadArgs[t][1]);
        free(threadArgs[t][2]);
        free(threadArgs[t]);
        threadArgs[t] = prepThreadData(t, iter->second, opPath);
        printf("main: starting thread :%d %s outputPath: %s\n", t, iter->second.c_str(), threadArgs[t][2]);
        hdl[t] = CreateThread(NULL, 0, fileProc, (void*)threadArgs[t], 0, &thr[t]);
        if (hdl[t] == NULL)
        {
            err = GetLastError();
            printf("main: thread failed %x %x %s %s\n", err, iter->second.c_str(), threadArgs[t][2]);
        }
    }
}
if (WAIT_FAILED == WaitForMultipleObjects(numThreads - 1, hdl, TRUE, INFINITE))     
{
    err = GetLastError();
    printf("main: thread failed %x %x\n", err);
}

我现在的问题是使用 pthreads 获得类似的功能。我能想到的最好的方法是使用信号量,当其中一个可用时,生成一个新线程,而不是使用 threadArgs 数组,我将只使用一个为每个线程生成分配内存的指针。此外,为了便于内存管理,为 threadArgs[t] 分配的内存将由生成的线程拥有。

有更好的解决方案吗?或者有类似WaitForMutlipleObjects()pthreads 的东西吗?更具体地说,如果我替换CreateThread()pthread_create(),我应该替换为什么WaitForMultipleObjects()呢?

答案1

听起来你想要一个工作队列。您可以使用需要处理的文件集合填充该队列,并使用一个函数将项目从队列中出列,该函数执行必要的锁定以防止线程之间的竞争。然后启动您想要的任意线程。每个线程将从队列中取出一个项目,对其进行处理,然后将下一个项目取出。当队列变空时,线程可以阻塞等待更多输入,或者如果您知道不会有更多输入,则线程可以终止。

这是一个简单的例子:

#include <cstdio>
#include <mutex>
#include <queue>
#include <thread>

template<typename T>
class ThreadSafeQueue {
public:
    void enqueue(const T& element)
    {
        std::lock_guard<std::mutex> lock(m_mutex);

        m_queue.push(element);
    }

    bool dequeue(T& value)
    {
        std::lock_guard<std::mutex> lock(m_mutex);

        if (m_queue.empty()) {
            return false;
        }

        value = m_queue.front();
        m_queue.pop();

        return true;
    }

private:
    std::mutex m_mutex;
    std::queue<T> m_queue;
};

static void threadEntry(const int threadNumber, ThreadSafeQueue<std::string>* const queue)
{
    std::string filename;

    while (queue->dequeue(filename)) {
        printf("Thread %d processing file '%s'\n", threadNumber, filename.c_str());
    }
}

int main()
{
    ThreadSafeQueue<std::string> queue;

    // Populate queue
    for (int i = 0; i < 100000; ++i) {
        queue.enqueue("filename_" + std::to_string(i) + ".txt");
    }

    const size_t NUM_THREADS = 4;

    // Spin up some threads
    std::thread threads[NUM_THREADS];
    for (int i = 0; i < NUM_THREADS; ++i) {
        threads[i] = std::thread(threadEntry, i, &queue);
    }

    // Wait for threads to finish
    for (int i = 0; i < NUM_THREADS; ++i) {
        threads[i].join();
    }

    return 0;
}

编译:

$ g++ example.cpp -pthread

该程序定义了ThreadSafeQueue一个具有内部锁定的队列,以允许多个线程同时访问它。

main函数首先填充队列。然后它启动 4 个线程。每个线程从队列中读取一个值并“处理”它(这里是通过将消息打印到标准输出)。当队列为空时,线程终止。该main函数在返回之前等待线程终止。

请注意,此设计假设所有元素在线程启动之前都已填充到队列中。经过一些更改,它可以扩展为支持在线程运行时处理新工作。

相关内容