02 June, 2013

Transaction synchronization callbacks in Spring Framework

Declarative transaction management simplifies considerably the development. Sometimes you want a little more control over transaction but at the same time you don't want to switch to programmatic transaction management. I'm talking about cases when you want to execute some activities only when transaction was completed successfully. For example: send an email with registration details, update a cache, send a message over network, etc. The tricky part here is that you want to perform these activities from a transactional method (a method marked with @Transactional which automatically will start and end the transaction).

Now let's see how we can achieve this in Spring Framework. TransactionSynchronizationManager is the central helper class that manages resources and transaction synchronizations per thread. It allows to register transaction synchronizations for the current thread if synchronization is active. TransactionSynchronization represents the actual transaction synchronization callback with methods like: afterCommit(), afterCompletion(int status), beforeCommit(boolean readOnly), beforeCompletion(), etc. In order to make things easier for use, we will create an after commit executor which will allow us to execute the code only after a successful commit.

Interface

import java.util.concurrent.Executor;

public interface AfterCommitExecutor extends Executor {}

Implementation

import java.util.ArrayList;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;

@Component
public class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter implements AfterCommitExecutor {
    private static final Logger LOGGER = LoggerFactory.getLogger(AfterCommitExecutorImpl.class);
    private static final ThreadLocal<List<Runnable>> RUNNABLES = new ThreadLocal<List<Runnable>>();

    @Override
    public void execute(Runnable runnable) {
        LOGGER.info("Submitting new runnable {} to run after commit", runnable);
        if (!TransactionSynchronizationManager.isSynchronizationActive()) {
            LOGGER.info("Transaction synchronization is NOT ACTIVE. Executing right now runnable {}", runnable);
            runnable.run();
            return;
        }
        List<Runnable> threadRunnables = RUNNABLES.get();
        if (threadRunnables == null) {
            threadRunnables = new ArrayList<Runnable>();
            RUNNABLES.set(threadRunnables);
            TransactionSynchronizationManager.registerSynchronization(this);
        }
        threadRunnables.add(runnable);
    }

    @Override
    public void afterCommit() {
        List<Runnable> threadRunnables = RUNNABLES.get();
        LOGGER.info("Transaction successfully committed, executing {} runnables", threadRunnables.size());
        for (int i = 0; i < threadRunnables.size(); i++) {
            Runnable runnable = threadRunnables.get(i);
            LOGGER.info("Executing runnable {}", runnable);
            try {
                runnable.run();
            } catch (RuntimeException e) {
                LOGGER.error("Failed to execute runnable " + runnable, e);
            }
        }
    }

    @Override
    public void afterCompletion(int status) {
        LOGGER.info("Transaction completed with status {}", status == STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK");
        RUNNABLES.remove();
    }

}

How it works (the most important lines are highlighted)

  1. Along with being an Executor, this class extends also from TransactionSynchronizationAdapter which is a simple TransactionSynchronization adapter containing empty method implementations, for easier overriding of single methods. Being a TransactionSynchronization allows us to register it for transaction synchronization in TransactionSynchronizationManager.
  2. When execute(Runnable runnable) method gets called, we check if synchronization is active for current thread. If not, then the runnable is executed immediately. Otherwise we store the submitted runnable in a ThreadLocal variable. If this is the first time when a runnable is submitted by current thread then we register our self for transaction synchronization for current thread (you cannot register if synchronization is not active).
  3. Because we registered for transaction synchronization, the afterCommit() method will be invoked any time a transaction completed successfully. At this point we get all the submitted runnables for the thread that completed transaction successfully and execute them.
    NOTE: The transaction will have been committed already, but the transactional resources might still be active and accessible. As a consequence, any data access code triggered at this point will still "participate" in the original transaction (with no commit following anymore!), unless it explicitly declares that it needs to run in a separate transaction. Hence: Use PROPAGATION_REQUIRES_NEW for any transactional operation that is called from your runnable. You can read more here.
  4. Inside afterCompletion(int status) method we clean up the ThreadLocal variable for thread that just completed a transaction.

How to use it

afterCommitExecutor.execute(new Runnable() {
    @Override
    public void run() {
        // code that will run after commit goes here
    }
});

More simpler. Custom annotation and AOP.

As you can see the usage looks standard, but we can make it more simpler by creating a custom annotation and an aspect.

Annotation

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface AfterCommit {}

Aspect

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Aspect
@Component
public class AfterCommitAnnotationAspect {
    private static final Logger LOGGER = LoggerFactory.getLogger(AfterCommitAnnotationAspect.class);

