Обработка файлов в потоке

468
29 января 2017, 14:06

Уже несколько часов ломаю голову, не могу придумать, как реализовать обработку файлов, выполняемую двумя потоками.

Создание потоков (2)

for (int i = 0; i < threadCount; i++) {
    threads.push_back(boost::thread(readFile));
}

Функция readFile

void readFile() {
    while (true) {
        fs::path filePath = getFilePath();
        fs::ifstream ifs(filePath);
        if (!ifs)
            break;
        if (!ifs.is_open()) {
            break;
        }
*обработка*
}

Функция getFilePath

fs::path getFilePath() {
    if (it != fs::directory_iterator() && it->path().extension() == ".txt") {
        fs::path itPath = it->path();
        ++it;
        return itPath;
    }
}

Т.е. сейчас я создаю поток, который сразу обращается к getFilePath и обрабатывает файл. Это повторяется до тех пор, пока в папке есть необработанные файлы. Если запустить один поток - то всё нормально, но при двух - всё плохо.

Дайте пару примеров работающего кода, или укажите на мои ошибки.

UPD:

Добился нормальной работы, изменив getFilePath

string getFilePath() {
    {
        lock_guard<mutex> lock(m);
        if (it != fs::directory_iterator() && it->path().extension() == ".txt") {
            string itPath = it->path().string();
            ++it;
            bHaveFiles = true;
            return itPath;
        } else {
            return "null";
        }
    }
}

Но вопрос не сильно изменился. Сейчас, если я создам условные 8 потоков, то из-за мьютекста только один поток получит файл на обработку, а остальные 7 потоков вынуждены ждать своей очереди. Реально как-либо организовать асинхронную работу?

Answer 1

Еще один ответ - поскольку он посвящен не советам, а экспериментальной их проверке.

Проведя некоторые эксперименты, должен признать, что мое предположение, что дисковые операции не дадут ускорения за счет многопоточности, не оправдались.

Вот эти эксперименты.

Для начала создаем 20000 файлов по 200K каждый -

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
int main()
{
    for(int i = 0; i < 20000; ++i)
    {
        char buf[200];
        sprintf(buf,"g:\\tmp\\test\\%06d.dat",i);
        FILE * f = fopen(buf,"wb");
        if (f)
        {
            for(int j = 0; j < 50000; ++j) fwrite(&i,sizeof(i),1,f);
            fwrite(buf,strlen(buf)+1,1,f);
            fclose(f);
        }
    }
}

После этого в дело вступает основная программа.

#include <vector>
#include <string>
#include <iostream>
#include <fstream>
#include <iomanip>
#include <thread>
#include <mutex>
#include <future>
#include <io.h>
#include <muTimer.hpp>
using namespace std;
mutex sumMtx;                // Мьютекс для защиты аккумулятора
unsigned long long sum = 0;  // Аккумулятор (сумма чисел, считанных из файла)
mutex filesMtx;              // Мьютекс для защиты списка файлов
vector<string> files;        // Список имен файлов
size_t filesIdx;             // Текущий индекс в файле

vector<string> getDirFiles(string dirName)   // Сбор всех имен файлов в каталоге
{
    vector<string> fileNames;
    _finddata_t info;
    intptr_t handle = _findfirst((dirName + "\\*.*").c_str(),&info);
    if (handle == -1) return fileNames;
    do {
        if (info.attrib & _A_SUBDIR) continue;
        fileNames.push_back(string(dirName) + "\\" + info.name);
    } while(_findnext(handle,&info) == 0);
    _findclose(handle);
    return fileNames;
}
void handleOneFile(const char * name)                    // Обработка одного файла
{
    fstream f(name,ios::binary|ios::in|ios::out);
    if (f)
    {
        unsigned int v = 0;                              // Чтение около 40К
        for(int j = 0; j < 10000; ++j) f.read((char*)&v,sizeof(v));
        time_t t;
        time(&t);
        f.seekp(f.tellg());
        f.write((char*)&t,sizeof(t));                    // Запись 4 байт
        lock_guard<mutex> lk(sumMtx);                    // Увеличение аккумулятора
        sum += v;
    }
}
void asyncHandle()                                       // Главная функция потока
{
    for(;;)
    {
        const char * name;                               // Чтение очередного 
        {                                                // файла из списка, 
            lock_guard<mutex> lk(filesMtx);              // если уже все считаны -
            if (filesIdx >= files.size()) break;         // выход
            name = files[filesIdx++].c_str();
        }
        handleOneFile(name);                             // Обработка файла
    }
}
int main(int argc, const char * argv[])
{
    files = getDirFiles("G:\\Tmp\\Test");                // Сбор списка файлов
    filesIdx = 0;                                        // Сброс и пару проходов
    asyncHandle();                                       // приведения кешей в
    filesIdx = 0;                                        // стабильное состояние 
    asyncHandle();
    for(int threadCount = 1; threadCount < 20; ++threadCount)  // Для разного числа потоков
    {
        sum = 0;                                               // Сброс аккумулятора и индекса
        filesIdx = 0;
        cout << "threadCount = " << setw(3) << threadCount << ":  ";
        muTimer mu;                                            // Мой таймер для хронометража
        vector<future<void>> tasks;
        for(int i = 0; i < threadCount; ++i)                   // Создаем threadCount потоков
        {
            tasks.push_back(async(asyncHandle));
        }
        for(int i = 0; i < threadCount; ++i)                   // Дожидаемся завершения
        {
            tasks[i].get();
        }
        cout << sum << "  ";                                   // Выводим накопленную сумму 
    }                                                          // и затраченное время
}

Результат оказался следующим (я старался максимально убрать все задачи на машине, чтоб никто в этот момент не полез к диску):

threadCount =   1:  199990000  46108 ms
threadCount =   2:  199990000  23587 ms
threadCount =   3:  199990000   9802 ms
threadCount =   4:  199990000   8409 ms
threadCount =   5:  199990000   8492 ms
threadCount =   6:  199990000   8575 ms
threadCount =   7:  199990000   8332 ms
threadCount =   8:  199990000   8507 ms
threadCount =   9:  199990000   8585 ms
threadCount =  10:  199990000   8254 ms
threadCount =  11:  199990000   8326 ms
threadCount =  12:  199990000   8218 ms
threadCount =  13:  199990000   8784 ms
threadCount =  14:  199990000   8359 ms
threadCount =  15:  199990000   8528 ms
threadCount =  16:  199990000   8481 ms
threadCount =  17:  199990000   8531 ms
threadCount =  18:  199990000   8772 ms
threadCount =  19:  199990000   8525 ms

Компилировал VC++ 2015 x86, машина под Windows 7 x64, с четырехъядерным процессором, так что совет запускать потоков не более, чем имеется ядер, в целом обоснован :)

