Skip to main content

Workflow Control Plane

"LangChain runs the workflow. AxonFlow decides when it's allowed to move forward."

The Workflow Control Plane provides governance gates for external orchestrators like LangChain, LangGraph, and CrewAI. Instead of modifying your orchestrator's code, you simply add checkpoint calls to AxonFlow before each step executes.

Overview

External orchestrators (LangChain, LangGraph, CrewAI) are great at workflow execution, but enterprises need governance controls. The Workflow Control Plane solves this by providing:

  1. Step Gates - Policy checkpoints before each workflow step
  2. Decision Types - Allow, block, or require approval
  3. Policy Integration - Reuses AxonFlow's policy engine
  4. Audit Trail - Every step decision is recorded

How It Works

Key Point: Your orchestrator runs the workflow. AxonFlow provides governance gates at each step transition.

Quick Start

1. Start AxonFlow

docker compose up -d

2. Create a Workflow

curl -X POST http://localhost:8080/api/v1/workflows \
-H "Content-Type: application/json" \
-d '{
"workflow_name": "code-review-pipeline",
"source": "langgraph"
}'

Note: total_steps is optional. Omit it — the server automatically sets it to the actual step count when the workflow reaches a terminal state (completed, aborted, or failed). This supports dynamic workflows like LangGraph where the number of steps is not known upfront.

Response:

{
"workflow_id": "wf_abc123",
"workflow_name": "code-review-pipeline",
"status": "in_progress"
}

3. Check Step Gate

Before executing each step, check if it's allowed:

curl -X POST http://localhost:8080/api/v1/workflows/wf_abc123/steps/step-1/gate \
-H "Content-Type: application/json" \
-d '{
"step_name": "Generate Code",
"step_type": "llm_call",
"model": "gpt-4",
"provider": "openai"
}'

Response (allowed):

{
"decision": "allow",
"step_id": "step-1"
}

Response (blocked):

{
"decision": "block",
"step_id": "step-1",
"reason": "GPT-4 not allowed in production",
"policy_ids": ["policy_gpt4_block"]
}

4. Complete Workflow

curl -X POST http://localhost:8080/api/v1/workflows/wf_abc123/complete

SDK Integration

Python

from axonflow import AxonFlow
from axonflow.workflow import (
CreateWorkflowRequest,
StepGateRequest,
MarkStepCompletedRequest,
WorkflowSource,
StepType,
)

async with AxonFlow(
endpoint="http://localhost:8080",
client_id="my-workflow-app",
client_secret="your-secret",
) as client:
# Create workflow (total_steps is optional — auto-finalized at terminal state)
workflow = await client.create_workflow(
CreateWorkflowRequest(
workflow_name="code-review-pipeline",
source=WorkflowSource.EXTERNAL,
)
)

# Check gate before each step
gate = await client.step_gate(
workflow_id=workflow.workflow_id,
step_id="step-1",
request=StepGateRequest(
step_name="Generate Code",
step_type=StepType.LLM_CALL,
model="gpt-4",
provider="openai",
),
)

if gate.is_allowed():
# Execute your step, then mark completed
result = execute_step()
await client.mark_step_completed(
workflow_id=workflow.workflow_id,
step_id="step-1",
request=MarkStepCompletedRequest(output={"code": result}),
)
elif gate.is_blocked():
print(f"Blocked: {gate.reason}")
await client.abort_workflow(workflow.workflow_id, gate.reason)

# Complete workflow
await client.complete_workflow(workflow.workflow_id)

LangGraph Adapter

For LangGraph workflows, use the specialized adapter with automatic cleanup:

from axonflow import AxonFlow
from axonflow.adapters import AxonFlowLangGraphAdapter, WorkflowBlockedError
from axonflow.workflow import WorkflowSource

