Skip to content

Latest commit

 

History

History
606 lines (385 loc) · 74.6 KB

internals.ru.md

File metadata and controls

606 lines (385 loc) · 74.6 KB

StatsHouse

Система сбора статистики и мониторинга ВКонтакте. Старая версия системы также известна, как “Статлоги”.

Это БЕТА-версия документа от 24 ноября 2022.

Какие задачи мы решаем

  1. Сбор статистики с высоким временным разрешением (1 секунда) и маленькой задержкой (5 секунд), почти real-time графики.
  2. Нормальная работа системы при неожиданном резком росте объёма вставляемых данных, в случае ошибок в коде клиентов или каких-то событий.
  3. Защита одних пользователей от других и честность распределения общих ресурсов, для избежания трагедии общин в такой “полудикой” среде, как ВК.
  4. Инфраструктура мониторинга максимально отделена от инфраструктуры, которую мы мониторим. В идеале StatsHouse должен продолжать работать, если недоступно всё, кроме него.
  5. Простота интеграции (создания клиентских библиотек), минимально число зависимостей.
  6. Ограничение кардинальности тэгов, эффективность хранения и получения данных.

Архитектура системы

Компоненты

  • Агент - принимает статистику по UDP в форматах JSON, ProtoBuf, MessagePack, TL а также по протоколу RPC TL, валидирует и интерпретирует статистику, накапливает её в течение секунды, шардирует и отправляет в агрегаторы по протоколу RPC TL. В случае недоступности агрегаторов, хранит данные на локальном диске в пределах квоты и отправляет позже. В VK агентов около 15000.
  • Аггрегатор статистики - принимают данные от агентов, агрегирует статистику, соответствующую каждой секунде от всех агентов и вставляет в базу данных. Аггрегаторов столько, сколько шардов-реплик clickhouse используется, в VK 8 шардов по 3 реплики, то есть 24 агрегаторов. Каждый агрегатор вставляет данные только в свою локальную реплику базы, развёрнутую на той же машине.
  • База данных (clickhouse) - хранит статистику
  • Сервис доступ к данным - позволяет делать только эффективные запросы к базе данных с помощью достаточно узкого API, кэширует данные для минимизации нагрузки на базу данных. Мы максимально ограничиваем выборку данных напрямую из clickhouse, так как неэффективные запросы могут негативно повлиять на кластер.
  • UI - получает данные от statshouse-api и показывает статистику пользователям.
  • Ingress proxy - принимает данные от агентов извне защищённого периметра (извне дата центра) и направляет на агрегаторы.
  • Метадата - хранит список метрик и настройки для каждой метрики, а также поддерживает и защищает от перегрузок глобальный словарь отображения строковых меток в целые и обратно.

Модель данных

Метрики и агрегация

Единицей сбора, просмотра и настройки свойств статистики является метрика. При записи метрики каждое измерение дополняется набором меток (ключей, тэгов), а также временем (timestamp). Измерения с одинаковыми наборами меток агрегируются, как в пределах временного интервала, так и между машинами.

Предположим, что в гипотетическом продукте нам нужно знать количество принятых пакетов в секунду, причём пакеты могут иметь разный формат, а также быть корректными и некорретными, соответственно для каждого пакета мы хотим знать статус обработки - “ок”, либо одна из нескольких ошибок. Каждый раз, когда пользовательский код принимает пакет, он отправляет на вход системы (в агент) событие, например в формате JSON.

{"metrics":[ {"name": "toy_packets_count",
 "tags":{"format": "JSON", "status": "ok"},
 "counter": 1}] }

Формат и статус могут иметь несколько разных значений каждый

{"metrics":[ {"name": "toy_packets_count",
 "tags":{"format": "TL", "status": "error_too_short"},
 "counter": 1} ]}

Если представить событие в виде строчки в традиционной базе данных, то после агрегации в пределах секунды у нас может получиться что-то вроде такой таблички - для каждой комбинации меток, которая встретилась, нам понадобится строчка, чтобы подсчитать число событий с данной комбинацией меток.

timestamp metric format status counter
13:45:05 toy_packets_count JSON ok 100
13:45:05 toy_packets_count TL ok 200
13:45:05 toy_packets_count TL error_too_short 5

Количество необходимых строчек в такой табличке называется кардинальностью метрики, для этой секунды на этой машине кардинальность метрики 3. Обьём данных не зависит от того, сколько было самих событий, а зависит только от того, сколько было разных наборов меток. Строчки с одинаковым набором меток “схлопываются”, а счётчики суммируются между собой.

Сбор данных происходит одновременно на многих машинах-агентах. После того, как сбор и агрегация данных за секунду завершены, данные отправляются на машины-агрегаторы, которые агрегируют данные со всех агентов. Для нашей гипотетической метрики после агрегации между машинами для каждой секунды у нас получится что-то вроде

timestamp metric format status counter
13:45:05 toy_packets_count JSON ok 1100
13:45:05 toy_packets_count JSON error_too_short 40
13:45:05 toy_packets_count JSON error_too_long 20
13:45:05 toy_packets_count TL ok 30
13:45:05 toy_packets_count TL error_too_short 2400
13:45:05 toy_packets_count msgpack ok 1

После агрегации между машинами кардинальность может увеличиться, так как каждая отдельная машина не всегда встречает все возможные комбинации меток, в данном случае общая кардинальность для данной секунды 6.

