Как мы учили Redis геопоиску

В одном из прошлых проектов мы реализовывали систему «заражения» ближайших к источнику пользователей. Одно движение пальцем по экрану запускало процесс геопоиска, а затем сотни записей и удалений в базе. Но чем больше данных появлялось, тем хуже становилось. Итак, перед вами задача: достаточно быстрый и устойчивый поиск k-ближайших соседей на поверхности земного шара. Искать нужно не просто ближайших, а еще и удовлетворяющих трем условиям: еще не зараженных, еще не голосовавших и инфицируемых. Инфицируемость определяется эвристическим алгоритмом и ее определение мы опустим.

Берем PostgreSQL. Отличная тема, я ни раз говорил, что его и ядро Linux я считаю двумя лучшими opensource-проектами в истории. Для поиска ближайших берем расширение PostGIS с поддержкой геоиндексов, не так ли? Нет, не так. С 2011 года в основной состав PostgreSQL включены операторы <->, <#> и GiST-индексы, которые позволяют делать поиск ближайших без сторонних библиотек. Никакого мусора в проекте и «ну все на хабре так делают» — это залог успеха. Заявляемая производительность — «меньше 10 ms на 2 млн записей» нас устраивает. 2 млн активных пользователей — это очень большой проект, а 10 ms в распределенной СУБД — копейки, зачастую join дороже.

Получаем примерно такой запрос:

select users.*
from users
    left outer join votes on votes.user_id = users.id and votes.post_id = %(post_id)s
    left outer join infections on infections.user_id = users.id and infections.post_id = %(post_id)s
where votes.post_id is null and infections.post_id is null and users.is_infectable = true
order by users.coordinates <-> point(%(latitude)s, %(longitude)s)
limit %(limit)s

Новый пользователь получает возможность заразить своим постом только 4-х людей вокруг. С повышением его индекса влияния, если он ведет себя хорошо и не репостит всякую чушь, это количество растет. PostgreSQL отлично справляется с такими объемами данных и не сильно-то напрягается. 2000 записей в секунду? Ерунда, 10% CPU на db.m3.2xlarge инстансе AWS (примерный аналог небольшого 8-ядерного сервера с 30 Гб RAM).

Проект потихоньку запустился, про него написали все крупнейшие техническиеиздания, и вам наваливает пару сотен тысяч первых пользователей. И здесь начинаются проблемы. Дело в том, что скорости порядка 2000 IOPS (операций ввода/вывода в секунду) на амазоне стоят $200 в месяц, что примерно сопоставимо со стоимостью аренды самого db.m3.2xlarge сервера. Как только вы упираетесь в IOPS, это примерно как упереться в 100% CPU. Очень неприятно, да еще и проходит не моментально. Можно докупить еще тысячу, но это уже $300. А если нам нужно завтра вырасти в 50 раз? Ценник начинает кусаться, а ведь CPU занят всего процентов на 10.

Перед нами вживую пример того, что часто объявляют минусом реляционных СУБД — они ОЧЕНЬ не любят когда много пишут и удаляют, но мало хранят. И несмотря на старую шутку про стартаперов, которые всё делают на NoSQL, сейчас действительно пришло время выбирать дополнительное хранилище.

MongoDB — в задницу, просыпаться ночью от segmentation fault из-за очередного незакрытого бага, нет уж, увольте. Cassandra — круто, но Java, нужны будут люди чтобы это поддерживать. Aerospike — дорого. Tarantool — быстро, в памяти, со скиптами, но чето ссыкатно опыта работы и знакомых нет, куда в случае чего бежать за помощью не понятно. Да еще и однопоточный как Redis. Ну вот да, остается только Redis. Всё в памяти, то есть проблем с I/O не будет никогда. А еще у амазона есть платформа ElastiCache, в которой настроенный масштабируемый редис прям из коробки. Надо попробовать.

