Skip to main content

External Orchestrator Integration

The Workflow Control Plane works with any external orchestration framework that can make HTTP calls. The integration pattern is the same regardless of whether you use Temporal, Apache Airflow, Prefect, Dagster, or a custom scheduler: register a workflow, check the gate before each step, mark steps as completed, and finalize the workflow when done.

Integration Pattern

Every WCP integration follows a four-phase lifecycle.

1. Register the workflow. At the start of your pipeline or DAG run, call POST /api/v1/workflows with a workflow name and optional metadata. AxonFlow returns a workflow_id that you pass to all subsequent calls. You can also include a trace_id to correlate with external observability systems like Langsmith, Datadog, or OpenTelemetry.

2. Check the gate before each step. Before executing any step (LLM call, tool invocation, connector call, or human task), call POST /api/v1/workflows/{id}/steps/{step_id}/gate with the step's metadata. AxonFlow evaluates all active workflow policies and returns one of three decisions: allow, block, or require_approval. Your orchestrator should only execute the step if the decision is allow.

3. Mark steps as completed. After a step executes successfully, call POST /api/v1/workflows/{id}/steps/{step_id}/complete. This endpoint accepts optional post-execution metrics (tokens consumed, cost, output data) that were not available at gate time. These metrics feed into cost tracking and audit logging.

4. Finalize the workflow. When the pipeline finishes, call complete, fail, or abort depending on the outcome. Completing a workflow with pending approvals is rejected by the server. AxonFlow automatically computes total_steps from the actual step count at finalization.

Choosing an Integration Method

AxonFlow provides three ways to integrate, depending on your orchestrator and language preference.

SDK client (Python, Go, TypeScript, Java) is the most common approach. The SDK handles authentication, serialization, error handling, and retry logic. Use the SDK when your orchestrator runs in one of the four supported languages. See SDK Integration for complete examples.

LangGraph adapter is a higher-level wrapper available in all four SDKs. It manages the full workflow lifecycle (registration, gate checks, completion) behind methods like check_gate() and step_completed(). The Python adapter also supports async with for automatic cleanup. Use the adapter when your orchestrator is LangGraph or follows a similar node-based execution pattern. See SDK Integration for adapter examples.

Raw HTTP API works with any language or framework. Use it when no SDK exists for your language, when you need minimal dependencies, or when integrating with infrastructure tools that only support webhooks or HTTP callouts.

Temporal

Temporal workflows are composed of activities. The natural integration point is inside each activity function, where you check the gate before executing the activity's core logic. If the gate blocks the step, raise a Temporal ApplicationError so the workflow can handle it according to your retry and failure policies.

from temporalio import activity
from temporalio.exceptions import ApplicationError
from axonflow import AxonFlow
from axonflow.workflow import MarkStepCompletedRequest, StepGateRequest, StepType

@activity.defn
async def governed_llm_call(workflow_id: str, step_name: str, prompt: str) -> str:
async with AxonFlow(endpoint="http://localhost:8080") as client:
gate = await client.step_gate(
workflow_id=workflow_id,
step_id=step_name,
request=StepGateRequest(
step_name=step_name,
step_type=StepType.LLM_CALL,
model="gpt-4",
provider="openai",
),
)
if gate.is_blocked():
raise ApplicationError(f"Blocked by policy: {gate.reason}")

result = await call_llm(prompt)
await client.mark_step_completed(
workflow_id=workflow_id,
step_id=step_name,
request=MarkStepCompletedRequest(
output={"result": result},
tokens_in=150,
tokens_out=45,
cost_usd=0.0023,
),
)
return result

The workflow itself handles registration and finalization. Create the AxonFlow workflow at the start of your Temporal workflow function, pass the workflow_id to each activity, and call complete_workflow or fail_workflow in the workflow's completion handler.

Apache Airflow

In Airflow, each task in a DAG corresponds to a WCP step. Since Airflow tasks run in separate processes, you need to pass the workflow_id through XComs or Airflow variables. The first task creates the AxonFlow workflow, and subsequent tasks check their gates using the shared workflow ID.

from airflow.decorators import task

@task
def governed_step(workflow_id: str, step_name: str, **kwargs):
import asyncio
from axonflow import AxonFlow
from axonflow.workflow import MarkStepCompletedRequest, StepGateRequest, StepType

async def run():
async with AxonFlow(endpoint="http://localhost:8080") as client:
gate = await client.step_gate(
workflow_id=workflow_id,
step_id=step_name,
request=StepGateRequest(step_name=step_name, step_type=StepType.LLM_CALL),
)
if gate.is_blocked():
raise Exception(f"Blocked: {gate.reason}")
# Execute your task logic here
result = do_work()
await client.mark_step_completed(
workflow_id=workflow_id,
step_id=step_name,
request=MarkStepCompletedRequest(
output={"result": result},
tokens_in=150,
tokens_out=45,
cost_usd=0.0023,
),
)
return result

