Мартин Фаулер, Рефакторинг

Кормен и другие, Алгоритмы: построение и анализ

Джоэл о программировании

Фредерик Брукс, Мифический человеко-месяц или как создаются программные системы



 

Асинхронная обработка задач

Яков Сироткин
yasha@telamon.ru
ноябрь 2009 года, обновлено 5 мая 2011 года
версия 1.2

Программисты любят действовать прямолинейно: получили запрос - сразу же его обработали. Если не заниматься нагрузочным тестированием и не подсовывать некорректные входные данные, то любой правильный алгоритм пройдёт QA и отправится на боевой сервер. К сожалению, прямой путь не всегда самый безопасный и самый быстрый. В живых системах регулярно случаются самые разные неприятности: возрастает нагрузка, пропадает связь с внешними ресурсами, смежники внезапно переходят на новый формат данных и так далее.

Самый лучший способ не ошибаться — это ничего не делать. Поэтому все входящие запросы мы будем сначала только записывать, а обработку отложим на потом. При этом для записи задачи обычно нужно гораздо меньше ресурсов, чем для её выполнения, и мы сможем принимать задачи гораздо быстрее, чем при синхронной обработке.

Архитектурные ограничения

Самым сердитым решением будет просто записывать лог входящих запросов, но с логами не очень удобно работать и мы будем предполагать, что у нас есть база данных, которая поддерживает SQL или какой-нибудь другой простой интерфейс.

Понятно, что никакая система не сможет обрабатывать транзакции бесконечно быстро, поэтому мы заранее ограничим сверху скорость, с которой будем обрабатывать входящие запросы. А записывать задачи будем используя все оставшиеся ресурсы.

Три сервера

Представим себе нашу систему в виде трёх серверов: сервер базы данных, сервер, записывающий задачи в базу и сервер, который занимается обработкой задач. Узким местом в нашей системе является база данных — о нагрузке на неё мы будем думать в первую очередь, остальные два сервера масштабируются простым клонированием. На практике запись и обработка часто осуществляются в рамках одного серверного приложения, запущенного на кластере из достаточного количества серверов.

Таблица с задачами

В данной статье я пытаюсь только раскрыть суть применяемого подхода, но не привожу образцы готового кода. Поэтому все упоминаемые сущности описаны максимально схематично, а в своей реализации вам нужно будет учитывать специфику конкретного проекта и следовать принятым в нём стандартам кодирования.

В типичном случае задачи хранятся в специальной таблице, по одной строке на задачу, у которой должны быть четыре колонки:

  1. ID — уникальный идентификатор задачи, PRIMARY KEY.
  2. TASK — параметры задачи, обычно это идентификаторы необходимых объектов в других таблицах в простейшем формате.
  3. PROCESS_TIME — время, после которого можно начать очередную попытку выполнить задачу. Значение по умолчанию — текущее время.
  4. ATTEMPT — количество уже сделанных попыток. Значение по умолчанию — 0.

Вокруг этой таблицы строится вся система асинхронной обработки задач, поэтому она должна быть сделана максимально надёжно и просто. В частности, не стоит делать колонку TASK блобом и хранить в ней все данные по задаче. Работоспособность системы при пиковых нагрузках будет определяться скоростью добавления строк в эту таблицу.

Обычно сначала система делается для какого-то конкретного процесса, но потом добавляются всё новые и новые задачи, поэтому становится целесообразным добавить колонку с типом задачи, чтобы выбирать по ней нужный обработчик.

Замечание о времени

Обычно у нас есть база данных и несколько серверов, которые к ней обращаются. У всех серверов может быть своё представление о текущем времени, поэтому их мнение по этому вопросу мы будем игнорировать, а пользоваться будем часами внутри базы данных. Для корректной работы нашей системы желательно, чтобы эти часы выдавали что-то похожее на время, а не случайные числа. Если база данных развёрнута на кластере, то внутри кластера время на разных нодах следует синхронизировать.

Если время в системе будет переведено назад, например, из-за перехода на летнее-зимнее время, то мы можем внезапно начать обрабатывать задачи, старт которых был запланирован на более поздний момент. Это достаточно сложный вопрос, который требует по крайней мере значительных усилий при тестировании. Мы оставим эту тему за рамками данной статьи, благо сейчас эта проблема потеряла свою актуальность.

