Avenue Code Snippets

[JAVA 21] Structured Concurrency: Powering Data Orchestration with Virtual Threads and Scopes

Written by Victor Hugo | 9/4/24 11:00 AM

Today we’re discussing Structured Concurrency in Java, a native solution introduced as a preview feature in JDK 21. We'll explore how it can assist in structuring and managing the complexity of concurrent work, as well as how it can expedite the development process and improve the reliability, maintainability, testability, and observability of our applications.

This article includes a sample code for a real-world scenario solved using Structured Concurrency. The source code is available in this GIT repository.

A practical approach

In this article, we will learn by solving a real-world problem using Structured Concurrency. Let's state the problem first, and in the following sections, we'll explore Java's new features and how to use them to solve our problem.

Imagine we have a BFF (Back-end for Front-end) application. This application communicates with various data providers that have information about the page, aggregates this data, and returns it to the client (FE application).

In a real-world scenario, this application would have additional responsibilities such as applying algorithms to modify/normalize the data, registering tracking information, and more. However, these are not the focus of this article. For simplicity, we are only focusing on returning the aggregated data.

The FE application represents an e-commerce page composed of the following components: Hero, Product Details, Recommendations, and Reviews. A prototype of this page would look like the following:

 

Each of these components is powered by a specific service that handles its data and business logic. To add more complexity to our example, let's assume that the Recommendations component may be powered by two independent services. The choice of which service to display data from is based on response time: the service that returns data first will be used.

By the end of this article, we’ll have built this application using the new features introduced in JDK 21.

Virtual Threads

In Java, as you may know, we use threads to run execution flows. When we need to perform asynchronous tasks, additional threads are required. Instead of managing threads directly, we should use an Executor or ExecutorService from the java.util.concurrent package. You might remember the ThreadPoolTaskExecutor class from the Spring Framework; this class is an example of an Executor.

We typically create the Executor during application startup, and it remains active for the entire duration of the system's execution. It maintains a thread pool that can be shared by all tasks. When we create a traditional thread instance, a thread is allocated at the Operating System level, which is why we refer to this type of thread as "Platform Threads." Since Platform Threads are resource-intensive, it is good practice to use a shared pool of them.

In JDK 21, Virtual Threads were introduced. They are essentially “lightweight threads” because they are not allocated at the Operating System level. Oracle provides a straightforward definition for Virtual Threads:

"Like a platform thread, a virtual thread is also an instance of java.lang.Thread. However, a virtual thread isn’t tied to a specific OS thread. A virtual thread still runs code on an OS thread. However, when code running in a virtual thread calls a blocking I/O operation, the Java runtime suspends the virtual thread until it can be resumed. The OS thread associated with the suspended virtual thread is now free to perform operations for other virtual threads."

Similar to Platform Threads, we should not interact with Virtual Threads directly. Instead, we should use an Executor. By calling the Executors#newVirtualThreadPerTaskExecutor method, we obtain an Executor that creates a new virtual thread for each task submitted to it.

Structured Concurrency

We’ve discussed Threads and Executors, which are useful for running concurrent tasks. However, as we start to build more complex workflows — even the simpler ones — we begin to see the gaps between our intentions as developers and what these tools provide.

Let’s imagine we have a method that executes two asynchronous tasks to build an object:

public MyRecord getRecord(String id) throws ExecutionException, InterruptedException {

 var recordMetaFuture = executorService.submit(() -> getRecordMetadata(id));

 var recordHistoryFuture = executorService.submit(() -> getRecordHistory(id));

 return new MyRecord(recordMetaFuture.get(), recordHistoryFuture.get());

}

This code is fairly common, and many developers have faced similar situations in their careers. Let’s break down some issues with this implementation:

  • If recordMetaFuture.get() fails and throws an exception, it won't stop the execution of recordHistoryFuture. The task will continue running in its own thread, leading to a potential thread leak that wastes resources and might cause side effects in the application;
  • If recordHistoryFuture fails but recordMetaFuture is still running, the exception from recordHistoryFuture will only be visible when recordMetaFuture finishes and recordHistoryFuture.get() is invoked. This can result in wasted time and resources;
  • If the thread that invoked the getRecord method is interrupted, the interruption will not be propagated to recordMetaFuture and recordHistoryFuture. These threads will continue executing their tasks until they complete or fail;

