Multi-Threading and Spring Transactions – DZone Java

As developers, we are used to applying the @Transactional annotation provided by the Spring framework and rely on the mechanisms implemented by the framework for transaction management. But is this enough?

Well, the answer is clear: No.

Spring takes care of all universal transaction management details and provides a consistent programming model for different transaction APIs, but how many people really understand how it behaves in a multi-threading environment? Is it possible to open a transaction and write data in multiple threads? According to some forum statements, the answer is: yes, it is! However, according to the framework, it is not.

Let’s take a step back and think about the EntityManager. The EntityManager works with a session or a cache of objects being managed by it. This means it has a state, and a state sharing between several threads can lead to race conditions; so, rule number one is to use one EntityManager per thread.

As a matter of fact, Spring takes care of keeping the transactional context per thread. Suppose we want to process in parallel a list of objects and store them in the database. We want to group those objects in dedicated chunks and pass each chunk to a processing method in a separate thread. Then, the results processed in each thread should be collected and presented to the user.

I will start with the definition of a service interface responsible for the processing but also having no clue about the parallel processing we would like to implement:

/**
 * Service interface defining the contract for object identifiers processing
 */
public interface ProcessingService {

	/**
	 * Processes the list of objects identified by id and returns a an identifiers
	 * list of the successfully processed objects
	 * 
	 * @param objectIds List of object identifiers
	 * 
	 * @return identifiers list of the successfully processed objects
	 */
	List<Integer> processObjects(List objectIds);
}

The default implementation of this service is based on database storage. However, the example is very simplistic:

/**
 * Service implementation for database related ids processing
 */
@Service("ProcessingDBService")
public class ProcessingDBService implements ProcessingService {
	
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
	
    @Transactional
    @Override
    public List processObjects(List objectIds) {
        // Process and save to DB
		
	logger.info("Running in thread " + Thread.currentThread().getName() + " with object ids " + objectIds.toString());
		
	return objectIds.stream().collect(Collectors.toList());
    }
}

Now, we would like to have the option to run this processing in chunks and parallel processes. In order to keep the code clean and decoupled from its runtime context, we will use the Decorator pattern as follows:

/**
 * Service implementation for parallel chunk processing
 */
@Service
@Primary
@ConditionalOnProperty(prefix = "service", name = "parallel", havingValue = "true")
public class ProcessingServiceParallelRunDecorator implements ProcessingService {

	private ProcessingService delegate;
	
	public ProcessingServiceParallelRunDecorator(ProcessingService delegate) {
		this.delegate = delegate;
	}

	/**
	 * In a real scenario it should be an external configuration
	 */
	private int batchSize = 10;

	@Override
	public List<Integer> processObjects(List objectIds) {
		List<List<Integer>> chuncks = getBatches(objectIds, batchSize);
		List<List<Integer>> processedObjectIds = chuncks.parallelStream().map(delegate::processObjects)
				.collect(Collectors.toList());

		return flatList(processedObjectIds);
	}

	private List<List<Integer>> getBatches(List collection, int batchSize) {
		return IntStream.iterate(0, i -> i < collection.size(), i -> i + batchSize)
				.mapToObj(i -> collection.subList(i, Math.min(i + batchSize, collection.size())))
				.collect(Collectors.toList());
	}

	private List<Integer> flatList(List> listOfLists) {
		return listOfLists.stream().collect(ArrayList::new, List::addAll, List::addAll);
	}

The actual calls are delegated to a processing service implementation, but the Decorator takes care of work distribution across threads and collecting the result. The implementation of the Decorator pattern enables the client code to stay unaware of the actual implementation. It is possible to directly inject the single-threaded version, but also the multi-threaded version without any direct change of the client code.

In order to understand what a client code might look like, let’s take a look at a simple Unit test:

@RunWith( SpringJUnit4ClassRunner.class )
@SpringBootTest(properties = { "service.parallel=false" })
public class ProcessingServiceTest {
	
	@Autowired
	ProcessingService processingService;
	
	ProcessingService processingServiceDecorator;
	
	@Test
	public void shouldRunParallelProcessingUsingDecorator() {
		processingServiceDecorator = new ProcessingServiceParallelRunDecorator(processingService);
		List objectIds = Arrays.asList(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12});
		
		List resultList = processingServiceDecorator.processObjects(objectIds);
		
		Assert.assertEquals(objectIds, resultList);
	}
	
}

The code passes a list of objectIds and runs the Decorator service explicitly created in the test. It is expected that due to the internally configured chunk size to 10, two threads will be processing the data. By checking the logs we can see the following rows:

ProcessingDBService: Running in thread ForkJoinPool.commonPool-worker-3 with object ids [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
ProcessingDBService: Running in thread main with object ids [11, 12]

In exactly two threads, the parallel stream uses the main thread for processing and a second one to distribute the work across them.

One important aspect of this processing is transaction handling. The first 10 elements were processed in one transaction while the last 2 were processed in another one. If you take a look at the ProcessingDBServiceyou will see the public method is annotated with @Transactional annotation. This is how Spring is expected to work: it takes care of holding the transactional context per thread in dedicated ThreadLocal objects and does not support running multiple threads in one transaction.

Error handling was not covered in this article but will be targeted in a subsequent one. One more note is that the dependency injection in the Decorator class is based on constructor injection. In order to be managed by a Spring container, you might need to use the @Qualifier in the constructor.

The code is part of a small project posted on GitHub.

.

Leave a Comment