async with AxonFlow(
endpoint="http://localhost:8080",
client_id="my-langgraph-app",
client_secret="your-secret",
) as client:
# Create adapter with auto_block=True (raises exception on block)
adapter = AxonFlowLangGraphAdapter(
client=client,
workflow_name="my-langgraph-workflow",
source=WorkflowSource.LANGGRAPH,
auto_block=True, # Raises WorkflowBlockedError on block
)

try:
# Use context manager for automatic cleanup
async with adapter:
await adapter.start_workflow()

# Before each LangGraph node
if await adapter.check_gate(
step_name="generate_code",
step_type="llm_call",
model="gpt-4",
provider="openai",
):
result = await generate_code(state)
await adapter.step_completed(step_name="generate_code", output=result)

# Workflow completes automatically when context manager exits

except WorkflowBlockedError as e:
print(f"Blocked: {e.reason}")
# Workflow is automatically aborted by context manager

Go

client := axonflow.NewClient(axonflow.AxonFlowConfig{
Endpoint: "http://localhost:8080",
ClientID: "my-workflow-app",
ClientSecret: "your-secret",
})

// Create workflow
workflow, err := client.CreateWorkflow(axonflow.CreateWorkflowRequest{
WorkflowName: "code-review-pipeline",
Source: axonflow.WorkflowSourceExternal,
TotalSteps: 3,
})
if err != nil {
log.Fatal(err)
}

// Check gate
gate, err := client.StepGate(workflow.WorkflowID, "step-1", axonflow.StepGateRequest{
StepName: "Generate Code",
StepType: axonflow.StepTypeLLMCall,
Model: "gpt-4",
Provider: "openai",
})
if err != nil {
log.Fatal(err)
}

if gate.IsAllowed() {
// Execute step, then mark completed
result := executeStep()
client.MarkStepCompleted(workflow.WorkflowID, "step-1", &axonflow.MarkStepCompletedRequest{
Output: map[string]interface{}{"code": result},
})
} else if gate.IsBlocked() {
client.AbortWorkflow(workflow.WorkflowID, gate.Reason)
}

client.CompleteWorkflow(workflow.WorkflowID)

TypeScript

import { AxonFlow } from "@axonflow/sdk";

const axonflow = new AxonFlow({
endpoint: "http://localhost:8080",
clientId: "my-workflow-app",
clientSecret: "your-secret",
});

// Create workflow
const workflow = await axonflow.createWorkflow({
workflowName: "code-review-pipeline",
source: "external",
totalSteps: 3,
});

// Check gate
const gate = await axonflow.stepGate(workflow.workflowId, "step-1", {
stepName: "Generate Code",
stepType: "llm_call",
model: "gpt-4",
provider: "openai",
});

if (gate.decision === "allow") {
// Execute step, then mark completed
const result = await executeStep();
await axonflow.markStepCompleted(workflow.workflowId, "step-1", {
output: { code: result },
});
} else if (gate.decision === "block") {
await axonflow.abortWorkflow(workflow.workflowId, gate.reason);
}

await axonflow.completeWorkflow(workflow.workflowId);

Java

AxonFlow client = AxonFlow.create(AxonFlowConfig.builder()
.endpoint("http://localhost:8080")
.clientId("my-workflow-app")
.clientSecret("your-secret")
.build());

// Create workflow
CreateWorkflowResponse workflow = client.createWorkflow(
CreateWorkflowRequest.builder()
.workflowName("code-review-pipeline")
.source(WorkflowSource.EXTERNAL)
.totalSteps(3)
.build()
);

// Check gate
StepGateResponse gate = client.stepGate(
workflow.getWorkflowId(),
"step-1",
StepGateRequest.builder()
.stepName("Generate Code")
.stepType(StepType.LLM_CALL)
.model("gpt-4")
.provider("openai")
.build()
);