Update

При изменении в main() на такой код

    vector<thread> tasks;
    for(int i = 0; i < threadCount; ++i)
    {
        tasks.push_back(thread(asyncHandle));
    }
    for(int i = 0; i < threadCount; ++i)
    {
        tasks[i].join();
    }

результаты поменялись не особенно:

threadCount =   1:  199990000  41373 ms
threadCount =   2:  199990000  24758 ms
threadCount =   3:  199990000   9544 ms
threadCount =   4:  199990000   8165 ms
threadCount =   5:  199990000   7911 ms
threadCount =   6:  199990000   7970 ms
threadCount =   7:  199990000   7807 ms
threadCount =   8:  199990000   7942 ms
threadCount =   9:  199990000   8064 ms
threadCount =  10:  199990000   7858 ms
threadCount =  11:  199990000   8361 ms
threadCount =  12:  199990000   8157 ms
threadCount =  13:  199990000   8550 ms
threadCount =  14:  199990000   8001 ms
threadCount =  15:  199990000   8392 ms
threadCount =  16:  199990000   8346 ms
threadCount =  17:  199990000   8558 ms
threadCount =  18:  199990000   8410 ms
threadCount =  19:  199990000   8398 ms

А вот при изменении

void asyncHandle(int start, int stop)
{
    for(;start < stop; ++start)
    {
        handleOneFile(files[start].c_str());
    }
}

