Skip to main content

LangGraph Wrapper

wrap_langgraph() is the simplest way to add AxonFlow governance to a LangGraph StateGraph. It wraps your compiled graph so that every node transition passes through a step gate, every tool call is individually governed, and every LLM invocation is audited. No adapter boilerplate required.

from axonflow.adapters import wrap_langgraph

governed = wrap_langgraph(graph, client, workflow_name="research-agent")
result = await governed.ainvoke({"query": "Summarize recent earnings"})

Before / After

Before: Manual Adapter

With the manual AxonFlowLangGraphAdapter, you write gate checks and audit calls around every node:

from axonflow import AxonFlow
from axonflow.adapters import AxonFlowLangGraphAdapter
from axonflow.workflow import WorkflowSource

async with AxonFlow(endpoint="http://localhost:8080") as client:
adapter = AxonFlowLangGraphAdapter(
client=client,
workflow_name="research-agent",
source=WorkflowSource.LANGGRAPH,
)

async with adapter:
await adapter.start_workflow()

if await adapter.check_gate("retrieve", "llm_call", model="gpt-4"):
result = await retrieve_node(state)
await adapter.step_completed("retrieve", output=result)

if await adapter.check_gate("summarize", "llm_call", model="gpt-4"):
result = await summarize_node(state)
await adapter.step_completed("summarize", output=result)

await adapter.complete_workflow()

This works, but it means rewriting your graph execution loop. Every new node needs manual gate and audit calls.

After: wrap_langgraph

With wrap_langgraph(), your existing compiled graph is wrapped in a single call:

from axonflow import AxonFlow
from axonflow.adapters import wrap_langgraph

async with AxonFlow(endpoint="http://localhost:8080") as client:
governed = wrap_langgraph(graph, client, workflow_name="research-agent")
result = await governed.ainvoke({"query": "Summarize recent earnings"})

The wrapper automatically:

  • Creates a workflow at the start of each invocation
  • Checks the step gate before every node
  • Audits every node completion with output and timing
  • Marks the workflow as complete (or failed) when the graph finishes
  • Supports ainvoke(), invoke(), and astream()

API Reference

wrap_langgraph()

from axonflow.adapters import wrap_langgraph

governed = wrap_langgraph(
graph, # Compiled LangGraph StateGraph
client, # AxonFlow client instance
workflow_name="my-workflow", # Workflow name for tracking
source=WorkflowSource.LANGGRAPH, # Workflow source (default: LANGGRAPH)
node_config=None, # Optional per-node configuration dict
govern_tools=True, # Per-tool governance (default: True)
trace_id=None, # Optional trace ID for correlation
)

Parameters:

ParameterTypeRequiredDefaultDescription
graphCompiledStateGraphYes--A compiled LangGraph graph (the return value of graph.compile())
clientAxonFlowYes--An initialized AxonFlow client instance
workflow_namestrYes--Name used to identify this workflow in the AxonFlow dashboard and audit logs
sourceWorkflowSourceNoWorkflowSource.LANGGRAPHWorkflow source identifier
node_configdict[str, NodeConfig]NoNonePer-node configuration overrides. Keys are node names. See NodeConfig Options
govern_toolsboolNoTrueWhen True, individual tool calls within nodes are governed via check_tool_gate() / tool_completed(). Set to False to disable per-tool gates. See Per-Tool Governance
trace_idstr | NoneNoNoneCorrelation ID passed through to the workflow. Useful for linking to LangSmith traces

Returns: A GovernedGraph instance.


NodeConfig Options

NodeConfig lets you override behavior for specific nodes. Pass a dictionary mapping node names to NodeConfig instances via the node_config parameter.

from axonflow.adapters import wrap_langgraph, NodeConfig

governed = wrap_langgraph(
graph, client,
workflow_name="research-agent",
node_config={
"retrieve": NodeConfig(step_type="tool_call", model="gpt-4"),
"summarize": NodeConfig(step_type="llm_call", model="gpt-4.1-nano"),
"format_output": NodeConfig(skip=True),
},
)
OptionTypeDefaultDescription
step_typestr | NoneNone (defaults to "tool_call")The step type passed to the step gate. Common values: "llm_call", "tool_call", "connector_call"
modelstr | NoneNoneLLM model identifier included in gate checks and audit records
providerstr | NoneNoneLLM provider identifier included in gate checks and audit records
skipboolFalseSkip governance entirely for this node. No gate check, no completion tracking. The node executes as if unwrapped