Жизненный цикл задачи

Каждая задача сначала должна быть записана в таблицу задач. При этом в ID записывается уникальный идентификатор, а в TASK сохраняются данные, необходимые для выполнения задачи. Например, если нам нужно обработать какой-то входящий пакет, то мы можем сохранить его в отдельную таблицу, а в TASK записать идентификатор этого пакета.

В подавляющем большинстве случаев обработку можно начинать сразу и в PROCESS_TIME мы записываем текущее время. Количество попыток ATTEMPT изначально равно 0.

Если какой-то сервер приложений берёт задачу из очереди и начинает её выполнять, то нет никаких гарантий, что в этот момент он не будет уничтожен взрывом. Поэтому мы сразу запишем в PROCESS_TIME текущее время плюс 2 в степени ATTEMPT минут и увеличим ATTEMPT на 1.

Таким образом, задача может начать выполняться повторно через одну минуту. Это нештатная ситуация и защитой вполне может служить уникальный ключ, из-за которого будет вылетать что-нибудь вроде Unique constraint violated.

Если выполнение задачи вызывает ошибку, то вторая попытка произойдёт не раньше, чем через 1 минуту, после первой, третья — через 2 минуты после второй, четвёртая — через 4 минуты после третьей и так далее. Грубо говоря, если мы за уже прошедшее время не решили проблему, то вполне можем ещё столько же подождать. Для некоторых задач через определённое время имеет смысл выполнять аварийное завершение — при нашей схеме работы удобно делать это начиная с какого-то номера попытки. Если аварийное завершение отработало нормально, то задача считается выполненной.

Строка с выполненной задачей удаляется из таблицы задач. Стоит отметить, что просто установить флаг будет недостаточно: от большого количества строк скорость работы с таблицей может существенно упасть. Если требуется хранить лог выполненных задач — это можно делать в другом месте. Заметим, что задача считается выполненной, если прошли все входящие в неё транзакции. Удаление строки из таблицы задач - это ещё одна транзакция, не стоит включать её непосредственно в процесс выполнения задачи. Следовательно, мы должны корректно обрабатывать ситуацию, когда задача была полностью или частично выполнена во время предыдущей попытки.

Обработка задач

Обработка задач может осуществляться в несколько потоков и даже с нескольких серверов. Поэтому очень важно, чтобы конкретная задача не была взята на обработку в двух разных местах. Это наиболее технически сложное место в описываемом подходе, поэтому я позволю себе привести конкретный пример процедуры получения задачи для Oracle:

begin
    update async_task
    set
        process_time = sysdate + (power(2, attempt))/1440, --1440 - это 24 часа * 60 минут
        attempt = attempt + 1
    where
        process_time < sysdate and rownum = 1
    returning
        id, attempt, task_type, task_desc into ?, ?, ?, ?;
end;

Вариант для MS SQL предоставлен Иваном Кармановым:

    update top(1) async_task
    set
        process_time = dateadd(MINUTE, (power(2, attempt)), GETUTCDATE()),
        attempt = attempt + 1
    output
        DELETED.id, DELETED.attempt, DELETED.task_type, DELETED.task_desc
    where
        process_time < GETUTCDATE()

Если ваша база данных не поддерживает такие запросы, то придётся придумать что-то другое. Хочу сразу предупредить, что обычный select и потом update — это ошибка, потому что между ними ещё один select может запросто выбрать ту же задачу. Разумеется, такую функциональность необходимо протестировать. Например, можно создать 100 тысяч задач и в 100 потоков начать их получать-закрывать, записывая в лог номера. Если в логе окажется 100 тысяч уникальных записей — значит тест пройден. Я буду рад дополнить статью примерами кода для других баз данных, присылайте их, пожалуйста.

