Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Cookbook

Common patterns, each a few lines. The full rules behind them are on the #[workflow] and #[container] pages.

Sequential vs. parallel

Edges come from data, not statement order. Independent calls run in parallel; a shared input creates the dependency:

#[workflow]
fn pipeline() {
    let a = ingest("src".to_string());   // a and b have no relation:
    let b = probe();                     //   they run in parallel
    combine(a, b);                       // depends on BOTH -> joins them
}

Need a strict order without a data dependency? Make the dependency explicit by threading a return value through.

Sub-workflow that returns a value

A #[workflow] with a return type bubbles its terminal task’s output up like a container’s:

#[workflow]
fn sub(seed: String) -> String {
    let f = fetch(seed);
    transform(f, 7)                      // tail call == this workflow's return
}

#[workflow]
fn parent() {
    let r = sub("seed".to_string());
    publish(r);
}

Nested calls

foo(bar()) lowers bar to its own task and wires foo’s arg to its output — a shorthand for let x = bar(); foo(x);:

#[workflow]
fn pipeline() {
    publish(transform(fetch("u".to_string()), 7));
}

Recursive: foo(bar(baz())) works the same way.

Fan-out over a list

#[workflow]
fn batch() {
    let items = make_list();                       // -> Vec<String>
    let out = items.fan_out(|x| caps(x, "!".to_string()));  // Argo withParam
    summarize(out);                                // out: Vec<String>
}

caps runs once per element ({{item}}); out is the aggregated Vec, consumed like any output.

Conditionals

Real if / else / else if become when-gated wrappers; a value-if selects the taken branch:

#[workflow]
fn gated() {
    let n = decide("hello".to_string());
    let chosen = if n > 3 { left(n) } else { right(n) };  // value-if
    if n == 0 {
        note("zero".to_string());
    } else {
        note(chosen);
    }
}

Conditions are a closed grammar (== != < <= > >=, && || ! over bindings/inputs/a.field/literals/nested calls).

Sequential steps mode

The default workflow body is a DAG (edges from data deps). Add steps to emit an Argo steps: template — one statement per sequential group:

#[workflow(steps)]
fn pipeline() {
    let p = prepare("seed".to_string());
    finalize(p);
}

Same body, different emit shape.

Passing one field of a struct

a.field (or a.field.sub) wires only that field to the next task — lowered to a JSON path on the producer’s output:

#[derive(serde::Serialize, serde::Deserialize)]
struct Meta { id: String, n: i64 }

#[container] fn make_meta() -> Meta { Meta { id: "abc".into(), n: 7 } }
#[container] fn use_id(id: String) { println!("id={id}"); }

#[workflow]
fn pipeline() {
    let m = make_meta();
    use_id(m.id);                          // only `id` is wired through
}

Named fields only (no a.0 / a[i]). The ghost type-checks that the field exists and matches the consumer.

Decoupled artifacts (no DAG edge)

A producer and consumer that share only an S3 key — no ordering, no wiring:

#[container]
fn produce() { cargo_athena::save_artifact_str!("report", "hello"); }

#[container]
fn consume() {
    let r = cargo_athena::load_artifact_str!("report");
    println!("{r}");
}

Artifact chain (with a DAG dep)

The recipe above has no ordering. To chain artifact-producing containers — explicit dependency, guaranteed ordering — bridge them with a return value:

#[container]
fn produce() -> String {
    cargo_athena::save_artifact_str!("report", "hello");
    "ok".to_string()                       // return value creates the edge
}

#[container]
fn consume(seq: String) {
    let r = cargo_athena::load_artifact_str!("report");
    println!("seq={seq}: {r}");
}

#[workflow]
fn pipeline() {
    let token = produce();
    consume(token);                        // edge: produce must finish first
}

The artifact key is still a literal; the return-value just orders the two tasks.

Per-task hooks & resilience

.continue_on / .on_success / .on_failure / .on_error / .on_exit are per-task builders — they fire for that one task, always:

#[workflow]
fn resilient() {
    let raw = fetch("u".to_string()).continue_on(failed, error);
    transform(raw, 9)
        .on_failure(alarm)
        .on_exit(cleanup);     // runs when *this task* finishes
}

Retry with backoff

A flaky step retries itself:

#[container(retry(limit = 3, policy = "OnError", backoff = "30s"))]
fn fetch(url: String) -> String { /* … */ "ok".into() }

limit is required (unlimited for no cap); policyAlways|OnFailure|OnError|OnTransientError; backoff is an int (seconds) or a humantime string. Works on #[workflow] too.

Timeouts

Three knobs for three scopes — stack as many as you need:

#[container(
    timeout = "5m",                       // controller; counts Pending time
    pod_running_timeout = "2m",           // kubelet; only counts time Running
)]
fn long_step() { /* … */ }

#[workflow(active_deadline_if_root = "1h")]   // whole-workflow cap (root-only)
fn pipeline() { /* … */ }

Full distinctions: Timeouts.

Whole-workflow cleanup

on_exit_if_root is the workflow-level exit handler — distinct from the per-task .on_exit(t) above. It runs once, when the workflow finishes, but only for the workflow you actually submit:

#[workflow(on_exit_if_root = teardown)]
fn pipeline() { /* … */ }

Every #[workflow]/#[container] that sets it carries the hook on its own template, so argo submit --from workflowtemplate/pipeline runs teardown. When pipeline is instead a templateRef’d sub-step of a bigger run, its on_exit_if_root stays inert (Argo fires only the submitted workflow’s) — submit it directly to get it.

Pinning a pod (image / SA / node)

Static, or with a container argument spliced in (image / service_account / node_selector values accept "lit" + arg):

#[container(
    image           = "ghcr.io/acme/heavy:" + tag,          // arg injected
    service_account = "athena-" + tenant + "-runner",
    node_selector   = { "kubernetes.io/arch" = "amd64",
                        "disktype" = profile.disk },          // a struct field
)]
fn heavy(tag: String, tenant: String, profile: Profile) -> String { tag }

Operands are an argument or a named struct field of one, and must be String/&str/number. See #[container] → Parameter injection.

Shared pod resources via #[fragment]

A #[fragment] is a normal helper that runs inside the calling container. Every host! / artifact decl it makes is carried onto each container that transitively calls it — share pod resources without a registry:

#[fragment]
fn with_secrets() {
    let _ = cargo_athena::host!("/secrets/db");
    let _ = cargo_athena::host!("/secrets/api");
}

#[container]
fn step_a() { with_secrets(); /* both /secrets paths land on step_a */ }

#[container]
fn step_b() { with_secrets(); /* …and on step_b */ }

Resources travel with the function; the calling containers don’t have to know what’s inside.

Pitfalls

  • Fan-out a value to two consumers → .clone(). The body is faithful Rust; Argo copies the output into each consumer, so the explicit clone is correct, not boilerplate.
  • Workflow bodies are strict. Loops, match, and arbitrary expressions are compile errors — by design, so a step is never silently dropped. if/else/else if, nested calls, and the builder/fan_out chain are supported.
  • Parameter values are JSON. Every value athena emits is JSON, so any string is safe (t("no") works) and a String "7" stays a string, not a number.