GovernedGraph Methods

The GovernedGraph returned by wrap_langgraph() provides three execution methods that mirror the standard LangGraph compiled graph interface.

ainvoke()

Asynchronous invocation. This is the recommended method for most use cases.

governed = wrap_langgraph(graph, client, workflow_name="research-agent")

result = await governed.ainvoke(
{"query": "What are the latest earnings?"},
config={"configurable": {"thread_id": "user-123"}},
)

Signature:

async def ainvoke(
self,
input: dict,
config: Optional[RunnableConfig] = None,
**kwargs,
) -> dict
ParameterTypeDescription
inputdictThe initial state passed to the graph
configRunnableConfig | NoneLangGraph runnable config (thread IDs, callbacks, etc.)
**kwargs--Additional keyword arguments forwarded to the underlying graph

Returns: The final graph state as a dictionary.

Raises:

  • WorkflowBlockedError if a step gate blocks a node and no fallback is configured
  • WorkflowApprovalRequiredError if a step gate returns a pending HITL approval

invoke()

Synchronous invocation. Use this in non-async contexts.

governed = wrap_langgraph(graph, client, workflow_name="research-agent")

result = governed.invoke({"query": "Summarize Q4 results"})

Signature:

def invoke(
self,
input: dict,
config: Optional[RunnableConfig] = None,
**kwargs,
) -> dict

Parameters and return value are the same as ainvoke().

astream()

Asynchronous streaming. Yields state updates as the graph executes, with governance applied at each node transition.

governed = wrap_langgraph(graph, client, workflow_name="research-agent")

async for event in governed.astream(
{"query": "Research AI governance trends"},
stream_mode="updates",
):
node_name = list(event.keys())[0]
print(f"Node {node_name} completed: {event[node_name]}")

Signature:

async def astream(
self,
input: dict,
config: Optional[RunnableConfig] = None,
*,
stream_mode: str = "values",
**kwargs,
) -> AsyncIterator[dict]
ParameterTypeDescription
inputdictThe initial state passed to the graph
configRunnableConfig | NoneLangGraph runnable config
stream_modestrLangGraph stream mode: "values", "updates", or "debug"
**kwargs--Additional keyword arguments forwarded to the underlying graph

Yields: State dictionaries according to the chosen stream_mode.


Per-Tool Governance with govern_tools

When govern_tools=True, the wrapper automatically intercepts individual tool calls within tools nodes and checks each one against the step gate using check_tool_gate(). This means you get per-tool policy enforcement without writing any tool-level gate code.

governed = wrap_langgraph(
graph, client,
workflow_name="research-agent",
govern_tools=True,
)

# Each tool call within the graph's tools nodes will be individually
# gate-checked before execution. If a tool is blocked by policy,
# it is skipped and the block reason is recorded in the audit trail.
result = await governed.ainvoke({"query": "Search for earnings data"})

How It Works

  1. The wrapper detects tools nodes in the graph (nodes that invoke LangChain tools).
  2. Before each tool executes, the wrapper calls check_tool_gate() with the tool's name, type, and input.
  3. If the gate allows the tool, it executes normally and tool_completed() is called.
  4. If the gate blocks the tool, the tool is skipped. The block reason is recorded in the audit trail and the graph continues with the remaining tools.

Combining with NodeConfig

You can combine govern_tools=True with node_config for fine-grained control:

governed = wrap_langgraph(
graph, client,
workflow_name="research-agent",
govern_tools=True,
node_config={
"tools": NodeConfig(step_type="tool_call"),
"summarize": NodeConfig(step_type="llm_call", model="gpt-4"),
},
)

For more details on per-tool governance and the underlying ToolContext mechanism, see Per-Tool Governance.


Error Handling

The wrapper raises specific exceptions when governance blocks execution.

WorkflowBlockedError

Raised when a step gate blocks a node and no fallback is available.

from axonflow.adapters.langgraph import WorkflowBlockedError

try:
result = await governed.ainvoke({"query": "Access restricted data"})
except WorkflowBlockedError as e:
print(f"Blocked at step '{e.step_id}': {e.reason}")
print(f"Policies: {e.policy_ids}")
AttributeTypeDescription
step_idstr | NoneThe step ID that was blocked
reasonstr | NoneHuman-readable reason from the policy engine
policy_idslist[str]IDs of the policies that triggered the block