Но как организовать геопоиск на Redis, который умеет хранить, умеет работать со множествами, но совершенно не умеет считать что-то сложное? Быстрое гугление показывает экстеншен redis-geo, который обещает путем установки кастомной сборки редиса некоторые фичи географического поиска со скоростью около 100000 операций в секунду. Поставим, проверим.

Поиск по 5000 точкам в радиусе 3000 км на макбуке про с i7 занимает… 3 секунды. Что за фигня, где обещанные микросекунды? Оказывается он выбирает все точки через geohash, а потом для каждой считает расстояние. На расстоянии 20 км и паре сотен точек это действительно быстро, но в масштабах всей земли и миллионов точек — лажа. Редис однопоточный, потому это блокировка всего сервера на 3 секунды. Даже целым редис-кластером нужной производительность не добиться. Лажа.

Но использование алгоритма geohash — интересная идея, нужно развивать. Для тех кто не в курсе, GeoHash — это система кодирования пары latitude/longitude в одно число, или же при переведении в 36-ричную систему счисления — обычную строку (поиграть с демо можно здесь).

Например: координатам г. Бобруйск (53.13810821063661, 29.217967987060547) соответствует хеш u9ky2c7zg.

Однако если мы переведем u9ky2c7zg обратно в координаты, мы получим (53.138101, 29.217947). Точность при кодировании была отброшена, но шестой знак после запятой — это ошибка порядка 5 метров, более чем достаточно. Однако это кодирование не простое. Если отбросить последний символ полученного хеша (u9ky2c7z), то получаем координаты (53.13804, 29.21797). А если еще 4 символа (u9ky2) — (53.17, 29.36). Этого до сих пор вполне достаточно, чтобы определить, что мы в Бобруйске. Оставив всего 2 символа (u9) мы можем сказать лишь то, что координаты где-то в Беларуси (или кусочке Литвы).

Третьей важной особенностью метода является то, что для любого geohash можно с помощью простых математических операций получить 8 его соседей. Имея эти знания можно попытаться что-то реализовать на Redis. Ясно, что искать перебором ближайших соседей на однопоточном редисе — плохая идея. При необходимой точности порядка 50 метров (это 8 символов геохеша) таких соседей будет 8^36 = 3 с 32 нулями после.

upd: Спустя год после выхода этого поста Redis имеет geohash прям из коробки: http://redis.io/commands/geohash

Но мы можем «зумить» путем отбрасывания последнего символа. У Redis есть операция KEYS, которая может искать по маске, например: «KEYS u9ky2*». Можем искать по различным маскам и за 8 операций найти всех до точности 5000 км, а дальше просто пробежать по соседям.

Используя set’ы вида geo:<geohash> = {user_id, user_id, …} реализуем поиск с помощью KEYS и проверяем на 150 тысячах точек.

6 секунд. Говно.

Переписываем всё на хранимых в Redis процедурах LUA. Те же 6 секунд.

Говно.

Замена KEYS на SCAN тоже ничего не дает, операция выполняется так же за O(N), где N — количество ключей в базе, просто теперь не блокирует весь сервер. 6 секунд — по прежнему неприемлемо. Выход? Хранить ID каждого пользователя с 8 точностями. Если ID это integer, пользователей 100 млн, то это всего 8 bytes 8 10^8 = 5.6 Gb. Для 100-миллионного проекта найти 5.8 Gb RAM — копейки (hint: образованный читатель здесь заметит, что Redis в своих структурах хранит не чистые int’ы, а обернутые в собственные объекты, потому эти теоретические выкладки стоит увеличить в несколько раз).

Храним каждого пользователя сразу в 8 сетах вида geo:u, geo:u8, geo:u8k, и.т.д. Теперь выборка для любой точности может производиться за 1 команду. Кроме всего прочего нам необходимо выбирать не просто всех юзеров, а тех, кто заражаем и еще не голосовал и не заражен.

