Concurrency

Joule provides structured concurrency primitives for safe parallel execution.

Async/Await

Functions that perform asynchronous operations are marked async:

async fn fetch_data(url: String) -> Result<String, Error> {
    let response = http::get(url).await?;
    Result::Ok(response.body())
}

The await keyword suspends execution until the asynchronous operation completes.

Async Energy Tracking

Async operations are fully energy-tracked. The compiler inserts timing wrappers around Spawn, TaskAwait, TaskGroupEnter, and TaskGroupExit operations to measure the energy consumed by asynchronous work:

#[energy_budget(max_joules = 0.005)]
async fn process_pipeline(urls: Vec<String>) -> Vec<Data> {
    let mut results: Vec<Data> = Vec::new();
    for url in urls {
        let data = fetch_data(url).await;    // energy tracked
        results.push(data);
    }
    results
}

Desugaring

Async functions are desugared to state machines backed by Task types. The await keyword becomes a yield point that checks for task completion and records energy consumed during the suspension.

Spawn

Launch a concurrent task:

use std::concurrency::spawn;

let handle = spawn(|| {
    heavy_computation()
});

let result = handle.join();

Task Pool

Under the hood, spawn submits work to a pthread-based task pool with 256 task slots. Tasks are distributed across worker threads and managed by the runtime:

  • Worker threads are pre-allocated (one per CPU core)
  • Tasks are stored in a fixed-size array (256 slots)
  • Task submission is lock-free on the fast path
  • Energy consumption is tracked per-task with thread-safe atomic counters

Channels

Send values between tasks using bounded channels:

use std::concurrency::{channel, Sender, Receiver};

let (tx, rx) = channel(capacity: 100);

spawn(|| {
    for i in 0..1000 {
        tx.send(i);    // blocks when buffer is full
    }
});

let value = rx.recv();  // blocks when buffer is empty

Bounded Channel Implementation

Channels are implemented as ring buffers protected by mutex/condvar pairs:

  • Capacity: Specified at creation time, provides backpressure
  • Blocking: send() blocks when the buffer is full; recv() blocks when empty
  • Thread Safety: Mutex protects the ring buffer; condvars signal producers and consumers
  • Energy: Channel operations are energy-tracked -- both send and receive costs are attributed to the calling task

Unbounded Channels

For cases where backpressure is not needed:

let (tx, rx) = channel();    // unbounded (grows as needed)

Task Groups

Structured concurrency with automatic cancellation:

use std::concurrency::TaskGroup;

let group = TaskGroup::new();

group.spawn(|| process_chunk_1());
group.spawn(|| process_chunk_2());
group.spawn(|| process_chunk_3());

let results = group.join_all();  // waits for all tasks

If any task panics, the group cancels all remaining tasks. Energy consumption is aggregated across all tasks in the group.

Parallel For

Parallel iteration distributes work across threads automatically:

let results = parallel for item in data {
    heavy_computation(item)
};

With explicit chunk size:

let processed = parallel(chunk_size: 1024) for row in matrix {
    transform(row)
};

The compiler sums energy consumption across all parallel branches for budget enforcement.

Mutex

Mutual exclusion for shared state:

use std::concurrency::Mutex;

let counter = Mutex::new(0);

// In a concurrent task:
let mut guard = counter.lock();
*guard = *guard + 1;
// guard dropped here, lock released

Atomic Types

Lock-free primitives for simple shared state:

use std::concurrency::AtomicI32;

let counter = AtomicI32::new(0);
counter.fetch_add(1);
let value = counter.load();

Supervisors

Supervisors manage task lifecycles with restart strategies:

use std::concurrency::Supervisor;

let sup = Supervisor::new(RestartStrategy::OneForOne);

sup.spawn("worker", || {
    process_queue()
});

sup.run();

Restart strategies:

StrategyBehavior
OneForOneOnly the failed task is restarted
OneForAllAll tasks are restarted when one fails
RestForOneThe failed task and all tasks started after it are restarted

Safety Guarantees

The ownership system prevents data races at compile time:

  • Shared state must be wrapped in Mutex, Atomic, or other synchronization primitives
  • The borrow checker ensures no mutable aliasing across tasks
  • Task groups provide structured lifetimes for spawned work
  • Channels provide safe, typed communication between tasks
  • Energy tracking is thread-safe using atomic counters