This content originally appeared on DEV Community and was authored by Emily Fortuna
Morning! The compensating action pattern is a mouthful to say (and some call it the compensating transaction pattern), but it’s easy to digest. In distributed systems, we are often fighting against data consistency issues. Though microservices improve scalability, testability, and individual team code velocity, they introduce a new problem: basic instruction sequences now require communication between services and can leave the entire system in a bad state on failure. Much like the chaos of family members individually making cereal for breakfast, some operations that span services needed to be completed as a single unit, similarly to database transactions. Otherwise, one person may pour out cereal only to discover their kid used up the last of the milk. With multiple breakfast-makers–er, microservices–it is impossible to guarantee all steps in a cross-service multi-step operation (cereal, then milk) will run without any hiccups.
The compensating action pattern provides transaction-like guarantees for a sequence of operations amid this distributed systems chaos. The pattern simulates a transaction by either letting the sequence of operations successfully run to completion or, in the face of failure, undoing state changes that the executed operations made, as if “it never happened.” This sequence of steps of forward progress or undoing is called a “long-running transaction”. It’s important to note that undoing the state changes is different from rolling the database back to a previous snapshotted state. This is because while executing the operations of my own long-running transaction, another service or transaction could also have successfully modified the database and this state would be lost in the face of a rollback. Deciding the way to compensate a given operation depends on the specific scenario. Handling a money transfer? Probably best to put the funds back where they started. Shipping issue? Maybe just unreserve the inventory. Because the manner of compensation depends on the particular needs of the application, you, the developer, need to define it in your program.
In summary, compensating actions (sometimes called compensating transactions or compensating activities in Temporal parlance) are a design pattern for handling failure amongst distributed, but related services. If you have:
- A series of operations that span more than one service and must be completed as a unit (“all or nothing”, or transaction-like semantics), and
- Some operations must be undone if the entire transaction fails to complete
Then the compensating action pattern can ensure you maintain a consistent state in the event of failure!
Compensating actions are an important component of the famous saga design pattern. Compensating actions ensure that there is a way to “go backwards” (undo) and end up with a consistent state. Sagas, a design pattern for failure resilience, use compensations, but also ensure there is a way to “go forwards” (by retrying) and maintain that an entire sequence of operations acts like a single transaction.
Coding up Compensating Actions
A Quick Note on Ordering of Compensations
Suppose the steps and compensations for the cereal analogy looked like this:
getBowl()
addCereal()
addMilk()
A naive way of coding up the steps and compensations might look like this:
// DON'T DO THIS
try {
getBowl()
addCompensation(putBowlAway)
addCereal()
addCompensation(putCerealBackInBox)
addMilk()
} catch (Exception e) {
compensate()
throw e
}
Suppose getBowl
fails. If it fails because there are no bowls in the cabinet, the above code looks correct. However, suppose while “executing” getBowl
you take a bowl out of the cabinet and then get distracted by what’s happening on Bluesky…for the entire morning. Your “breakfast workflow” times out (and therefore getBowl
“fails”) because now it’s lunch. Well, now we have a problem because we jump to execute compensate
and there are no compensations registered, but our bowl is out on the kitchen counter! This sort of scenario can happen if an Activity times out (in Temporal specifics, ScheduleToCloseTimeout
StartToCloseTimeout
and HeartbeatTimeout
can set such limits) or perhaps crashes after the thing that you’d want to compensate for (like acquiring the bowl) has been executed. In Temporal scenarios, this can also happen if the user cancels the Activity after the thing we want to compensate for has already executed. All these issues* can be solved by simply reordering your steps in the following way:
try {
addCompensation(putBowlAway)
getBowl()
addCompensation(putCerealBackInBox)
addCereal()
addMilk()
} catch (Exception e) {
compensate()
throw e
}
Easy-peasy.
On to the Code!
If you’re writing Java, you can leverage Temporal’s Saga library to register functions you’d like to run as compensations, and Temporal will do the rest:
// Inside your Workflow class
// You can set parallel compensations if appropriate with the Builder
Saga saga = new Saga(new Saga.Options.Builder().build());
try {
saga.addCompensation(breakfastActivity::putBowlAway);
breakfastActivity.getBowl();
saga.addCompensation(breakfastActivity::putCerealBackInBox);
breakfastActivity.addCereal();
breakfastActivity.addMilk();
} catch (ActivityFailure e) {
saga.compensate();
throw e;
}
In the event of a failure during addCereal()
or addMilk()
, the Saga class will call compensate()
, and all registered compensation functions will execute (putBowlAway()
and putCerealBackInBox()
). The full repository is available on GitHub.
If you’re working with TypeScript, Python, or Go, you’ll need to keep track of the compensations yourself, but it’s not challenging. All you need to do is keep track of a list of compensating functions, and then execute them when there’s a failure.
Here’s the Python version:
class Compensations:
def __init__(self, parallel_compensations=False):
self.parallel_compensations = parallel_compensations
self.compensations = []
def add(self, function: typing.Callable[..., typing.Awaitable[None]]):
self.compensations.append(function)
def __iadd__(self, function: typing.Callable[..., typing.Awaitable[None]]):
self.add(function)
return self
async def compensate(self):
async def run_compensation(
compensation: typing.Callable[..., typing.Awaitable[None]]
) -> None:
try:
await workflow.execute_activity(
compensation,
start_to_close_timeout=time_delta,
retry_policy=common_retry_policy,
)
except:
workflow.logger.exception("failed to compensate")
if self.parallel_compensations:
all_compensations = [run_compensation(c) for c in self.compensations]
await asyncio.gather(*all_compensations)
else:
for f in reversed(self.compensations):
await run_compensation(f)
Then, the registering your custom compensations part will look very similar to the Java example:
@workflow.defn
class BreakfastWorkflow:
@workflow.run
async def run(self, parallel_compensations) -> None:
compensations = Compensations(parallel_compensations=parallel_compensations)
try:
compensations += put_bowl_away
await workflow.execute_activity(
get_bowl,
start_to_close_timeout=time_delta,
retry_policy=common_retry_policy,
)
compensations += put_cereal_back_in_box
await workflow.execute_activity(
add_cereal,
start_to_close_timeout=time_delta,
retry_policy=common_retry_policy,
)
await workflow.execute_activity(
add_milk,
start_to_close_timeout=time_delta,
retry_policy=common_retry_policy,
)
except Exception:
task = asyncio.create_task(compensations.compensate())
# Ensure the compensations run in the face of cancelation.
await asyncio.shield(task)
raise
The complete repository is up on GitHub.
Here’s the TypeScript version:
type Compensation = () => Promise<void>
async function compensate(compensations: Compensation[], compensateInParallel = false) {
if (compensateInParallel) {
compensations.map(comp => comp().catch(err => console.error(`failed to compensate: $error`)))
}
for (const comp of compensations) {
try {
await comp()
} catch (err) {
console.error(`failed to compensate: ${err}`)
}
}
}
export async function breakfastWorkflow(compensateInParallel = false): Promise<void> {
const compensations: Compensation[] = []
try {
compensations.unshift(putBowlAway)
await getBowl()
compensations.unshift(putCerealBackInBox)
await addCereal()
await addMilk()
} catch (err) {
await compensate(compensations, compensateInParallel)
throw err
}
}
Complete repository on GitHub.
In Go, we can make use of the defer
keyword to check whether function execution aborted normally or whether compensations need to be run:
type Compensations []any
func (s *Compensations) AddCompensation(activity any) {
*s = append(*s, activity)
}
func (s Compensations) Compensate(ctx workflow.Context, inParallel bool) {
if !inParallel {
for i := len(s) - 1; i >= 0; i-- {
errCompensation := workflow.ExecuteActivity(ctx, s[i]).Get(ctx, nil)
if errCompensation != nil {
workflow.GetLogger(ctx).Error("Executing compensation failed", "Error", errCompensation)
}
}
} else {
selector := workflow.NewSelector(ctx)
for i := 0; i < len(s); i++ {
execution := workflow.ExecuteActivity(ctx, s[i])
selector.AddFuture(execution, func(f workflow.Future) {
if errCompensation := f.Get(ctx, nil); errCompensation != nil {
workflow.GetLogger(ctx).Error("Executing compensation failed", "Error", errCompensation)
}
})
}
for range s {
selector.Select(ctx)
}
}
}
func BreakfastWorkflow(ctx workflow.Context, parallelCompensations bool) (err error) {
// Omitted for brevity: set activity options and retry policy here.
var compensations Compensations
defer func() {
// Defer is at the top so that it is executed regardless of which step might fail.
if err != nil {
// activity failed, and workflow context is canceled
disconnectedCtx, _ := workflow.NewDisconnectedContext(ctx)
compensations.Compensate(disconnectedCtx, parallelCompensations)
}
}()
compensations.AddCompensation(PutBowlAway)
err = workflow.ExecuteActivity(ctx, GetBowl).Get(ctx, nil)
if err != nil {
return err
}
compensations.AddCompensation(PutCerealBackInBox)
err = workflow.ExecuteActivity(ctx, AddCereal).Get(ctx, nil)
if err != nil {
return err
}
err = workflow.ExecuteActivity(ctx, AddMilk).Get(ctx, nil)
return err
}
The full repository for the Go code is available on GitHub.
*The Fine Print
There are a few Temporal-specific configurations or actions you should be aware of that might prevent your compensations from executing as you would like: if you set timeouts or retries on Workflows, your compensations might not get a chance to run before your Workflow times out (so, don’t set these limitations if you want to ensure your compensations run). Additionally, terminate and reset will not allow Workflow code to execute any finally
or defer
statements, so avoid these as well. If you avoid these scenarios, you can ensure your compensations properly run to completion.
Summary
Compensating actions (or compensating transactions) are a distributed systems design pattern for simulating atomic execution of operations distributed across multiple databases. If one of the distributed operations fails, their effects are undone via a compensating action. Compensations are a component of the larger saga design pattern, about which I’ll go into more detail in my next post.
The complete project with all the code mentioned in this post is available on GitHub:
To see one of my colleagues, Dominik Tornow, give an intro to sagas that builds on these ideas, please check out our YouTube video.
This content originally appeared on DEV Community and was authored by Emily Fortuna
Emily Fortuna | Sciencx (2023-05-04T00:28:24+00:00) Compensating Actions, Part of a Complete Breakfast with Sagas. Retrieved from https://www.scien.cx/2023/05/04/compensating-actions-part-of-a-complete-breakfast-with-sagas/
Please log in to upload a file.
There are no updates yet.
Click the Upload button above to add an update.