    private final AfterCommitExecutor afterCommitExecutor;

    @Autowired
    public AfterCommitAnnotationAspect(AfterCommitExecutor afterCommitExecutor) {
        this.afterCommitExecutor = afterCommitExecutor;
    }

    @Around(value = "@annotation(org.zmeu.blog.spring.transaction.AfterCommit)", argNames = "pjp")
    public Object aroundAdvice(final ProceedingJoinPoint pjp) {
        afterCommitExecutor.execute(new PjpAfterCommitRunnable(pjp));
        return null;
    }

    private static final class PjpAfterCommitRunnable implements Runnable {
        private final ProceedingJoinPoint pjp;

        public PjpAfterCommitRunnable(ProceedingJoinPoint pjp) {
            this.pjp = pjp;
        }

        @Override
        public void run() {
            try {
                pjp.proceed();
            } catch (Throwable e) {
                LOGGER.error("Exception while invoking pjp.proceed()", e);
                throw new RuntimeException(e);
            }
        }

        @Override
        public String toString() {
            String typeName = pjp.getTarget().getClass().getSimpleName();
            String methodName = pjp.getSignature().getName();
            return "PjpAfterCommitRunnable[type=" + typeName + ", method=" + methodName + "]";
        }
    }

}

The aspect defines an around advice with a pointcut that matches all the methods annotated with our custom @AfterCommit annotation. Be aware of the fully qualified name of annotation org.zmeu.blog.spring.transaction.AfterCommit. Fix the package name if your annotation resides in a different package. The aspect logic is very simple, it just wraps the annotated method inside a Runnable and submits it for execution to our AfterCommitExecutor.

You can download entire maven project will all the sources (including examples).

Example

Suppose we have a system with the following services:

  • EmailSender which provides API for sending emails.
  • UserDao which provides API for persisting/updating/reading users into/from DB.
  • UserCache which provides API for caching users. Because cached data should be in sync with DB data, all the service methods are annotated with @AfterCommit.
  • UserService which provides API for manipulating with users. A typical createUser() method may look like this:
  • import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Transactional;
    import org.zmeu.blog.spring.transaction.AfterCommitExecutor;
    
    @Service
    public class UserServiceImpl implements UserService {
        private static final Logger LOGGER = LoggerFactory.getLogger(UserServiceImpl.class);
    
        @Autowired
        private UserDao userDao;
        @Autowired
        private UserCache userCache;
        @Autowired
        private EmailSender emailSender;
        @Autowired
        private AfterCommitExecutor afterCommitExecutor;
    
        @Override
        @Transactional
        public User createUser(final String email, final String name) {
            LOGGER.info("Creating new user with email={} and name={}", email, name);
    
            User user = new User(email, name);
    
            // user will be added to cache after commit, annotation way
            userCache.add(user);
    
            // email will be sent after commit, programmatic way
            afterCommitExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    emailSender.sendEmail(email, "Welcome", "Welcome aboard dear " + name);
                }
            });
    
            userDao.persist(user);
    
            return user;
        }
    
    }
    

As you see the createUser() method is transactional (first highlighted line). I moved the userDao.persist(user) line to the end in order to be able to simulate a transaction failure (usually in real cases when you use some ORM framework like Hibernate, the actual DB access will happen after the transactional method finishes, when the session is flushed and the transaction completes, and at that time the things may go wrong and the transaction may fail). So we have 2 after commit actions that should be executed, one submitted using the @AfterCommit annotation and the second one programmatically (highlighted lines).

Now it's time for a test:

import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.zmeu.blog.spring.transaction.configuration.SpringConfiguration;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = SpringConfiguration.class)
public class UserServiceImplTest {

    @Autowired
    private UserService userService;

