AspNetCore + apache Kafka помогите разобраться

212
05 декабря 2018, 07:40

С Kafka раньше не работал Здравствуйте, необходимо наладить коммуникацию с Java приложением, решил попробовать Kafka как посредника. Работаю под Win10. Клиент для kafka от confluence для AspNetCore. Изучил официальную документацию, почитал статьи и создал тестовое приложение на AspNetCore 2.0 Запускаю 1 экземпляр Kafka.

настройки продьюсера

    var config = new Dictionary<string, object>
        {
            { "bootstrap.servers", brokerEndpoints },
            { "api.version.request", true },
            { "socket.blocking.max.ms", 1 },
            { "queue.buffering.max.ms", 5 },
            { "queue.buffering.max.kbytes", 10240 },
            {
                "default.topic.config",
                new Dictionary<string, object>
                    {
                        { "message.timeout.ms", 3000 },        //таймаут на подключение к брокеру (если таймаут вышел, то выставляется message.Error в ответе)
                        { "request.required.acks", -1 }        // гарантированная доставка сообщения до конкретного партишена. (самая высокая гарантия доставки в брокера)
                    }
            }
        };

настройки консьюмера

        {
            { "bootstrap.servers", brokerEndpoints },
            { "api.version.request", true },
            { "group.id", !string.IsNullOrEmpty(groupId) ? groupId : Guid.NewGuid().ToString() },
            { "socket.blocking.max.ms", 1 },
            { "enable.auto.commit", false },  //отключить автокоммит офсета после прочтения сообщения (ручной коммит через CommitAsync())
            { "fetch.wait.max.ms", 5 },
            { "fetch.error.backoff.ms", 5 },
            { "fetch.message.max.bytes", 10240 },
            { "queued.min.messages", 1000 }, //1000
            {
                "default.topic.config",
                new Dictionary<string, object>
                    {
                        { "auto.offset.reset", "beginning" }
                    }
            }
        };
  1. Как создавать дополнительные partition для топика и зачем они нужны при 1 экземпляре Kafka? Создаю topic и пишу в него.

    message = await _producer.ProduceAsync(topic, null, value, partition);

    Если не указывать partition, то все нормально, создается по умолчанию partition = 0 (файл 0) и пишется в него. Я думал можно вручную задать partition при записи, но это не так. указываю partition = 1 и тут же ошибка, что такого partition нет. Т.е. partition нужен только, когда работает кластер Kafka (несколько экземпляров брокера), для каждого экземпляра Kafka своя версия распределенного лога (partition)?

  2. Создал 100 параллельно записывающих продюсера в 1 топик (каждый продюсер пишет раз в 300мс). Это плохо? Снижается производительность?

  3. Создал консьюмера и каждый перезапуск консьюмера приводит к долгой перебалансировки группы, может 15 сек ждать (видно по лог kafka). Это нормально?

  4. Создал 1 консьюмера, он читает топик в который пишут 100 параллельно записывающих продюсера. Консьюмер не успевает за продюсерами, это видно. Консьюмеров должно быть много?

  5. Создал 100 параллельно записывающих продюсера в 100 топиков (у каждого свой топик). Создал 100 паралельно прослущивающих консьюмера для каждого топика. получил хорошую скорость получения данных, практичеки синхронно за продюсерами успевали консьюмеры. Так и надо работать?

  6. Создал 100 параллельно записывающих продюсера в 100 топиков (у каждого свой топик). 1 консьюмер подписан на все 100 топиков, тоже с производительностью плохо, видно что чтение идет блоками (задержка потом сразу много данных) Так тоже лучше не делать?

  7. При долгой работе иногда получаю ошибку в логе Kafka "Error to rename log ...". Также много сообщений "Removed 0 expired offset in 0 millisecodns ..." Что это такое?

  8. Тестировал сутки разные комбинации продюсеров и консьюмеров и 1 раз упала Kafka с ошибкой. Я так понимаю чтобы избежать траблов, нужно поднимать несколько экземпляров Kafka? Тогда как это все настраивать?

  9. Есть реальная задача в которой одно приложение имеет много параллельно работающих фоновых задачи, которые часто, раз в 100мс пишут результаты своей работы в Kafka. другая программа должна получать эти данные. Взаимодействие лучше организовать как в пункте "4." или же какой-то вариант распределенного лога со множеством partition?

  10. Посмотрите пожалуйста все настройки продюсера и консьюмера и распишите пожалуйста все настройки, какая за что отвечает? (настройки брал из примера на гит хабе, описание не нашел).

  11. Как настраивать Kafka, чтобы топики сильно не разрастались, например ограничить по размеру, и как будет себя вести программа, когда старый лог (например размером в 3Гб) удалится и создастся новый лог?

  12. Иногда, при запуске Kafka ругается на старые логи, приходится чистить всю папку "tmp". Это нормально?

    Заранее всем спасибо!!!

READ ALSO
Работа внутри группы LINQ

Работа внутри группы LINQ

столкнулся с проблемойНеобходимо сделать сортировку, подсчет и удаление повторяющихся элементов(только после подсчета) внутри группы

169
Как подключить файл для чтения через консоль?

Как подключить файл для чтения через консоль?

Как подключить файл для чтения через консоль, чтобы не в коде передавать адрес файла

172
API Яндекс.Касса: про создание платежа

API Яндекс.Касса: про создание платежа

Сайт на движке, но не магазин, поэтому готовый модуль использовать не получится, обратился к документации, API v3 (новые площадки подключаются...

134
Задача с ассоциативным массивом

Задача с ассоциативным массивом

В массиве есть авторы и книги, у каждой книги один автор

157