WorkflowApprovalRequiredError

Raised when a step gate returns a pending HITL approval. The workflow is paused and can be resumed after approval.

from axonflow.adapters.langgraph import WorkflowApprovalRequiredError

try:
result = await governed.ainvoke({"query": "Execute trade order"})
except WorkflowApprovalRequiredError as e:
print(f"Approval required at step '{e.step_id}': {e.reason}")
print(f"Approval URL: {e.approval_url}")
# The workflow is NOT aborted — it stays resumable.
# Approve via the HITL API, then retry the invocation.
AttributeTypeDescription
step_idstr | NoneThe step awaiting approval
approval_urlstr | NoneURL to the approval portal
reasonstr | NoneHuman-readable reason for the approval requirement

General Error Handling Pattern

from axonflow.adapters.langgraph import (
WorkflowBlockedError,
WorkflowApprovalRequiredError,
)

try:
result = await governed.ainvoke({"query": user_input})
except WorkflowBlockedError as e:
# Policy blocked a node — return a safe response to the user
return {"error": f"Request blocked: {e.reason}", "policies": e.policy_ids}
except WorkflowApprovalRequiredError as e:
# HITL approval needed — workflow stays resumable
return {"pending": True, "step_id": e.step_id, "approval_url": e.approval_url}
except Exception as e:
# AxonFlow service error or graph error
logger.error(f"Governance error: {e}")
raise

Migration from AxonFlowLangGraphAdapter

If you are currently using AxonFlowLangGraphAdapter with manual gate checks, migrating to wrap_langgraph() simplifies your code significantly.

Step 1: Remove adapter boilerplate

Before:

from axonflow.adapters import AxonFlowLangGraphAdapter
from axonflow.workflow import WorkflowSource

adapter = AxonFlowLangGraphAdapter(
client=client,
workflow_name="research-agent",
source=WorkflowSource.LANGGRAPH,
)

async with adapter:
await adapter.start_workflow()

# Manual gate checks for each node...
if await adapter.check_gate("retrieve", "tool_call"):
result = await retrieve(state)
await adapter.step_completed("retrieve", output=result)

if await adapter.check_gate("summarize", "llm_call", model="gpt-4"):
result = await summarize(state)
await adapter.step_completed("summarize", output=result)

await adapter.complete_workflow()

After:

from axonflow.adapters import wrap_langgraph, NodeConfig

governed = wrap_langgraph(
graph, client,
workflow_name="research-agent",
node_config={
"retrieve": NodeConfig(step_type="tool_call"),
"summarize": NodeConfig(step_type="llm_call", model="gpt-4"),
},
)

result = await governed.ainvoke({"query": "Summarize recent earnings"})

Step 2: Move per-tool governance

If you were using check_tool_gate() and tool_completed() manually, enable govern_tools=True instead:

Before:

if await adapter.check_tool_gate("web_search", "function",
tool_input={"query": "latest news"}):
search_result = await web_search(query="latest news")
await adapter.tool_completed("web_search", output=search_result)

After:

governed = wrap_langgraph(
graph, client,
workflow_name="research-agent",
govern_tools=True,
)
# Tool gates are checked automatically for every tool call
result = await governed.ainvoke({"query": "Search for latest news"})

Step 3: Update error handling

Replace manual blocked-response logic with exception handling:

Before:

gate = await adapter.check_gate("summarize", "llm_call")
if not gate:
return {"response": "Request was blocked by policy"}

After:

from axonflow.adapters.langgraph import WorkflowBlockedError

try:
result = await governed.ainvoke(state)
except WorkflowBlockedError as e:
return {"response": f"Request was blocked: {e.reason}"}

What stays the same

  • The AxonFlowLangGraphAdapter is not deprecated. It remains available for cases where you need full manual control over the governance lifecycle (for example, custom node ordering, conditional gate checks, or integration with non-standard graph patterns).
  • The underlying WCP API calls (step_gate, step_completed, start_workflow, complete_workflow) are identical. wrap_langgraph() is a convenience layer on top of the same adapter.
  • mcp_tool_interceptor() for MultiServerMCPClient continues to work independently and can be combined with wrap_langgraph().

Platform Version: v5.3.2 | SDKs: Python v5.2.0, TypeScript v4.3.0, Go/Java v4.2.0