После агрегации данные записываются а базу данных в посекундную табличку. Обьём посекундных данных велик, поэтому происходит постоянная ротация и посекундные данные доступны за 2 суток.

Параллельно у каждого timestamp зануляются секунды и данные агрегируются в пределах минуты и записываются в минутную табличку. Аналогично в часовую. Поминутные данные хранятся месяц, почасовые не удаляются (удаляются вручную).

Общая часовая кардинальность метрики очень важна, так как именно она определяет, сколько рядов для данной метрики будет храниться длительное время в базе данных.

Также при выборке информации нам придётся пробежать все строчки за выбранный интервал времени, именно кардинальность определяет, сколько строк нам придётся в среднем пробежать и насколько быстро это удастся сделать.

Сэмплирование

У системы есть 2 узких места, первое - отправка данных с машин-агентов на машины-агрегаторы и обработка ими, второе - вставка агрегаторами в базу данных. Если каждую секунду будет отправляться больше данных, чем может обработать агрегатор или вставить база данных, они начнут копиться, и появится отставание, которое в принципе может расти бесконечно, а может и исчезнуть если объём данных уменьшится. Чтобы гарантировать минимальное отставание, применяется сэмлирование. Все “лишние” данные будут выброшены, а оставшиеся домножены на коэфициент таким образом, чтобы сохранить мат ожидания значений.

Предположим, что за секунду собраны 3 строки данных, причём все принадлежат одной метрике

timestamp metric format status counter
13:45:05 toy_packets_count JSON ok 100
13:45:05 toy_packets_count TL ok 200
13:45:05 toy_packets_count TL error_too_short 5

А ширина канала позволяет отправить на агрегатор только 2 ряда. Тогда будет выбран sampling factor 1.5, ряды случайно перемешаны, и отправлены только первые 2 ряда со счётчиком домноженным на 1.5. Таким образом могут быть отправлены следующие данные.

timestamp metric format status counter
13:45:05 toy_packets_count TL ok 300
13:45:05 toy_packets_count JSON ok 150

Или такие

timestamp metric format status counter
13:45:05 toy_packets_count TL ok 300
13:45:05 toy_packets_count TL error_too_short 7.5

Или такие

timestamp metric format status counter
13:45:05 toy_packets_count TL error_too_short 7.5
13:45:05 toy_packets_count JSON ok 150

Если каждый агент проведёт подобную операцию, хотя каждый агент выбросит часть рядов, после агрегации данных от многих агентов выяснится, что значения счётчиков каждого ряда окажутся близки к своим истинным значениям (без применения сэмплирования), тем ближе, чем больше агентов участвовали в агрегации.

То же самое произойдёт при агрегации рядов за 60 секунд в 1 минутный ряд, и далее в часовой. Либо при выборке данных, когда происходит агрегация по не интересующим нас тэгам.

Все средние значения агрегатов сохрантся, к ним только добавится высокочастстоный шум.

Чем выше кардинальность метрики - тем выше будут выбраны коэффициенты сэмплирования, и тем сильнее при тех же условиях будет шум на графике. Это стимулирует пользователей уменьшать кардинальность.

Одинаковый алгоритм применяется как перед отправкой агентами на агрегатор, так и перед вставкой агрегатором в базу данных.

Отметим, что нецелые факторы сэмплирования приводят к тому, что значения счётчиков перестают быть целыми. Поэтому в системе statshouse счётчики не ограничены целыми значениями, а изначально имеют тип с плавающей точкой. Впрочем, для конкретной метрики можно указать настройкой, что факторы сэмплирования нужно случайно округлять. Если желаемое значение фактора 1.1, то 9 из 10 раз оно округлится в 1, а 1 из 10 раз в 2.

Рассмотрим теперь, что происходит, если у нас более 1 метрики и ширину канала нужно распределить между ними.

Честное сэмплирование

Мы хотим, чтобы метрики минимально влияли друг на друга. Если одна метрики внезапно стала генерировать гораздо больше рядов, чем другая, желательно, чтобы именно для этой метрики был выбран больший коээфициент сэмплирования, чтобы остальные метрики не были затронуты.

Алгоритм работает таким образом:

  • Сначала все метрики сортируются по возрастанию количества занятых байтов, и берутся по-очереди.
  • Для каждой метрики
    • Вычисляется положенное ей число байтов - оставшийся бюджет делится на количество оставшихся метрик
    • Если метрика занимает меньше положенного, данные записываются целиком без сэмплирования.
    • Если же метрика занимает больше положенного, она сэмплируется таким образом, чтобы занять положеное количество байтов (если положено 500, а фактически собрали 2000, то будет выбран sampling factor 4).
    • Оставшийся бюджет уменьшается на число байтов, использованных метрикой.

Размер в байтах каждого ряда зависит от типа метрики и количества использованных меток, счётчики занимают меньше, чем значения, и т.д.

Также, поскольку в реальности есть более важные и менее важные метрики, предусмотрено задание вручную веса для каждой метрики. Метрике с весом 2 будет выделен канал в 2 раза более широкий, чем метрике с весом 1.

Сэмплирование также является важным стимулом для каждого пользователя системы не использовать больше, чем справедливый процент ресурсов системы, так как при росте объёма записи выше справедливого происходит ухудшение качества статистики.

Сэмплирование "китов"