и

    vector<thread> tasks;
    int count = (files.size()+20)/threadCount;
    for(int i = 0; i < threadCount; ++i)
    {
        tasks.push_back(thread(asyncHandle,i*count,
                               std::min((size_t)(i+1)*count,files.size())));
    }
    for(int i = 0; i < threadCount; ++i)
    {
        tasks[i].join();
    }

(т.е. каждому потоку выдавался примерно равный кусок работы изначально) результаты получались

threadCount =   1:  199990000  57153 ms
threadCount =   2:  199990000  27474 ms
threadCount =   3:  199990000  16320 ms
threadCount =   4:  199990000  13041 ms
threadCount =   5:  199990000   8656 ms
threadCount =   6:  199990000   8811 ms
threadCount =   7:  199990000   8943 ms
threadCount =   8:  199990000  10088 ms
threadCount =   9:  199990000   9069 ms
threadCount =  10:  199990000   8360 ms
threadCount =  11:  199990000   8578 ms
threadCount =  12:  199990000   8839 ms
threadCount =  13:  199990000   8435 ms
threadCount =  14:  199990000   8957 ms
threadCount =  15:  199990000   8718 ms
threadCount =  16:  199990000  10704 ms
threadCount =  17:  199990000  10382 ms
threadCount =  18:  199990000  10500 ms
threadCount =  19:  199990000  11576 ms

Т.е. рост с увеличением числа потоков появился, но не такой уж большой, и при небольших количествах потоков результаты оказались явно хуже. Почему - особых идей нет, разве что первые потоки успевали до запуска следующих сделать кучу работы, так что заканчивались первыми и ожидали, пока последние доделают свою часть. Так что мне кажется, что более равномерная работа - при выборке файлов из коллекции по одному - более эффективный способ работы.

Ну и последнее - если перенести накопление после всей выполненной работы, а не для каждого файла:

int handleOneFile(const char * name)
{
    fstream f(name,ios::binary|ios::in|ios::out);
    if (f)
    {
        unsigned int v = 0;
        for(int j = 0; j < 10000; ++j) f.read((char*)&v,sizeof(v));
        time_t t;
        time(&t);
        f.seekp(f.tellg());
        f.write((char*)&t,sizeof(t));
        return v;
    }
    return 0;
}
void asyncHandle(int start, int stop)
{
    long long s = 0;
    for(;start < stop; ++start)
    {
        s += handleOneFile(files[start].c_str());
    }
    lock_guard<mutex> lk(sumMtx);
    sum += s;
}

получилось примерно так:

threadCount =   1:  199990000  38168 ms
threadCount =   2:  199990000  23800 ms
threadCount =   3:  199990000  10787 ms
threadCount =   4:  199990000   9065 ms
threadCount =   5:  199990000   8403 ms
threadCount =   6:  199990000   8077 ms
threadCount =   7:  199990000   8410 ms
threadCount =   8:  199990000   9656 ms
threadCount =   9:  199990000   8038 ms
threadCount =  10:  199990000   8209 ms
threadCount =  11:  199990000   8722 ms
threadCount =  12:  199990000   8561 ms
threadCount =  13:  199990000   8524 ms
threadCount =  14:  199990000   8524 ms
threadCount =  15:  199990000   8000 ms
threadCount =  16:  199990000   8511 ms
threadCount =  17:  199990000   8463 ms
threadCount =  18:  199990000   9431 ms
threadCount =  19:  199990000   8456 ms

Откровенно говоря, сделать какие-то однозначные (далеко ведущие :)) выводы и дать какие-то рекомендации несколько затрудняюсь... Мьютексы явно играют роль, хотя и не кардинальную, но вполне заметную. Сейчас сделаю еще один эксперимент с async и на этом закруглюсь.

