MassTransit, Saga и RabbitMQ для реализации диспетчера процессов
Однажды перед нами встала задача автоматизировать различные workflow в крупной компании. Для нас это значило соединить воедино на момент старта порядка 10 систем. Причем связать всё надо было асинхронно, масштабируемо, надежно. Упрощённо процесс можно описать как последовательность действий в разных системах, которую нельзя автоматизировать полностью, поскольку она требует человеческого участия. Например, для выбора определенных действий или элементарного согласования, которое необходимо для перехода на следующий этап процесса. Для решения этой задачи мы решили использовать архитектуру обмена сообщениями через шину данных, и нам отлично подошел MassTransit с его Saga в связке с RabbitMQ.

Что из себя представляет Saga?
Saga — это реализация шаблона "Диспетчер процессов" из книги «Шаблоны интеграции корпоративных приложений», который позволяет описать процесс в виде конечного автомата. На вход прилетает какое-то событие, Saga выполняет последовательность действий. При этом на любом из этапов Saga может потребоваться решение человека. Тогда она создает задачу в трекере и «засыпает» на неопределённое время, ожидая новых событий. Saga реализована на базе Automatonymous. Она декларативно описывается в классе, унаследованном от MassTransitStateMachine<>. Для Saga нужно описать все состояния, принимаемые события и совершаемые действия при наступлении определенных событий. Текущее состояние сохраняется в базе.
Для начала описываем все состояния и события Saga и даём им понятные имена. Выглядит это следующим образом:
public sealed partial class StateMachine { public State AwaitingTaskCreated { get; set; } public State AwaitingTaskTakedToWork { get; set; } public State AwaitingDecisionAboutTask { get; set; } public State Rejected { get; set; } public EventStartWorkflowCommandReceived { get; set; } public Event TaskCreated { get; set; } public Event TaskTakedToWork { get; set; } public Event TaskDeclined { get; set; } public Event TaskApproved { get; set; } private void BuildStateMachine() { InstanceState(x => x.CurrentState); Event(() => StartWorkflowCommandReceived, x => x.CorrelateById(ctx => ctx.Message.CorrelationId) .SelectId(context => context.Message.CorrelationId)); Event(() => TaskCreated, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskTakedToWork, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskDeclined, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); Event(() => TaskApproved, x => x.CorrelateById(ctx => ctx.Message.CorrelationId)); } }
public sealed partial class StateMachine : MassTransitStateMachine{ public StateMachine() { BuildStateMachine(); Initially(WhenStartWorkflowCommandReceived()); During(AwaitingTaskCreatedInPlanner, WhenTaskCreated()); During(AwaitingTaskTakedToWork, WhenTaskTakedToWork()); During(AwaitingDecisionAboutTask, WhenTaskApproved(), WhenTaskDeclined()); } private EventActivityBinder WhenStartWorkflowCommandReceived() { return When(StartWorkflowCommandReceived) .Then(ctx => ctx.Instance.SaveConfigurationRequestInfo(ctx.Data)) .Send(TaskManagerQueue, ctx => new CreateTaskCommand(ctx.Instance)) .TransitionTo(AwaitingTaskCreated); } private EventActivityBinder WhenTaskCreated() { return When(DPORPApproveTaskCreatedInPlanner) .Then(ctx => ctx.Instance.SaveCreatedTaskInfo(ctx.Data)) .Send(MailServiceQueue, ctx => new NotifyRequestAuthorThatWorkflowStarted(ctx.Instance)) .TransitionTo(AwaitingTaskTakedToWork); } private EventActivityBinder WhenTaskTakedToWork() { return When(TaskTakedToWork) .Then(ctx => ctx.Instance.MarkTaskAsTakedToWork(ctx.Data)) .TransitionTo(AwaitingDecisionAboutTask); } private EventActivityBinder WhenTaskApproved() { return When(TaskApproved) .Then(ctx => ctx.Instance.MarkTaskAsApproved(ctx.Data)) .Finalize(); } private EventActivityBinder WhenTaskDeclined() { return When(TaskDeclined) .Then(ctx => ctx.Instance.MarkTaskAsDeclined(ctx.Data)) .TransitionTo(Rejected); } }
public class WorkflowSaga : SagaStateMachineInstance , ISagaWithState , ICreatedOnOffset , IModifiedOnOffset , ICreatedBy, IModifiedBy { public Guid CorrelationId { get; set; } public string CurrentState { get; set; } public string InitialRequestViewUrl { get; set; } public string RequestNumber { get; set; } public string RequestAuthor { get; set; } public string RequestText { get; set; } public byte[] RowVersion { get; set; } public string CreatedBy { get; set; } public string ModifiedBy { get; set; } public DateTimeOffset CreatedOn { get; set; } public DateTimeOffset ModifiedOn { get; set; } public DateTimeOffset CompletedOn { get; set; } public virtual ICollection RelatedTasks { get; set; } public void SaveGabrielConfigurationRequestInfo( ICreateGabrielConfigurationRequestCommand command) { CorrelationId = command.CorrelationId; RequestNumber = command.RequestNumber; RequestAuthor = command.Author; RequestText = command.RequestText; InitialRequestViewUrl = command.InitialRequestViewUrl; CreatedOn = RuntimeContext.Current.DateTimeOffset.Now; } public void SaveCreatedTaskInfo(ITaskCreationInfo taskCreationInfo) { RelatedPlannerTasks.Add(new RelatedPlannerTask(taskCreationInfo)); } public void MarkTaskAsTakedToWork(ITaskUpdatedInfo taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.TakedToWork); } public void MarkTaskAsApproved(TaskApprovedNotification taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.Completed, taskInfo.Comment); CompletedOn = RuntimeContext.Current.DateTimeOffset.Now; } public void MarkTaskAsDeclined(TaskDeclinedNotification taskInfo) { UpdateTaskInfo(taskInfo, TaskStatus.Declined, taskInfo.Comment); CompletedOn = RuntimeContext.Current.DateTimeOffset.Now; } private void UpdateTaskInfo(ITaskUpdatedInfo taskInfo, TaskStatus taskStatus, string comment = null) { var task = RelatedTasks.Single(t => t.Number == taskInfo.Number); task.ModifiedBy = taskInfo.TaskModifiedBy; task.Comment = comment; task.Status = taskStatus; } }
Обработка ошибок
Что происходит с Saga, если во время обработки сообщения возникает ошибка? Это важный вопрос, поскольку всем хочется, чтобы машина состояний всегда оставалась консистентной, даже если что-то пошло не так. И в Saga от MassTransit с этим всё хорошо.
Как вы уже успели заметить, в примерах выше нет ни одного try catch блока для обработки исключений. Причина простая: они там не нужны. Если во время обработки сообщения возникает исключение, то сообщение возвращается в очередь, а все изменения откатятся. Так как все манипуляции с данными мы делаем в той же транзакции, что и Saga, транзакция не будет закрыта.
Вообще, манипуляция чем-то кроме Saga в самой Saga — это bad practice. По книжке «Шаблоны интеграции корпоративных приложений», диспетчер процессов должен оставаться максимально «тонким и тупым»: просто раздавать команды системам и следить за состоянием, а сам он ничего делать не должен.
Конечно, есть и более сложные сценарии, когда нужно выполнить какие-то компенсирующие действия для обработки исключений. Тогда используется обработчик машины состояний “.Catch” для перехвата исключения определенного типа и дальнейшего выполнения компенсирующей логики.
А если вам нужно просто залогировать возникшее исключение, то лучше воспользоваться наблюдателем (Observer).
Теперь представим ситуацию, что мы уже выполнили команду Send во время обработки сообщения, после чего возникло исключение. Что же будет с отправленной на данном шаге командой? Ведь всё, что улетело, уже не вернёшь? Но и тут всё продумано.
При конфигурации шины можно включить опцию UseInMemoryOutbox. Эта опция позволяет не отправлять сообщения до того момента, пока текущий шаг не будет выполнен. Если возникнет исключение, то сообщения не отправятся вовсе. Вот выдержка из документации:
///
Тесты
На первый взгляд, тестирование асинхронной машины состояний — то ещё удовольствие. Но и здесь всё хорошо. MassTransit предоставляет неплохой фреймворк для написания тестов, который полностью удовлетворяет все наши нужды в тестировании машины состояний.
Фреймворк предоставляет InMemory реализацию шины данных (InMemoryTestHarness), которая позволяет отправлять и получать сообщения, минуя RabbitMQ или другую очередь.
Ну и как пример:
[TestFixture] public class SagaTests : TestFixtureBase { protected const string HostName = "HostName"; protected InMemoryTestHarness Harness; protected StateMachine StateMachine; protected StateMachineSagaTestHarnessSaga; [SetUp] public void SetUp() { StateMachine = (StateMachine)Kernel. Get >(); Harness = new InMemoryTestHarness(HostName); Saga = Harness .StateMachineSaga (StateMachine); } [TearDown] public async Task TearDown() { await Harness.Stop(); } protected async Task InitializeSaga() { await Harness.Start(); var command = new TestStartWorkflowCommand { CorrelationId = SagaId, Author = RequestAuthor, InitialRequestViewUrl = InitialRequestViewUrl, RequestText = RequestText, RequestNumber = RequestNumber, }; await Harness.InputQueueSendEndpoint .Send (command); // Эта строчка нам нужна, поскольку consume срабатывает не сразу и, // соответственно, и Saga не будет, пока не законсюмим Assert.IsTrue(Harness.Consumed .Select ().Any()); var currentSaga = Saga.Created.Contains(SagaId); currentSaga.RelatedPlannerTasks = new List (); return currentSaga; } [Test] public async Task CheckCurrntStateWhenStartWorkflowCommand() { var saga = await InitializeSaga(); Assert.IsNotNull(saga); Assert.AreEqual(StateMachine .AwaitingORDTApproveTaskCreatedInPlanner.Name, saga.CurrentState); } } public class WhenTaskCreated : SagaTestsBase { private async Task InitializeState() { var saga = await InitializeSaga(true); saga.CurrentState = StateMachine.AwaitingTaskCreated.Name; InitializeRelatedTask(saga); await SendTaskCreatedNotification(); Assert.IsTrue(Harness.Consumed .Select ().Any()); return saga; } [Test] public async Task SaveWorkflowDataWhenTaskCreated() { var saga = await InitializeState(); var taskInfo = saga.RelatedPlannerTasks .First(task => task.PlannerTaskType == PlannerTaskType.DPORPApprove); Assert.AreEqual(TaskNumber, taskInfo.Number); Assert.AreEqual(TaskUrl, taskInfo.TaskUrl); Assert.AreEqual(SagaId, taskInfo.SagaCorrelationId); Assert.AreEqual(TaskStatus.Created, taskInfo.Status); Assert.AreEqual(User, taskInfo.ModifiedBy); Assert.AreEqual(saga.CurrentState, StateMachine.AwaitingTaskTakedToWork.Name); } [Test] public async Task SendsMailWhenTaskCreated() { var mailConsumer = Harness .Consumer > (RabbitMqRouting.QueueNames .SendEmailsQueueName); await InitializeState(); Assert.IsTrue(mailConsumer.Consumed .Select ().Any()); } private async Task SendTaskCreatedNotification() { await Harness.InputQueueSendEndpoint .Send(new TaskCreatedNotification { TaskUrl = TaskUrl, Number = TaskNumber, TaskModifiedBy = User, CorrelationId = SagaId }); } }
Полезные советы
Оригинал опубликован на Habr.com