02 June, 2013

Transaction synchronization callbacks in Spring Framework

Declarative transaction management simplifies considerably the development. Sometimes you want a little more control over transaction but at the same time you don't want to switch to programmatic transaction management. I'm talking about cases when you want to execute some activities only when transaction was completed successfully. For example: send an email with registration details, update a cache, send a message over network, etc. The tricky part here is that you want to perform these activities from a transactional method (a method marked with @Transactional which automatically will start and end the transaction).

Now let's see how we can achieve this in Spring Framework. TransactionSynchronizationManager is the central helper class that manages resources and transaction synchronizations per thread. It allows to register transaction synchronizations for the current thread if synchronization is active. TransactionSynchronization represents the actual transaction synchronization callback with methods like: afterCommit(), afterCompletion(int status), beforeCommit(boolean readOnly), beforeCompletion(), etc. In order to make things easier for use, we will create an after commit executor which will allow us to execute the code only after a successful commit.

Interface

import java.util.concurrent.Executor;

public interface AfterCommitExecutor extends Executor {}

Implementation

import java.util.ArrayList;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;

@Component
public class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter 
    implements AfterCommitExecutor {
    private static final Logger LOGGER = LoggerFactory.getLogger(
        AfterCommitExecutorImpl.class);
    private static final ThreadLocal<List<Runnable>> 
        RUNNABLES = new ThreadLocal<List<Runnable>>();

    @Override
    public void execute(Runnable runnable) {
        LOGGER.info("New runnable {}", runnable);
        if (!TransactionSynchronizationManager.isSynchronizationActive()) {
            LOGGER.info("Transaction is NOT ACTIVE. Executing right now {}",
                runnable);
            runnable.run();
            return;
        }
        List<Runnable> threadRunnables = RUNNABLES.get();
        if (threadRunnables == null) {
            threadRunnables = new ArrayList<Runnable>();
            RUNNABLES.set(threadRunnables);
            TransactionSynchronizationManager.registerSynchronization(this);
        }
        threadRunnables.add(runnable);
    }

    @Override
    public void afterCommit() {
        List<Runnable> threadRunnables = RUNNABLES.get();
        LOGGER.info("Transaction successfully committed, executing {} runnables",
            threadRunnables.size());
        for (int i = 0; i < threadRunnables.size(); i++) {
            Runnable runnable = threadRunnables.get(i);
            LOGGER.info("Executing {}", runnable);
            try {
                runnable.run();
            } catch (RuntimeException e) {
                LOGGER.error("Failed to execute {}", runnable, e);
            }
        }
    }

    @Override
    public void afterCompletion(int status) {
        LOGGER.info("Transaction completed with status {}",
            status == STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK");
        RUNNABLES.remove();
    }
}

How it works (the most important lines are highlighted)

  1. Along with being an Executor, this class extends also from TransactionSynchronizationAdapter which is a simple TransactionSynchronization adapter containing empty method implementations, for easier overriding of single methods. Being a TransactionSynchronization allows us to register it for transaction synchronization in TransactionSynchronizationManager.
  2. When execute(Runnable runnable) method gets called, we check if synchronization is active for current thread. If not, then the runnable is executed immediately. Otherwise we store the submitted runnable in a ThreadLocal variable. If this is the first time when a runnable is submitted by current thread then we register our self for transaction synchronization for current thread (you cannot register if synchronization is not active).
  3. Because we registered for transaction synchronization, the afterCommit() method will be invoked any time a transaction completed successfully. At this point we get all the submitted runnables for the thread that completed transaction successfully and execute them.
    NOTE: The transaction will have been committed already, but the transactional resources might still be active and accessible. As a consequence, any data access code triggered at this point will still "participate" in the original transaction (with no commit following anymore!), unless it explicitly declares that it needs to run in a separate transaction. Hence: Use PROPAGATION_REQUIRES_NEW for any transactional operation that is called from your runnable. You can read more here.
  4. Inside afterCompletion(int status) method we clean up the ThreadLocal variable for thread that just completed a transaction.

How to use it

afterCommitExecutor.execute(new Runnable() {
    @Override
    public void run() {
        // code that will run after commit goes here
    }
});

More simpler. Custom annotation and AOP.

As you can see the usage looks standard, but we can make it more simpler by creating a custom annotation and an aspect.

Annotation

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface AfterCommit {}

Aspect

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Aspect
@Component
public class AfterCommitAnnotationAspect {
    private static final Logger LOGGER = LoggerFactory.getLogger(
        AfterCommitAnnotationAspect.class);

    private final AfterCommitExecutor afterCommitExecutor;

    @Autowired
    public AfterCommitAnnotationAspect(AfterCommitExecutor afterCommitExecutor) {
        this.afterCommitExecutor = afterCommitExecutor;
    }

    @Around(value = "@annotation(org.zmeu.blog.spring.transaction.AfterCommit)",
        argNames = "pjp")
    public Object aroundAdvice(final ProceedingJoinPoint pjp) {
        afterCommitExecutor.execute(new PjpAfterCommitRunnable(pjp));
        return null;
    }

    private static final class PjpAfterCommitRunnable implements Runnable {
        private final ProceedingJoinPoint pjp;

        public PjpAfterCommitRunnable(ProceedingJoinPoint pjp) {
            this.pjp = pjp;
        }

        @Override
        public void run() {
            try {
                pjp.proceed();
            } catch (Throwable e) {
                LOGGER.error("Exception while invoking pjp.proceed()", e);
                throw new RuntimeException(e);
            }
        }

        @Override
        public String toString() {
            String typeName = pjp.getTarget().getClass().getSimpleName();
            String methodName = pjp.getSignature().getName();
            return "PjpAfterCommitRunnable[type=" + typeName + ",
                method=" + methodName + "]";
        }
    }
}

