Дизайнер процессов
Транзакции и Задачи

Границы транзакций

Job Executor

Задача (job) - это явное представление задачи для запуска выполнения процесса. Задача (job) создается при приближении события таймера или задачи, помеченной для асинхронного выполнения (см. границы транзакций). Таким образом, обработку задач (job) можно разделить на три фазы:

  • Job Creation (задача создания)
  • Job Acquisition (задача доставки)
  • Job Execution (задача исполнения)

В то время пока задачи (job) создаются во время выполнения процесса, job executor выполняет роль доставки (Acquisition) и исполнителя (Execution) задач.

empty

Job Creation (задача создания)

Задачи (jobs) создаются для различных целей камундой. Существующие типы задачи (job):

  • Атрибуты асинхронности при выставлении границ транзакции в процессе.
  • Временные задачи (jobs) - событие таймер в BPMN.
  • Асинхронная обработка BPMN событий.

При создании задача (job) может получить приоритет для доставки и исполнения

Job Acquisition (задача доставки)

Задача доставки (job acquisition) - это процесс извлечения из базы данных задач, которые должны быть выполнены в следующий раз. Поэтому задачи должны быть сохранены в базе данных вместе со свойствами, определяющими возможность выполнения задания. Например, задача, созданное для события по таймеру, не может быть выполнено до истечения определенного промежутка времени.

Задачи находятся в БД в таблице ACT_RU_JOB. Таблица содержит следующие атрибуты:

Получение задачи

Задача может быть получена, если удовлетворяет следующим условиям:

  • подлежит выполнению, то есть значение в столбце DUEDATE_ находится в прошлом;
  • не заблокировано, то есть значение в столбце LOCK_EXP_TIME_ находится в прошлом;
  • повторные попытки не были исчерпаны, то есть значение в столбце RETRIES_ больше нуля.

Кроме того, в механизме процесса есть понятие приостановки задачи. Например, задача приостанавливается, когда приостанавливается принадлежащий ему экземпляр процесса. Задачу можно получить, только если она не приостановлено.

Правила для Job Execution (задача исполнения)

Пул потоков

Доставленные задачи выполняются пулом потоков.
Пул потоков выбирает задачи из очереди полученных задач. Очередь полученных задач - это очередь в памяти с фиксированной емкостью. Когда исполнитель (executor) начинает выполнять задачу, она сначала удаляется из очереди.

Ошибочная задача

В случае сбоя выполнения задачи, например, если вызов служебной задачи вызывает исключение, задача будет повторно выполнена несколько раз (по умолчанию 2, всего выполняется 3 раза). Повтор исполнения происходит не сразу, задача добавляется обратно в очередь на получение. При повторном исполнении значение столбца RETRIES_ уменьшится, а исполнитель разблокирует задание. Таким образом, механизм процесса выполняет учет невыполненных задач. Разблокировка также включает удаление времени LOCK_EXP_TIME_ и владельца блокировки LOCK_OWNER_ путем установки для обеих записей значения null. Впоследствии задача будет автоматически повторено выполнена, как только она будет разблокирована. Когда число повторных попыток исчерпано (значение столбца RETRIES_ равно 0), повторные попытки исполнения прекратятся и будет передан соответствующий сигнал процессу.

Несмотря на то, что все задачи с ошибками повторяются и счетчик повторов уменьшается, есть исключение когда этого не происходит - оптимистическая блокировка.
Оптимистическая блокировка - процесс разрешения конфликтующих обновлений ресурсов, например при параллельном исполнении шагов.

После исчерпания повторных попыток создается инцидент.

Одновременное выполнение задач (Concurrent Job Execution)

Задача исполнитель (Job Executor) следит за тем, чтобы задания от одного экземпляра процесса никогда не выполнялись одновременно.

empty

У нас есть параллельный шлюз, за которым следуют три служебных задачи, выполняющиеся асинхронно. В результате этого в базу данных добавляются три задачи. Как только задача появляется в базе данных, она может быть обработано исполнителем задач. Он получает задачи и делегирует их пулу потоков, которые собственно и обрабатывают задачи.