Нужно помнить, что падение нашей системы случается тогда, когда мы перестаём записывать в базу приходящие к нам задачи. Поэтому мы должны гарантировать, что обработка задач создаёт конечную, контролируемую нагрузку. Если в какой-то момент мы не нашли в базе задач для выполнения, то нужно сделать паузу, а не устраивать непрерывный поток запросов. Если произошла ошибка, то пауза должна быть более существенной, чтобы снизить нагрузку. И даже если мы успешно выполнили задачу, всё равно нужно сделать небольшую задержку, чтобы не все ресурсы ушли на обработку задач.

Если выполнять все задачи из одного потока — это ошибка, потому что одна зависшая задача парализует всю систему. Если каждую задачу запускать в новом потоке — это тоже ошибка, потому что под нагрузкой мы можем получить множество потоков, которые будут отчаянно драться между собой за ресурсы, но производительность системы упадёт. Можно сделать несколько потоков, каждый из которых будет в бесконечном цикле обрабатывать задачи, делая паузы между всеми обращениями к базе. В Java для этого можно использовать стандартный ScheduledExecutorService. Рассмотрим теперь ситуацию, когда нагрузка упала до нуля и каждый поток раз в секунду проверяет, не появилась ли новая задача. 10 потоков будут генерировать 10 запросов в секунду, не многовато ли для простаивающей системы? Если в каждом потоке делать проверку раз в 10 секунд, то среднее время ожидания новой задачи составит 1 секунду. Разумеется, если поток-обработчик успешно выполнил задачу, то вполне хватит сделать паузу на 1 секунду, 10-секундная пауза нужна только при пустой очереди задач.

Количество потоков обработчика и длина пауз (после ошибки, после успешно выполненной задачи, после обнаружения пустой очереди) являются конфигурируемыми параметрами системы. Можно жёстко прописать их в коде, можно сделать их редактируемыми через веб-интерфейс без перезапуска сервера. В большинстве случаев вполне достаточно вынести их в конфигурационный файл.

Тестирование обработки ошибок

Понятно, что в тестовом окружении легко добиться ситуации, когда всё будет работать идеально. К сожалению, в реальной жизни регулярно случаются разные аварии. И, если вы не протестировали, как система ведёт себя при появлении ошибок, то скорее всего она упадёт. Как минимум, нужно проверить, что обработка задач восстанавливается после временного разрыва соединения с базой данных и сделать обработчик, который будет с некоторой вероятностью бросать исключение, например, делить на 0.

Работа в аварийной ситуации

Если в системе что-то сломается, то попытки выполнения некоторых задач будут приводить к ошибкам. Каждая следующая попытка будет откладываться на всё более долгий срок, поэтому сломавшиеся задачи не приведут к катастрофическому росту нагрузки и то, что не повреждено — будет работать как прежде. Когда поломки будут устранены, то накопившиеся задачи будут сделаны в обычном режиме и всё вернётся в нормальное русло само собой. Если авария длилась слишком долго и есть задачи, которые требуют срочного выполнения, то может потребоваться ручное изменение PROCESS_TIME.

Пример: отправка почты

Рассмотри систему, которая рассылает почту с трёх серверов. Для каждого исходящего письма создаётся отдельная задача, после 5 неудачных попыток мы перестаём отправлять письмо и ставим ему статус «Отправить не удалось».

Вечером админы поломали отправку почты на одном из серверов. Если за первые 5 попыток письмо попадало на работающий сервер, то оно успешно отправлялось и только в логе оставалось сообщение об ошибке. Однако, если письмо 5 раз попадало в один и тот же неисправный сервер, то случалась фатальная ошибка.

Пока с утра разбирались с проблемой, пользователи успели написать 300 писем. Из них одно не было отправлено, о чём автору сообщили особо. Для ещё одного письма починка системы случилась между 5-ой и 6-ой попытками, поэтому его судьбу тоже отследили вручную.

Катастрофический всплеск нагрузки

Периодически случаются сбои, когда на один сервис отправляется очень большой поток запросов, на несколько порядков больше нормального. Если использовать стандартную архитектуру, то в системе очень быстро упадёт скорость работы, начнутся ошибки и вы сразу это заметите. В нашем случае довольно долго всё будет выглядеть безоблачно, а потом вы вдруг обнаружите, что очередь задач у вас выросла на год вперёд. Поэтому размер очереди нужно обязательно мониторить в автоматическом режиме.