if (gate.isAllowed()) {
// Execute step, then mark completed
Object result = executeStep();
client.markStepCompleted(
workflow.getWorkflowId(),
"step-1",
MarkStepCompletedRequest.builder()
.output(Map.of("code", result))
.build()
);
} else if (gate.isBlocked()) {
client.abortWorkflow(workflow.getWorkflowId(), gate.getReason());
}

client.completeWorkflow(workflow.getWorkflowId());

Gate Decisions

DecisionDescriptionAction
allowStep is allowed to proceedExecute the step
blockStep is blocked by policySkip or abort workflow
require_approvalHuman approval requiredWait for approval (Enterprise)

Step Types

TypeDescriptionExample
llm_callLLM API callOpenAI, Anthropic, Bedrock
tool_callTool/function executionCode execution, file operations
connector_callMCP connector callDatabase, API integrations
human_taskHuman-in-the-loop taskManual review, approval

Workflow Sources

SourceDescription
langgraphLangGraph workflow
langchainLangChain workflow
crewaiCrewAI workflow
externalOther external orchestrator

Policy Configuration

Create policies with scope: workflow to control step execution:

Block Specific Models

{
"name": "block-gpt4-in-workflows",
"scope": "workflow",
"conditions": {
"step_type": "llm_call",
"model": "gpt-4"
},
"action": "block",
"reason": "GPT-4 not allowed in production workflows"
}

Require Approval for Deployments

{
"name": "require-approval-for-deploy",
"scope": "workflow",
"conditions": {
"step_type": "connector_call",
"step_name": "deploy"
},
"action": "require_approval",
"reason": "Deployment steps require human approval"
}

Block PII in Step Inputs

{
"name": "block-pii-in-workflow-inputs",
"scope": "workflow",
"conditions": {
"step_input.contains_pii": true
},
"action": "block",
"reason": "PII detected in workflow step input"
}

API Reference

MethodEndpointDescription
POST/api/v1/workflowsCreate workflow
GET/api/v1/workflows/{id}Get workflow status
POST/api/v1/workflows/{id}/steps/{step_id}/gateCheck step gate
POST/api/v1/workflows/{id}/steps/{step_id}/completeMark step completed (optional output + usage metrics)
POST/api/v1/workflows/{id}/completeComplete workflow
POST/api/v1/workflows/{id}/abortAbort workflow
POST/api/v1/workflows/{id}/failFail workflow
POST/api/v1/workflows/{id}/resumeResume workflow
GET/api/v1/workflowsList workflows

Step Completion Payload

POST /api/v1/workflows/{id}/steps/{step_id}/complete accepts an optional JSON body. You can send:

  • output -- structured step output object
  • tokens_in -- actual input tokens consumed
  • tokens_out -- actual output tokens produced
  • cost_usd -- actual cost for the step

If omitted, the endpoint still succeeds and marks the step complete. The response is 204 No Content.

Audit Logging

Every workflow operation is automatically logged to the audit trail for compliance and debugging. All operations are recorded in the audit_logs table with the workflow ID as the request_id.

Operations Logged

OperationRequest TypeDescription
Create Workflowworkflow_createdWorkflow registration with name, source, and total steps
Step Gate Checkworkflow_step_gatePolicy evaluation with decision, evaluated policies, and matched policies
Mark Step Completedworkflow_step_completedStep completion with optional output and post-execution metrics
Complete Workflowworkflow_completedWorkflow finishes successfully
Abort Workflowworkflow_abortedWorkflow cancelled with reason

Querying Audit Logs

Use the SDK audit search methods to query workflow logs:

# Python SDK
from axonflow.types import AuditSearchRequest
from datetime import datetime, timedelta, timezone

# Get audit logs for a specific workflow
response = await client.search_audit_logs(
AuditSearchRequest(
start_time=datetime.now(timezone.utc) - timedelta(hours=1),
limit=100,
)
)

