From Multi-Threaded applications to Multi-Time applications

At the DDD Europe conference..

Recently, I visited the DDD Europe conference, where Vaughn Vernon held a talk on using Akka an the Actor model to implement applications, based on the principles of Domain Driven Design. This talk, combined with some recent issues I've been facing at my customers site, inspired me to think about the design issues you face when developing multi-time applications. But let's start with what Vernon was talking about at his talk.

Vernons talk

His main premise: the Actor model is a more simplified model for application development, while at the same time providing a thread-safe environment to fully make use of the ever increasing number of cores processors have. While CPUs don't get any faster, in terms of pure clock speed, they do get more cores, and the we need to adapt our software to make use of these multiple cores. Traditional programming models don't scale nicely because of IO waits causing the software to block and the CPU being idle, which is one of the reasons behind the advent of Node.JS type applications.

When translating the Actor model to DDD, one gets a quite simply (oversimplified?) application model:

Every command is sent as a message to the aggregate to be processed. The processing of the command in the aggregate itself may result in domain events being published. As messages are published to the aggregate's mailbox, and processing of these messages runs on an isolated thread, there are no concurrency issues and no waits for the sender of the Command Messages.

The scalability of this model in terms of how fast command messages can be processed is limited of course by the number of messages processed by a single aggregate instance, as that is the lower-most level on which the mailbox can be defined. Given that the number of messages on a single instance will probably not get that high, this should be more than enough for now. So, in this setup, the Actor model combined solves the multi-threaded realisation of applications, as all state, including state in the database is managed by the aggregate.

The main premise was that in this setup we can increase throughput messages and thus reduce latency between sending of the message or emitting the event and the processing message/event as the execution can run completely parallel on a separate thread on a dedicated core.

Time travel in concurrent applications

Introducing.. relative time.... Recently, we had an issue while testing some new code on the application I'm working on. We had created an Eventbus (based on JMS) to process Domain Events which are emitted by our application. As the application runs on a clustered (distributed) setup, JMS events can be processed on any of the multiple nodes the application runs on. We had the following logic implemented (omitting all irrelevant details):
  1. A new aggregate is created and added to the Repository 
  2. An event is emitted to signal the occurrence of the new aggregate being created
  3. A listener for this retrieved the created aggregate by Identity from the Repository to do something with that aggregate.
While testing this, we ran into a, at first strange, issue: step 3 resulted in an exception, the handling of the event rolling back to the dead-letter queue of the JMS Queue, because the new aggregate could not be found in the Repository. To add some technical details:
  • The repository is database-based, using an Oracle DB as its datastore
  • The database and JMS transactions are linked using XA, so both commit at the same time
  • In the XA configuration, the database transaction was given precedence over JMS, so even a slight delay in the transaction commit would still mean the database transaction was committed first.
Still, the aggregate could not be found... When checking the database, the aggregate was in fact there. After some small digging, we found this only happened on a multi-cluster setup where step 3 was executed on another node. Also, retrieval of the aggregate included a time component in the query (validityDate >= :myCurrentDate): our data has a validity component which I think often occurs in financial applications or anything to do with actual humans (i.e. business software). And finally the myCurrentDate parameter was determined using Java's System.currentTimeMillis(). 

Ah.... now we're getting somewhere... We found, due to some configuration issue, that the time between the two servers differed, and differed quite a bit: just under a second. Despite this being a configuration issue, this got us thinking about relative time on multiple servers. To make this a bit more graphical, here's the situation (in the picture things work out fine):
The event is emitted first on Server 2. Then processed on Server 1, which retrieves the data as indicated on his own time to do some additional processing on. The two main parameters which control if this error occurs are Δd and Δt as indicated in the picture. As long as Δt >= Δd, this setup works:
  • Once the time difference (Δd) between the two servers becomes too large (as was our case), this will fail. This can be alleviated using stuff like time-synchronization (e.g. NTP), to a lower bound.
  • Once processing latency (the time it takes for Server 1 to pick up the event after publication) becomes increasingly smaller, and drops below the lower bound guaranteed by NTP, this error will start to occur more and more often.
Given where we started of, the Actor model, massively parallel processors and the increasing need for WebScale applications running on clustered setups, these kinds of problems may occur more often.

Towards solutions

Of course, errors occurring more and more often is not an acceptable situation, so we thought about a few possible solutions:

Solution 1: Pause and retry..

The most simple one, albeit also the most ugly one is a simpel retry mechanism: in case an error occurs, just wait for a bit and try again. However, this setup becomes ugly very fast. Pausing processing effectively means locking up a whole thread, reducing the throughput of my system. One might design a setup where the processing of an event is delayed asynchronously and such, but this can get very messy very quickly when you start to think about events which have an inherent order to them.

Solution 2: Use event.occuredAt

The other obvious solutions is to look at the way the ":myCurrentDate" parameter is determined. This is currently done using System.currentTimeMillis(), however we could also pass as part of the domain event the date/time at which the event occurred at the originating server (server2 in this case). I.e. the original server uses System.currentTimeMillis() to determine the current date, and packs that inside the event being sent. 