return asyncio.run(run())

For Airflow DAGs with conditional branching, check the gate at the start of each branch. If a branch is blocked, you can use Airflow's AirflowSkipException to skip downstream tasks in that branch rather than failing the entire DAG.

Custom Orchestrators

Any system that can make HTTP requests can integrate with WCP. The workflow below shows the complete HTTP lifecycle using curl. Replace these calls with your language's HTTP client.

Step 1: Register the Workflow

curl -X POST http://localhost:8080/api/v1/workflows \
-H "Content-Type: application/json" \
-H "Authorization: Basic $(echo -n 'client_id:client_secret' | base64)" \
-d '{
"workflow_name": "nightly-data-pipeline",
"source": "external",
"trace_id": "datadog-trace-abc123",
"metadata": {"environment": "production", "team": "data-eng"}
}'

Step 2: Check the Gate

curl -X POST http://localhost:8080/api/v1/workflows/$WF_ID/steps/extract-data/gate \
-H "Content-Type: application/json" \
-H "Authorization: Basic $(echo -n 'client_id:client_secret' | base64)" \
-d '{
"step_name": "extract-data",
"step_type": "connector_call",
"step_input": {"source": "production-db", "table": "customers"}
}'

Inspect the decision field in the response. If "allow", execute the step. If "block", read the reason and policy_ids fields to understand why. If "require_approval", poll the workflow status or use the approval_url to route to the approval UI.

Step 3: Mark Step Completed

curl -X POST http://localhost:8080/api/v1/workflows/$WF_ID/steps/extract-data/complete \
-H "Content-Type: application/json" \
-H "Authorization: Basic $(echo -n 'client_id:client_secret' | base64)" \
-d '{
"output": {"rows_extracted": 15000},
"tokens_in": 0,
"tokens_out": 0,
"cost_usd": 0.0
}'

The request body is optional. You can send an empty POST if you do not have post-execution metrics.

Step 4: Finalize the Workflow

# On success
curl -X POST http://localhost:8080/api/v1/workflows/$WF_ID/complete \
-H "Authorization: Basic $(echo -n 'client_id:client_secret' | base64)" \

# On failure
curl -X POST http://localhost:8080/api/v1/workflows/$WF_ID/fail \
-H "Content-Type: application/json" \
-H "Authorization: Basic $(echo -n 'client_id:client_secret' | base64)" \
-d '{"reason": "Step extract-data timed out after 300s"}'

# On manual cancellation
curl -X POST http://localhost:8080/api/v1/workflows/$WF_ID/abort \
-H "Content-Type: application/json" \
-H "Authorization: Basic $(echo -n 'client_id:client_secret' | base64)" \
-d '{"reason": "Cancelled by operator"}'

API Endpoint Reference

MethodEndpointDescription
POST/api/v1/workflowsRegister a new workflow. Returns workflow_id.
GET/api/v1/workflows/{id}Get workflow status, metadata, and step history.
GET/api/v1/workflowsList workflows. Supports status, source, trace_id, limit, and offset query parameters.
POST/api/v1/workflows/{id}/steps/{step_id}/gateCheck policy gate before step execution. Returns decision, reason, policy_ids, policies_evaluated, and policies_matched.
POST/api/v1/workflows/{id}/steps/{step_id}/completeMark a step as completed. Accepts optional output, tokens_in, tokens_out, and cost_usd.
POST/api/v1/workflows/{id}/completeMark the workflow as completed. Rejected if any step has a pending approval.
POST/api/v1/workflows/{id}/failMark the workflow as failed with an optional reason.
POST/api/v1/workflows/{id}/abortAbort the workflow with an optional reason.
POST/api/v1/workflows/{id}/resumeResume a workflow after an approval. Returns 409 if a step is still pending or was rejected.

Evaluation Tier and Enterprise Endpoints

These endpoints are available on the evaluation tier and above.

MethodEndpointDescription
POST/api/v1/workflows/{id}/steps/{step_id}/approveApprove a pending step. Requires a comment (minimum 10 characters).
POST/api/v1/workflows/{id}/steps/{step_id}/rejectReject a pending step. Requires a reason (minimum 10 characters). Automatically aborts the workflow.
GET/api/v1/workflows/approvals/pendingList pending approvals for the current tenant.

Error Handling

The WCP API uses standard HTTP status codes. Your integration should handle these cases:

  • 404 Not Found: The workflow or step ID does not exist. Verify you are using the correct workflow_id returned from the create call.
  • 409 Conflict: The workflow is in a terminal state (completed, aborted, or failed) and cannot accept new gate checks. Also returned when trying to complete a workflow with pending approvals, or when a step gate is called while a previous step awaits approval.
  • 429 Too Many Requests: The concurrent execution limit for your license tier has been reached.