Java: согласование thread'ов

90
03 февраля 2022, 21:50

Друзья, добрый день! Взываю к помощи с многопоточностью в Java. Стоит задача реализовать простейший конвейер, на котором стоят всего два работника - Reader и Writer. Конвейер создает Manager, знакомя работников друг с другом, затем создавая отдельные потоки для каждого работника и запуская их (соответственно, в run() у Writer бесконечный цикл, который ждет данных от Reader, и завершается, когда Reader пришлет условный EOF).

Менеджер выглядит примерно так:

threads = new ArrayList<>();
Thread readthread = new Thread(reader); //создаем тред для ридера
threads.add(readthread);
reader.addConsumer(writer); //знакомим работников
writer.addProducer(reader);
Thread writethread = new Thread(writer); //тред для райтера
threads.add(writethread);
for (Thread t : threads) { //запускаем треды в порядке добавления в лист
     t.start();
}

Проблема состоит в том, что отлаживать такое многопоточное приложение не получается, и при запуске программа просто виснет (непонятно, почему).

Если решить проблему с отладкой запуском только одного треда, а для второго насильно сделать run() так:

writethread.start();
reader.run();

Или так:

readthread.start();
writer.run();

В отладке все работает, корректно завершается (из входного файла с текстом "abcdef" получаем ровно такой же выходной). Если же просто запустить этот код (который с одним-единственным тредом), он крашится ровно так же, как и с двумя потоками, причем из исходного файла с текстом "abcdef" получается "cdefef", после чего вечный цикл продолжается, не реагируя на присланный Reader'ом EOF.

Код ридера с комментариями тут.

Код райтера тут.

Код вспомогательной структуры Triplet здесь.

Проект полностью здесь.

Примечание: считывание делаем по две буквы (то есть файл "abcdef" будет прочитан за 3 считывания).

Собственно, вопрос в том, с чем проблема может быть связана и как грамотно ловить/отлаживать такие ошибки?

Использую Java 1.8.0, IDE Intellij Idea 14.1.7.

Спасибо!

Answer 1

Разбираться довольно тяжело в твоём коде, поскольку всё очень сильно переусложнено.

Почему не останавливается, я тебе могу сказать точно - флаг endofstreamflag не помечен volatile, при этом значение в него пишется из потока ридера (через вызов loadDataFrom), а читается из потока райтера.

Зачем их знакомить? Асинхронная обработка будет намного проще, если их не знакомить.

reader.addConsumer(writer); //знакомим работников
writer.addProducer(reader);

Например, пусть reader грузит данные в очередь, а writer пишет на диск.

final var queue = new ArrayBlockingQueue<byte[]>(1);
final var endoffile = "#EOF$".getBytes();
new Thread(() -> {
  try (var fis = new FileInputStream(filename)) {
    var buffer = new byte[readsize]; 
    while (fis.read((byte[])buffer) > 0) { // правильно работает только для файлов, чей размер кратен readsize
        queue.add(buffer);
        buffer = new byte[readsize]);
    }
    queue.add(endoffile);
  }
}).start();
new Thread(() -> {
  try (var fos = new FileOutputStream(outputfile)) {
    while(true) {
      var buffer = queue.get();
      if (Arrays.equals(buffer, endoffile)) break;
      fos.write(buffer);
    }
  }
}).start();
READ ALSO
Java не получается послать Intent

Java не получается послать Intent

Я пытаюсь передать Intent из сервиса в MainActivity в BroadcastReciver, но либо интент не передается, либо BroadcastReciver его не принимаетНиже код Activity где я должен...

101
Не загружается реклама Admob в Android Studio I/Ads: Ad failed to load : 0

Не загружается реклама Admob в Android Studio I/Ads: Ad failed to load : 0

Я пытаюсь вставить Admob рекламу в пустое приложение Android StudioЯ пытался как подключать проект к firebase самостоятельно, так и делать это через кнопку...

89
Ошибка NullPointerException в JavaFx

Ошибка NullPointerException в JavaFx

делаю чат на java, но столкнулся с ошибкой и не понимаю, как ее решитьОшибка:

96
IndexOutOfBoundsException

IndexOutOfBoundsException

Ошибка:

89