Алгоритм описанный выше хорошо работает, если у нас исходные значения счётчиков в рядах близки друг к другу. Однако очень часто в метрике есть один или несколько доминирующих рядов (мы называем их "киты"), например если каждую секунду у нас успешно обрабатывается 1000 пакетов, но происходит по 1 из 4 ошибок:

timestamp metric format status counter
13:45:05 toy_packets_count JSON ok 1000
13:45:05 toy_packets_count TL error_too_short 1
13:45:05 toy_packets_count TL error_too_long 1
13:45:05 toy_packets_count TL error_too_bad 1
13:45:05 toy_packets_count TL error_too_good 1

Если мы отобразим на графике сумму счётчиков пакетов, то получим гладкий график со значением 1004.

Представим теперь, что бюджет позволяет вставить только 4 ряда из 5. Тогда каждую секунду мы будем выбрасывать один ряд, и примерно 4 секунды из 5 значение будет 1003 * (5 / 4) ~ 1203, а одну секунду из 5 значение будет 4 * (5 / 4) = 5.

В среднем это правильно, если просуммируем за 5 секунд, получим 1004, однако визуально график будет проходить гораздо выше среднего значения, в районе 1200, и иметь глубокие провалы почти до 0.

Поэтому при сэмплировании мы первую половину бюджета (2 ряда в данном примере) заполняем рядами с максимальным значением счётчика без сэмплирования, а вторую половину бюджета заполняем случайными из оставшихся рядов, сэмплируя их пропорционально сильнее.

В данном примере ряд со значением 1000 будет всегда вставлен со значением счётчика 1000, так что снова получим гладкий график со значением в районе 1004.

Выбор временного разрешения

Для некоторых метрик секундное разрешение не так важно, как минимальный коэффициент сэмплирования. Поэтому StatsHouse позволяет установить для каждой метрике меньшее разрешение, например 5 секунд. Тогда данные будут отправляться в 5 раз реже, а число рядов, выделяемой метрике окажется примерно в 5 раз больше. При этом задержка увеличится примерно на 5+5 секунд - сначала 5 секунд данные будут собираться, затем будут шардированы (по меткам, как обычно) на 5 частей, и следующие 5 секунд будут отправляться, каждую секунду по одному шарду. Такой способ отправки гарантирует честное распределение канала между метриками с разным временным разрешением.

Разрешение может любым быть делителем числа 60, но мы рекомендуем использовать только значения, совпадающие с LOD UI - 1, 5, 15, 60. Так как при выборе, например, разрешения в 2 секунды в каждые 2 соседние точки 5 секундного LOD-а попадут 3 и 2 значения метрики, а значит на всём протяжении графика будет присутствовать пила с амплитудой +50%. .

Хранение меток - имена

Все метрики хранятся в одной таблице, где есть 15 колонок для меток, названных tag1..tag15. Когда в примере выше мы использовали метки с именем “format”, “status” на самом деле система выбирала одну из колонок на основании описания метрики, например можно отредактировать описание так, чтобы format направлялся в tag1, а статус - в tag2 или наоборот. Также можно использовать системные имена 1..15 напрямую для выбора нужной колонки, они не пересекаются с пользовательскими и всегда доступны для записи.

Дополнительная колонка tag0 имеет специальную интерпретацию, и служит для задания окружения (environment), в котором собирается статистика. Например production или staging. В принципе, могут использоваться любые значения. Например, если на подмножестве машин экспериментальная версия, может быть задана строчка для этого эксперимента. Задаётся в клиентских библиотеках один раз при инициализации. В остальном с точки зрения системы tag0 ничем не отличается от остальных колонок.

Хранение меток - значения

Так как значения меток постоянно повторяются, для компактности хранения и скорости записи и выборки tag0..tag15 имеют тип int32 и там хранятся не строчки, а значения, которые берутся из гигантского отображения строчек string ↔ int32. Отображение общее на все метрики, а его элементы никогда не удаляются, а чтобы предотвратить его бесконтрольный рост, на создание элементов отображения установлен бюджет с лимитом в 300, пополняющийся на 1 значения в час (настраивается). Когда бюджет исчерпывается и отображение создать не удаётся, в колонку записывается специальное служебное значение (mapping flood), чтобы не терять событие целиком.

Максимальная длина значения тэга 128 байтов, если больше - обрезается. Также делается нормализация - с обеих сторон делается TRIM, а все последовательности юникодных пробелов внутри заменяются на 1 ASCII пробел, а все непечатные символы заменяются на дорожный знак. Это делает метки более регулярными и уменьшает удивление при отображении в UI, копировании и вставке в чаты, и т.д.

Иногда значения меток уже имеют подходящий тип, а количество значений велико, например номера приложений или каких-нибудь других объектов. Тогда можно отредактировать описание метрики, указав, что определённые тэги являются “сырыми”, тогда для них вместо отображения строчки-значения, она будет просто парситься, как (u)int32 (принимаем значения в диапазоне -2^31..2^32-1) и вставляться в таблицу как есть. При отображении в UI можно попросить показывать такие значения в каком-то формате, например timestamp, беззнаковое целое, hex, ip адрес, и т.д.

Хранение времени и окно приёма

Обычно время события назначается системой автоматически по времени приёма события, однако если нужно писать старую статистику, можно указать конкретное значение времени события, но принимается только статистика за последние полтора часа, если более старая - время будет установлено в текущее время минус полтора часа. Это сделано потому, что система может работать только при эффективной агрегация между хостами, а для этого все хосты должны присылать данные за конкретную секунду максимально вместе и слажено. А также система хранения (сейчас это clickhouse) работает гораздо медленнее, если значение primary ключа оказывается в разных партах на диске. Для тех метрик, которые явно указывают время событий можно ожидать выбора системой более высоких факторов сэмплирования, так как ряды с разным временем не могут быть агрегированы между собой.