Developers manage complexity by breaking tasks down into multiple subtasks, creating a task-subtask relationship. This relationship is naturally handled in a single-threaded environment, but in multi-threaded implementations, this relationship exists only in the developer’s mind.

Structured Concurrency was introduced to address this gap. Its goal is to simplify the concurrent programming model by establishing relationships between tasks and subtasks running in different threads, allowing us to treat them as a single unit of work.

StructuredTaskScope

JDK 21 introduces the StructuredTaskScope class, which is a fundamental building block for structured concurrency. This class represents a task to be executed and allows us to fork subtasks into it, which will be executed on virtual threads.

For each forked subtask, we get a Subtask instance, which allows us to check the status of an individual subtask, retrieve its result, or get a Throwable if an exception occurs. The following code snippet demonstrates a simple scope where tasks return integers. Note that subtask2 includes a Thread.sleep call to simulate a more expensive task:

public static void main(String[] args) {

   // creating the scope

   try (var scope = new StructuredTaskScope<Integer>()) {

       // forking subtasks

       StructuredTaskScope.Subtask<Integer> subtask1 = scope.fork(() -> 1);

       StructuredTaskScope.Subtask<Integer> subtask2 = scope.fork(() -> {

           Thread.sleep(1_000);

           return 2;

       });

       StructuredTaskScope.Subtask<Integer> subtask3 = scope.fork(() -> 3);

 

       // joining subtasks, waiting for them to finish or fail

       scope.join();

 

       // printing results

       System.out.println(printSubTask("subtask1", subtask1));

       System.out.println(printSubTask("subtask2", subtask2));

       System.out.println(printSubTask("subtask3", subtask3));

   } catch (Exception exception) {

       exception.printStackTrace();

   }

}

 

// console output:

// subtask1 state: SUCCESS value: 1

// subtask2 state: SUCCESS value: 2

// subtask3 state: SUCCESS value: 3

Now let’s see the task-subtask relationship in practice. Suppose that after 500ms, we are no longer interested in continuing with the task. We can call StructuredTaskScope#shutdown, which prevents new threads from starting and interrupts all unfinished threads.

public static void main(String[] args) {

   // creating the scope

   try (var scope = new StructuredTaskScope<Integer>()) {

       // forking subtasks

       StructuredTaskScope.Subtask<Integer> subtask1 = scope.fork(() -> 1);

       StructuredTaskScope.Subtask<Integer> subtask2 = scope.fork(() -> {

           System.out.println("starting subtask2");

           Thread.sleep(1_000);

           System.out.println("subtask2 completed");

           return 2;

       });

       StructuredTaskScope.Subtask<Integer> subtask3 = scope.fork(() -> 3);

 

       // waiting 500ms

       Thread.sleep(500);

       // shutting down the scope

       scope.shutdown();

       scope.join();

 

       // printing results

       System.out.println(printSubTask("subtask1", subtask1));

       System.out.println(printSubTask("subtask2", subtask2));

       System.out.println(printSubTask("subtask3", subtask3));

   } catch (Exception exception) {

       exception.printStackTrace();

   }

}

 

// starting subtask2

// subtask1 state: SUCCESS value: 1

// subtask2 state: UNAVAILABLE value: null

// subtask3 state: SUCCESS value: 3

We can observe that subtask2, the slower one, was canceled. The UNAVAILABLE state indicates that the subtask was forked but not completed. This state can also occur if a task is completed after the scope has been shut down or if it was forked after the task scope was shut down.

StructuredTaskScope has a useful specialization called ShutdownOnFailure, which simplifies error handling in concurrent code. With this scope, if a task fails and throws an exception, the scope is shut down, interrupting any other subtasks. In the following example, an exception is thrown from the third subtask, causing the other subtasks to be canceled:

public static void main(String[] args) {

   // creating the scope

   try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

       // forking subtasks

       StructuredTaskScope.Subtask<Integer> subtask1 = scope.fork(() -> {

           System.out.println("starting subtask1");

           Thread.sleep(500);

           System.out.println("subtask1 completed");

           return 2;

       });

       StructuredTaskScope.Subtask<Integer> subtask2 = scope.fork(() -> {

           System.out.println("starting subtask2");

           Thread.sleep(1_000);

           System.out.println("subtask2 completed");

           return 2;

       });

       StructuredTaskScope.Subtask<Integer> subtask3 = scope.fork(() -> {

           System.out.println("starting subtask3");

           System.out.println("throwing exception from subtask3");

           throw new RuntimeException("Runtime error in subtask3");

       });

 

       // joining subtasks, waiting for the fastest one to finish

       scope.join();

 

       // printing results

       System.out.println(printSubTask("subtask1", subtask1));

       System.out.println(printSubTask("subtask2", subtask2));

       System.out.println(printSubTask("subtask3", subtask3));

   } catch (Exception exception) {

       exception.printStackTrace();

   }

}

 

