Bridging the Gap: CompletableFuture and Observable Conversion

Snippet of programming code in IDE
Published on

Bridging the Gap: CompletableFuture and Observable Conversion

The world of asynchronous programming in Java can often feel overwhelming. With various tools and libraries available, how can we effectively manage asynchronous tasks while ensuring code maintainability and readability? This post will discuss two powerful paradigms in Java: CompletableFuture for asynchronous programming and Observable from ReactiveX (RxJava), focusing on how to convert between them. By the end of this article, you'll have a fundamental understanding and practical examples that you can implement immediately.

What is CompletableFuture?

CompletableFuture is a part of Java's java.util.concurrent package introduced in Java 8. It allows you to write non-blocking asynchronous code that can improve the performance of applications by enabling concurrent execution.

Key Features of CompletableFuture:

  1. Asynchronous Execution: Enables non-blocking code execution.
  2. Combining Futures: You can compose multiple futures, allowing for more complex flows.
  3. Exception Handling: Provides built-in methods for handling errors in asynchronous computations.
  4. Convenient API: Comes with a fluent API that makes chaining operations straightforward.

Basic Example:

Let's look at a simple example of using CompletableFuture.

import java.util.concurrent.CompletableFuture;

public class CompletableFutureExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // Simulating a long-running task
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return "Hello from CompletableFuture!";
        });

        // This code will be executed while the future is being computed
        future.thenAccept(result -> System.out.println(result));
        
        // Ensure the main thread waits for the async task to complete
        future.join();
    }
}

In this example, we have a simple async computation that simulates a delay of one second. While the computation runs, we immediately proceed with other parts of our program without blocking. Finally, we wait for the result before terminating the application.

What is Observable?

Observable is part of the ReactiveX library, specifically RxJava in the Java ecosystem. Reactive programming is a paradigm aimed at asynchronous data streams. An Observable represents a producer that emits items (data) to observers.

Key Features of Observable:

  1. Stream-based: Works well with streams of data.
  2. Backpressure Management: Provides mechanisms to handle data at your own pace.
  3. Chaining Operators: Allows composition of multiple operations in a readable way.
  4. Error Handling: Built-in error recovery mechanisms.

Basic Example:

Here’s a simple illustration of Observable.

import io.reactivex.rxjava3.core.Observable;

public class ObservableExample {
    public static void main(String[] args) {
        Observable<String> observable = Observable.create(emitter -> {
            // Simulating data emission
            emitter.onNext("Hello from Observable!");
            emitter.onComplete(); // signal that the emission is complete
        });

        observable.subscribe(System.out::println);
    }
}

In this example, we create an Observable that emits a single string and then completes. The subscribed observer reacts to the emitted item by printing it to the console.

Converting CompletableFuture to Observable

Now that we understand both CompletableFuture and Observable, let's look at how to convert between them. This can be useful in situations where you need to integrate systems employing different paradigms.

Conversion Method

While there is no direct method provided out-of-the-box for conversion, we can write utility methods for converting CompletableFuture to Observable and vice versa.

CompletableFuture to Observable

To convert a CompletableFuture to Observable, we can create an observable that subscribes to the completion of the future and emits the result.

import io.reactivex.rxjava3.core.Observable;
import java.util.concurrent.CompletableFuture;

public class Converter {

    public static <T> Observable<T> completableFutureToObservable(CompletableFuture<T> future) {
        return Observable.create(emitter -> {
            future.whenComplete((result, throwable) -> {
                if (throwable != null) {
                    emitter.onError(throwable);
                } else {
                    emitter.onNext(result);
                    emitter.onComplete();
                }
            });
        });
    }
}

Example Usage:

public class ConversionExample {
    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // Simulate a long-running task
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
            return "Converted Future Result!";
        });

        Observable<String> observable = Converter.completableFutureToObservable(future);
        observable.subscribe(
            System.out::println,
            Throwable::printStackTrace
        );

        // Keep the program running long enough to see the output
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Observable to CompletableFuture

The reverse conversion requires wrapping the observable subscription in a CompletableFuture. The future will complete when the observable emits an item.

import io.reactivex.rxjava3.core.Observable;
import java.util.concurrent.CompletableFuture;

public class Converter {

    public static <T> CompletableFuture<T> observableToCompletableFuture(Observable<T> observable) {
        CompletableFuture<T> future = new CompletableFuture<>();
        
        observable.subscribe(
            future::complete,
            future::completeExceptionally
        );

        return future;
    }
}

Example Usage:

public class ConversionExample {
    public static void main(String[] args) {
        Observable<String> observable = Observable.create(emitter -> {
            emitter.onNext("Hello from Observable to CompletableFuture!");
            emitter.onComplete();
        });

        CompletableFuture<String> future = Converter.observableToCompletableFuture(observable);
        
        future.whenComplete((result, throwable) -> {
            if (throwable != null) {
                System.err.println("Error: " + throwable.getMessage());
            } else {
                System.out.println(result);
            }
        });

        // Ensure the application waits for the future result
        future.join();
    }
}

The Last Word

In this post, we've ventured into the landscapes of both CompletableFuture and Observable. We learned how to convert between the two paradigms, allowing for dynamic and robust applications capable of handling asynchronous tasks effectively.

By leveraging these tools and strategies, developers can write clear, efficient, and maintainable asynchronous code with confidence.

For more details about CompletableFuture, feel free to check the Oracle documentation. Additionally, the RxJava documentation can provide further insights into the power of the Observable pattern here.

With this foundation, you’re now equipped to seamlessly bridge the gap in your asynchronous Java applications—leveraging the speed and flexibility of both CompletableFuture and Observable. Happy coding!