Storage and autoscale in Lerua
Last updated
Last updated
https://habr.com/ru/companies/leroy_merlin/articles/712946/
Всем привет! Меня зовут Александр Токарев, я технический архитектор домена «Управление данными» в Леруа Мерлен. Год назад мы уже делали обзор нашей Платформы данных, сейчас же я расскажу про её развитие за последний год и про задачи, которые нам удалось решить.
Мы столкнулись с необходимостью масштабировать наш подход, когда количество источников, интегрированных в платформу, стало больше 150. Всего же мы планируем интегрировать данные из более чем 800 систем. Однако ETL‑инструменты, которые мы использовали на первых этапах развития дата платформы, не позволяли добиться эффективного масштабирования. Кроме того, сам процесс интеграции источников был достаточно трудоемким. Поэтому возник запрос на рефакторинг архитектуры процесса поставки данных, который, с одной стороны, позволил бы эффективно горизонтально масштабироваться, а с другой стороны, упростил бы сам процесс интеграции. В результате мы пришли к следующей схеме процесса.
Вкратце напомню структуру изначального процесса, который мы выстроили в ходе построения дата‑платформы. Источники данных отправляют CDC (Change Data Capture) сообщения в Kafka‑топики, которые вычитываются ETL (Extract‑Transform‑Load) процессом на NiFi, складывающим преобразованные CDC в БД GreenPlum в сырой слой (Raw). Затем по расписанию запускается загрузчик в Airflow, вызывающий хранимую процедуру в самом GreenPlum, которая делает свёртку сырых сообщений и преобразует их в табличный вид, который у нас называется ODS (Operational Data Store) слоем. Далее на основе ODS‑слоя можно построить слой витрин данных (Marts), но это не всегда является обязательным.
Как было
Кроме того, в нашей компании при работе с данными мы основываемся на подходе Data Mesh. Его суть заключается в том, что за интеграцию данных в дата‑платформу и обеспечение качества интегрируемых данных отвечают команды, являющиеся владельцами систем‑источников данных. В свою очередь, команда дата‑платформы отвечает за предоставление инструментов для проведения интеграции и за обеспечение работоспособности этих инструментов в процессе эксплуатации.
Данный подход был вполне эффективен на первых этапах развития дата‑платформы. Однако со временем в результате роста количества подключенных систем и объёма загружаемых данных мы столкнулись с рядом проблем.
Технологическая нагрузка по расчету ODS‑слоя из сырых данных выполняется на тех же мощностях, что и запросы пользователей. Из‑за этого мы регулярно получали жалобы на то, что запросы выполняются медленно. В качестве частичного решения этой проблемы мы поставили выполнение всех технологических процессов на ночь, но, во‑первых, это уже затрагивало наших потребителей в дальних часовых поясах, во‑вторых, ограничивало скорость поступления данных в ODS‑слой до одного раза в сутки.
В продуктовых командах, владеющих системами, которые должны быть интегрированы в дата‑платформу, не всегда есть дата‑инженеры с нужными компетенциями. Для небольших команд это не всегда экономически оправдано. Кроме того, дата‑инженер должен владеть сопутствующими технологиями, такими как NiFi и Airflow.
Около 75% источников используют однотипные системы для хранения данных — например, БД PostgreSQL или MSSQL. Соответственно процессы интеграции данных из таких источников выглядят однотипно. Каждый дата‑инженер просто копировал интеграционный ETL‑процесс в NiFi с похожего источника и слегка его модифицировал. Поэтому возникла потребность в создании универсального процесса под стандартные типы источников данных, которые просто конфигурировались бы под каждую интеграцию.
Необходимо было обеспечить резервирование всех данных на случай отказа основного кластера GreenPlum и их быстрое восстановление.
Таким образом, мы приняли решение вынести всю технологическую нагрузку за пределы кластера GreenPlum. Все поступающие из Kafka‑топиков CDC‑сообщения сохраняются в персистентное хранилище на базе S3, с разделением по источникам, целевым таблицам и временным интервалам. Мы решили сделать единый процесс, читающий данные сразу из всех источников и обрабатывающий CDC‑сообщения в зависимости от их формата. Это позволило нам автоматически балансировать ресурсы между источниками в зависимости от нагрузки — вместо создания отдельных ETL‑процессов на NiFi под каждый источник. Также это дало возможность сделать универсальные конфигурируемые обработчики для часто встречающихся типов источников данных.
После сохранения CDC‑сообщений из Kafka на S3 мы запускаем процесс формирования ODS‑слоя, который также хранится на S3. Он представляет собой снимок таблицы (snapshot) на определенный момент времени и может служить для неё точкой восстановления. Таким образом, хранилище на S3 является первоисточником для данных ODS‑слоя, которые затем могут загружаться как в общий коммунальный кластер GreenPlum, где с ним могут работать все желающие, так и в другие специализированные кластеры и БД, которые работают с меньшим объёмом исходных данных для повышения производительности.
Ниже приведена логическая схема загрузки данных в дата‑платформу.
Логическая схема загрузки данных в дата-платформу
На данной схеме я показываю, что все CDC‑сообщения, приходящие из Kafka, сохраняются в неизменном виде на S3. При сохранении они разделяются по директориям в соответствии с названием таблицы и интервалом времени. Для определения интервала времени используется event‑time, содержащийся в самом сообщении. Это нужно для того, чтобы в случае так называемых late events мы могли доложить их в нужную директорию и пересчитать.
Далее на основе сохранённых CDC‑сообщений в Raw‑слое необходимо рассчитать приращение для основной таблицы — дельту. Дельта представляет собой последовательность операций, которые необходимо применить к предыдущему снимку таблицы для того, чтобы получить снимок таблицы на начало следующего периода. Таким образом, мы получаем серию снимков для таблицы с определённой периодичностью.
У данного подхода есть два недостатка. Первый заключается в том, что в случае большого объёма данных в снимке и малого — в дельте при расчёте следующего снимка будет расходоваться много ресурсов. Поэтому имеет смысл перестраивать снимки не после появления каждой новой дельты, а после появления нескольких дельт. Например, вместо S1 = S0 + d0 и S2 = S1 + d1 можно рассчитывать снимок через два интервала как S2 = S0 + d0 + d1 и т. д.
Второй недостаток заключается в том, что каждый раз загружать в GreenPlum полное состояние таблицы при появлении нового снимка также будет слишком затратным процессом. Поэтому мы разработали процедуру применения дельт к таблице непосредственно к данным, уже хранящимся в GreenPlum, что устраняет необходимость в постоянной перезагрузке данных. Таким образом, загрузка данных в GreenPlum из снимков таблицы необходима только в случае восстановления резервной копии, в штатном же режиме дельты применяются и к снимкам таблицы на S3, и к данным в самом GreenPlum. В случае штатной работы системы состояние таблицы после применения соответствующей дельты и состояние снимка на S3 должны быть эквивалентными.
С учётом всех изложенных выше вводных мы пришли к следующей архитектуре системы.
Текущая архитектура
Изображенные на схеме компоненты процесса интеграции источников в дата‑платформу взаимодействуют следующим образом. Точкой входа в контур дата‑платформы и механизмом передачи CDC‑сообщений от источников в дата‑платформу является Kafka. Как и в предыдущей реализации дата‑платформы, мы исходим из того, что данные от источников поступают в виде непрерывного потока CDC‑сообщений. Из неё данные вычитываются модулем CDC ingest, построенным на Apache Flink. Его основными задачами являются загрузка данных из Kafka‑топиков и сохранение их на S3 в партиционированном виде.
Кроме того, модуль CDC ingest определяет, завершилось ли заполнение директории или ещё в процессе. Для этого он использует оконные функции, входящие в состав Apache Flink. Для каждого интервала времени он собирает множество всех выходных директорий, в которые была запись за этот интервал. Затем при помощи скользящего окна мы сравниваем множества за последний и предыдущий интервалы, и если в предыдущем интервале есть пути, которых нет в последнем, то мы считаем, что запись в эти директории завершилась и их можно передавать на следующие шаги.
Для расчета приращений (дельт) ODS‑слоя служит модуль Raw to ODS, построенный на Apache Spark. Его задача заключается в свёртке CDC‑сообщений по натуральному ключу таким образом, чтобы в результате его работы по каждому ключу была только одна итоговая операция, которую нужно применить к предыдущему состоянию таблицы. Например, если по данному ключу была сначала операция delete, а затем insert, то он их преобразует в одну операцию update. Или наоборот, если по ключу было три операции, сначала insert, потом update, потом delete, то он просто исключит этот ключ из итоговой дельты, и т. д. Также этот модуль отвечает за построение снимков таблиц на определённые моменты времени.
За загрузку данных ODS‑слоя из S3 в GreenPlum отвечает модуль Upload coordinator. Сами данные на S3 подключаются как внешние таблицы по протоколу PXF и затем перегружаются во внутренние таблицы GreenPlum при помощи хранимой процедуры. Изначально планировалось делать его на Airflow, который просто бы вызывал хранимую процедуру, однако мы столкнулись со следующей проблемой. Нам было необходимо, чтобы от запуска к запуску у нас формировался разный DAG, структура которого зависела от состава и количества партиций, готовых к загрузке. Однако динамические DAGи на Airflow позволяют сформировать только одну форму DAGа, которая не меняется от запуска к запуску. Поэтому в качестве координатора загрузки был выбран Apache Spark, который может распараллелить задачи на загрузку. В отличие от предыдущего модуля, здесь Spark не занимается непосредственно обработкой больших объёмов данных, а просто вызывает хранимую процедуру с Executor`ов, поэтому здесь мы ему выделяем существенно меньше ресурсов, чем предыдущему модулю.
За координацию задач по расчёту и загрузке данных между тремя предыдущими модулями отвечает сервис Task Coordinator. После окончания записи сырых данных модуль CDC ingest делает в нём запись о том, что определённую директорию можно обрабатывать следующим модулем. В свою очередь, модуль Raw to ODS считывает из координатора список путей к директориям, готовым к обработке, и в результате создаёт следующий список уже из директорий ODS‑слоя, готовых к загрузке в GreenPlum. Аналогично работает и модуль загрузки данных ODS.
За управление параметрами конфигурации всех этапов процесса отвечает сервис Metadata. Ранее мы уже делали про него отдельную статью. Он позволяет, в частности, определить схему данных для каждой таблицы источника, указать, какие Kafka‑топики использовать для чтения CDC‑сообщений, настроить периодичность появления новых директорий на S3 и т. д.
Одним из преимуществ данного решения является то, что в целом процесс получился идемпотентным. На любом этапе в случае возникновения какой‑либо ошибки мы просто выполняем его заново, и это не приводит к дублированию или потере данных. Кроме того, идемпотентность процесса позволяет нам корректно работать в случае дублирования CDC‑событий или в случае запоздалых событий (late events). Из‑за того что мы используем для разделения на директории время самого события, они всегда попадут в одну директорию raw‑слоя и будут дедуплицированы и рассчитаны заново.
После запуска в эксплуатацию мы обнаружили следующий недостаток. Так как для обработки данных со всех источников использовался единый процесс, то в случае появления ошибок в структуре данных одного из источников он весь останавливался целиком. Кроме того, если по одному источнику начинают прогружать большой объём данных, то он влиял на скорость загрузки данных по другим источникам. В итоге было принято решение о разделении потоков по нескольким независимым экземплярам, или пулам. На каждый пул выделяется определенный объём ресурсов, прежде всего ядер процессора и памяти. В итоге мы сейчас работаем в 4 пулах: два из них выделены отдельно для систем с большим объёмом данных, один пул — для нестабильных источников (как правило, это такие источники, которые были недавно подключены) и ещё один — для всех остальных источников со стабильной структурой сообщений и небольшими объёмами данных.
Вся технологическая нагрузка была вынесена на внешний кластер k8s. Как следствие, была существенно повышена производительность кластера GreenPlum для пользовательских запросов. В частности, для загрузки и расчёта самого большого источника раньше в среднем расходовалось порядка 15% вычислительных мощностей, а после переключения — около 1%.
График нагрузки на GreenPlum при использовании старого процесса
График нагрузки на GreenPlum после переключения
В итоге внедрение новой реализации позволило нам достичь следующих результатов.
Упрощён процесс интеграции источников в дата‑платформу. Для стандартных источников больше нет необходимости в создании отдельного ETL‑процесса. Достаточно просто добавить соответствующую конфигурацию в сервис метаданных. Время интеграции стандартного источника уменьшилось в ~2 раза.
Благодаря снижению нагрузки на кластер GreenPlum в ~10 раз, освободившиеся ресурсы были выделены для полезной работы пользовательских процедур и запросов.
Рассчитанные данные ODS‑слоя теперь доступны как в GreenPlum, так и на S3. Это, во‑первых, обеспечивает резервирование данных, а во‑вторых, позволяет работать с ними, используя различные инструменты для распределённой обработки данных, такие как Apache Spark, минуя GreenPlum.