# Filter by workflow ID
for entry in response.entries:
if entry.request_id == workflow.workflow_id:
print(f"[{entry.timestamp}] {entry.request_type}")
// Go SDK
auditLogs, _ := client.SearchAuditLogs(ctx, &axonflow.AuditSearchRequest{
StartTime: &startTime,
Limit: 100,
})
for _, entry := range auditLogs.Entries {
if entry.RequestID == workflowID {
fmt.Printf("[%s] %s\n", entry.Timestamp, entry.RequestType)
}
}

Verifying Audit Logs

See the workflow-policy examples for complete working examples that demonstrate audit log verification after workflow operations.

For more details on audit logging, see Audit Logging.

Best Practices

1. Use Descriptive Step Names

# Good
await adapter.check_gate("generate_code", "llm_call")
await adapter.check_gate("review_code", "tool_call")
await adapter.check_gate("deploy_to_staging", "connector_call")

# Bad
await adapter.check_gate("step1", "llm_call")
await adapter.check_gate("step2", "tool_call")

2. Always Handle Block Decisions

gate = await client.step_gate(...)

if gate.is_blocked():
# Log the reason
logger.warning(f"Step blocked: {gate.reason}")
# Abort the workflow
await client.abort_workflow(workflow_id, gate.reason)
return

3. Use Context Manager for Cleanup

adapter = AxonFlowLangGraphAdapter(
client=client,
workflow_name="my-workflow",
source=WorkflowSource.LANGGRAPH,
)

async with adapter:
await adapter.start_workflow()
# If exception occurs, workflow is automatically aborted
# If successful, workflow is automatically completed

4. Include Relevant Metadata

workflow = await client.create_workflow(
CreateWorkflowRequest(
workflow_name="code-review-pipeline",
metadata={
"environment": "production",
"team": "engineering",
"triggered_by": "github-action"
}
)
)

Tracing Integration

The trace_id field lets you correlate AxonFlow workflows with traces from external observability tools like Langsmith, Datadog, or OpenTelemetry.

Setting trace_id

Pass trace_id when creating a workflow:

curl -X POST http://localhost:8080/api/v1/workflows \
-H "Content-Type: application/json" \
-d '{
"workflow_name": "code-review-pipeline",
"source": "langgraph",
"trace_id": "langsmith-run-abc123"
}'

The trace_id is returned in the create response and preserved in all subsequent status queries.

SDK Usage

# Python
workflow = await client.create_workflow(
CreateWorkflowRequest(
workflow_name="research-agent",
source=WorkflowSource.LANGGRAPH,
trace_id="langsmith-run-abc123", # External trace correlation
)
)
# trace_id is available on the response
print(workflow.trace_id) # "langsmith-run-abc123"

# LangGraph adapter
async with adapter:
await adapter.start_workflow(trace_id="datadog-trace-xyz")
// Go
workflow, _ := client.CreateWorkflow(axonflow.CreateWorkflowRequest{
WorkflowName: "research-agent",
Source: axonflow.WorkflowSourceLangGraph,
TraceID: "otel-trace-456",
})

Filtering by trace_id

List workflows by trace_id to find all executions associated with a specific external trace:

curl "http://localhost:8080/api/v1/workflows?trace_id=langsmith-run-abc123"
workflows = await client.list_workflows(
ListWorkflowsOptions(trace_id="langsmith-run-abc123")
)

Per-Tool Governance

When a LangGraph tools node invokes multiple individual tools, each tool can be governed independently using ToolContext. This provides granular policy control — for example, allowing web_search but blocking code_executor within the same tools node.

How It Works

Instead of a single gate check for the entire tools node, you check each tool individually:

from axonflow.workflow import ToolContext, StepGateRequest, StepType

# Check gate with tool context
gate = await client.step_gate(
workflow_id=workflow.workflow_id,
step_id="step-tools-web_search",
request=StepGateRequest(
step_name="tools/web_search",
step_type=StepType.TOOL_CALL,
tool_context=ToolContext(
tool_name="web_search",
tool_type="function",
tool_input={"query": "latest AI research"},
),
),
)

