Skip to main content

External Orchestrator Integration

The Workflow Control Plane works with any external orchestration framework. Below are integration patterns for popular tools beyond LangChain and LangGraph.

Temporal

Add step gate checks inside your Temporal workflow activities:

from temporalio import activity
from temporalio.exceptions import ApplicationError
from axonflow import AxonFlow
from axonflow.workflow import MarkStepCompletedRequest, StepGateRequest, StepType

@activity.defn
async def governed_llm_call(workflow_id: str, step_name: str, prompt: str) -> str:
async with AxonFlow(endpoint="http://localhost:8080") as client:
gate = await client.step_gate(
workflow_id=workflow_id,
step_id=step_name,
request=StepGateRequest(
step_name=step_name,
step_type=StepType.LLM_CALL,
model="gpt-4",
provider="openai",
),
)
if gate.is_blocked():
raise ApplicationError(f"Blocked by policy: {gate.reason}")

result = await call_llm(prompt)
await client.mark_step_completed(
workflow_id=workflow_id,
step_id=step_name,
request=MarkStepCompletedRequest(
output={"result": result},
tokens_in=150,
tokens_out=45,
cost_usd=0.0023,
),
)
return result

Apache Airflow

Use a custom Airflow operator or add gate checks in your PythonOperator callables:

from airflow.decorators import task

@task
def governed_step(workflow_id: str, step_name: str, **kwargs):
import asyncio
from axonflow import AxonFlow
from axonflow.workflow import MarkStepCompletedRequest, StepGateRequest, StepType

async def run():
async with AxonFlow(endpoint="http://localhost:8080") as client:
gate = await client.step_gate(
workflow_id=workflow_id,
step_id=step_name,
request=StepGateRequest(step_name=step_name, step_type=StepType.LLM_CALL),
)
if gate.is_blocked():
raise Exception(f"Blocked: {gate.reason}")
# Execute your task logic here
result = do_work()
await client.mark_step_completed(
workflow_id=workflow_id,
step_id=step_name,
request=MarkStepCompletedRequest(
output={"result": result},
tokens_in=150,
tokens_out=45,
cost_usd=0.0023,
),
)
return result

return asyncio.run(run())

WCP API Endpoints

For orchestrators without an AxonFlow SDK, use the HTTP API directly:

MethodEndpointDescription
POST/api/v1/workflowsCreate a new workflow session
GET/api/v1/workflows/{id}Get workflow status and metadata
POST/api/v1/workflows/{id}/steps/{step_id}/gateCheck policy gate before step execution
POST/api/v1/workflows/{id}/steps/{step_id}/completeMark a step as completed with optional output and usage metrics
POST/api/v1/workflows/{id}/completeMark the entire workflow as completed
POST/api/v1/workflows/{id}/abortAbort the workflow with a reason
POST/api/v1/workflows/{id}/failFail the workflow with an optional reason
POST/api/v1/workflows/{id}/resumeResume a paused workflow (after approval)
GET/api/v1/workflowsList workflows with optional filters