Scraper Обновлено: 3 October, 2019

Используйте формат сообщений Apache Kafka для экономии места и пропускной способности

  Перевод   Ссылка на автора

Путешествие в группе может сократить расходы. Изображение из Unsplash

Одной из основных проблем, с которыми мы сталкиваемся в наши дни, является объем дискового пространства, используемого темами Apache Kafka. Наши сообщения сериализуются как JSON. Как вы знаете в JSON, каждое поле модели данных хранится в строке JSON, что приводит к тому, что в теме Kafka хранится много дублированных имен полей. Чтобы уменьшить использование дискового пространства, у нас есть несколько вариантов, и я хочу поделиться некоторыми из них здесь.

TL; DR

В этой статье я покажу вам, как мы можем уменьшить использование дискового пространства Kafka. Сначала я опишу, почему необходимо уменьшить дисковое пространство. Затем я опишу формат сообщений Kafka и то, как он помогает нам экономить дисковое пространство и пропускную способность. После этого, используя тесты, мы показываем, как длительное использование, сжатие и использование Avro могут помочь нам сэкономить дисковое пространство. И, наконец, в конце вы можете просмотреть таблицу, которая поможет вам решить, какой подход лучше для вашего варианта использования.

Почему мне нужно уменьшить размер сообщения?

Хотя вы можете утверждать, что хранилище сегодня дешевое, есть несколько ситуаций, в которых вы хотите уменьшить дисковое пространство и пропускную способность:

  • это не странно хранить все ваши данные в Кафке, Нью-Йорк Таймс, например, использует Кафку как источник правды! Можно сохранить записи всей темы навсегда, отключив сохранение в Kafka. При правильном сжатии вы можете значительно сократить использование дискового пространства. Если вы не уменьшите размер сообщения, возможно, вам не хватит места на диске в несколько раз быстрее, чем вы думаете!
  • Если вы можете уменьшить размер сообщений при отправке их брокеру, вы используете меньшую полосу пропускания и, следовательно, сможете увеличить общее количество сообщений, которые могут быть отправлены брокерам.
  • Облачные платформы предлагают цены это вычисляется количеством данных, которые записываются или считываются из кластера Kafka, и количеством данных, которые хранятся в кластере! Таким образом, снижение этих факторов может значительно снизить ваши затраты на применение.

Прежде чем показать вам, как мы можем уменьшить размер сообщения, я хочу описать формат сообщения Kafka.

Формат сообщения

Формат сообщения Kafka уже описан в его документация Поэтому я вкратце остановлюсь.

Для этой статьи достаточно определить запись, запись пакета и накладные расходы записи:

  • Запись: каждая запись - это наша знакомая пара ключ-значение (и некоторые небольшие дополнительные данные).
  • Пакетная запись: каждый производственный запрос (для тематического раздела), отправляемый брокеру Kafka, упаковывается в пакет. Каждая партия содержит от одной до нескольких записей и содержит раздел заголовка пакета.
  • Служебные данные пакета записей: каждый созданный пакет содержит метаданные о записях, такие как версия сообщения (магический байт), количество записей, алгоритм сжатия, транзакция и т. Д. Эти метаданные хранятся в служебной части пакета. Запись пакета служебных данных61 байт,

Объем накладных расходов на запись постоянен, и мы не можем уменьшить его размер. Но мы можем оптимизировать размер пакета записей тремя способами: задержаться, сжать и использовать схемы для значений и ключей наших записей.

затяжной

Как общественный транспорт помогает нам сократить трафик?

Изображение из https://www.reddit.com/r/pics/comments/clx25f/benefits_of_public_transport/

Как вы видите выше, если каждый человек использует свою машину для поездок по городу, мы можем столкнуться с интенсивным движением. Но, с другой стороны, использование автобусов для перевозки пассажиров к месту назначения может значительно сократить трафик. Это будет достигнуто, потому что мы перемещаем тех же людей, используя меньше места (автобусы).

Задержка в Кафке помогает нам сделать то же самое. Вместо того, чтобы отправлять пакет с одной записью, мы можем немного подождать, собрать больше записей за время производства и отправить их как пакет:

Используйте задержку для отправки нескольких записей в одном пакете.

На изображении выше левый производитель немедленно отправляет каждую запись, а правый производитель ждет немного дольше, чтобы собрать больше записей и отправить их в виде пакета брокеру Kafka. Таким образом, у правильного производителя есть только одна служебная запись пакета и сэкономлено 122 байта.

Давайте посмотрим на это, запустив тест. Я произвел 100 записей в темеjson-simple, Каждая партия записей содержит ровно одну запись. Теперь мы можем просмотреть общий размер этих 100 записей, используяkafka-log-dirsкоманда:

# kafka-log-dirs --bootstrap-server localhost:9092 --describe --topic-list json-simpleQuerying brokers for log directories information
Received log directory information from brokers 1
{"version":1,"brokers":[{"broker":1,"logDirs":[{"logDir":"/var/lib/kafka/data","error":null,"partitions":[{"partition":"json-simple-0","size":18034,"offsetLag":0,"isFuture":false}]}]}]}

Я выделил поле размера в результате. Как видите, эти 100 пакетов используют 18 034 байта нашего хранилища.

Теперь давайте используем задержку и создадим те же 100 записей, но внутри ровно одного пакета записей. Вы можете включить его, установивlinger.msвведите число миллисекунд, которое ваш продюсер должен ждать, чтобы собрать наши 100 записей. Я делаю эту рекордную партию внутриjson-lingeringтема:

# kafka-log-dirs --bootstrap-server localhost:9092 --describe --topic-list json-lingeringQuerying brokers for log directories information
Received log directory information from brokers 1
{"version":1,"brokers":[{"broker":1,"logDirs":[{"logDir":"/var/lib/kafka/data","error":null,"partitions":[{"partition":"json-lingering-0","size":12031,"offsetLag":0,"isFuture":false}]}]}]}

Как вы видите выше, нам нужно 12 031 для хранения тех же данных и сэкономить 0,33% дискового пространства Кроме того, нам нужна меньшая пропускная способность сети, потому что количество байтов, отправляемых брокеру, меньше.

компрессия

Сжатие помогает нам сократить объем данных, которые мы хотим сохранить на диске или отправить по сети, за счет увеличения загрузки ЦП. Это компромисс между большим количеством операций ввода-вывода или большим использованием процессора. Большинство веб-приложений сегодня тратят время на ожидание ввода-вывода (сеть, запрос к базе данных и т. Д.), А загрузка ЦП может быть небольшой. Следовательно, имеет смысл использовать CPU для уменьшения ввода-вывода в этой ситуации.

В производителе Kafka перед отправкой пакета записи брокеру мы можем использовать сжатие, чтобы уменьшить размер пакета. Производитель сжимает только записи внутри пакета и не сжимает секцию заголовка пакета. Когда сжатие включено, для типа сжатия в заголовке пакета устанавливается бит флага. Этот флаг будет использоваться во время распаковки у потребителя.

Алгоритмы сжатия работают хорошо, когда в данных больше дублирования. Чем больше записей в пакете, тем выше степень сжатия, которую вы можете ожидать. Вот почему производитель сжимает все записи в пакете вместе (вместо того, чтобы сжимать каждую запись отдельно). Например, в JSON более вероятно иметь одинаковые имена полей в смежных записях.

Кафка поддерживает различные алгоритмы сжатия. Если вы хотите иметь более высокую степень сжатия, вы можете использовать gzip за счет увеличения загрузки ЦП. Но если вы хотите меньше использовать процессор и более быстрый алгоритм, вы можете выбрать Snappy. Вы можете установить алгоритм сжатия, используяcompression.typeконфиг в продюсере.

Давайте создадим те же 100 записей, используя задержку и сжатие gzip внутри темыjson-gzip:

# kafka-log-dirs --bootstrap-server localhost:9092 --describe --topic-list json-gzipQuerying brokers for log directories information
Received log directory information from brokers 1
{"version":1,"brokers":[{"broker":1,"logDirs":[{"logDir":"/var/lib/kafka/data","error":null,"partitions":[{"partition":"json-gzip-0","size":3602,"offsetLag":0,"isFuture":false}]}]}]}

Как вы можете видеть выше, нам нужно только 3 602 байта для хранения наших исходных данных. Мы можем выбрать алгоритм сжатия Snappy:

# kafka-log-dirs --bootstrap-server localhost:9092 --describe --topic-list json-snappyQuerying brokers for log directories information
Received log directory information from brokers 1
{"version":1,"brokers":[{"broker":1,"logDirs":[{"logDir":"/var/lib/kafka/data","error":null,"partitions":[{"partition":"json-snappy-0","size":5661,"offsetLag":0,"isFuture":false}]}]}]}

Как и ожидалось, используя алгоритм Snappy, нам нужно больше байтов (5661 байт) для хранения наших 100 записей.

схема

Использование схемы для хранения записей Kafka имеет несколько преимуществ, и одним из них является уменьшение размера записи. Поскольку в JSON необходимо хранить имя каждого поля с вашими данными в записи, общий размер записи значительно увеличивается. Но если вы решите хранить свои данные в формате Avro, вы можете сохранить схему один раз и создавать записи на основе этой схемы много раз. Таким образом, вы уменьшаете размер записи, удаляя схему из записи. Вам просто нужно сохранить схему значения записи (или ключа) в Реестр схемы слияния и сохраните идентификатор схемы в записи.

Хранение схемы в отдельном месте имеет смысл, потому что схема является одинаковой для большинства записей, и если у вас есть более новая версия схемы, вы можете сохранить новую в Рефлюэнтной схеме.