Если у вас уже скопилось очень много задач, которые нужно выполнить, то нужно запустить больше потоков для их обработки, может быть даже задействовать дополнительные сервера. Вполне может случиться так, что завалит вас не очень важными задачами одного типа, а задачи другого типа нужно выполнять срочно, но они слишком долго стоят в общей очереди. Тогда вам потребуется запустить обработчик, который будет заниматься только приоритетными задачами. Имеет смысл заранее предусмотреть такой механизм в вашей системе.

Заметим, что если таблица задач вырастет, то скорость работы с ней может сильно упасть, и, даже после её уменьшения обратно до штатного крошечного размера, база может продолжить работать с ней как с большой. В этом случае от администратора базы данных потребуется принять меры по исправлению ситуации.

Поток некорректных задач

Ещё одна возможная проблема — поток некорректных данных из внешних систем. Иногда ошибка носит технический характер (например, недопустимый символ в XML, который можно просто выкинуть) и задачи можно честно выполнить, вбив костыль в код обработчика. Если задачи по сути бессмысленны, то, возможно, будет иметь смысл просто удалить их. Другой возможный подход — удалить некорректные задачи и поставить вместо них новые, исправленные. Решение придётся принимать исходя из конкретной ситуации, универсального ответа здесь быть не может.

Для избежания подобных проблем может возникнуть желание тщательно проверять входящие задачи и сразу выдавать ошибки внешним системам. Эта идея довольно вредная, потому что она грозит нашей системе радикальной потерей производительности в штатном режиме, а если вдруг мы начнём отдавать ошибки наружу, то можно и чужие сервера поломать. Но мы должны автоматически мониторить наши логи на наличие ошибок и связываться с коллегами из других проектов в случае необходимости.

Лирическое отступление о валидации: пусть s — строка с входными данными, v(s) — функция, которая показывает, являются ли данные корректными, а f(s) — функция, которая показывает, будут ли данные успешно обработаны. Увы, v(s) и f(s) идеально совпадают на тестовых данных, но в случае аварии мы не можем предсказать их поведение.

Пример: интеграция с внешней системой

Предположим, что нам нужно обеспечить передачу данных из системы A в систему B. Чтобы можно было отследить, что и где поломалась, будем передавать данные в пакетах с уникальными идентификаторами. При приёме пакета система B записывает его содержимое в таблицу принятых пакетов и ставит задачу на обработку пакета с данным идентификатором. Если идентификатор пакета обнаружить не удалось, система B возвращает ошибку. В системе B этим занимается один сервис, который больше ничего не делает и очень редко перезапускается и модифицируется. Обработчики пакетов могут регулярно обновляться, их простой не критичен.

Если система A потеряет доступ к B, то пакет нужно будет отправить повторно, поэтому в A тоже есть асинхронная очередь исходящих пакетов. Также следует предусмотреть отправку исправленных пакетов, если первая версия содержала ошибки. Если в B пришёл исправленный пакет, то он должен заменить предыдущий пакет с тем же идентификатором и создать задачу на его обработку. Разумеется, B должна корректно работать с исправленными пакетами.

Для увеличения производительности можно передавать много пакетов в одном запросе от A к B, но если запрос приведёт к ошибке, то следует разбить его на запросы по одному пакету в каждом (более изощрённые схемы вряд ли имеют смысл).

Хронически высокая нагрузка

У подавляющего большинства систем бывают периоды повышенной нагрузки, например, днём пользователей много, а ночью и в выходные — на порядок меньше. Так как скорость обработки задач в нашей архитектуре ограничена, то при росте числа клиентов в пиковые часы очередь задач может начать расти, а потом потихоньку рассасываться. Если очередь начинает расти, то это должно настораживать, потому что фактически это означает, что система не справляется с нагрузкой в реальном времени, то есть если бы мы обрабатывали задачи синхронно, то у нас начались бы серьёзные проблемы. И если таблица задач станет слишком большой, то работа с ней может существенно замедлиться, а если мы вдруг за спокойный период перестанем успевать очищать очередь, то количество задач будет бесконечно расти, пока от нас не станут уходить клиенты.

