I'm trying to configure a Saga that works in the following way:
But I'm facing two issues:
Relevant Code:
Bus configuration for saga
Bus = Configure.With(Activator)
.Transport(t => t.UseRabbitMq(rabbitMqConnectionString, inputQueueName))
.Logging(l => l.ColoredConsole())
.Routing(r => r.TypeBased().MapAssemblyOf<IEventContract(publisherQueue))
.Sagas(s => {
s.StoreInSqlServer(connectionString, "Sagas", "SagaIndex");
if (enforceExclusiveAccess)
{
s.EnforceExclusiveAccess();
}
})
.Options(o =>
{
if (maxDegreeOfParallelism > 0)
{
o.SetMaxParallelism(maxDegreeOfParallelism);
}
if (maxNumberOfWorkers > 0)
{
o.SetNumberOfWorkers(maxNumberOfWorkers);
}
})
.Timeouts(t => { t.StoreInSqlServer(dcMessengerConnectionString, "Timeouts"); })
.Start();
SagaData class:
public class RouteListSagaData : ISagaData
{
public Guid Id { get; set; }
public int Revision { get; set; }
private readonly IList<LisaShippingActivity> _shippingActivities = new List<LisaShippingActivity>();
public long RoutePlanId { get; set; }
public IEnumerable<LisaShippingActivity> ShippingActivities => _shippingActivities;
public bool SentToLisa { get; set; }
public void AddShippingActivity(LisaShippingActivity shippingActivity)
{
if (!_shippingActivities.Any(x => x.Equals(shippingActivity)))
{
_shippingActivities.Add(shippingActivity);
}
}
public IEnumerable<LisaShippingActivity> GroupShippingActivitiesToLisaActivities() => LisaShippingActivity.GroupedByRouteIdAndAddress(ShippingActivities);
}
CorrelateMessages method
protected override void CorrelateMessages(ICorrelationConfig<RouteListSagaData> config)
{
config.Correlate<ShippingOrder>(x => x.RoutePlanId, y => y.RoutePlanId);
config.Correlate<VerifyRouteListIsComplete>(x => x.RoutePlanId, y => y.RoutePlanId);
}
Handle for the message that supose to initiate the Saga and send the DefferedMessage if saga IsNew
public async Task Handle(ShippingOrder message)
{
try
{
var lisaActivity = message.AsLisaShippingActivity(_commissionerUserName);
if (Data.ShippingActivities.Contains(lisaActivity))
return;
Data.RoutePlanId = message.RoutePlanId;
Data.AddShippingActivity(lisaActivity);
var delay = TimeSpan.FromSeconds(_lisaDelayedMessageTime != 0 ? _lisaDelayedMessageTime : 60);
if (IsNew)
{
await _serviceBus.DeferLocal(delay, new VerifyRouteListIsComplete(message.RoutePlanId), _environment);
}
}
catch (Exception err)
{
Serilog.Log.Logger.Error(err, "[{SagaName}] - Error while executing Route List Saga", nameof(RouteListSaga));
throw;
}
}
And, finally, the handler for the deffered message:
public Task Handle(VerifyRouteListIsComplete message)
{
try
{
if (!Data.SentToLisa)
{
var lisaData = Data.GroupShippingActivitiesToLisaActivities();
_lisaService.SyncRouteList(lisaData).Wait();
Data.SentToLisa = true;
}
MarkAsComplete();
return Task.CompletedTask;
}
catch (Exception err)
{
Serilog.Log.Error(err, "[{SagaName}] - Error sending message to LisaApp. RouteId: {RouteId}", nameof(RouteListSaga), message.RoutePlanId);
_serviceBus.DeferLocal(TimeSpan.FromSeconds(5), message, _configuration.GetSection("AppSettings")["Environment"]).Wait();
MarkAsUnchanged();
return Task.CompletedTask;
}
}
Any help is appreciated!
I am not sure I understand the symptoms, you're experiencing, correctly.
If I send two messages "at the same time" to the first handler, each one comes and even with properties that correlate that messages, the IsNew property not changes after the first message processed
If EnforceExclusiveAccess is called, I would expect the messages to be handled in a serial fashion, the first one with IsNew == true and the second one with IsNew == false.
If not, I would expect both messages to be handled in parallel with IsNew == true, but then – when the sage data is inserted – I would expect one of them to succeed and the other one to fail with a ConcurrencyException.
After the ConcurrencyException, the message would be processed again, this time with IsNew == false.
Is that not what you're experiencing?
In the second handler, I wish to access all data related to those Saga, but I can't because the data seems to be the data as it was in the revision of those messages was deferred.
Are you saying that the data in the saga data seems to be in the state that it was in when the VerifyRouteListIsComplete message was deferred?
That sounds really weird, and also pretty unlikely 🙂could you maybe try again and see if it really is so?
UPDATE: I have found out why you are experiencing this weird behavior: You have accidentally set up your saga handler instance to be re-used across messages.
You did it by registering it like this (WARNING: Don't do this!):
_sagaHandler = new ShippingOrderSagaHandler(_subscriber);
_subscriber.Subscribe<ShippingOrderMessage>(_sagaHandler);
_subscriber.Subscribe<VerifyRoutePlanIsComplete>(_sagaHandler);
where the Subscribe method then makes this call on BuiltinHandlerActivator (WARNING: Don't do this!):
activator.Register(() => handlerInstance);
This reason why this is bad (especially for a saga handler), is because the handler instance itself is stateful – it has a Data property containing the current state of the process, and that also includes the IsNew property.
What you should ALWAYS DO, is to ensure that a new handler instance is created every time a message comes in – your code should be changed to something like this:
_subscriber.Subscribe<ShippingOrderMessage>(() => new ShippingOrderSagaHandler(_subscriber)).Wait();
_subscriber.Subscribe<VerifyRoutePlanIsComplete>(() => new ShippingOrderSagaHandler(_subscriber)).Wait();
which can be done if the implementation of Subscribe is changed into this:
public async Task Subscribe<T>(Func<IHandleMessages<T>> getHandler)
{
_activator.Register((bus, context) => getHandler());
await _activator.Bus.Subscribe<T>();
}
That will solve your exclusive access problem :)
There's another problem with your code: You have a potential race condition between registering your handler and starting the subscriber bus instance, because you could in theory be unfortunate and start receiving messages in between the bus gets started and you register your handler.
You should change your code to ensure that all handlers are registered before you start the bus (and thus start receiving messages).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With