Kafka Engine. Недокументированные возможности.

Kafka Engine появился в ClickHouse сравнительно недавно и до его появления приходилось создавать специализированное приложение которое получало данные из Kafka и писало их в ClickHouse.

Как говорит нам документация Kafka Engine позволяет:

  • Публиковать/подписываться на потоки данных.
  • Организовать отказо-устойчивое хранилище.
  • Обрабатывать потоки по мере их появления.

Для получения данных достаточно создать таблицу с движком Kafka и указать параметры подключения.

Kafka(broker_list, topic_list, group_name, format[, schema])

После чего можно выполнять запросы к данной таблице.

SELECT * FROM kafka_engine_table LIMIT 5;

Однако данный функционал был бы относительно бесполезен, если бы не ещё одна возможность, а именно создание материализованных представлений на эту таблицу. Материализованное представление можно рассматривать как триггер который сработает когда данные будут доступны для получения и запрос который обработает эти данные и запишет в нужную нам таблицу.

CREATE MATERIALIZED VIEW consumer TO daily
    AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
    FROM queue GROUP BY day, level;

К сожалению, в документации, на данный момент, не описаны ряд настроек.

Первая недокументированная возможность это то, что можно указать количество подписчиков, например, если вы хотите создать несколько материализованных представлений. Для того чтобы указать их количество необходимо добавить параметр при создании таблицы.

Kafka(broker_list, topic_list, group_name, format[, schema, num_consumers])

Второй возможностью является указание любых доступных параметров конфигурации для подключения. Параметры задаются глобально через конфигурацию.

Как видно из приведённого ниже кода:

/// path - static const String CONFIG_PREFIX = "kafka";
static void
loadFromConfig(struct rd_kafka_conf_s * conf, const AbstractConfiguration & config, const std::string & path)
{
    AbstractConfiguration::Keys keys;
    std::vector<char> errstr(512);
    config.keys(path, keys);
    for (const auto & key : keys)
    {
        const String key_path = path + "." + key;
        const String key_name = boost::replace_all_copy(key, "_", ".");
        if (rd_kafka_conf_set(conf, key_name.c_str(), config.getString(key_path).c_str(), errstr.data(), errstr.size()) != RD_KAFKA_CONF_OK)
            throw Exception("Invalid Kafka setting " + key_path + " in config: " + String(errstr.data()), ErrorCodes::INVALID_CONFIG_PARAMETER);
    }
}

для этого необходимо добавить в файл конфигурации сервера новую секцию. При этом ключи будут являться названием настройки (обратите внимание что в имени ключа не может быть точек, поэтому все “_” в ключе заменяются на “.” для настройки).

Пример для задания настройки client.id:

<kafka>
    <client_id>ClickHouseKafkaEngine</client_id>
</kafka>