Так как каждый geo:* — это множество, редис имеет операцию вычитания нескольких множеств SDIFF и очень быстрые операции объединения SUNION и пересечения SINTER. Чтобы реализовать задуманное нам нужен один union с множеством «заражаемые» и два diff с множествами «проголосовавшие» и «зараженные» для каждого такого geo-квадрата.

Двигаясь от 8-й точности можно за O(1) с помощью SCARD получить размер geo-квадрата (количество пользователей в нём), затем 8 его соседей, а затем, если это количество недостаточно, переключиться на меньшую точность и повторить.

def search(post_id, latitude, longitude, limit):
    precision = 8
    user_hash = geohash.encode(
        latitude=latitude,
        longitude=longitude,
        precision=precision
    )
    users = set()
    while precision > 0:
        user_square_hash = user_hash[:precision]
        users = users.union(users_in_squares(
            squares=[user_square_hash],
            post_id=post_id,
            limit=limit
        ))
        if len(users) >= limit:
            break
        neighbor_square_hashes = geohash.neighbors(square_hash)
        users = users.union(users_in_squares(
            squares=neighbor_square_hashes,
            post_id=post_id,
            limit=limit
        ))
        if len(users) >= limit:
            break
        precision -= 1
    return users
def users_in_squares(squares, post_id, limit):
    users = set()
    pipe = slave.pipeline(transaction=False)
    for square_hash in squares:
        pipe.scard("geo:%s" % square_hash)
    users_in_square = sum(pipe.execute())
    if users_in_square > limit:
        pipe = slave.pipeline(transaction=False)
        for square_hash in squares:
            pipe.sinterstore(
                "geo:%s" % square_hash,
                "infectable:users",
                "tmp:geo:infectable:%s" % square_hash
            )
            pipe.sdiff(
                "tmp:geo:infectable:%s" % square_hash,
                "post:%s:infected" % post_id,
                "post:%s:voted" % post_id
            )
            pipe.delete("tmp:geo:infectable:%s" % square_hash)
        users = users.union(*[u for u in pipe.execute() if isinstance(u, set)])
    return users

Только имея нужное количество мы можем производить операции SUNION и SDIFF. К сожалению, Redis не позволяет сделать SUNION и SDIFF сразу в одном запросе, потому нам нужно записать результаты SUNION во временное множество с помощью SUNIONSTORE, а затем его про-diff-ать с оставшимися.

Реализуем. 0.0001 секунда в лучшем (густонаселенная Германия), 0.01 секунда в худшем (Сибирь) случае, вместе с отправкой/приемом/парсингом данных между серверами. Терпимо. Реализуем все остальные алгоритмы, тщательно тестируем добавление/удаление пользователей в множества geo:*, начинаем копировать часть трафика на тестовые сервера чтобы протестировать под нагрузкой и…

Под нагрузкой имеем время обработки под 3 секунды и блокировку всего сервера. В качестве апп-фреймворка у нас асинхронный Tornado, либа редиса была специально выбрана официальная, а она блокирующая. Однако неблокирующая реализацияработает в десятки раз медленнее, из-за чего не имеет особого смысла. Но всё равно пробуем переписать всё на неблокирующем tornado-redis’е.

Под нагрузкой снова 2-3 секунды на поиск. Даже если запустить tornado в 16 процессов на 4 CPU с connection pool’ом на 20 соединений в каждом. Но почему?

Потому что сам Redis тоже однопоточный, какими бы неблокирующими библиотеками мы его не обкладывали, многопоточным он от этого не станет. При выполнении большого diff’а он начинает блокироваться до завершения операции. А еще в это время Redis бомбардируют сотни запросов на обновление местоположения. Diff выполняется быстро, несколько миллисекунд, потом еще несколько десятков мс на отправку, однако после его выполнения torando не возвращая управление в IOLoop вызывает остальные diff’ы. В сумме набегает кругленькая сумма миллисекунд. И несмотря на то, что в это время другие вполне бы могли выполнить множество операций чтения, Redis тщательно занят нашим diff’ом и не хочет ничего читать. Встретились два однопоточных одиночества, мать их. Нужен слейв.