// console output

// starting subtask1

// starting subtask2

// starting subtask3

// throwing exception from subtask3

// subtask1 state: UNAVAILABLE value: null

// subtask2 state: UNAVAILABLE value: null

// subtask3 state: FAILED Exception: Runtime error in subtask3

In the following sections, we’ll explore another specialization of StructuredTaskScope and discuss how to create our implementation to handle specific scenarios.

Recommendations — First Responder Wins

We have stated that the Recommendations component is powered by two different services and that we’ll display data from the fastest one. This can be easily achieved using another specialization of StructuredTaskScope: the ShutdownOnSuccess class.

The code below demonstrates how to implement a solution for the Recommendations component using the ShutdownOnSuccess class. We fork two subtasks that call different data providers. Once we join the subtasks, the data from the fastest client will be available through the StructuredTaskScope#result method.

@RequiredArgsConstructor

public class RecommendationsService {

 

   private final RecommendationsClientA recommendationsClientA;

   private final RecommendationsClientB recommendationsClientB;

 

   public Recommendations getRecommendations(String productId) {

 

       try (var scope = new StructuredTaskScope.ShutdownOnSuccess<Recommendations>()) {

           System.out.println("Recommendations SCOPE started...");

 

           scope.fork(() -> recommendationsClientA.getRecommendations(productId));

           scope.fork(() -> recommendationsClientB.getRecommendations(productId));

           scope.join();

 

           return scope.result();

       } catch (PageContextException | InterruptedException | ExecutionException ex) {

           System.out.println("Failed to get recommendations");

           ex.printStackTrace();

           return null;

       }

   }

}

In this scope, once a subtask finishes its work, the scope is completed, and all pending subtasks are interrupted. For the Recommendations use case, this is an ideal fit.

Building Our Scope

To address the problem, we outlined at the beginning of this article, we need to aggregate data from all components and pass it to the FE application. Let’s create a wrapper class for this purpose: PageContext.

public record PageContext(ReviewsData reviewsData, ProductDetails productDetails,

                         HeroData heroData, Recommendations recommendations) {

 

}

The idea is to create a custom scope where each forked subtask provides part of the data needed by the context. We’ll start by creating a marker interface called PageContextComponent and have all objects that compose the PageContext implement it. This approach will allow us to fork subtasks without worrying about typing.

Let’s get started. The first step is to create the PageContextScope class and extend StructuredTaskScope<PageContextComponent>. As class attributes, we’ll include all the data needed to build the PageContext and also add a list to keep track of exceptions that might be thrown by the forked subtasks. This will assist us in troubleshooting the application.

public class PageContextScope extends StructuredTaskScope<PageContextComponent> {

 

   private volatile HeroData heroData;

   private volatile ReviewsData reviewsData;

   private volatile ProductDetails productDetails;

   private volatile Recommendations recommendations;

   private final List<Throwable> exceptions = new CopyOnWriteArrayList<>();

}

Next, we need to override the handleComplete method. This method is invoked every time a subtask completes or fails. Based on the state of the subtask, we decide how to handle successes and failures. In our case, when a subtask fails, we capture the cause, but we could also choose to shut down the scope and throw an exception.

public class PageContextScope extends StructuredTaskScope<PageContextComponent> {

   // class attributes...

 

   @Override

   protected void handleComplete(Subtask<? extends PageContextComponent> subtask) {

       switch (subtask.state()) {

           case Subtask.State.SUCCESS -> onSuccess(subtask);

           case Subtask.State.FAILED -> exceptions.add(subtask.exception());

       }

 

       super.handleComplete(subtask);

   }

}

Pattern matching for switch expressions will be useful when handling subtasks that have successfully completed. We essentially retrieve the task result, cast it to the correct type, and store it in the scope.

