Projectreactor & Webflux
reactor
| Aspect | generate (sync) |
push (async, single thread) |
create (async, multi-thread) |
|---|---|---|---|
| Emission style | Pull (one item per request) | Push (producer pushes items) | Push (producer pushes items) |
| Concurrency | Always single-threaded, synchronous | Single-thread only | Multi-thread allowed |
| Backpressure | Native: generator runs only on demand | Needs overflow strategy if fast producer | Needs overflow strategy if fast producer |
| Use case | Generate values step by step, state machine | Event listener, single-thread producer | Multi-threaded sources, complex bridging |
1 |
|
Rule of Thumb
Use Flux.generate when you want a pull-based, synchronous generator (like a lazy sequence, state machine, or random number generator).
Use Flux.push/create when you want to adapt to an external asynchronous source (like callbacks, listeners, or multiple threads producing data).
webflux
🟢 Cold, synchronous sources (run on caller thread, usually the subscriber’s thread)
These methods execute inline on the same thread that subscribes, unless moved:
Mono.just(…), Flux.just(…)
Mono.empty(), Flux.empty()
Mono.error(…), Flux.error(…)
Flux.range(…), Flux.fromIterable(…), Flux.fromArray(…)
Mono.fromSupplier(…) (evaluates lazily but still on caller thread)
Mono.fromCallable(…)
Flux.defer(…), Mono.defer(…)
👉 By default: No background thread, execution happens immediately when subscribed.
thread model
default WebFlux app on Netty:
- Boss threads: 1
- Worker (event loop) threads: 2 * cpu
- Reactor Scheduler available:
- paraller: cpu
- boundedElastic: 10 * cpu -> 1000+
- timer: cpu (independent scheduler, never run user code directly, wraped a ScheduledExecutorService)
- single: 1
- immediate: 0 (just current thread)
Best Practices
Pure reactive stack (WebClient, R2DBC, reactive Mongo, etc.):
→ stay on Netty event loop, no scheduler needed.Blocking APIs (JDBC, legacy SDKs, file IO):
→ wrap with Mono.fromCallable(…).subscribeOn(Schedulers.boundedElastic()).Heavy CPU tasks (compression, JSON parsing, crypto):
→ offload with publishOn(Schedulers.parallel()).Global customization (rarely needed): adjust Netty loop threads for connection-heavy apps, or override boundedElastic size for blocking-heavy apps.
why heavy cpu tasks, not io tasks, cauz netty born for heavy sockets tasks, it’s non-block at all.
io tasks not to be block tasks for netty, don’t equal to them. even if there’s no io, it still could be a block task, e.g. Thread.sleep(5000) is a block task, forbidden running on event loop.
🟡 Asynchronous sources (use a Reactor scheduler by default)
These spawn work on an internal boundedElastic or parallel thread pool:
Mono.delay(Duration) → Schedulers.parallel()
Flux.interval(Duration) → Schedulers.parallel()
Mono.fromRunnable(…), Mono.fromFuture(…), Mono.fromCompletionStage(…)
If future already completed → current thread
If not → completion thread (depends on upstream executor/future, not Reactor itself)
👉 By default: don’t run on subscriber thread, Reactor chooses.
🔵 Blocking bridge operators (force boundedElastic by default)
Reactor assumes blocking I/O → runs on Schedulers.boundedElastic() unless you change:
Mono.fromCallable(blockingFn).subscribeOn(Schedulers.boundedElastic()) (best practice)
Mono.block() / Flux.blockFirst() / Flux.blockLast() (blocking the caller thread)
Operators like flatMap with blocking calls must be explicitly shifted to boundedElastic.
🔴 Thread-affecting operators (don’t run anything themselves but change execution)
publishOn(Scheduler) → switches downstream execution thread
subscribeOn(Scheduler) → changes the context where subscription and upstream happen
⚪ Context-dependent sources
Some methods inherit threads from external APIs:
Flux.create(…) or Mono.create(…) → depends on how you emit (caller thread or async callback thread)
Flux.push(…) → same, depends on emitter thread
Flux.generate(…) → runs on subscriber thread unless you schedule
✅ Quick rule of thumb
“Pure data” operators (just, range, fromIterable) → run on subscriber thread.
“Timed” operators (delay, interval) → run on Reactor’s parallel scheduler.
“Bridges to blocking/async world” (fromCallable, fromFuture) → run on caller’s thread or boundedElastic/foreign executor, unless you move them.
Schedulers (publishOn, subscribeOn) are the only way to force thread switch.
caller thread = initiator of subscription, subscriber thread = executor of signals.
1 | flux.publishOn(Schedulers.parallel()) |
- caller thread = main
- subscriber thread = parallel-1
- Mapping still happens on ‘parallel-1’ because of
publishOn
think about
🔹 Two philosophies
- Operators/methods decide their scheduler (local responsibility)
You sprinkle .subscribeOn(…) or .publishOn(…) inside every method that returns a Mono/Flux.
Pros:
The method guarantees safe execution (e.g., offloading a blocking call to boundedElastic).
Callers don’t need to know the implementation details (e.g., that it does blocking JDBC work).
Cons:
You lose flexibility: the caller cannot override your scheduler easily.
If multiple methods apply .subscribeOn(…), only the first one in the chain wins (because subscribeOn only affects upstream).
Can lead to confusion if one method hides threading decisions.
- Caller decides (central responsibility)
Your methods just describe the pipeline (Mono.just, map, flatMap, etc.) with no scheduler hints.
The caller (e.g., controller or service entry point) applies subscribeOn / publishOn.
Pros:
Clean separation of concerns: methods remain pure, deterministic pipelines.
Caller has full control of threading policy.
Easy to test synchronously (no schedulers in unit tests unless you want them).
Cons:
Risk: if a method does blocking work (e.g., JDBC, file IO, legacy API), caller must remember to schedule it properly, or it will block Netty/event loop.
🔹 Rule of Thumb
👉 Don’t sprinkle subscribeOn everywhere.
If your method is purely non-blocking/reactive (using Reactor operators, R2DBC, WebClient, etc.):
❌ Do not configure a scheduler. Let the caller decide.
If your method wraps blocking code (JDBC, file IO, legacy API, etc.):
✅ Apply subscribeOn(Schedulers.boundedElastic()) inside the method — because you must protect the Netty event loop.
This way:
Non-blocking pipelines remain transparent and flexible.
Blocking bridges are safe by default, without relying on the caller’s discipline.
1 | // ✅ Non-blocking service, no scheduler |
Caller:
1 | // Controller - doesn't care about threading for reactive code |
✅ Best practice summary
Non-blocking methods: don’t set subscribeOn.
Blocking methods: always set subscribeOn(Schedulers.boundedElastic()) inside the method.
At the top level (controllers, pipelines): only add publishOn/subscribeOn when you need explicit control over where downstream operators run.
define custom schedulers
1 | import reactor.core.scheduler.Scheduler; |
methods usage
defer & interval
defer is like a provider in spring, when inject a provider: () -> new Instance() instead a : new Instance() itself.
it create a new Mono for every subscription by invoking the supplier lazily.
say we need a loop call,
1 | public Mono<String> pollUntilJson(String path) { |
alternatives to delay
1 | Flux.interval(Duration.ofSeconds(1)) |
or exponential backoff way
1 | Mono.defer(() -> webClient.get().uri(path).retrieve().bodyToMono(String.class)) |