Вот с async вновь. Отличие в том, что здесь накопление в глобальную переменную идет не после обработки файла, а по окончании всей работы; кроме того, чтобы еще уменьшить количество обращений к мьютексу, я брал из коллекции сразу по два имени файла.

Результат не сказать чтоб сильно отличающийся. Почему для малого количества потоков при этом больше, чем в самой первой версии - для меня загадка :(

threadCount =   1:  199990000  44203 ms
threadCount =   2:  199990000  39898 ms
threadCount =   3:  199990000  23217 ms
threadCount =   4:  199990000   7808 ms
threadCount =   5:  199990000   8043 ms
threadCount =   6:  199990000   7668 ms
threadCount =   7:  199990000   8168 ms
threadCount =   8:  199990000   7762 ms
threadCount =   9:  199990000   7675 ms
threadCount =  10:  199990000   8221 ms
threadCount =  11:  199990000   7910 ms
threadCount =  12:  199990000   8056 ms
threadCount =  13:  199990000   7888 ms
threadCount =  14:  199990000   8058 ms
threadCount =  15:  199990000   7741 ms
threadCount =  16:  199990000   8075 ms
threadCount =  17:  199990000   7812 ms
threadCount =  18:  199990000   8489 ms
threadCount =  19:  199990000   8346 ms
Answer 2

В комментарий просто не влез, так что простите, выскажу ответом несколько своих соображений.

Дисковые операции существенно более долгие, чем получение одного значения из контейнера с помощью итератора, так что ожидание мьютекса тормозить потоки не будет - это не главный тормоз.

Что касается мьютексов - то опять же вам нужно как-то реализовывать подсчет запущенных потоков, а для этого использовать тот же мьютекс (вот тут мне давали совет, как это реализовать).

Список файлов я бы собрал заранее, а потом просто выдавал потокам по одному - вплоть до собрать их все в каком-то векторе, и выдавать ссылку или указатель на имя, даже не копируя - вряд ли ожидание такой короткой операции будет длинным. А вот подгадить несинхронизированное обращение может сильно. Да и собирать список по ходу дела плохо по двум причинам: это тоже дисковая операция, и вот тут могут быть тормоза, это раз, и список файлов во время работы может меняться - это два...

Еще - я бы не использовал потоки. Все авторитеты в один голос говорят, что лучше использовать async - если они пишут правду, то его реализации работают с пулом потоков, а значит, это существенно быстрее, чем создавать-убивать потоки. С моей же точки зрения, даже важнее то, что async корректно обработает даже исключение в потоке, не уложив при этом программу.

Еще одно замечание - при большом количестве файлов и большом количестве потоков начнет работать против вас ограничение на количество одновременно открытых файлов.

А в общем - повторюсь: лично я не жду большого эффекта от распараллеливания дисковых операций. Если распараллеливать, скажем, вычисления и дисковые операции - то это разумно: медленная дисковая операция не заставляет в таком случае тормозиться вычислениям. Но когда несколько потоков одновременно требуют от диска что-то сделать - есть у меня подозрение, что операционка такие требования просто поставит в одну очередь, и вся многопоточность станет самообманом...

Вопрос вы задали интересный, надо бы посидеть и прохронометрировать...

READ ALSO
Класс для работы с RSA шифром

Класс для работы с RSA шифром

Существуют ли в Qt5 стандартные классы, при помощи которых можно с генерировать публичные и приватные ключи шифрования для шифра RSA и собственно...

484
Чтение двоичного файла

Чтение двоичного файла

есть файл fbx binarупытался считать с помощью ifstream но он считывает до определенного символа и дальше не читает файл(примерно 10 - 12 символ)

380
Qt. Реализация гаджета рабочего стола

Qt. Реализация гаджета рабочего стола

Возник вопрос, возможно ли реализовать такую программу что бы:

504
FBX binary как обработать? [требует правки]

FBX binary как обработать? [требует правки]

есть FBX файлхотел написать загрузчик но не знаю что делать

498