Метрики-значения

Кроме метрик-счётчиков, есть метрики-значения. Например вместо счётчика принятых пакетов мы бы могли захотеть записывать размер принятых пакетов.

{"metrics":[ {"name": "toy_packets_size",
 "tags":{"format": "JSON", "status": "ok"},
 "value": [150]} ]}

или

{"metrics":[ {"name": "toy_packets_size",
 "tags":{"format": "JSON", "status": "error_too_short"},
 "value": [0]} ]}

Значение является массивом, чтобы можно было отправлять сразу несколько значений пачкой, что более эффективно.

Тогда кроме счётчика будут вычисляться агрегаты значений - сумма, минимальное и максимальное значение.

timestamp metric format status counter sum min max
13:45:05 toy_packets_size JSON ok 100 13000 20 1200
13:45:05 toy_packets_size TL ok 200 7000 4 800
13:45:05 toy_packets_size TL error_too_short 5 10 0 8

Среднее вычисляется при выборке данных путём деления суммы на сумму счётчика.

Перцентили значений

Если в описании метрики установлена галочка “с перцентилями”, то кроме агрегатов значений система будет считать перцентили на агентах, пересылать на агрегаторы, агрегировать там и записывать в clickhouse. Объём данных для такой метрики значительно возрастет, поэтому система скорее всего выберет высокие факторы сэмплирования. В таком случае может помочь уменьшение кардинальности или временного разрешения.

Метрики-счётчики уникумов

Используются, чтобы оценить число разных уникальных целых значений (если значения не целые, а например строки, то можно взять hash строк) Предположим, что пакеты содержат id пользователя, отправляющего пакеты. Мы можем посчитать сколько разных пользователей отправляло пакеты.

{"metrics":[ {"name": "toy_packets_user",
 "tags":{"format": "JSON", "status": "ok"},
 "unique": [17]} ]}

Уникальное значение является массивом, чтобы можно было отправлять сразу несколько значений пачкой, что более эффективно.

Множества хранятся в сжатом виде и использованием функции, подобной Hyper Log Log, так что сами значения недоступны, можно узнать только оценку кадинальности множеств.

timestamp metric format status counter unique
13:45:05 toy_packets_user JSON ok 100 uniq(17, 19, 13, 15)
13:45:05 toy_packets_user TL ok 200 uniq(17, 19, 13, 15, 11)
13:45:05 toy_packets_user TL error_too_short 5 uniq(51)

Кроме того для уникумов хранятся обычные агрегаты, как для значений (минимальное, максимальное, среднее, стандартное отклонение), интерпретированных, как int64 и округлённых до float64. Знание диапазона значений часто помогает в отладке.

Топ строк

Иногда бывает ситуация, когда число разных значений метки огромно и неограниченно, например referrer или слово в поисковом запросе. Если использовать обычные метки, то новые значения очень быстро выберут бюджет на создание элементов отображения, да ещё и засорят его огромным количеством одноразовых значений. Для подобных случаев система поддерживает специальную метку с именем _s, что означает буквально строковый тэг. Буквально в дополнение к 16 целочисленным колонкам тэгов сделана дополнительная строковая колонка. Для каждой комбинации обычных тэгов на агенте создаётся специальная структура данных, которая хранит некоторое количество популярных за эту секунду значений строки (например, 100), когда структура наполняется, применяется вероятностное вытеснение. На агрегатор отправляется топ этих значений, например 10. При сэмплировании выбрасываются либо все строки из набора, либо ни одной, таким образом распределение самих наборов по пространству кардинальности обычных меток сохраняется, это важно для большинства пользователей. Аггрегатор собирает все строки для набора во всех агентов, и в свою очередь вставляет в базу данных топ этих строк (например, 20), а для “остальных” не попавших в топ, используется пустая строка. Таким образом обычные счётчики (и метрики других типов) являются на самом деле частным случаем топа строк, когда все строки пустые.

Метка машины-агента и механизм max_host

Большинство пользователей интуитивно хотят использовать имя машины-агента в качестве метки, чтобы иметь возможность просматривать статистику с каждой машины независимо. Однако, что может оказаться неожиданным, добавление такой метки прекращает агрегацию данных между машинами, а значит увеличивает кардинальности и объём данных в соответствующее число раз. Например, если машин-агентов 100, то объём данных увеличится в 100 раз, и системой могут быть выбраны гигантские коэффициенты сэмплирования, например 10, 50 или 100, при этом качество данных сильно ухудшается из-за шума.

Поэтому в StatsHouse для всех метрик включен очень дешёвый механизм, когда в специальную колонку max_host при агрегации данных записывается имя машины, ответственной за максимальное значение (либо внёсшей максимальный вклад в счётчик для счётчиков). Такая колонка увеличивает объём данных менее, чем на 10%, при этом позволяет ответить на вопросы “на каком хосте максимальный объём занятого дискового пространства” или “на каком хосте максимальное количество ошибок каждого типа”.

Например, после агрегации следующих строк от двух агентов