Let's explore what this means. Reading of data is handled correctly: we can guarantee the event date/time is equal to or larger than the validity date/time, so we can retrieve the data. Problem solved? Well, not quite so fast, as we also have to consider writing data afterwards.

Suppose we are writing data (which may also have a validity component in it), we should probably also use the same date as was passed inside the event parameter. However, if we do, let's consider the following scenarios:
  • The logic we're executing may involve some call to another system. This takes time to execute, however if we still use the events date to write data, we would effectively ignore the passage of time happening during our execution, all changes would happen at exactly the same time. We could design our way out of this by introducing some relative clock (i.e. using the new Java 8 Clock), but it gets more complicated in this case already.
  • Suppose the logic we're executing in the event handler causes new events to be triggered. What would be the date/time at which the new event occurred? 
    • At System.currentTimeMillis() of the running system? This may imply the new event happened before the original event causing it. Weird...
    • At originalEvent.occuredAt? This implies both events happened at the exact same time, weird again..
    • Using our relative Clock? Then the events would happen in the correct order with the originating events (and any other events originating from the same server), but not with events running on our own server. Which is weird again...

Solution 3: One time to rule them all?

A final solution would be to require all events and command date/time to be created by a single server, which kinda acts like the single source of truth for time. The events in that sense could be viewed as a sort of Aggregate Actor itself towards which Command messages are sent to publish an event to the rest of the system, where the (single-threaded, single machine) Actor is the source for date/time and the whole rest of the system must adhere to this date/time.

Note: asking for time from a single source of truth doesn't help, as this is effectively the same as what NTP does, and you'll be facing and trying to solve the same issues the NTP people already solved.

While this may work, I think there are a number of issues with this setup:
  • The Event/Command publishing Actor may become a single bottleneck in the system. Scaling this over multiple servers is kinda pointless as it introduces the same problem outlined above again. Thereby limiting the scalability of the system we tried to increase in the first place.
  • It feels kinda weird to have to go through all this hassle, the solution seems to be quite complex, which to me is often an indication of the fact that I'm not seeing the more simple solution yet.
  • Apart from that, I don't want to start to think about all the Java frameworks who use System.currentTimeMillis() internally (think Log4j) which would need to be adapted to use the time centrally determined.

Closing words

In the end, it seems to me the ever increasing parallelism in CPUs, and the changes we make in our programming model to utilise these possibilities may introduce a fresh new set of problems when we transition from thinking about multi-threaded applications to multi-time applications.

I haven't found a solution yet to this problem - don't think any of those outlined above is really elegant - which is what triggered me thinking when I heard Vernon talk about the actor model and how it solves the multi-thread issue elegantly by providing a programming model where there are no multi-thread (even though at runtime there are). I wonder if we can think of a programming model incorporating time, which solves the concurrent problem elegantly by making us think, there is no relative time.



Comments

  1. I wanted to quote a response I got from my co-worker Danny van Heumen he sent to me via mail and the responses:

    > I am amazed how you write this text without using the term "distributed systems"! This is a typical distributed
    > systems problem and you have completely avoided the term. Leslie Lamport has, I believe one thing and
    > another done research on time distributed systems. (You can find quite a lot if you're looking for "distributed
    > time" and "time in distributed systems".)

    True, indeed (lol), I've added the term somewhere in the text now. There is a mention of a clustered setup somewhere in the text, but indeed "distributed" would've been more correct. This has to do with the figment of my imagination about the parallel between multi-threaded and multi-time applications. We started of with building multi-threaded applications by just using threads and synchronisation mechanisms like Dijkstra's semaphores. Now, we're at a stage where - if you apply e.g. the actor model - you can design away all the complexity part of a multi-threaded setup. Frameworks like vert.x allow for a programming model where execution seems like a single-threaded application where it is multi-threaded in practice. This got me thinking, based on the case I faced, how we can come up with a design to alleviate the time issue altogether.

    > Did you know currentTimeMillis () is not guaranteed monotonic? For monotonic time you use
    > System.nanoTime ().

    No didn't know that, haven't read its javadoc for some time now ;-).

    > Another solution is obviously designing so you're not dependent on time. There are substitutes for time as an
    > artificial monotonically increasing index you are sending along with messages.

    True, but to me this is the same as my described solution 3, a centrally defined time over all systems in the distributed setup. Unless you are able to create such an index in a distributed way, but I cannot think of a way to do that. In that case, scalability again depends on that central system, which I was trying to avoid. Also, as for using a monotonic index, in practice most systems require the actual date/time being stored at some point due to the annoying fact we have users. Most humans I know don't cope well with indexes for time ;-).

    > What not completely obvious from the article is why you're so dependent on the time in that query.

    True, didn't mention that. The situation I based this blog on is from a real-life situation of the system I currently work on. The systems database is also tied to other systems (shared database setup, unfortunately), so I cannot easily change the tables in the system. Those tables are based on date/time field, not on indexes and I can't change that. And that's apart from the fact that in the end, my users need to see the actual date/time, not an index.

    ReplyDelete

Post a Comment

Popular posts from this blog

Validations across micro-services

Bridging the world of Event Storming and BPMN