Обычно это хорошо. Однако это также несет в себе неотъемлемую проблему: согласованность. Рассмотрим параллельное соединение после задач. Когда выполнение задачи завершено, мы приходим к параллельному соединению и должны решить, ждать ли нам выполнения других задач или двигаться дальше. Это означает, что для каждой ветки, приходящей к параллельному соединению, мы должны принять решение, можем ли мы продолжать или нам нужно дождаться одного или нескольких других исполнений из других веток.

Это требует синхронизации между ветками выполнения. Эта проблема решается с помощью оптимистической блокировки. Всякий раз, когда принимается решение на основе данных, которые могут быть неактуальными обязательно увеличивается счетчик ревизии одной и той же строки базы данных в обеих транзакциях. Таким образом, та транзакция, которая зафиксируется первой (родительской), выиграет, а другие транзакции потерпят неудачу с исключением оптимистической блокировки. Задача, которая выполнится первой и придет к параллельному шлюзу выполнит фиксацию, остальные попадут под оптимистическую блокировку, а счетчик ревизии родительской задачи будет увеличен. Поскольку выполнение инициируется задачей, исполнитель задания (job executor) повторит проверку условия после определенного времени ожидания.

Хоть это и приемлемое решение с точки зрения сохранности и согласованности, такое поведение нежелательно на более высоком уровне, особенно если выполнение имеет нетранзакционные побочные эффекты, которые не будут отменены неудачной транзакцией. Например, если бронирование билетов на концерт принадлежит другому экземпляру, то можно забронировать несколько билетов при повторном выполнении задачи.

Понимание ACID транзакций камунды

При выполнении процесса движок Camunda будет продвигаться по шагам процесса пока не достигнет шага - точки ожидания. Такой шаг может быть один из:

empty

  1. Пользовательские задачи и задачи получения сообщения.
  2. Все промежуточные события "перехвата".
  3. Шлюз на основе событий, который предлагает возможность реагирования на одно из нескольких промежуточных событий улавливания.
  4. Некоторые дополнительные типы задач(шагов) (задачи: сервис/service, бизнес-правила/DMN, отправки сообщения/send task).
  5. Внешние задачи. События отправки сообщения тоже можно имплементировать как внешние задачи.

В состоянии ожидания любое дальнейшее выполнение процесса должно дождаться какого-то триггера. Поэтому состояния ожидания всегда сохраняются в базе данных. Однако можно контролировать границы транзакций, вводя дополнительные точки сохранения с помощью атрибутов async before и async after. Job executor будет следить за тем, чтобы процесс продолжался асинхронно.

Контроль границ транзакций

Дополнительные точки ожидания

Можно получить лучший контроль над границами транзакций, введя дополнительные точки сохранения в различных шагах процесса, используя атрибуты async before и async after. Состояние процесса будет сохраняться в таких местах, а задача исполнения (job executor) будет следить за тем, чтобы выполнение процесса продолжалось асинхронно. Рассмотрим на примере диаграммы ниже:

empty

  1. Пользовательская задача - имеет обязательную точку ожидания. После создания пользовательской задачи состояние процесса будет сохранено и зафиксировано в базе данных. Процесс будет ожидать взаимодействия с пользователем.
  2. Задача сервис (service task) по умолчанию выполняется синхронно. В этом случае, если сервис не сработает по какой-либо причине, то потребуется повторно выполнить пользовательскую задачу.
  3. Установив async before='true' задача сервис будет выполняться асинхронно. Это означает, что перед выполнением этой задачи в бд будет сохранено и зафиксировано состояние процесса.

Таким образом, если в транзакции произошел сбой, то исполнение процесса будет откатываться к последней известной точки сохранения. Повторное исполнение ошибочной транзакции будет произведено с последней точки сохранения (где был настроен async before='true'). Границы транзакции могут включать не один шаг процесса.

Каждую задачу-сервис делать асинхронной

Эмпирическое правило, особенно при большом количестве задача-сервис (service task), заключается в том, чтобы помечать каждую задачу как асинхронную.