The aspect defines an around advice with a pointcut that matches all the methods annotated with our custom @AfterCommit annotation. Be aware of the fully qualified name of annotation org.zmeu.blog.spring.transaction.AfterCommit. Fix the package name if your annotation resides in a different package. The aspect logic is very simple, it just wraps the annotated method inside a Runnable and submits it for execution to our AfterCommitExecutor.

You can download entire maven project will all the sources (including examples).

Example

Suppose we have a system with the following services:

  • EmailSender which provides API for sending emails.
  • UserDao which provides API for persisting/updating/reading users into/from DB.
  • UserCache which provides API for caching users. Because cached data should be in sync with DB data, all the service methods are annotated with @AfterCommit.
  • UserService which provides API for manipulating with users. A typical createUser() method may look like this:
  • import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Transactional;
    import org.zmeu.blog.spring.transaction.AfterCommitExecutor;
    
    @Service
    public class UserServiceImpl implements UserService {
        private static final Logger LOGGER = LoggerFactory.getLogger(
            UserServiceImpl.class);
    
        @Autowired
        private UserDao userDao;
        @Autowired
        private UserCache userCache;
        @Autowired
        private EmailSender emailSender;
        @Autowired
        private AfterCommitExecutor afterCommitExecutor;
    
        @Override
        @Transactional
        public User createUser(final String email, final String name) {
            LOGGER.info("Creating new user with email={} and name={}",
                email, name);
    
            User user = new User(email, name);
    
            // user will be added to cache after commit, annotation way
            userCache.add(user);
    
            // email will be sent after commit, programmatic way
            afterCommitExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    emailSender.sendEmail(email, "Welcome", "Welcome aboard");
                }
            });
    
            userDao.persist(user);
    
            return user;
        }
    }
    

As you see the createUser() method is transactional (first highlighted line). I moved the userDao.persist(user) line to the end in order to be able to simulate a transaction failure (usually in real cases when you use some ORM framework like Hibernate, the actual DB access will happen after the transactional method finishes, when the session is flushed and the transaction completes, and at that time the things may go wrong and the transaction may fail). So we have 2 after commit actions that should be executed, one submitted using the @AfterCommit annotation and the second one programmatically (highlighted lines).

Now it's time for a test:

import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.zmeu.blog.spring.transaction.configuration.SpringConfiguration;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = SpringConfiguration.class)
public class UserServiceImplTest {

    @Autowired
    private UserService userService;

