Workflow Control Plane
"LangChain runs the workflow. AxonFlow decides when it's allowed to move forward."
The Workflow Control Plane provides governance gates for external orchestrators like LangChain, LangGraph, and CrewAI. Instead of modifying your orchestrator's code, you simply add checkpoint calls to AxonFlow before each step executes.
Overview
External orchestrators (LangChain, LangGraph, CrewAI) are great at workflow execution, but enterprises need governance controls. The Workflow Control Plane solves this by providing:
- Step Gates - Policy checkpoints before each workflow step
- Decision Types - Allow, block, or require approval
- Policy Integration - Reuses AxonFlow's policy engine
- Audit Trail - Every step decision is recorded
How It Works
Key Point: Your orchestrator runs the workflow. AxonFlow provides governance gates at each step transition.
Quick Start
1. Start AxonFlow
docker compose up -d
2. Create a Workflow
curl -X POST http://localhost:8080/api/v1/workflows \
-H "Content-Type: application/json" \
-d '{
"workflow_name": "code-review-pipeline",
"source": "langgraph"
}'
Note:
total_stepsis optional. Omit it — the server automatically sets it to the actual step count when the workflow reaches a terminal state (completed, aborted, or failed). This supports dynamic workflows like LangGraph where the number of steps is not known upfront.
Response:
{
"workflow_id": "wf_abc123",
"workflow_name": "code-review-pipeline",
"status": "in_progress"
}
3. Check Step Gate
Before executing each step, check if it's allowed:
curl -X POST http://localhost:8080/api/v1/workflows/wf_abc123/steps/step-1/gate \
-H "Content-Type: application/json" \
-d '{
"step_name": "Generate Code",
"step_type": "llm_call",
"model": "gpt-4",
"provider": "openai"
}'
Response (allowed):
{
"decision": "allow",
"step_id": "step-1"
}
Response (blocked):
{
"decision": "block",
"step_id": "step-1",
"reason": "GPT-4 not allowed in production",
"policy_ids": ["policy_gpt4_block"]
}
4. Complete Workflow
curl -X POST http://localhost:8080/api/v1/workflows/wf_abc123/complete
SDK Integration
Python
from axonflow import AxonFlow
from axonflow.workflow import (
CreateWorkflowRequest,
StepGateRequest,
MarkStepCompletedRequest,
WorkflowSource,
StepType,
)
async with AxonFlow(
endpoint="http://localhost:8080",
client_id="my-workflow-app",
client_secret="your-secret",
) as client:
# Create workflow (total_steps is optional — auto-finalized at terminal state)
workflow = await client.create_workflow(
CreateWorkflowRequest(
workflow_name="code-review-pipeline",
source=WorkflowSource.EXTERNAL,
)
)
# Check gate before each step
gate = await client.step_gate(
workflow_id=workflow.workflow_id,
step_id="step-1",
request=StepGateRequest(
step_name="Generate Code",
step_type=StepType.LLM_CALL,
model="gpt-4",
provider="openai",
),
)
if gate.is_allowed():
# Execute your step, then mark completed
result = execute_step()
await client.mark_step_completed(
workflow_id=workflow.workflow_id,
step_id="step-1",
request=MarkStepCompletedRequest(output={"code": result}),
)
elif gate.is_blocked():
print(f"Blocked: {gate.reason}")
await client.abort_workflow(workflow.workflow_id, gate.reason)
# Complete workflow
await client.complete_workflow(workflow.workflow_id)
LangGraph Adapter
For LangGraph workflows, use the specialized adapter with automatic cleanup:
from axonflow import AxonFlow
from axonflow.adapters import AxonFlowLangGraphAdapter, WorkflowBlockedError
from axonflow.workflow import WorkflowSource
async with AxonFlow(
endpoint="http://localhost:8080",
client_id="my-langgraph-app",
client_secret="your-secret",
) as client:
# Create adapter with auto_block=True (raises exception on block)
adapter = AxonFlowLangGraphAdapter(
client=client,
workflow_name="my-langgraph-workflow",
source=WorkflowSource.LANGGRAPH,
auto_block=True, # Raises WorkflowBlockedError on block
)
try:
# Use context manager for automatic cleanup
async with adapter:
await adapter.start_workflow()
# Before each LangGraph node
if await adapter.check_gate(
step_name="generate_code",
step_type="llm_call",
model="gpt-4",
provider="openai",
):
result = await generate_code(state)
await adapter.step_completed(step_name="generate_code", output=result)
# Workflow completes automatically when context manager exits
except WorkflowBlockedError as e:
print(f"Blocked: {e.reason}")
# Workflow is automatically aborted by context manager
Go
client := axonflow.NewClient(axonflow.AxonFlowConfig{
Endpoint: "http://localhost:8080",
ClientID: "my-workflow-app",
ClientSecret: "your-secret",
})
// Create workflow
workflow, err := client.CreateWorkflow(axonflow.CreateWorkflowRequest{
WorkflowName: "code-review-pipeline",
Source: axonflow.WorkflowSourceExternal,
TotalSteps: 3,
})
if err != nil {
log.Fatal(err)
}
// Check gate
gate, err := client.StepGate(workflow.WorkflowID, "step-1", axonflow.StepGateRequest{
StepName: "Generate Code",
StepType: axonflow.StepTypeLLMCall,
Model: "gpt-4",
Provider: "openai",
})
if err != nil {
log.Fatal(err)
}
if gate.IsAllowed() {
// Execute step, then mark completed
result := executeStep()
client.MarkStepCompleted(workflow.WorkflowID, "step-1", &axonflow.MarkStepCompletedRequest{
Output: map[string]interface{}{"code": result},
})
} else if gate.IsBlocked() {
client.AbortWorkflow(workflow.WorkflowID, gate.Reason)
}
client.CompleteWorkflow(workflow.WorkflowID)
TypeScript
import { AxonFlow } from "@axonflow/sdk";
const axonflow = new AxonFlow({
endpoint: "http://localhost:8080",
clientId: "my-workflow-app",
clientSecret: "your-secret",
});
// Create workflow
const workflow = await axonflow.createWorkflow({
workflowName: "code-review-pipeline",
source: "external",
totalSteps: 3,
});
// Check gate
const gate = await axonflow.stepGate(workflow.workflowId, "step-1", {
stepName: "Generate Code",
stepType: "llm_call",
model: "gpt-4",
provider: "openai",
});
if (gate.decision === "allow") {
// Execute step, then mark completed
const result = await executeStep();
await axonflow.markStepCompleted(workflow.workflowId, "step-1", {
output: { code: result },
});
} else if (gate.decision === "block") {
await axonflow.abortWorkflow(workflow.workflowId, gate.reason);
}
await axonflow.completeWorkflow(workflow.workflowId);
Java
AxonFlow client = AxonFlow.create(AxonFlowConfig.builder()
.endpoint("http://localhost:8080")
.clientId("my-workflow-app")
.clientSecret("your-secret")
.build());
// Create workflow
CreateWorkflowResponse workflow = client.createWorkflow(
CreateWorkflowRequest.builder()
.workflowName("code-review-pipeline")
.source(WorkflowSource.EXTERNAL)
.totalSteps(3)
.build()
);
// Check gate
StepGateResponse gate = client.stepGate(
workflow.getWorkflowId(),
"step-1",
StepGateRequest.builder()
.stepName("Generate Code")
.stepType(StepType.LLM_CALL)
.model("gpt-4")
.provider("openai")
.build()
);
if (gate.isAllowed()) {
// Execute step, then mark completed
Object result = executeStep();
client.markStepCompleted(
workflow.getWorkflowId(),
"step-1",
MarkStepCompletedRequest.builder()
.output(Map.of("code", result))
.build()
);
} else if (gate.isBlocked()) {
client.abortWorkflow(workflow.getWorkflowId(), gate.getReason());
}
client.completeWorkflow(workflow.getWorkflowId());
Gate Decisions
| Decision | Description | Action |
|---|---|---|
allow | Step is allowed to proceed | Execute the step |
block | Step is blocked by policy | Skip or abort workflow |
require_approval | Human approval required | Wait for approval (Enterprise) |
Step Types
| Type | Description | Example |
|---|---|---|
llm_call | LLM API call | OpenAI, Anthropic, Bedrock |
tool_call | Tool/function execution | Code execution, file operations |
connector_call | MCP connector call | Database, API integrations |
human_task | Human-in-the-loop task | Manual review, approval |
Workflow Sources
| Source | Description |
|---|---|
langgraph | LangGraph workflow |
langchain | LangChain workflow |
crewai | CrewAI workflow |
external | Other external orchestrator |
Policy Configuration
Create policies with scope: workflow to control step execution:
Block Specific Models
{
"name": "block-gpt4-in-workflows",
"scope": "workflow",
"conditions": {
"step_type": "llm_call",
"model": "gpt-4"
},
"action": "block",
"reason": "GPT-4 not allowed in production workflows"
}
Require Approval for Deployments
{
"name": "require-approval-for-deploy",
"scope": "workflow",
"conditions": {
"step_type": "connector_call",
"step_name": "deploy"
},
"action": "require_approval",
"reason": "Deployment steps require human approval"
}
Block PII in Step Inputs
{
"name": "block-pii-in-workflow-inputs",
"scope": "workflow",
"conditions": {
"step_input.contains_pii": true
},
"action": "block",
"reason": "PII detected in workflow step input"
}
API Reference
| Method | Endpoint | Description |
|---|---|---|
| POST | /api/v1/workflows | Create workflow |
| GET | /api/v1/workflows/{id} | Get workflow status |
| POST | /api/v1/workflows/{id}/steps/{step_id}/gate | Check step gate |
| POST | /api/v1/workflows/{id}/steps/{step_id}/complete | Mark step completed (optional output + usage metrics) |
| POST | /api/v1/workflows/{id}/complete | Complete workflow |
| POST | /api/v1/workflows/{id}/abort | Abort workflow |
| POST | /api/v1/workflows/{id}/fail | Fail workflow |
| POST | /api/v1/workflows/{id}/resume | Resume workflow |
| GET | /api/v1/workflows | List workflows |
Step Completion Payload
POST /api/v1/workflows/{id}/steps/{step_id}/complete accepts an optional JSON body. You can send:
output-- structured step output objecttokens_in-- actual input tokens consumedtokens_out-- actual output tokens producedcost_usd-- actual cost for the step
If omitted, the endpoint still succeeds and marks the step complete. The response is 204 No Content.
Audit Logging
Every workflow operation is automatically logged to the audit trail for compliance and debugging. All operations are recorded in the audit_logs table with the workflow ID as the request_id.
Operations Logged
| Operation | Request Type | Description |
|---|---|---|
| Create Workflow | workflow_created | Workflow registration with name, source, and total steps |
| Step Gate Check | workflow_step_gate | Policy evaluation with decision, evaluated policies, and matched policies |
| Mark Step Completed | workflow_step_completed | Step completion with optional output and post-execution metrics |
| Complete Workflow | workflow_completed | Workflow finishes successfully |
| Abort Workflow | workflow_aborted | Workflow cancelled with reason |
Querying Audit Logs
Use the SDK audit search methods to query workflow logs:
# Python SDK
from axonflow.types import AuditSearchRequest
from datetime import datetime, timedelta, timezone
# Get audit logs for a specific workflow
response = await client.search_audit_logs(
AuditSearchRequest(
start_time=datetime.now(timezone.utc) - timedelta(hours=1),
limit=100,
)
)
# Filter by workflow ID
for entry in response.entries:
if entry.request_id == workflow.workflow_id:
print(f"[{entry.timestamp}] {entry.request_type}")
// Go SDK
auditLogs, _ := client.SearchAuditLogs(ctx, &axonflow.AuditSearchRequest{
StartTime: &startTime,
Limit: 100,
})
for _, entry := range auditLogs.Entries {
if entry.RequestID == workflowID {
fmt.Printf("[%s] %s\n", entry.Timestamp, entry.RequestType)
}
}
Verifying Audit Logs
See the workflow-policy examples for complete working examples that demonstrate audit log verification after workflow operations.
For more details on audit logging, see Audit Logging.
Best Practices
1. Use Descriptive Step Names
# Good
await adapter.check_gate("generate_code", "llm_call")
await adapter.check_gate("review_code", "tool_call")
await adapter.check_gate("deploy_to_staging", "connector_call")
# Bad
await adapter.check_gate("step1", "llm_call")
await adapter.check_gate("step2", "tool_call")
2. Always Handle Block Decisions
gate = await client.step_gate(...)
if gate.is_blocked():
# Log the reason
logger.warning(f"Step blocked: {gate.reason}")
# Abort the workflow
await client.abort_workflow(workflow_id, gate.reason)
return
3. Use Context Manager for Cleanup
adapter = AxonFlowLangGraphAdapter(
client=client,
workflow_name="my-workflow",
source=WorkflowSource.LANGGRAPH,
)
async with adapter:
await adapter.start_workflow()
# If exception occurs, workflow is automatically aborted
# If successful, workflow is automatically completed
4. Include Relevant Metadata
workflow = await client.create_workflow(
CreateWorkflowRequest(
workflow_name="code-review-pipeline",
metadata={
"environment": "production",
"team": "engineering",
"triggered_by": "github-action"
}
)
)
Tracing Integration
The trace_id field lets you correlate AxonFlow workflows with traces from external observability tools like Langsmith, Datadog, or OpenTelemetry.
Setting trace_id
Pass trace_id when creating a workflow:
curl -X POST http://localhost:8080/api/v1/workflows \
-H "Content-Type: application/json" \
-d '{
"workflow_name": "code-review-pipeline",
"source": "langgraph",
"trace_id": "langsmith-run-abc123"
}'
The trace_id is returned in the create response and preserved in all subsequent status queries.
SDK Usage
# Python
workflow = await client.create_workflow(
CreateWorkflowRequest(
workflow_name="research-agent",
source=WorkflowSource.LANGGRAPH,
trace_id="langsmith-run-abc123", # External trace correlation
)
)
# trace_id is available on the response
print(workflow.trace_id) # "langsmith-run-abc123"
# LangGraph adapter
async with adapter:
await adapter.start_workflow(trace_id="datadog-trace-xyz")
// Go
workflow, _ := client.CreateWorkflow(axonflow.CreateWorkflowRequest{
WorkflowName: "research-agent",
Source: axonflow.WorkflowSourceLangGraph,
TraceID: "otel-trace-456",
})
Filtering by trace_id
List workflows by trace_id to find all executions associated with a specific external trace:
curl "http://localhost:8080/api/v1/workflows?trace_id=langsmith-run-abc123"
workflows = await client.list_workflows(
ListWorkflowsOptions(trace_id="langsmith-run-abc123")
)
Per-Tool Governance
When a LangGraph tools node invokes multiple individual tools, each tool can be governed independently using ToolContext. This provides granular policy control — for example, allowing web_search but blocking code_executor within the same tools node.
How It Works
Instead of a single gate check for the entire tools node, you check each tool individually:
from axonflow.workflow import ToolContext, StepGateRequest, StepType
# Check gate with tool context
gate = await client.step_gate(
workflow_id=workflow.workflow_id,
step_id="step-tools-web_search",
request=StepGateRequest(
step_name="tools/web_search",
step_type=StepType.TOOL_CALL,
tool_context=ToolContext(
tool_name="web_search",
tool_type="function",
tool_input={"query": "latest AI research"},
),
),
)
The policy adapter propagates tool_name, tool_type, and tool_input.* keys into the policy evaluation context, enabling tool-aware rules.
LangGraph Adapter (Recommended)
The Python LangGraph adapter provides convenience methods for per-tool governance:
from axonflow import AxonFlow
from axonflow.adapters import AxonFlowLangGraphAdapter
from axonflow.workflow import WorkflowSource
async with AxonFlow(endpoint="http://localhost:8080") as client:
adapter = AxonFlowLangGraphAdapter(
client=client,
workflow_name="research-agent",
source=WorkflowSource.LANGGRAPH,
)
async with adapter:
await adapter.start_workflow(trace_id="langsmith-run-abc123")
# Standard LLM node gate
if await adapter.check_gate("plan_research", "llm_call", model="gpt-4"):
result = await plan_research(state)
await adapter.step_completed("plan_research", output=result)
# Per-tool governance within a tools node
if await adapter.check_tool_gate("web_search", "function",
tool_input={"query": "latest news"}):
search_result = await web_search(query="latest news")
await adapter.tool_completed("web_search", output=search_result)
if await adapter.check_tool_gate("sql_query", "mcp",
tool_input={"query": "SELECT * FROM users LIMIT 10"}):
db_result = await sql_query("SELECT * FROM users LIMIT 10")
await adapter.tool_completed("sql_query", output=db_result)
Raw HTTP
curl -X POST http://localhost:8080/api/v1/workflows/$WF_ID/steps/step-tools-web_search/gate \
-H "Content-Type: application/json" \
-d '{
"step_name": "tools/web_search",
"step_type": "tool_call",
"tool_context": {
"tool_name": "web_search",
"tool_type": "function",
"tool_input": {"query": "latest news"}
}
}'
Policy Examples
With ToolContext, you can write policies that target specific tools:
{
"name": "block-code-executor",
"scope": "workflow",
"conditions": {
"tool_name": "code_executor"
},
"action": "block",
"reason": "Code execution not allowed in production"
}
{
"name": "block-dangerous-sql",
"scope": "workflow",
"conditions": {
"tool_name": "sql_query",
"tool_input.query": { "contains": "DROP TABLE" }
},
"action": "block",
"reason": "Destructive SQL operations blocked"
}
Phase 1 Scope
Per-tool governance is currently in Phase 1 (context enrichment). ToolContext is optional and fully backward compatible. Future Phase 2 will add dedicated tool_call_policy types with tool name/type matching, per-tool rate limits, and tool allowlists/blocklists.
External Orchestrator Integration
The Workflow Control Plane works with any external orchestration framework. Below are integration patterns for popular tools beyond LangChain and LangGraph.
Temporal
Add step gate checks inside your Temporal workflow activities:
from temporalio import activity
from axonflow import AxonFlow
from axonflow.workflow import MarkStepCompletedRequest, StepGateRequest, StepType
@activity.defn
async def governed_llm_call(workflow_id: str, step_name: str, prompt: str) -> str:
async with AxonFlow(endpoint="http://localhost:8080") as client:
gate = await client.step_gate(
workflow_id=workflow_id,
step_id=step_name,
request=StepGateRequest(
step_name=step_name,
step_type=StepType.LLM_CALL,
model="gpt-4",
provider="openai",
),
)
if gate.is_blocked():
raise ApplicationError(f"Blocked by policy: {gate.reason}")
result = await call_llm(prompt)
await client.mark_step_completed(
workflow_id=workflow_id,
step_id=step_name,
request=MarkStepCompletedRequest(
output={"result": result},
tokens_in=150,
tokens_out=45,
cost_usd=0.0023,
),
)
return result
Apache Airflow
Use a custom Airflow operator or add gate checks in your PythonOperator callables:
from airflow.decorators import task
@task
def governed_step(workflow_id: str, step_name: str, **kwargs):
import asyncio
from axonflow import AxonFlow
from axonflow.workflow import MarkStepCompletedRequest, StepGateRequest, StepType
async def run():
async with AxonFlow(endpoint="http://localhost:8080") as client:
gate = await client.step_gate(
workflow_id=workflow_id,
step_id=step_name,
request=StepGateRequest(step_name=step_name, step_type=StepType.LLM_CALL),
)
if gate.is_blocked():
raise Exception(f"Blocked: {gate.reason}")
# Execute your task logic here
result = do_work()
await client.mark_step_completed(
workflow_id=workflow_id,
step_id=step_name,
request=MarkStepCompletedRequest(
output={"result": result},
tokens_in=150,
tokens_out=45,
cost_usd=0.0023,
),
)
return result
return asyncio.run(run())
WCP API Endpoints
| Method | Endpoint | Description |
|---|---|---|
| POST | /api/v1/workflows | Create a new workflow session |
| GET | /api/v1/workflows/{id} | Get workflow status and metadata |
| POST | /api/v1/workflows/{id}/steps/{step_id}/gate | Check policy gate before step execution |
| POST | /api/v1/workflows/{id}/steps/{step_id}/complete | Mark a step as completed with optional output and usage metrics |
| POST | /api/v1/workflows/{id}/complete | Mark the entire workflow as completed |
| POST | /api/v1/workflows/{id}/abort | Abort the workflow with a reason |
| POST | /api/v1/workflows/{id}/fail | Fail the workflow with an optional reason |
| POST | /api/v1/workflows/{id}/resume | Resume a paused workflow (after approval) |
| GET | /api/v1/workflows | List workflows with optional filters |
Fail Workflow
failWorkflow() terminates a workflow as failed with an optional reason. Unlike abortWorkflow(), which indicates a manual cancellation (e.g., due to a policy block), failWorkflow() indicates an error condition -- the workflow encountered an unrecoverable problem during execution.
After a workflow is failed, its status becomes failed and it cannot be resumed.
API Endpoint
POST /api/v1/workflows/{id}/fail
Request Body:
{
"reason": "optional failure reason"
}
The reason field is optional. If omitted, the workflow is failed without a specific reason.
Response:
{
"workflow_id": "wf_abc123",
"status": "failed",
"reason": "optional failure reason"
}
SDK Examples
Go
err := client.FailWorkflow(workflowID, "pipeline error: step 3 timed out")
if err != nil {
log.Fatal(err)
}
Python
await client.fail_workflow(workflow_id, reason="pipeline error: step 3 timed out")
TypeScript
await client.failWorkflow(workflowId, "pipeline error: step 3 timed out");
Java
client.failWorkflow(workflowId, "pipeline error: step 3 timed out");
Community vs Enterprise
| Feature | Community | Enterprise |
|---|---|---|
| Step gates (allow/block) | Yes | Yes |
| Policy evaluation | Yes | Yes |
| SDK support (4 languages) | Yes | Yes |
| LangGraph adapter | Yes | Yes |
require_approval action | Returns decision | Routes to Portal HITL |
| Org-level policies | No | Yes |
| Cross-workflow analytics | No | Yes |
Troubleshooting
Gate Returns "allow" When Expected to Block
- Check if the policy exists and is enabled
- Verify the policy scope is
workflow - Check if conditions match the step request
Workflow Stuck in "in_progress"
- Ensure you call
complete_workflow()orabort_workflow() - Check for unhandled exceptions in your code
- Use the context manager for automatic cleanup
Connection Refused
- Ensure AxonFlow Agent is running:
docker compose ps - Check the endpoint URL matches your configuration
- Verify network connectivity
Examples
See the complete examples in examples/workflow-control/:
http/workflow-control.sh- HTTP/curl examplego/main.go- Go SDK examplepython/main.py- Python SDK examplepython/langgraph_example.py- LangGraph adapter examplepython/langgraph_tools_example.py- Per-tool governance exampletypescript/index.ts- TypeScript SDK examplejava/WorkflowControl.java- Java SDK example
Related
- Choosing a Mode - Compare Proxy Mode, Gateway Mode, and Workflow Control Plane
- Community vs Enterprise - Feature availability by edition
- Multi-Agent Planning - AxonFlow's native orchestration layer