Недостатком является то, что такие задачи немного увеличивают общее потребление ресурсов. Но этот подход имеет ряд преимуществ:

  • процесс остановится на задаче, в которой возникла ошибка;
  • для каждой задачи-сервис можно настроить стратегию повторных попыток;
  • возможность использования различных фич к каждой задаче-сервис.

Правила и запреты установки точек сохранения

Помимо общей стратегии помечать задачи-сервис как точки сохранения (далее ТС) может потребоваться дополнительно настроить ТС.

Типичные места для настройки ТС после, атрибут async after="true":

  • Empty
    - позволяет пользователям завершить свои задачи, не дожидаясь дорогостоящих последующих шагов и не наблюдая неожиданного отката транзакции в состояние ожидания перед выполнением пользовательской задачи;
  • Empty
    (не идемпотентные действия) - гарантирует, что необратимые изменения, которые не должны происходить чаще одного раза, случайно не повторятся, потому что последующие шаги могут откатить транзакцию к ТС задолго до затронутого шага. Если процесс может быть вызван из других процессов, то для конечных событий также необходимо настроить ТС "после";
  • Empty
    (дорогостоящие операции) - гарантирует, что вычислительно затратный шаг не придется повторять только потому, что последующие шаги могут откатить транзакцию к ТС задолго до затрагиваемого шага. Если процесс может быть вызван из других процессов, то для конечных событий также необходимо настроить ТС "после";
  • Empty
    (перехват внешних событий) - гарантирует, что внешнее событие, в том числе получение сообщения, сохранится как можно скорее поскольку любые последующие шаги могут откатить транзакцию к ТС намного раньше - результат выполнения этого шага не должен быть потерян;

Типичные места для настройки ТС до, атрибут async before='true':

  • Empty
    (начальные события) - ТС позволяет откатить действия при старте процесса до того как что-либо будет выполнено в экземпляре процесса;
  • Empty
    (задачи-сервис вызывающие внешние системы) - ТС позволяет отделить шаг с потенциальной ошибкой от остальных шагов до него. В случае проявления ошибки экземпляр процесса будет ожидать ее обработки, которую можно произвести в мониторинге процессов;
  • Empty
    (параллельные соединения) - параллельные соединения синхронизируют отдельные ветки процессов. Если одна из веток пришла в параллельное соединение раньше всех, то выполнение этой ветки откатится назад с исключением оптимистической блокировки и будет выполнен повторно позже. Таким образом такая ТС гарантирует синхронизацию исполнения параллельных веток, которую выполняет задача исполнения (job executor) Camunda. Следует обратить внимание, что для действий в рамках нескольких экземпляров существует специальный флаг multi instance asynchronous after.

Нельзя настраивать ТС до в следующих случаях:

  • Empty
    - пользовательские задачи и другие шаги с состоянием ожидания;
  • Empty
    (а также шаги - внешние задачи) - такие ТС создают лишнюю нагрузку поскольку сами по себе завершают транзакцию;
  • Empty
    (всевозможные шлюзы) - для таких шагов нет необходимости использовать ТС, только если на шлюзе не настроен "слушатель"(listeners), который может дать сбой.

Откат транзакции без обработки ошибки

Каждое необработанное исключение, возникающее во время исполнения процесса откатывает транзакцию к ТС. На картинке приведен пример:

empty

  1. Происходит выполнение пользовательской задачи.
  2. После выполнения задачи продолжается исполнение процесса пока не достигнет шага с состоянием ожидания (ТС) - таймер.
  3. В случае, когда возникает необработанное исключение произойдет откат транзакции и выполненная пользовательская задача окажется незавершенной.

В данной ситуации процесс окажется в тупике поскольку выполнение пользовательской задачи окажется невозможным. Для решения проблемы в этом примере на задачу-сервис установить флаг asyncBefore='true'.

empty

В таком случае откат пользовательской задачи не произойдет, а job executor через некоторое время произведет повторно неудавшейся транзакции с шага "Generate invoice".

Иногда в процессе откат транзакции необходим. Пример:

empty

  1. Пользователь вводит данные на форме.
  2. При проверке введенных данных обнаруживается проблема и создается исключение, которое откатывает транзакцию и предоставляет возможность ввести данные повторно.