Многопоточная обработка блоков данных

91
26 августа 2021, 02:10

Прошу помощи в разборе проблемы. Я использую 2 экземпляра класса который реализует идею паттерна поставщик/потребитель.

1 экземпляр используется в функции чтения из файла данных, записывает их в очередь и извлекает эти данные для отправки в функцию сжатия которая обрабатывается в 8 потоках исполнения Threads.

2 экземпляр используется для записи в очередь уже сжатых блоков, а извлеченные данные отправляются в функцию записи в выходной файл.

Программа отрабатывает корректно в плане чтения файла, сжатия и записи в выходной файл.

Но после всех процессов зависает в бесконечном цикле. Фрагмент я отметил комментарием:

public sealed class QueueHandler : IDisposable
    {
        private object locker = new object();
        private Queue<Block> queue = new Queue<Block>();
        private int id = 0;
        private bool stop = false;
        public void StopProcess()
        {
            lock (locker)
            {
                stop = true;
                Monitor.PulseAll(locker);
            }
        }
        public void Enqueue(Block block)
        {
            lock (locker)
            {
                while (id != block.ID)
                    Monitor.Wait(locker);
                queue.Enqueue(block);
                id++;             
                Monitor.PulseAll(locker);
            }
        }
        public Block Dequeue()
        {
            while (true)
            {
                lock (locker)
                {
                    while (queue.Count == 0 && !stop)
                        Monitor.Wait(locker); // Здесь происходит зависание и ожидание данных хотя их в очереди уже нет
                    if (queue.Count == 0)
                        return null;
                    return queue.Dequeue();
                }
            }
        }
        public void Dispose()
        {
            Enqueue(null);
            queue = null;
        }
    }

Метод где вызываются все потоки исполнения:

public override void Run()
        {
            Console.WriteLine("Компрессия...\n");
            FileHandler reader = new FileHandlerCompressing(inFile, outFile, queueReader, queueWriter, true);
            for (int i = 0; i < threadsCount; i++)
            {
                workersThreds[i] = new Thread(new ThreadStart(Compress));
                workersThreds[i].Start();
            }
            FileHandler writer = new FileHandlerCompressing(inFile, outFile, queueReader, queueWriter, false);
            while (true)
            {
                cancelled = writer.GetStatusProcess();
                if (cancelled)
                {
                    Console.WriteLine("\nКомпрессия завершена.");
                    success = true;
                    break;
                }
            }         
        }

Компрессия

private void Compress()
        {
            try
            {
                byte[] dataCompress;
                while (true)
                {
                    Block block = queueReader.Dequeue();
                    if (block != null)
                    {
                        using (MemoryStream memoryStream = new MemoryStream())
                        {
                            using (GZipStream gz = new GZipStream(memoryStream, CompressionMode.Compress))
                            {
                                gz.Write(block.Buffer, 0, block.Buffer.Length);
                            }
                            dataCompress = memoryStream.ToArray();
                        }
                        Block outData = new Block(block.ID, dataCompress);
                        queueWriter.Enqueue(outData);
                    }
                    else
                    {
                        queueWriter.StopProcess();
                        break;
                    }
                }
                return;
            }
            catch (Exception ex)
            {
                Console.WriteLine("Компрессор: " + ex.Message);
            }
        }

Запись/чтение файла

public class FileHandlerCompressing : FileHandler
    {
        public FileHandlerCompressing(string inFile, string outFile, QueueHandler queueReader, QueueHandler queueWriter, bool rw) : 
            base(inFile, outFile, queueReader, queueWriter, rw)
        {
        }
        public override bool GetStatusProcess()
        {
            if (stopRead && stopWrite)
                return true;
            else return false;
        }
        public override void Read()
        {
            try
            {
                using (FileStream fileStream = new FileStream(fileNameIn, FileMode.Open))
                {
                    int bytesRead;
                    byte[] bufferRead;
                    Block block;
                    while ((fileStream.Position < fileStream.Length))
                    {
                        if (fileStream.Length - fileStream.Position <= blockSize)
                        {
                            bytesRead = (int)(fileStream.Length - fileStream.Position);
                        }
                        else
                        {
                            bytesRead = blockSize;
                        }
                        bufferRead = new byte[bytesRead];
                        fileStream.Read(bufferRead, 0, bytesRead);
                        fileStream.Flush();
                        block = new Block(counterOperation, bufferRead);
                        reader.Enqueue(block);
                        ++counterOperation;
                        ProgressOperation.ProgressBar(fileStream.Position, fileStream.Length);
                    }
                    stopRead = true;
                    reader.StopProcess();
                    return;
                }
            }
            catch (IOException err)
            {
                Console.WriteLine(err.Message);
            }
        }
        public override void Write()
        {
            try
            {
                Block block;
                using (FileStream fileStream = new FileStream(fileNameOut, FileMode.Create))
                {
                    while (true)
                    {
                        block = writer.Dequeue();
                        if (block != null)
                        {
                            BitConverter.GetBytes(block.Buffer.Length).CopyTo(block.Buffer, 4);
                            fileStream.Write(block.Buffer, 0, block.Buffer.Length);
                            fileStream.Flush();
                        }
                        else break;                     
                    }
                }
                stopWrite = true;
                return;
            }
            catch (Exception ex)
            {
                Console.WriteLine("Write: " + ex.Message);
            }
        }
    }
READ ALSO
Получение массива по запросу Mysql

Получение массива по запросу Mysql

Я выполняю запрос MySql, где получаю столбец Id с несколькими значениямиКак я могу получить все эти значения Int массивом?

101
Обновление данных с помощью оператора LOAD DATA INFILE

Обновление данных с помощью оператора LOAD DATA INFILE

Нужно реализовать добавление и если значения полей одинаковые, замену данных с помощью оператора LOAD DATA INFILEПробовал таким способом, но в итоге...

281
Ошибка при php авторизации

Ошибка при php авторизации

делаю авторизацию на сайте по урокам Хауди-хоВроде сделал всё правильно, но выдаёт ошибку

200