Amazon ElastiCache позволяет поднять read-only slave сервер для существующего за пару кликов мышкой. Теперь у нас есть откуда читать, но что делать с постоянно занятым master’ом? Нужно отказываться от единственной операции записи, а именно от SUNIONSTORE. Если бы разработчики Redis позволили на read-only слейвах создавать временные переменные — всё было бы куда проще, а так придется снова поебаться.

Быть может эффективнее получать все данные из Redis и делать объединение множеств уже в Python? Тестим. Нет, результат ожидаемо в сотни раз медленнее. Вместо того, чтобы работать в памяти одного сервера, нам приходится гонять по сети сотни килобайт.

Единственный выход — хранить в geo:* уже результат union’а, то есть только тех пользователей, которые точно могут получать заражения.

А что если на 8 точности geo (20 метров) хранить всех, а на всех нижних — только заражаемых? Таким образом мы избавляемся от любых операций записи и получаем возможность масштабироваться на бесконечное число read-only серверов.

def users_in_squares(squares, post_id, limit):
    users = set()
    pipe = slave.pipeline(transaction=False)
    for square_hash in squares:
        pipe.scard("geo:infectable:%s" % square_hash)
    users_in_square = sum(pipe.execute())
    if users_in_square > limit:
        pipe = slave.pipeline(transaction=False)
        for square_hash in squares:
            pipe.sdiff(
                "geo:infectable:%s" % square_hash,
                "post:%s:infected" % post_id,
                "post:%s:voted" % post_id
            )
        users = users.union(*[u for u in pipe.execute()])
    return users

По графикам на балансировщике обработка запроса занимает от 10 до 60 мс под нагрузкой. Отлично, хотя можно еще оптимизировать.

Выводы:

  1. Если вам нужно много писать и удалять из таблицы, скорее всего вы быстро упретесь в I/O традиционных SQL СУБД.
  2. Одним серверов Redis даже не среднего размера проектах можно обойтись только если он используется как простой кеш. Если используется что-то сложнее операций GET/SET, скорее всего понадобится кластер. Встроенной в redis кластеризацией пока никто не пользуется, а вот twemproxy вполне годен.
  3. SDIFF на больших множествах — узкое место, вспомните курс алгебры множеств и перепишите на другие операции. SINTER самая быстрая, SUNIONна втором месте. Используйте сначала их чтобы уменьшить размерность множества.
  4. Pipeline ускоряет отправку данных в Redis в десятки раз, однако если отправить пару тысяч команд, да еще и сказать выполнять их атомарно — придется страдать.
  5. Так как мы работаем с NoSQL, да еще и храним все данные в памяти, нужно всегда быть готовым к тому, что всё сломается и из бекапа не восстановится. И иметь скрипт «судного дня», который за конечное время сможет восстановить работоспособность. Ну и конечно, все жизненно важные данные нужно дублировать в SQL.
  6. Операция KEYS действительно (как и написано в документации, но кто верит документации не проверив, хах) может использоваться только для отладки. Потому что она лочит всё и пробегает ВСЕ ключи в БД. Альтернатива ей — операция SCAN, хоть и кажется хорошим решением, на деле работает еще медленнее (просто не блокирует всю базу), так как по сути делает то же самое.
  7. Если оно работает медленно, скорее всего переписывание всего на хранимых процедурах LUA не поможет ничем. А тем более реализация всей логики на ней. Если хотите упороться LUA прям внутри БД, попробуйте tarantool, потом мне расскажете как оно.
  8. Экономия памяти становится главным приоритетом, а её утечки снова головной болью. Почти каждая операция записи должна сопровождаться установкой времени жизни EXPIRE/EXPIREAT, даже если это добавление в существующую структуру (дело в том, что при опустении сета редис его удаляет, соответственно при последующей записи это будет уже новый сет без expireat и опа — память потекла). Hash-таблицы эффективнее по памяти, чем просто куча ключей. Вот тут можно почитать подробнее.