Bridging the Gap: CompletableFuture and Observable Conversion

- Published on
Bridging the Gap: CompletableFuture and Observable Conversion
The world of asynchronous programming in Java can often feel overwhelming. With various tools and libraries available, how can we effectively manage asynchronous tasks while ensuring code maintainability and readability? This post will discuss two powerful paradigms in Java: CompletableFuture
for asynchronous programming and Observable
from ReactiveX (RxJava), focusing on how to convert between them. By the end of this article, you'll have a fundamental understanding and practical examples that you can implement immediately.
What is CompletableFuture?
CompletableFuture
is a part of Java's java.util.concurrent
package introduced in Java 8. It allows you to write non-blocking asynchronous code that can improve the performance of applications by enabling concurrent execution.
Key Features of CompletableFuture:
- Asynchronous Execution: Enables non-blocking code execution.
- Combining Futures: You can compose multiple futures, allowing for more complex flows.
- Exception Handling: Provides built-in methods for handling errors in asynchronous computations.
- Convenient API: Comes with a fluent API that makes chaining operations straightforward.
Basic Example:
Let's look at a simple example of using CompletableFuture
.
import java.util.concurrent.CompletableFuture;
public class CompletableFutureExample {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// Simulating a long-running task
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Hello from CompletableFuture!";
});
// This code will be executed while the future is being computed
future.thenAccept(result -> System.out.println(result));
// Ensure the main thread waits for the async task to complete
future.join();
}
}
In this example, we have a simple async computation that simulates a delay of one second. While the computation runs, we immediately proceed with other parts of our program without blocking. Finally, we wait for the result before terminating the application.
What is Observable?
Observable
is part of the ReactiveX library, specifically RxJava in the Java ecosystem. Reactive programming is a paradigm aimed at asynchronous data streams. An Observable
represents a producer that emits items (data) to observers.
Key Features of Observable:
- Stream-based: Works well with streams of data.
- Backpressure Management: Provides mechanisms to handle data at your own pace.
- Chaining Operators: Allows composition of multiple operations in a readable way.
- Error Handling: Built-in error recovery mechanisms.
Basic Example:
Here’s a simple illustration of Observable
.
import io.reactivex.rxjava3.core.Observable;
public class ObservableExample {
public static void main(String[] args) {
Observable<String> observable = Observable.create(emitter -> {
// Simulating data emission
emitter.onNext("Hello from Observable!");
emitter.onComplete(); // signal that the emission is complete
});
observable.subscribe(System.out::println);
}
}
In this example, we create an Observable
that emits a single string and then completes. The subscribed observer reacts to the emitted item by printing it to the console.
Converting CompletableFuture to Observable
Now that we understand both CompletableFuture
and Observable
, let's look at how to convert between them. This can be useful in situations where you need to integrate systems employing different paradigms.
Conversion Method
While there is no direct method provided out-of-the-box for conversion, we can write utility methods for converting CompletableFuture
to Observable
and vice versa.
CompletableFuture to Observable
To convert a CompletableFuture
to Observable
, we can create an observable that subscribes to the completion of the future and emits the result.
import io.reactivex.rxjava3.core.Observable;
import java.util.concurrent.CompletableFuture;
public class Converter {
public static <T> Observable<T> completableFutureToObservable(CompletableFuture<T> future) {
return Observable.create(emitter -> {
future.whenComplete((result, throwable) -> {
if (throwable != null) {
emitter.onError(throwable);
} else {
emitter.onNext(result);
emitter.onComplete();
}
});
});
}
}
Example Usage:
public class ConversionExample {
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// Simulate a long-running task
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Converted Future Result!";
});
Observable<String> observable = Converter.completableFutureToObservable(future);
observable.subscribe(
System.out::println,
Throwable::printStackTrace
);
// Keep the program running long enough to see the output
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Observable to CompletableFuture
The reverse conversion requires wrapping the observable subscription in a CompletableFuture
. The future will complete when the observable emits an item.
import io.reactivex.rxjava3.core.Observable;
import java.util.concurrent.CompletableFuture;
public class Converter {
public static <T> CompletableFuture<T> observableToCompletableFuture(Observable<T> observable) {
CompletableFuture<T> future = new CompletableFuture<>();
observable.subscribe(
future::complete,
future::completeExceptionally
);
return future;
}
}
Example Usage:
public class ConversionExample {
public static void main(String[] args) {
Observable<String> observable = Observable.create(emitter -> {
emitter.onNext("Hello from Observable to CompletableFuture!");
emitter.onComplete();
});
CompletableFuture<String> future = Converter.observableToCompletableFuture(observable);
future.whenComplete((result, throwable) -> {
if (throwable != null) {
System.err.println("Error: " + throwable.getMessage());
} else {
System.out.println(result);
}
});
// Ensure the application waits for the future result
future.join();
}
}
The Last Word
In this post, we've ventured into the landscapes of both CompletableFuture
and Observable
. We learned how to convert between the two paradigms, allowing for dynamic and robust applications capable of handling asynchronous tasks effectively.
By leveraging these tools and strategies, developers can write clear, efficient, and maintainable asynchronous code with confidence.
For more details about CompletableFuture
, feel free to check the Oracle documentation. Additionally, the RxJava documentation can provide further insights into the power of the Observable
pattern here.
With this foundation, you’re now equipped to seamlessly bridge the gap in your asynchronous Java applications—leveraging the speed and flexibility of both CompletableFuture
and Observable
. Happy coding!