timestamp metric format min max max_host
13:45:05 toy_latency JSON 200 1200 nginx001
timestamp metric format min max max_host
13:45:05 toy_latency JSON 4 80 nginx003

ясно, что максимальное значение latency 1200 было именно на хосте nginx001

timestamp metric format min max max_host
13:45:05 toy_latency JSON 4 1200 nginx001

Прежде чем добавлять метку с именем машины-агента мы рекомендуем попробовать посмотреть в UI функцию max_host для вашей метрики. Чаще всего, зная имя лишь одной проблемной машины, можно посмотреть логи и решить проблему.

Также советуем, где возможно, использовать разбивку не по отдельным машинам, а по их группам, используя метку environment или собственную метку. Например, запуская экспериментальную версию на одной или нескольких машинах, можно установить environment staging или dev для того, чтобы отделить статистику с этих машин от остальных.

Значение максимум в колонке max_host имеет типа Float32, а не Float64 для лучшего сжатия, так как высокая точность здесь не нужна. Имя хоста хранится в виде строкового отображения (Int32), как тэги.

Метаметрики

Для получения сведений о работе самой системы на разных этапах собираются и записываются метаметрики. Все они имеют префикс два подчерка. Самые главные из них вынесены в UI для каждой метрики - это ошибки приёма данных (например отрицательное значение счётчика или значение-NaN), факторы сэмплирования, выбранные агентом и агрегатором, а также оценка часовой кардинальности метрики и количество созданных элементов отображения.

Многие метаметрики по разным причинам не подчиняются общим правилам, например не сэмплируются. Некоторые метаметрики, например статус приёма данных и факторы сэмлирования агентом пересылаются в специальной компактной форме для экономии трафика.

Детали реализации

Метрики и метаинформация

Свойства каждой метрики хранятся в специальном сервисе метаданных, развёрнутом обычно на машинах первого шарда агрегаторов. Поскольку нагрузка на сервис данных невелика, нет смысла разворачивать этот сервис на отдельных машинах.

Для каждой метрики хранится её тип (для правильного отображения), имя метрики, имена и способ интерпретации тэгов.

Метрики создаются через UI, автоматического создания метрик не происходит. Это важно, так как все компоненты предполагают, что разных метрик немного (максимум десятки тысяч) и не имеют защиты от неконтролируемого роста числа разных метрик.

При миграции с существующего решения можно включить режим “автосоздания”, который создаст все реально используемые метрики, затем мы рекомендуем отключит автосоздание, так как иначе может произойти создание слишком большого числа метрик, например если кто-то запишет в имя метрики rand.

Аггрегаторы находятся в постоянном TL RPC long poll к сервису метаданных на изменение метаданных метрик, а агенты аналогично в long poll к агрегаторам, поэтому изменения свойств метрики уже через секунду отражаются на всех агентах.

Удалять метрики нельзя, так как нет способа сделать это эффективно в базе данных ClickHouse. Поэтому используется скрытие метрики установкой флага visible (это действие обратимо). При этом статистика по этой метрике перестаёт записываться в базу данных.

Отображение строковых значений

Сервис метаданных хранит взаимно однозначные отображение в своей базе.

'iphone' <=> 12
'null' <=> 26
...

Flood-лимиты на создание хранятся в той же базе данных.

Аггрегаторы работают с сервисом данныхнапрямую, и кэшируют отображения в памяти и файлах (на месяц), агенты работают с отображениями через агрегаторы, и также кэшируют отображения в памяти и файлах (тоже на месяц). Также агенты при старте с чистого листа используют специальный boostrap запрос из примерно 100000 самых распространённых отображений, это нужно так как при разворачивании на 10000 машинах пришлось бы скачать с агрегаторов примерно миллиард значений по-одному, что заняло бы долгое время, в течение которого метрики бы не могли писаться.

Приём данных

Все ошибки приёма пишутся во встроенную метрику __ingestion_status. Если имя метрики найдено, её ID будет записан в соответствующую колонку метрики, если нет - строчка имени будет записана в строковую колонку. То же самое произойдёт с не найденным именем тэга.

Счётчики и значения имеют тип float64, и при приёме их значения обрезаются диапазоном значений [-max(float32)..max(float32)]. Это сделано для того, чтобы при их суммировании и других операциях над ними, в том числе внутри базы данных, никогда не получать значений +-inf. Однако точность float64 сохраняется, в том числе если исходные значения целые, можно просуммировать довольно много их без потери точности.

Значения-уникумы имеют тип int64, который интерпретируется просто как 64 бита и выбран вместо uint64 потому, что в некоторых языках нет типа uint64. Эти значения считаются чем-то вроде хэшей и при вычислении кардинальности составленных из них множеств они просто проверяются на равенство и неравенство. При записи агрегатов (min, max, sum) эти значения сначала интерпретируются, как int64 и затем конвертируются во float64, так как пишутся в те же колонки, что агрегаты обычных значений.

Если счётчик указан сам по себе, без массива значений или уников, интерпретируется просто как счётчик.

Значения и уники вместе указывать нельзя, будет записана ошибка приёма.

Если указан массив значений или уников, а счётчик не указан, то число событий считается равным размеру массива.

Если счётчик указан вместе с массивом, то число событий берётся равным счётчику, а массив считается сэмплом настоящих значений. Так что

... "counter": 6, "values": [1, 2, 3] ...

означает, что число событий 6, а каждого значений по 2.

Приём данных по UDP

