Package groovy.concurrent
Class Agent<T>
java.lang.Object
groovy.concurrent.Agent<T>
- Type Parameters:
T- the value type
A thread-safe mutable-value container inspired by Clojure's agents and
GPars'
Agent.
An Agent wraps a value that can be read by any thread but
modified only through serialised update functions. Updates are queued
and applied one at a time on a dedicated executor, guaranteeing that
the value is never corrupted by concurrent writes.
Reading the current value via get() is non-blocking and
returns a snapshot. Sending an update via send(Function) is
also non-blocking — the function is queued and applied asynchronously.
Use sendAndGet(Function) to obtain an Awaitable that
completes with the new value after the update is applied.
// Groovy:
def counter = Agent.create(0)
counter.send { it + 1 }
counter.send { it + 1 }
assert await(counter.getAsync()) == 2
// Java:
Agent<Integer> counter = Agent.create(0);
counter.send(n -> n + 1);
Awaitable<Integer> result = counter.sendAndGet(n -> n + 1);
- Since:
- 6.0.0
- See Also:
-
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final intDefault per-subscriber buffer size forchanges(). -
Method Summary
Modifier and TypeMethodDescriptionchanges()Returns aFlow.Publisherthat emits the agent's value after every successful update.static <T> Agent<T>create(T initialValue) Creates an agent with the given initial value, using a single-thread executor for serialised updates.static <T> Agent<T>Creates an agent backed by the given pool for update execution.get()Returns the current value.getAsync()Returns the current value as anAwaitable.voidQueues an update function to be applied to the current value.sendAndGet(Function<T, T> updateFn) Queues an update function and returns anAwaitablethat completes with the new value after the update is applied.voidshutdown()Shuts down the agent's update executor.toString()Returns the current value in a diagnostic form.
-
Field Details
-
DEFAULT_CHANGES_BUFFER
public static final int DEFAULT_CHANGES_BUFFERDefault per-subscriber buffer size forchanges().- See Also:
-
-
Method Details
-
create
Creates an agent with the given initial value, using a single-thread executor for serialised updates.- Type Parameters:
T- the value type- Parameters:
initialValue- the starting value- Returns:
- a new agent
-
create
Creates an agent backed by the given pool for update execution. Updates are still serialised (only one at a time), but they run on the pool's threads.- Type Parameters:
T- the value type- Parameters:
initialValue- the starting valuepool- the pool to use for updates- Returns:
- a new agent
-
get
Returns the current value. This is a non-blocking snapshot read.- Returns:
- the current value
-
getAsync
Returns the current value as anAwaitable. The awaitable completes after all previously queued updates have been applied, ensuring a consistent read.- Returns:
- an awaitable holding the value after pending updates
-
send
Queues an update function to be applied to the current value. The function receives the current value and returns the new value.Updates are applied asynchronously and serialised: only one update runs at a time.
- Parameters:
updateFn- a function from current value to new value
-
sendAndGet
Queues an update function and returns anAwaitablethat completes with the new value after the update is applied.- Parameters:
updateFn- a function from current value to new value- Returns:
- an awaitable holding the new value
-
shutdown
public void shutdown()Shuts down the agent's update executor. No further updates will be accepted. Pending updates are executed before shutdown completes. The changes publisher (if any subscribers attached) is closed after pending updates drain, signallingonCompleteto all live subscribers. Callingshutdown()more than once is a no-op. -
changes
Returns aFlow.Publisherthat emits the agent's value after every successful update. The publisher is hot and per-subscriber:- Subscribers see only changes that occur after they subscribe; the current value at subscription time is not replayed.
- Each subscriber gets an independent buffer (default 256 items).
- Slow subscribers drop the most recent value rather than blocking the agent's update thread. Values already buffered are delivered in order; only newly-offered values that cannot fit are discarded.
- Closes (signals
onComplete) whenshutdown()is called. Ifchanges()is first called aftershutdown(), the returned publisher is already closed and subscribers receiveonCompleteimmediately.
Typical use:
for await (newValue in agent.changes()) { log.info "Agent value is now {}", newValue }- Returns:
- a hot publisher of state transitions
- Since:
- 6.0.0
-
toString
Returns the current value in a diagnostic form.
-