    @BeforeClass
    public static void beforeClass() throws ClassNotFoundException {
        Class.forName("org.hsqldb.jdbcDriver");
    }

    @Test
    public void createUser() {
        userService.createUser("test_email", "test_name");
    }

    @Test
    public void createUserFailure() {
        userService.createUser(null, "test_name");
    }

}

Results of successful scenario:

UserServiceImpl - Creating new user with email=test_email and name=test_name
AfterCommitExecutorImpl - New runnable PjpAfterCommitRunnable[type=UserCacheImpl, method=add]
AfterCommitExecutorImpl - New runnable UserServiceImpl$1@687ea9
UserDaoImpl - Persisting User[email=test_email, name=test_name]
AfterCommitExecutorImpl - Transaction successfully committed, executing 2 runnables
AfterCommitExecutorImpl - Executing PjpAfterCommitRunnable[type=UserCacheImpl, method=add]
UserCacheImpl - Adding User[email=test_email, name=test_name] to cache
AfterCommitExecutorImpl - Executing UserServiceImpl$1@418bbf55
EmailSenderImpl - Sending email to test_email with subject=Welcome and text=Welcome aboard
AfterCommitExecutorImpl - Transaction completed with status COMMITTED

Results of failure scenario:

UserServiceImpl - Creating new user with email=null and name=test_name
AfterCommitExecutorImpl - New runnable PjpAfterCommitRunnable[type=UserCacheImpl, method=add]
AfterCommitExecutorImpl - New runnable UserServiceImpl$1@687ea9
UserDaoImpl - Persisting User[email=null, name=test_name]
AfterCommitExecutorImpl - Transaction completed with status ROLLED_BACK

Happy synchronization!

15 comments:

  1. You made my day today. Thank you very much. You don't know how much time I have wasted to figure out the solution.

    Thank you very much !

    ReplyDelete
  2. Thank you for pointing me to the important note in the TransactionSynchronization.afterCommit() javadoc. Solved my problem.

    ReplyDelete
  3. I am using @Transactional in my service layer everthing is working fine by changes are not being reflected in DB.

    ReplyDelete
    Replies
    1. Check whether you have configured Spring properly in your project. Download the project sources and see org.zmeu.blog.spring.transaction.configuration.SpringConfiguration. There are 2 important things there:
      1. @EnableTransactionManagement
      2. PlatformTransactionManager bean defined in the context.

      Delete
  4. Excellent trick. Why is ThreadLocal needed here? It can be just a List to hold Runnables? Is ThreadLocal required?

    ReplyDelete
    Replies
    1. If your application is a single thread application then yes, you can replace ThreadLocal> with List. But if you are gonna use the AfterCommitExecutor from multiple threads then ThreadLocal allows you to keep the runnables that have to be executed for each thread after a successfull transaction.

      Delete
  5. Nice post.

    Is there a way to get more information about the transaction, ie what method was called and the object(s) that were persisted. For example, if a user was created, send an email to that user.

    ReplyDelete
  6. Very helpful, thanks! I'm using this approach to publish domain events via string integration.

    ReplyDelete
  7. Very helpful article. Thanks very much

    ReplyDelete
  8. Its a very nice article. Can some one help me in acheiving same using CDI transaction managmenet, without springs

    ReplyDelete
  9. I have a Question!! My @Transactional function calls another function whenever there is a rollback... But the problem is that after the function is called and gets executed line by line. The Changes are not getting saved (using .save method) in Database

    ReplyDelete
  10. Hi Andrei Zagorneanu,

    Good article. Thanks for posting.

    I have a situation where lets say I have two services
    AccountService
    CustomerService
    Both annotated with @Transactional with some createXXX() methods.
    Now from my AccountController lets say I have a method where I would first call createAccount()
    and once I get back the Account-ID from createAccount() I will then call createCustomer() (To create a Customer and also update Account-ID from Account on the Customer.)
    But for some reason if createCustomer() fails I will be left with a polluted state where Account has already been persisted but no Customer created.
    Is TransactionSynchronizationManager relevant here Or should I handle this differently?

    Thanks,
    Bala

    ReplyDelete