Используя устаревшую схему и схему Avro, мы создали 100 одинаковых записей дляavro-lingeringтема:

# kafka-log-dirs --bootstrap-server localhost:9092 --describe --topic-list avro-lingeringQuerying brokers for log directories information
Received log directory information from brokers 1
{"version":1,"brokers":[{"broker":1,"logDirs":[{"logDir":"/var/lib/kafka/data","error":null,"partitions":[{"partition":"avro-lingering-0","size":5559,"offsetLag":0,"isFuture":false}]}]}]}

Без сжатия нам нужно 5,559 байт для хранения наших 100 записей. Если я включу сжатие для той же партии, мы сможем уменьшить размер еще больше:

# kafka-log-dirs --bootstrap-server localhost:9092 --describe --topic-list avro-gzipQuerying brokers for log directories information
Received log directory information from brokers 1
{"version":1,"brokers":[{"broker":1,"logDirs":[{"logDir":"/var/lib/kafka/data","error":null,"partitions":[{"partition":"avro-gzip-0","size":3305,"offsetLag":0,"isFuture":false}]}]}]}

При использовании gzip нам нужно только 3 305 байт для хранения 100 записей, в то время как для тех же записей (в простейшем случае) нам понадобилось 18 034 байта. Используя этот подход, мы можем сохранить81 процентнашего места для хранения. Обратите внимание, что в наших образцах записей не было дублированных значений (кроме поля состояния), и если у вас есть дубликаты в ваших данных (что не редкость), уменьшение размера будет очень большим!

Интересным моментом при хранении значений в формате Avro является то, что если вы отключите задержку, ваша запись обычно меньше записи, хранящейся в JSON. Мы можем проверить это, просмотрев размер каждого значения записи, используяDumpLogSegmentsорудие труда. Давайте сначала посмотрим размер значения записи в формате JSON, который мы уже опубликовалиjson-lingering:

# kafka-run-class kafka.tools.DumpLogSegments --deep-iteration --files /var/lib/kafka/data/json-lingering-0/00000000000000000000.logDumping /var/lib/kafka/data/json-lingering-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 99 count: 100 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1570112629952 size: 12031 magic: 2 compresscodec: NONE crc: 2574281250 isvalid: true
| offset: 0 CreateTime: 1570112629952 keysize: -1 valuesize: 107 sequence: -1 headerKeys: []
| offset: 1 CreateTime: 1570112629952 keysize: -1 valuesize: 107 sequence: -1 headerKeys: [].
. 98 other records goes here!
.

В приведенной выше команде мы можем просмотреть пакет из 100 записей, используя этуDumpLogSegments, Как вы видите, пакет содержит 100 записей, и я пропустил 98 записей для краткости. Важным моментом (выделенным жирным шрифтом) является то, что размер каждого значения записи составляет 107 байт. Если мы запустим ту же команду дляavro-lingering:

# kafka-run-class kafka.tools.DumpLogSegments --deep-iteration --files /var/lib/kafka/data/avro-lingering-0
/00000000000000000000.logDumping /var/lib/kafka/data/avro-lingering-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 99 count: 100 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1570088225966 size: 5559 magic: 2 compresscodec: NONE crc: 82210245 isvalid: true
| offset: 0 CreateTime: 1570088225966 keysize: -1 valuesize: 46 sequence: -1 headerKeys: []
| offset: 1 CreateTime: 1570088225966 keysize: -1 valuesize: 46 sequence: -1 headerKeys: [].
. 98 other records goes here!
.

Мы видим, что с помощью Avro нам нужно всего 46 байтов для хранения значения каждой записи. Если количество записей внутри пакета мало, использование формата Avro может значительно уменьшить размер сообщения.

сравнение

Я суммирую сравнение, используя таблицу:

Как вы можете видеть, используя gzip и Avro формат дает лучший результат.

Вывод

Мы сравнили несколько подходов, которые можно использовать в Apache Kafka для уменьшения использования дискового пространства. Как видите, у каждого есть свой компромисс:

  • Задержка требует, чтобы вы подождали немного больше, чтобы собрать больше записей.
  • Сжатие использует больше ресурсов процессора, но уменьшает количество операций ввода-вывода.
  • Использование Avro налагает зависимость на клиентов (потребителей и производителей), чтобы иметь другое хранилище данных (Confluent Schema Registery) для хранения схемы записей.

В некоторых случаях эти компромиссы должны быть приемлемыми и сокращать использование ресурсов.

Если вы хотите запустить тесты, исходный код доступен на GitHub,

Ссылки

https: //kafka.apache орг / документация /
https://github.com/apache/kafka
https://www.confluent.io/blog/decoupling-systems-with-apache-kafka-schema-registry-and-avro/
https://medium.com/@stephane.maarek/introduction-to-schemas-in-apache-kafka-with-the-confluent-schema-registry-3bf55e401321