Как узнать дату последнего смещения тем в Kafka?

111
25 декабря 2019, 17:40

У меня есть Kafka множеством топиков. И есть Consumer Kafka для чтения записей из этих тем:

    public Set<ConsumerRecord> consumeKafka() {
        consumer.subscribe(topics);
        Set<ConsumerRecord> resultRecords = new HashSet<>();
        int i = 0;
        while (i++ < topicIteration) {
            ConsumerRecords<Object, Object> records = consumer.poll(100);
            System.out.println(records.partitions());
            for (ConsumerRecord consumerRecord : records){
                resultRecords.add(consumerRecord);
            }
        }
        return resultRecords;
    }
private Consumer consumerInit(String consumerId){
    props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
    props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    if (isActualizationTopics) {
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
    } else {
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, topicLimit);
    }
    return new KafkaConsumer<>(props);
}

Чтобы не читать все топики подряд, мне нужно отсортировать список топиков, и получать данные из тех, из которых данные не читались дольше всего. Я так понимаю мне нужно получить дату последнего смещения по каждому топику и отсортировать? Как можно это сделать? Или возможно есть другие подходы к решению?

Answer 1

Такой возможности нет. Кафка работает только с оффсетами и не интересуется датами. Вы можете попробовать решить задачу, сохраняя эту информацию на стороне приложения в формате <топик, дата_последнего_чтения>.

READ ALSO
Android: su pm install -r permission denied в системном приложении

Android: su pm install -r permission denied в системном приложении

Не могу обновить приложение без спроса пользователя, приложение является системным и вшито в прошивку в system/priv-app/Как пробовал:

137
Можно ли получить число N используя только /*-+()?

Можно ли получить число N используя только /*-+()?

Написать алгоритм, который используя числа 1, 3, 4, 6, а также умножение, деление, сложение, вычитание и скобки, находит все комбинации для получения...

155
SharedPreferences настройки приложения

SharedPreferences настройки приложения

Как организовать в настройках приложения функцию скрыть/показать textviewПытался с помощью SharedPreferences, но так и понял

143
Почему приложение вылетает [закрыт]

Почему приложение вылетает [закрыт]

Хотите улучшить этот вопрос? Update the question so it's on-topic for Stack Overflow на русском

151