Declarative transaction management simplifies considerably the development. Sometimes you want a little more control over transaction but at the same time you don't want to switch to programmatic transaction management. I'm talking about cases when you want to execute some activities only when transaction was completed successfully. For example: send an email with registration details, update a cache, send a message over network, etc. The tricky part here is that you want to perform these activities from a transactional method (a method marked with @Transactional
which automatically will start and end the transaction).
Now let's see how we can achieve this in Spring Framework. TransactionSynchronizationManager
is the central helper class that manages resources and transaction synchronizations per thread. It allows to register transaction synchronizations for the current thread if synchronization is active. TransactionSynchronization
represents the actual transaction synchronization callback with methods like: afterCommit(), afterCompletion(int status), beforeCommit(boolean readOnly), beforeCompletion(),
etc. In order to make things easier for use, we will create an after commit executor which will allow us to execute the code only after a successful commit.
Interface
import java.util.concurrent.Executor;
public interface AfterCommitExecutor extends Executor {}
Implementation
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
@Component
public class AfterCommitExecutorImpl extends TransactionSynchronizationAdapter
implements AfterCommitExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(
AfterCommitExecutorImpl.class);
private static final ThreadLocal<List<Runnable>>
RUNNABLES = new ThreadLocal<List<Runnable>>();
@Override
public void execute(Runnable runnable) {
LOGGER.info("New runnable {}", runnable);
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
LOGGER.info("Transaction is NOT ACTIVE. Executing right now {}",
runnable);
runnable.run();
return;
}
List<Runnable> threadRunnables = RUNNABLES.get();
if (threadRunnables == null) {
threadRunnables = new ArrayList<Runnable>();
RUNNABLES.set(threadRunnables);
TransactionSynchronizationManager.registerSynchronization(this);
}
threadRunnables.add(runnable);
}
@Override
public void afterCommit() {
List<Runnable> threadRunnables = RUNNABLES.get();
LOGGER.info("Transaction successfully committed, executing {} runnables",
threadRunnables.size());
for (int i = 0; i < threadRunnables.size(); i++) {
Runnable runnable = threadRunnables.get(i);
LOGGER.info("Executing {}", runnable);
try {
runnable.run();
} catch (RuntimeException e) {
LOGGER.error("Failed to execute {}", runnable, e);
}
}
}
@Override
public void afterCompletion(int status) {
LOGGER.info("Transaction completed with status {}",
status == STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK");
RUNNABLES.remove();
}
}
How it works (the most important lines are highlighted)
- Along with being an
Executor
, this class extends also fromTransactionSynchronizationAdapter
which is a simpleTransactionSynchronization
adapter containing empty method implementations, for easier overriding of single methods. Being aTransactionSynchronization
allows us to register it for transaction synchronization inTransactionSynchronizationManager
. - When
execute(Runnable runnable)
method gets called, we check if synchronization is active for current thread. If not, then the runnable is executed immediately. Otherwise we store the submitted runnable in aThreadLocal
variable. If this is the first time when a runnable is submitted by current thread then we register our self for transaction synchronization for current thread (you cannot register if synchronization is not active). - Because we registered for transaction synchronization, the
afterCommit()
method will be invoked any time a transaction completed successfully. At this point we get all the submitted runnables for the thread that completed transaction successfully and execute them.
NOTE: The transaction will have been committed already, but the transactional resources might still be active and accessible. As a consequence, any data access code triggered at this point will still "participate" in the original transaction (with no commit following anymore!), unless it explicitly declares that it needs to run in a separate transaction. Hence: Use PROPAGATION_REQUIRES_NEW for any transactional operation that is called from your runnable. You can read more here. - Inside
afterCompletion(int status)
method we clean up theThreadLocal
variable for thread that just completed a transaction.
How to use it
afterCommitExecutor.execute(new Runnable() {
@Override
public void run() {
// code that will run after commit goes here
}
});
More simpler. Custom annotation and AOP.
As you can see the usage looks standard, but we can make it more simpler by creating a custom annotation and an aspect.
Annotation
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface AfterCommit {}
Aspect
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Aspect
@Component
public class AfterCommitAnnotationAspect {
private static final Logger LOGGER = LoggerFactory.getLogger(
AfterCommitAnnotationAspect.class);
private final AfterCommitExecutor afterCommitExecutor;
@Autowired
public AfterCommitAnnotationAspect(AfterCommitExecutor afterCommitExecutor) {
this.afterCommitExecutor = afterCommitExecutor;
}
@Around(value = "@annotation(org.zmeu.blog.spring.transaction.AfterCommit)",
argNames = "pjp")
public Object aroundAdvice(final ProceedingJoinPoint pjp) {
afterCommitExecutor.execute(new PjpAfterCommitRunnable(pjp));
return null;
}
private static final class PjpAfterCommitRunnable implements Runnable {
private final ProceedingJoinPoint pjp;
public PjpAfterCommitRunnable(ProceedingJoinPoint pjp) {
this.pjp = pjp;
}
@Override
public void run() {
try {
pjp.proceed();
} catch (Throwable e) {
LOGGER.error("Exception while invoking pjp.proceed()", e);
throw new RuntimeException(e);
}
}
@Override
public String toString() {
String typeName = pjp.getTarget().getClass().getSimpleName();
String methodName = pjp.getSignature().getName();
return "PjpAfterCommitRunnable[type=" + typeName + ",
method=" + methodName + "]";
}
}
}
The aspect defines an around
advice with a pointcut that matches all the methods annotated with our custom @AfterCommit
annotation. Be aware of the fully qualified name of annotation org.zmeu.blog.spring.transaction.AfterCommit
. Fix the package name if your annotation resides in a different package. The aspect logic is very simple, it just wraps the annotated method inside a Runnable
and submits it for execution to our AfterCommitExecutor
.
You can download entire maven project will all the sources (including examples).
Example
Suppose we have a system with the following services:
EmailSender
which provides API for sending emails.UserDao
which provides API for persisting/updating/reading users into/from DB.UserCache
which provides API for caching users. Because cached data should be in sync with DB data, all the service methods are annotated with@AfterCommit
.UserService
which provides API for manipulating with users. A typicalcreateUser()
method may look like this:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.zmeu.blog.spring.transaction.AfterCommitExecutor;
@Service
public class UserServiceImpl implements UserService {
private static final Logger LOGGER = LoggerFactory.getLogger(
UserServiceImpl.class);
@Autowired
private UserDao userDao;
@Autowired
private UserCache userCache;
@Autowired
private EmailSender emailSender;
@Autowired
private AfterCommitExecutor afterCommitExecutor;
@Override
@Transactional
public User createUser(final String email, final String name) {
LOGGER.info("Creating new user with email={} and name={}",
email, name);
User user = new User(email, name);
// user will be added to cache after commit, annotation way
userCache.add(user);
// email will be sent after commit, programmatic way
afterCommitExecutor.execute(new Runnable() {
@Override
public void run() {
emailSender.sendEmail(email, "Welcome", "Welcome aboard");
}
});
userDao.persist(user);
return user;
}
}
As you see the createUser()
method is transactional (first highlighted line). I moved the userDao.persist(user)
line to the end in order to be able to simulate a transaction failure (usually in real cases when you use some ORM framework like Hibernate, the actual DB access will happen after the transactional method finishes, when the session is flushed and the transaction completes, and at that time the things may go wrong and the transaction may fail). So we have 2 after commit actions that should be executed, one submitted using the @AfterCommit
annotation and the second one programmatically (highlighted lines).
Now it's time for a test:
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.zmeu.blog.spring.transaction.configuration.SpringConfiguration;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = SpringConfiguration.class)
public class UserServiceImplTest {
@Autowired
private UserService userService;
@BeforeClass
public static void beforeClass() throws ClassNotFoundException {
Class.forName("org.hsqldb.jdbcDriver");
}
@Test
public void createUser() {
userService.createUser("test_email", "test_name");
}
@Test
public void createUserFailure() {
userService.createUser(null, "test_name");
}
}
Results of successful scenario:
UserServiceImpl - Creating new user with email=test_email and name=test_name
AfterCommitExecutorImpl - New runnable PjpAfterCommitRunnable[type=UserCacheImpl, method=add]
AfterCommitExecutorImpl - New runnable UserServiceImpl$1@687ea9
UserDaoImpl - Persisting User[email=test_email, name=test_name]
AfterCommitExecutorImpl - Transaction successfully committed, executing 2 runnables
AfterCommitExecutorImpl - Executing PjpAfterCommitRunnable[type=UserCacheImpl, method=add]
UserCacheImpl - Adding User[email=test_email, name=test_name] to cache
AfterCommitExecutorImpl - Executing UserServiceImpl$1@418bbf55
EmailSenderImpl - Sending email to test_email with subject=Welcome and text=Welcome aboard
AfterCommitExecutorImpl - Transaction completed with status COMMITTED
Results of failure scenario:
UserServiceImpl - Creating new user with email=null and name=test_name
AfterCommitExecutorImpl - New runnable PjpAfterCommitRunnable[type=UserCacheImpl, method=add]
AfterCommitExecutorImpl - New runnable UserServiceImpl$1@687ea9
UserDaoImpl - Persisting User[email=null, name=test_name]
AfterCommitExecutorImpl - Transaction completed with status ROLLED_BACK
Happy synchronization!
You made my day today. Thank you very much. You don't know how much time I have wasted to figure out the solution.
ReplyDeleteThank you very much !
You are always welcome!
DeleteThank you for pointing me to the important note in the TransactionSynchronization.afterCommit() javadoc. Solved my problem.
ReplyDeleteGood!
DeleteI am using @Transactional in my service layer everthing is working fine by changes are not being reflected in DB.
ReplyDeleteCheck whether you have configured Spring properly in your project. Download the project sources and see org.zmeu.blog.spring.transaction.configuration.SpringConfiguration. There are 2 important things there:
Delete1. @EnableTransactionManagement
2. PlatformTransactionManager bean defined in the context.
Excellent trick. Why is ThreadLocal needed here? It can be just a List to hold Runnables? Is ThreadLocal required?
ReplyDeleteIf your application is a single thread application then yes, you can replace ThreadLocal> with List. But if you are gonna use the AfterCommitExecutor from multiple threads then ThreadLocal allows you to keep the runnables that have to be executed for each thread after a successfull transaction.
DeleteNice post.
ReplyDeleteIs there a way to get more information about the transaction, ie what method was called and the object(s) that were persisted. For example, if a user was created, send an email to that user.
Very helpful, thanks! I'm using this approach to publish domain events via string integration.
ReplyDeleteVery helpful article. Thanks very much
ReplyDeleteThanks! It works perfectly
ReplyDeleteIts a very nice article. Can some one help me in acheiving same using CDI transaction managmenet, without springs
ReplyDeleteI have a Question!! My @Transactional function calls another function whenever there is a rollback... But the problem is that after the function is called and gets executed line by line. The Changes are not getting saved (using .save method) in Database
ReplyDeleteHi Andrei Zagorneanu,
ReplyDeleteGood article. Thanks for posting.
I have a situation where lets say I have two services
AccountService
CustomerService
Both annotated with @Transactional with some createXXX() methods.
Now from my AccountController lets say I have a method where I would first call createAccount()
and once I get back the Account-ID from createAccount() I will then call createCustomer() (To create a Customer and also update Account-ID from Account on the Customer.)
But for some reason if createCustomer() fails I will be left with a polluted state where Account has already been persisted but no Customer created.
Is TransactionSynchronizationManager relevant here Or should I handle this differently?
Thanks,
Bala