Concurrency
Joule provides structured concurrency primitives for safe parallel execution.
Async/Await
Functions that perform asynchronous operations are marked async:
async fn fetch_data(url: String) -> Result<String, Error> {
let response = http::get(url).await?;
Result::Ok(response.body())
}
The await keyword suspends execution until the asynchronous operation completes.
Async Energy Tracking
Async operations are fully energy-tracked. The compiler inserts timing wrappers around Spawn, TaskAwait, TaskGroupEnter, and TaskGroupExit operations to measure the energy consumed by asynchronous work:
#[energy_budget(max_joules = 0.005)]
async fn process_pipeline(urls: Vec<String>) -> Vec<Data> {
let mut results: Vec<Data> = Vec::new();
for url in urls {
let data = fetch_data(url).await; // energy tracked
results.push(data);
}
results
}
Desugaring
Async functions are desugared to state machines backed by Task types. The await keyword becomes a yield point that checks for task completion and records energy consumed during the suspension.
Spawn
Launch a concurrent task:
use std::concurrency::spawn;
let handle = spawn(|| {
heavy_computation()
});
let result = handle.join();
Task Pool
Under the hood, spawn submits work to a pthread-based task pool with 256 task slots. Tasks are distributed across worker threads and managed by the runtime:
- Worker threads are pre-allocated (one per CPU core)
- Tasks are stored in a fixed-size array (256 slots)
- Task submission is lock-free on the fast path
- Energy consumption is tracked per-task with thread-safe atomic counters
Channels
Send values between tasks using bounded channels:
use std::concurrency::{channel, Sender, Receiver};
let (tx, rx) = channel(capacity: 100);
spawn(|| {
for i in 0..1000 {
tx.send(i); // blocks when buffer is full
}
});
let value = rx.recv(); // blocks when buffer is empty
Bounded Channel Implementation
Channels are implemented as ring buffers protected by mutex/condvar pairs:
- Capacity: Specified at creation time, provides backpressure
- Blocking:
send()blocks when the buffer is full;recv()blocks when empty - Thread Safety: Mutex protects the ring buffer; condvars signal producers and consumers
- Energy: Channel operations are energy-tracked -- both send and receive costs are attributed to the calling task
Unbounded Channels
For cases where backpressure is not needed:
let (tx, rx) = channel(); // unbounded (grows as needed)
Task Groups
Structured concurrency with automatic cancellation:
use std::concurrency::TaskGroup;
let group = TaskGroup::new();
group.spawn(|| process_chunk_1());
group.spawn(|| process_chunk_2());
group.spawn(|| process_chunk_3());
let results = group.join_all(); // waits for all tasks
If any task panics, the group cancels all remaining tasks. Energy consumption is aggregated across all tasks in the group.
Parallel For
Parallel iteration distributes work across threads automatically:
let results = parallel for item in data {
heavy_computation(item)
};
With explicit chunk size:
let processed = parallel(chunk_size: 1024) for row in matrix {
transform(row)
};
The compiler sums energy consumption across all parallel branches for budget enforcement.
Mutex
Mutual exclusion for shared state:
use std::concurrency::Mutex;
let counter = Mutex::new(0);
// In a concurrent task:
let mut guard = counter.lock();
*guard = *guard + 1;
// guard dropped here, lock released
Atomic Types
Lock-free primitives for simple shared state:
use std::concurrency::AtomicI32;
let counter = AtomicI32::new(0);
counter.fetch_add(1);
let value = counter.load();
Supervisors
Supervisors manage task lifecycles with restart strategies:
use std::concurrency::Supervisor;
let sup = Supervisor::new(RestartStrategy::OneForOne);
sup.spawn("worker", || {
process_queue()
});
sup.run();
Restart strategies:
| Strategy | Behavior |
|---|---|
OneForOne | Only the failed task is restarted |
OneForAll | All tasks are restarted when one fails |
RestForOne | The failed task and all tasks started after it are restarted |
Safety Guarantees
The ownership system prevents data races at compile time:
- Shared state must be wrapped in
Mutex,Atomic, or other synchronization primitives - The borrow checker ensures no mutable aliasing across tasks
- Task groups provide structured lifetimes for spawned work
- Channels provide safe, typed communication between tasks
- Energy tracking is thread-safe using atomic counters