The policy adapter propagates tool_name, tool_type, and tool_input.* keys into the policy evaluation context, enabling tool-aware rules.

The Python LangGraph adapter provides convenience methods for per-tool governance:

from axonflow import AxonFlow
from axonflow.adapters import AxonFlowLangGraphAdapter
from axonflow.workflow import WorkflowSource

async with AxonFlow(endpoint="http://localhost:8080") as client:
adapter = AxonFlowLangGraphAdapter(
client=client,
workflow_name="research-agent",
source=WorkflowSource.LANGGRAPH,
)

async with adapter:
await adapter.start_workflow(trace_id="langsmith-run-abc123")

# Standard LLM node gate
if await adapter.check_gate("plan_research", "llm_call", model="gpt-4"):
result = await plan_research(state)
await adapter.step_completed("plan_research", output=result)

# Per-tool governance within a tools node
if await adapter.check_tool_gate("web_search", "function",
tool_input={"query": "latest news"}):
search_result = await web_search(query="latest news")
await adapter.tool_completed("web_search", output=search_result)

if await adapter.check_tool_gate("sql_query", "mcp",
tool_input={"query": "SELECT * FROM users LIMIT 10"}):
db_result = await sql_query("SELECT * FROM users LIMIT 10")
await adapter.tool_completed("sql_query", output=db_result)

Raw HTTP

curl -X POST http://localhost:8080/api/v1/workflows/$WF_ID/steps/step-tools-web_search/gate \
-H "Content-Type: application/json" \
-d '{
"step_name": "tools/web_search",
"step_type": "tool_call",
"tool_context": {
"tool_name": "web_search",
"tool_type": "function",
"tool_input": {"query": "latest news"}
}
}'

Policy Examples

With ToolContext, you can write policies that target specific tools:

{
"name": "block-code-executor",
"scope": "workflow",
"conditions": {
"tool_name": "code_executor"
},
"action": "block",
"reason": "Code execution not allowed in production"
}
{
"name": "block-dangerous-sql",
"scope": "workflow",
"conditions": {
"tool_name": "sql_query",
"tool_input.query": { "contains": "DROP TABLE" }
},
"action": "block",
"reason": "Destructive SQL operations blocked"
}

Phase 1 Scope

Per-tool governance is currently in Phase 1 (context enrichment). ToolContext is optional and fully backward compatible. Future Phase 2 will add dedicated tool_call_policy types with tool name/type matching, per-tool rate limits, and tool allowlists/blocklists.

External Orchestrator Integration

The Workflow Control Plane works with any external orchestration framework. Below are integration patterns for popular tools beyond LangChain and LangGraph.

Temporal

Add step gate checks inside your Temporal workflow activities:

from temporalio import activity
from axonflow import AxonFlow
from axonflow.workflow import MarkStepCompletedRequest, StepGateRequest, StepType

@activity.defn
async def governed_llm_call(workflow_id: str, step_name: str, prompt: str) -> str:
async with AxonFlow(endpoint="http://localhost:8080") as client:
gate = await client.step_gate(
workflow_id=workflow_id,
step_id=step_name,
request=StepGateRequest(
step_name=step_name,
step_type=StepType.LLM_CALL,
model="gpt-4",
provider="openai",
),
)
if gate.is_blocked():
raise ApplicationError(f"Blocked by policy: {gate.reason}")

result = await call_llm(prompt)
await client.mark_step_completed(
workflow_id=workflow_id,
step_id=step_name,
request=MarkStepCompletedRequest(
output={"result": result},
tokens_in=150,
tokens_out=45,
cost_usd=0.0023,
),
)
return result

Apache Airflow

Use a custom Airflow operator or add gate checks in your PythonOperator callables:

from airflow.decorators import task