Простейший шаг для повышения пропускной способности системы — увеличить количество потоков, обрабатывающих задачи. Если вычислительной мощности хватает, то это поможет, правда снизится допустимая пиковая нагрузка при приёме задач. Понятно, что с некоторого момента новые потоки перестанут приносить пользу, потому что мы упрёмся в производительность железа, а вот будем ли мы при этом успевать записывать новые задачи — это как повезёт.

Наш подход не помогает сэкономить на железе, мы только откладываем получение результатов вместо мгновенного отказа в обслуживании. Если вы хотите всегда быстро отвечать на запросы пользователей, то вам придётся поддерживать резервы вычислительной мощности.

Интерфейс администратора

Ранее я упоминал, что в реальной системе скорее всего будут задачи разных типов. Это не важно для обсуждения остальных вопросов, но, в случае возникновения каких-либо проблем, необходимо иметь простой способ посмотреть количество существующих задач каждого типа. Также нужен список задач конкретного типа с указанием количества попыток (отсортированный по убыванию количества попыток). Закладываться на прямой доступ к базе на боевом сервере нельзя, поэтому следует заранее сделать для этого простой веб-интерфейс.

Делать громоздкий интерфейс с сортировками, фильтрами, возможностью манипулирования временем исполнения, количеством попыток и удалением заданий — вредная растрата ресурсов. Проблема даже не в том, что вы потратите время на реализацию функциональности, которая никогда не понадобится, а в том, что если вы вдруг решите ей воспользоваться, то ситуация наверняка будет кризисной, и редко используемые куски кода могут подкинуть вам неприятный сюрприз. В крайних случаях требуется вмешательство человека с доступом к боевой базе.

Горизонтальная масштабируемость

Заметим, что данное решение великолепно масштабируется. Можно сделать несколько таблиц задач на разных серверах, для каждой из них сделать несколько своих серверов для обработки и постановки задач. Если сделать веб-интерфейс для постановки задач, то можно использовать стандартные средства балансировки нагрузки для веб-сайтов. На практике мне не приходилось сталкиваться с такими решениями, одной таблицы задач всегда хватало.

А не упорядочить ли нам задачи?

Иногда возникает идея, что сначала нужно выполнять те задачи, у которых время выполнения или создания раньше или ещё как-то сортировать их по какому-то признаку. В нормальной ситуации в очереди находится несколько задач, созданных совсем недавно и приоритезировать их нет никакого смысла. Если задач скапливается много, то дополнительное условие может серьёзно замедлить их выборку. Конечно, можно сделать индекс, но тогда замедлится вставка. По-моему, работать в этом направлении имеет смысл, только если этого требует особая специфика задачи.

А нельзя ли использовать готовое решение?

Из готовых решений, которые позволяли бы сделать очередь задач, мы пробовали Oracle AQ. Более того, Oracle AQ оказался существенно быстрее самописного кода. Однако, наше решение полностью устраивало нас по производительности и было абсолютно понятно с точки зрения администрирования, а вот внедрение Oracle AQ потребовало бы содержательных усилий со стороны админов и несло бы с собой дополнительные риски.

Понятно, что есть много разных способов сделать асинхронную обработку задач. Можно при взятии задачи выставлять ей статус «обрабатывается» и если в обработчике случится фатальная ошибка, то нужно будет вручную поменять статус. Или можно брать все задачи из одного потока и никаких проблем с одновременным доступом с разных серверов. Во многих случаях недостатки таких решений не критичны для бизнеса.

Я ожидаю, что после прочтения этой статьи квалифицированный разработчик сможет за два дня внедрить асинхронную обработку задач в свой проект (если, конечно, в нём есть задачи, которые можно так обрабатывать), через неделю — оттестировать, через две недели — начать эксплуатировать. Право, речь идёт о совсем небольшом объёме кода и я старался ответить на все возникающие вопросы. Но я не вижу смысла делать на этой основе общую библиотеку, каждому будет проще написать своё решение, чем разбираться с настройками незнакомой библиотеки и интегрироваться с ней.

Автор благодарен Филиппу Дельгядо за постановку задачи.



Хостится на .masterhost Яндекс цитирования