Мы принимаем пакет в формате MessagePack, Protobuf, JSON, TL, все форматы семантически идентичны и происходит автоопределение формата по первым байтам пакета.

Пакет является объектом, содержащим массив метрик

{"metrics":[ ... ]}

Каждый элемент массива является объектом с полями:

{
 "name":"rpc_call_latency",  // имя метрики, обязательно
 "tags":{"protocol": "tcp"}, // тэги
 "ts": 1630000000,           // время событий, отсутствие или 0 означает 'сейчас'
 "counter": 6,               // счётчик события (событий)
 "value": [1, 2.0, -3.0],    // значения, если есть (нельзя вместо с уникумами)
 "unique": [15, 18, -60]     // уникумы, если есть (нельзя вместе со значениями)
}

Например, можно отправить вот такой пакет

{"metrics":[
{"name":"rpc_call_latency",
 "tags":{"protocol": "tcp"},
 "value": [15, 18, 60]},
{"name": "rpc_call_errors",
 "tags":{"protocol": "udp","error_code": "-3000"},
 "counter": 5}
]}
{"metrics":[
{"name": "external_landings",
 "tags":{"country": "ru","sex": "m","skey": "lenta.ru"},
 "counter": 1}
]}

Определения для Protobuf:

message Metric {
  string              name    = 1;
  map<string, string> tags    = 2;
  double              counter = 3;
  uint32              ts      = 4;  // UNIX seconds UTC
  repeated double     value   = 5;
  repeated int64      unique  = 6;
}

message MetricBatch {
  repeated Metric metrics = 13337;  // to autodetect packet format by first bytes
}

В случае TL, тело пакета должно быть Boxed-сериализацией объекта statshouse.addMetricsBatch (определение ниже)