    @BeforeClass
    public static void beforeClass() throws ClassNotFoundException {
        Class.forName("org.hsqldb.jdbcDriver");
    }

    @Test
    public void createUser() {
        userService.createUser("test_email", "test_name");
    }

    @Test
    public void createUserFailure() {
        userService.createUser(null, "test_name");
    }

}

Results of successful scenario:

[main] INFO UserServiceImpl - Creating new user with email=test_email and name=test_name
[main] INFO AfterCommitExecutorImpl - Submitting new runnable PjpAfterCommitRunnable[type=UserCacheImpl, method=add] to run after commit
[main] INFO AfterCommitExecutorImpl - Submitting new runnable org.zmeu.blog.spring.transaction.example.UserServiceImpl$1@418bbf55 to run after commit
[main] INFO UserDaoImpl - Persisting User[email=test_email, name=test_name]
[main] INFO AfterCommitExecutorImpl - Transaction successfully committed, executing 2 runnables
[main] INFO AfterCommitExecutorImpl - Executing runnable PjpAfterCommitRunnable[type=UserCacheImpl, method=add]
[main] INFO UserCacheImpl - Adding User[email=test_email, name=test_name] to cache
[main] INFO AfterCommitExecutorImpl - Executing runnable org.zmeu.blog.spring.transaction.example.UserServiceImpl$1@418bbf55
[main] INFO EmailSenderImpl - Sending email to test_email with subject=Welcome and text=Welcome aboard dear test_name
[main] INFO AfterCommitExecutorImpl - Transaction completed with status COMMITTED

Results of failure scenario:

[main] INFO UserServiceImpl - Creating new user with email=null and name=test_name
[main] INFO AfterCommitExecutorImpl - Submitting new runnable PjpAfterCommitRunnable[type=UserCacheImpl, method=add] to run after commit
[main] INFO AfterCommitExecutorImpl - Submitting new runnable org.zmeu.blog.spring.transaction.example.UserServiceImpl$1@687ea9 to run after commit
[main] INFO UserDaoImpl - Persisting User[email=null, name=test_name]
[main] INFO AfterCommitExecutorImpl - Transaction completed with status ROLLED_BACK

Happy synchronization!

15 comments:

  1. You made my day today. Thank you very much. You don't know how much time I have wasted to figure out the solution.

    Thank you very much !

    ReplyDelete
  2. Thank you for pointing me to the important note in the TransactionSynchronization.afterCommit() javadoc. Solved my problem.

    ReplyDelete
  3. I am using @Transactional in my service layer everthing is working fine by changes are not being reflected in DB.

    ReplyDelete
    Replies
    1. Check whether you have configured Spring properly in your project. Download the project sources and see org.zmeu.blog.spring.transaction.configuration.SpringConfiguration. There are 2 important things there:
      1. @EnableTransactionManagement
      2. PlatformTransactionManager bean defined in the context.

      Delete
  4. Excellent trick. Why is ThreadLocal needed here? It can be just a List to hold Runnables? Is ThreadLocal required?

    ReplyDelete
    Replies
    1. If your application is a single thread application then yes, you can replace ThreadLocal> with List. But if you are gonna use the AfterCommitExecutor from multiple threads then ThreadLocal allows you to keep the runnables that have to be executed for each thread after a successfull transaction.

      Delete
  5. Nice post.

    Is there a way to get more information about the transaction, ie what method was called and the object(s) that were persisted. For example, if a user was created, send an email to that user.

    ReplyDelete
  6. Very helpful, thanks! I'm using this approach to publish domain events via string integration.

    ReplyDelete
  7. Very helpful article. Thanks very much

    ReplyDelete
  8. Its a very nice article. Can some one help me in acheiving same using CDI transaction managmenet, without springs

    ReplyDelete
  9. I have read your blog its very attractive and impressive. I like it your blog.

    Java Training in Chennai Core Java Training in Chennai Core Java Training in Chennai

    ReplyDelete
  10. I have read your blog its very attractive and impressive. I like it your blog.

    Java Online Training Java Online Training JavaEE Training in Chennai Java EE Training in Chennai

    ReplyDelete