@task
def governed_step(workflow_id: str, step_name: str, **kwargs):
import asyncio
from axonflow import AxonFlow
from axonflow.workflow import MarkStepCompletedRequest, StepGateRequest, StepType

async def run():
async with AxonFlow(endpoint="http://localhost:8080") as client:
gate = await client.step_gate(
workflow_id=workflow_id,
step_id=step_name,
request=StepGateRequest(step_name=step_name, step_type=StepType.LLM_CALL),
)
if gate.is_blocked():
raise Exception(f"Blocked: {gate.reason}")
# Execute your task logic here
result = do_work()
await client.mark_step_completed(
workflow_id=workflow_id,
step_id=step_name,
request=MarkStepCompletedRequest(
output={"result": result},
tokens_in=150,
tokens_out=45,
cost_usd=0.0023,
),
)
return result

return asyncio.run(run())

WCP API Endpoints

MethodEndpointDescription
POST/api/v1/workflowsCreate a new workflow session
GET/api/v1/workflows/{id}Get workflow status and metadata
POST/api/v1/workflows/{id}/steps/{step_id}/gateCheck policy gate before step execution
POST/api/v1/workflows/{id}/steps/{step_id}/completeMark a step as completed with optional output and usage metrics
POST/api/v1/workflows/{id}/completeMark the entire workflow as completed
POST/api/v1/workflows/{id}/abortAbort the workflow with a reason
POST/api/v1/workflows/{id}/failFail the workflow with an optional reason
POST/api/v1/workflows/{id}/resumeResume a paused workflow (after approval)
GET/api/v1/workflowsList workflows with optional filters

Fail Workflow

failWorkflow() terminates a workflow as failed with an optional reason. Unlike abortWorkflow(), which indicates a manual cancellation (e.g., due to a policy block), failWorkflow() indicates an error condition -- the workflow encountered an unrecoverable problem during execution.

After a workflow is failed, its status becomes failed and it cannot be resumed.

API Endpoint

POST /api/v1/workflows/{id}/fail

Request Body:

{
"reason": "optional failure reason"
}

The reason field is optional. If omitted, the workflow is failed without a specific reason.

Response:

{
"workflow_id": "wf_abc123",
"status": "failed",
"reason": "optional failure reason"
}

SDK Examples

Go

err := client.FailWorkflow(workflowID, "pipeline error: step 3 timed out")
if err != nil {
log.Fatal(err)
}

Python

await client.fail_workflow(workflow_id, reason="pipeline error: step 3 timed out")

TypeScript

await client.failWorkflow(workflowId, "pipeline error: step 3 timed out");

Java

client.failWorkflow(workflowId, "pipeline error: step 3 timed out");

Community vs Enterprise

FeatureCommunityEnterprise
Step gates (allow/block)YesYes
Policy evaluationYesYes
SDK support (4 languages)YesYes
LangGraph adapterYesYes
require_approval actionReturns decisionRoutes to Portal HITL
Org-level policiesNoYes
Cross-workflow analyticsNoYes

Troubleshooting

Gate Returns "allow" When Expected to Block

  1. Check if the policy exists and is enabled
  2. Verify the policy scope is workflow
  3. Check if conditions match the step request

Workflow Stuck in "in_progress"

  1. Ensure you call complete_workflow() or abort_workflow()
  2. Check for unhandled exceptions in your code
  3. Use the context manager for automatic cleanup

Connection Refused

  1. Ensure AxonFlow Agent is running: docker compose ps
  2. Check the endpoint URL matches your configuration
  3. Verify network connectivity

Examples

See the complete examples in examples/workflow-control/:

  • http/workflow-control.sh - HTTP/curl example
  • go/main.go - Go SDK example
  • python/main.py - Python SDK example
  • python/langgraph_example.py - LangGraph adapter example
  • python/langgraph_tools_example.py - Per-tool governance example
  • typescript/index.ts - TypeScript SDK example
  • java/WorkflowControl.java - Java SDK example