В случае JSON, первый символ должен быть скобкой {, иначе автоопределение не сработает.

В случае Protobuf, по той же причине нельзя добавлять в объект MetricBatch никаких дополнительных полей.

Приём данных по TL RPC

statshouse.metric#3325d884 fields_mask:#
  name:    string
  tags:    (dictionary string)
  counter: fields_mask.0?double
  ts:      fields_mask.4?#               // UNIX timestamp UTC
  value:   fields_mask.1?(vector double)
  unique:  fields_mask.2?(vector long)

= statshouse.Metric;

@write statshouse.addMetricsBatch#56580239 fields_mask:#
  metrics:(vector statshouse.metric)
= True;

Ошибки приёма возвращаются, как TL ошибки. Мы не специфицируем коды ошибок, так как не предполагаем никакой логики в клиентах при получении ошибок, только запись в лог и последующий анализ вручную.

Приём данных по unix datagram socket или TCP (TODO)

Если можно делать неблокирующую отправку в unix datagram socket, то можно использовать их вместо UDP для того, чтобы отследить потерю пакетов со стороны отправителя.

Либо клиенты, например PHP, могут с той же целью пользоваться non-blocking TCP/unix socket для того, чтобы не блокироваться, когда буфер сокета переполняется (хвостик пакета, не влезший целиком запоминается, и будет отправлен по мере освобождения буфера. Новые же пакеты будут выбрасываться целиком, а их счётчик будет увеличиваться).

Структура основных таблиц ClickHouse

Вся статистика всех типов для всех метрик исходно сохраняется в одну таблицу clickhouse, один ряд соответствует одному агрегату. Примерное определение таблицы такое.

CREATE TABLE statshouse2_value_1s (
    `time`           DateTime,
    `metric`         Int32,
    `tag0`           Int32,
    `tag1`           Int32,
...
    `tag15`          Int32,
    `stag`           String,
    `count`          SimpleAggregateFunction(sum, Float64),
    `min`            SimpleAggregateFunction(min, Float64),
    `max`            SimpleAggregateFunction(max, Float64),
    `sum`            SimpleAggregateFunction(sum, Float64),
    `max_host`       AggregateFunction(argMax, Int32, Float32), 
    `percentiles`    AggregateFunction(quantilesTDigest(0.5), Float32),
    `uniq_state`     AggregateFunction(uniq, Int64)
) ENGINE = *MergeTree
PARTITION BY toDate(time) ORDER BY (metric, time,tag0,tag1, ...,tag15, stag);

Если метрик не является перцентилем или счётчиком уникумов, то значение в соответствующей колонке будет пустым. Также, для обычного счётчика все колонки, кроме count будут нулевыми/пустыми. Аналогично колонка строкового тэга будет непустой только если использован топ строк.

Данные из этой таблицы агрегируются за некоторые временные интервалы (например, 60 секунд, 3600 секунд) и сохраняются в идентичные таблицы, но с другим именем, для поддержки быстрой выборки за временные интервалы, значительно превышающие секунду.

Данные шардируются между шардами clickhouse по хэшу от (metric, key0, … , key15), поэтому части данных метрики, с оответствующие, например, метке "protocol":"tcp" обычно оказываются на разных шардах из-за разных значений других тэгов (на всех шардах, если комбинаций тэгов много), и для получения полной картины нужно всегда делать Distributed Query ко всем шардам. Это так, потому что набор тэгов имеет ограниченную кардинальность, и при её достижении объём данных при агрегации перестаёт расти, поэтому если бы каждый шард хранил “сэмпл” всей статистики, то при достаточном объёме данных, фактически каждому шарду пришлось бы хранить число рядов равное кардинальности статистики, а не долю, пропорциональную числу шардов.

Буферные таблицы не используются, так как обычно каждый агрегатор делает 1 вставку в секунду. Вставка делается в incoming-таблицу, а копирование оттуда с помощью материализованного представления с фильтрацией значений по time в пределах окна приёма данных (двое суток), это защита от ошибочной вставки мусорных данных, которая приведёт к тому, что при запросах clickhouse придётся читать данные из всех партов, а не 1-2.

Количество реплик каждого шарда должно быть больше или ровно 3, первые три будут использоваться для вставки агрегаторами, остальные считаются readonly-репликами и могут разворачиваться для масштабирования нагрузки чтений. Количество шардов может быть каким угодно (мы используем 8, в планах увеличить до 16). Для предотвращения неправильной конфигурации агентов и разного шардирования, которое бы привело к взрывному росту объёма данных из-за плохой агрегации, агенты присылают номер шарда-реплики с каждым пакетом данных а агрегатор, и если агрегатор видит, что данные предназначаются не ему, отвечает ошибкой. Этот же номер шарда-реплики используется ingress proxy для того, чтобы направить данные на правильный агрегатор.

Ingress Proxy

Поскольку агенты и агрегаторы используют протокол TL RPC с ключом шифрования датацентра, агенты вне датацентра не могут напрямую присоединяться к агрегаторам, так как это бы потребовало копирования/раскрытия ключа датацентра на внешние площадки.

Поэтому ingress proxy имеет отдельный набор ключей шифрования для подключения извне. Любой ключ может отзываться простым удалением из конфигурации ingress proxy.

Ingress proxy не имеет состояния, и для уменьшения вероятности атаки проксирует только подмножество типов запросов TL RPC, используемых агрегаторами.

Ingress proxy должно быть ровно 3 штуки, каждая проксирует в соответсвующую реплику каждого шарда. Поэтому отказ или остановка для обслуживания одной из ingress proxy эквивалентен отказу одной из реплик каждого шарда, что не мешает нормальной работе системы.

Криптоключи.

Статсхаус использует VK RPC с (опциональным) шифрованием для общения всех компонентов. В VK RPC криптоключ является и паролем для входа, и секретом для вывода эфемерных ключей соединения.

Для того, чтобы установить связь, клиент должен использовать при установлении соединения один из ключей, известных серверу.

Центральным компонентам системы являются аггрегаторы, они при запуске получают единственный "главный" криптоключ датацентра.

Все агенты для подключения к аггрегаторам должны получить адреса первого шарда аггрегаторов в параметре -agg-addr=X, а также "главный" криптоключ датацентра в параметре -aes-pwd-file=X.

Однако это безопасно только внутри защищённого периметра. Если есть необходимость подключаться извне, используется ingress proxy, установленные как раз на границе периметра.

Ингресс прокси стоит на границе, и у неё есть 2 половинки - RPC сервер для того, чтобы агенты подключались снаружи, и RPC клиент для подключения самой прокси к аггрегаторам внутри.

Ингресс прокси обязаны знать свои внешние адреса, по которым к ним будут подключаться агенты, эти адреса даются в аргументе -ingress-external-addr. Для управления тем, на каких интерфейсах могут подключаться агенты используется аргумент (-ingress-addr=X, обычно :8128 что эквивалентно 0.0.0.0:8128, либо адрес подсети сетевого адаптера, чтобы ограничить подключение только через него). Порт в нём должен совпадать с портами в -ingress-external-addr.

По этим адресам внешняя половинка ингресс прокси должна быть доступна для агентов.

В настройках ingress proxy указывается как внутренний криптоключ для отправки данных аггрегаторам -aes-pwd-file=X, так и множество внешних ключей для агентов, установленных на удалённых площадках -ingress-pwd-dir=X, содержимое каждого файла это криптоключ, имя файла игнорируется, и считается комментарием. Длина ключей произвольна, но не меньше 4 байтов, первые 4 байта используется для идентификации ключей и не могут совпадать. При изменении содержимого папки нужно перезапустить ingress proxy, механизма слежения за папкой нет, так как набор ключей меняется крайне редко.

Соответственно каждый агент получает в -aes-pwd-file=X один из ключей указанных у прокси в папке -ingress-pwd-dir=X.

Прокси за прокси

Тройка Ingress proxy имитирует аггрегаторы, так что можно установить ещё один уровень ingress proxy, который будет в качестве аггрегаторов использовать предыдущую прокси.

Тогда для следующей ingress proxy в аргументе -agg-addr=X даётся тройка адресов из -ingress-external-addr=X предыдущей прокси, а в качестве криптоключа в -aes-pwd-file=X один из криптоключей в папке -ingress-pwd-dir=X предыдущей прокси.

Kubernetes и подобное

При шифровании в VK RPC для вывода эфемерных ключей используются remote- и local- IP-адреса соединений, как их видят клиент и сервер, так что трюки типа перекладывания пакетов между адаптерами с помощью средств firewall приведут к невозможности установления соединений. Если соединяются, например, kubernetes-подобные компоненты, то придётся создать виртуальные сетевые адаптеры и подключить их друг к другу средствами linux network namespaces.

Однако крайне не рекомендуется разворачивать агенты и ingress proxy внутри подов. Лучше устанавливать агенты на контейнеровозы, прокидывая в поды порт 13337.

Эта рекомендация потому, что статсхаусу не нравится, когда количество агентов флуктуирует. Агенты отчитываются на аггрегаторы строго каждую секунду, и неизменное число агентов являесят важным индикатором того, что все агенты имеют связь до аггрегаторов. Остановка подов не должна приводить к уменьшению этого числа, так как сработает главный алерт.

Детали приёма регулярных значений

Агенты финализирую секунду для отправки синхронно с календарной секундой. Поэтому если у клиента есть какое-то значение, которое должно фиксироваться каждую секунду (условный “уровень воды”), клиентские библиотеки стараются присылать это значение в районе середины календарной секунды. Таким образом увеличивается вероятность того, что каждая секунда будет содержать ровно 1 измерений, однако гарантий этого система не предоставляет. Клиенты, которым это важно могут явно указать timestamp.

Взаимодействие между агентом и агрегатором, детали

Аггрегатор имеет 2 логических точки входа для данных, одна для отправки “актуальных” данных, другая - для отправки “исторических” данных, то есть тех, которые не удалось отправить сразу после создания.

Аггерагтор всегда приоритизирует вставку актуальных данных, поэтому после сбоя, когда сначала данные долго не могли вставиться, но потом нормальный ход вставки возобновился, актуальные данные начинают вставляться немедленно, а вот накопившийся за время сбоя объём исторических данных будет вставлен по возможности, настолько быстро, насколько это не мешает вставке актуальных данных.

Аггрегатор позволяет вставлять актуальные данные за последние 5 секунд (короткое окно, настраивается), если агент не успел, ему отправляется ответ “присылай данные, как исторические”. Для каждой актуальной секунды агрегатор хранит контейнер со статистикой, куда и агрегируется данные клиентов, как только наступает следующая секунда, данные секунды, выходящей из короткого окна вставляются в clickhouse, а агенты получают ответ с результатом вставки. Также короткое окно распространяется на две секунды в будущее, чтобы нормально работали клиенты, у которых часы немного идут вперёд.

Аггрегатор позволяет вставлять исторические данные за последние 2 суток (длинное окно, настраивается). Если приходят более старые данные, пишется специальная метастатистика, данные выбрасываются, а на агент отправляется ответ ОК.

Поскольку агрегация данных между машинами очень важна, каждый агент делает запрос на вставку нескольких десятки исторических секунд, начиная от самой старой, агрегатор принимает все эти запросы, затем выбирает самую старую секунду, агрегирует, вставляет и присылает ответ, затем снова выбирает самую старую секунду, и так далее. Такой алгоритм приводит к тому, что отставшие сильнее агенты “догоняют” менее отставших, таким образом создаётся тенденция агрегировать и вставлять каждую историческую секунду максимально одновременно, один раз.

Поскольку агрегатор должен лимитировать объём вставленных в секунду данных, а часть данных может вставляться позже, как исторические, агрегатор учитывает, сколько агентов внесли вклад в секунду, и устанавливает лимит пропорционально (все агенты каждую секунду присылают метаданные, даже если никаких пользовательских событий не было). Поэтому, если актуальные данные за какую-то секунду прислали 80% агентов, то для их вставки будет использовано 80% канала, затем если исторические данные прислали ещё 15% агентов, для их вставки будет выделено ещё 15% канала, и т.д. Этот алгоритм приводил к слишком большому сэмплированию, когда агентов очень мало, поэтому в этом случае к лимиту делается небольшая поправка в сторону увеличения.

Каждый шард-реплика агента при старте выбирает поправку к часам от -1 до 0 секунд в зависимости от номера реплики, для того, чтобы агенты не присылали данные лавиной в момент переключения секунды, приводя к большому количеству потерянных пакетов.

Агент сохраняет данные на диск в случае получения ошибки от агрегатора или его недоступности и хранит там либо до достижения лимита в байтах, либо до истечении длинного окна в двое суток, когда известно, что эти данные агрегатор всё равно не примет.

В некоторых случаях, когда доступ к диску нежелаелен или невозможен, агенты может быть запущен с пустым аргументом --cache-dir, тогда диск не будет использоваться, а хранение исторических данных будет в памяти, и всего на несколько минут отсутствия подключения к агрегаторам.

Предотвращение двойной вставки и работа при отказе агрегатора

Если агент получает от агрегатора ошибку, и после этого отправляет те же самые данные другому агрегатору, находящемуся на другой машине, то для дедупликации нужна система консенсуса. Мы решили, что сложность такой системы слишком высока, поэтому у нас возможна ситуация, когда и основной и запасной агрегатор вставят данные от одного агента за одну секунду. Мы решили, что это происходит нечасто, и вместо того, чтобы предотвратить двойную вставку, мы контролируем её специальной метаметрикой “количество агентов, приславших данные в эту секунду”. Для того, чтобы эта метаметрика была стабильной при нормальной работе, агенты присылают данные каждую секунду, даже если никакой пользовательской статистики в эту секунду не было. Также, если агент обнаруживает, что часы сдвинулись вперёд больше, чем на секунду (например, машина затупила или уснула), он присылает разницу в специальном поле, чтобы агрегаторы могли учесть это в специальной метаметрике.

При отказе агрегатора, данные, предназначеныые ему, отправляются агентами на один из двух агрегаторов-реплик, распределяясь между ними по номеру секунды - чётные секунды идут на одну из оставшихся, нечётные на другую, так что в среднем нагрузка на каждый вырастает на 50%. Это одна из причин того, что мы поддерживаем запись строго на 3 реплики clickhouse.