public class PageContextScope extends StructuredTaskScope<PageContextComponent> {

   // class attributes...

 

   private void onSuccess(Subtask<? extends PageContextComponent> subtask) {

       switch (subtask.get()) {

           case ProductDetails productDetailsFromSubtask -> this.productDetails = productDetailsFromSubtask;

           case HeroData heroDataFromSubTask -> this.heroData = heroDataFromSubTask;

           case ReviewsData reviewsDataFromSubTask -> this.reviewsData = reviewsDataFromSubTask;

           case Recommendations recommendationsFromTask -> this.recommendations = recommendationsFromTask;

           default -> throw new IllegalStateException(STR."Invalid output for subtask \{subtask}");

       }

   }

 

}

Finally, let’s implement the method to retrieve the scope result, which I’ll call getContext. The caller should join the scope before attempting to get its result, so we validate this by calling super.ensureOwnerAndJoined(). Another good practice is to validate the scope before returning the resulting value. In our case, we create an instance of PageContextException to declare any missing components and exceptions thrown by the subtasks.

If everything is in order, we should create and return the PageContext instance.

public class PageContextScope extends StructuredTaskScope<PageContextComponent> {

   // class attributes...

 

   public PageContext getContext() {

       super.ensureOwnerAndJoined();

       validateScope();

       return new PageContext(reviewsData, productDetails, heroData, recommendations);

   }

 

   private void validateScope() {

       List<String> missingComponents = new ArrayList<>();

 

       Objects.requireNonNullElseGet(heroData, () -> missingComponents.add("heroData"));

       Objects.requireNonNullElseGet(reviewsData, () -> missingComponents.add("reviewsData"));

       Objects.requireNonNullElseGet(productDetails, () -> missingComponents.add("productDetails"));

       Objects.requireNonNullElseGet(recommendations, () -> missingComponents.add("recommendations"));

 

       if (!missingComponents.isEmpty()) {

           PageContextException pageContextException = new PageContextException(STR."""

                   Failed to build ProductDetailsContextScope, missing components: \{missingComponents}

                   """);

           exceptions.forEach(pageContextException::addSuppressed);

           throw pageContextException;

       }

   }

}

Using the new scope is quite straightforward. We just need to instantiate it and fork the subtasks.

@RequiredArgsConstructor

public class PageContextService {

 

   private final HeroClient heroClient;

   private final ProductDetailsClient productDetailsClient;

   private final ReviewsClient reviewsClient;

   private final RecommendationsService recommendationsService;

 

   public PageContext getPageContext(String productId) {

 

       try (var scope = new PageContextScope()) {

           System.out.println("PageContext SCOPE started...");

 

           scope.fork(() -> heroClient.getHeroData(productId));

           scope.fork(() -> productDetailsClient.getDetails(productId));

           scope.fork(() -> reviewsClient.getReviews(productId));

           scope.fork(() -> recommendationsService.getRecommendations(productId));

           scope.join();

 

           return scope.getContext();

       } catch (PageContextException | InterruptedException ex) {

           System.out.println("Failed to build context ");

           ex.printStackTrace();

           return null;

       }

   }

}

If we call PageContextService.getPageContext(“1”); we’ll have a context with the following content:

{

   "reviewsData": {

       "averageReviews": 5.0,

       "reviews": [

           {

               "value": 5.0,

               "comment": "Nice",

               "date": "2024-08-03T23:22:29.3327393",

               "authorName": "Victor"

           },

           {

               "value": 5.0,

               "comment": "Cool",

               "date": "2024-08-03T23:22:29.3327393",

               "authorName": "Anna"

           }

       ]

   },

   "productDetails": {

       "characteristics": {

           "Storage": "500GB"

       }

   },

   "heroData": {

       "title": "Playstation 5",

       "image": "https://www.my-image-host/1"

   },

   "recommendations": {

       "recommendations": [

           {

               "title": "Playstation 5 refurbished",

               "image": "https://www.my-image-host/recommendation/1",

               "price": "$1.00"

           }

       ]

   }

}

Conclusion

This concludes our article. We discussed Threads and Virtual Threads, explored JDK 21’s preview features for Structured Concurrency, and demonstrated their practical application by solving a real-world problem.

If you’re curious about the entire project and how I wrote unit tests for it, check it out on this GIT repository. I hope you find it as exciting as I do!