LangChain + AxonFlow Integration
Overview
LangChain provides powerful orchestration primitives for building LLM applications. AxonFlow adds production-grade governance, audit trails, and multi-tenant isolation.
Together, they enable enterprises to deploy complex AI workflows with compliance and observability.
Why Use AxonFlow with LangChain?
LangChain Strengths
- Rich library of chains, agents, and tools
- Multi-step workflow orchestration
- Integration with 100+ data sources
- RAG (Retrieval Augmented Generation) support
- Active community and ecosystem
AxonFlow Strengths
- Policy enforcement (budget limits, PII filtering, content moderation)
- Audit trails (compliance requirements: HIPAA, GDPR, SOC 2)
- Multi-tenant isolation (customer data separation)
- Sub-10ms overhead (production performance)
- Multi-model routing (OpenAI, Anthropic, Bedrock, custom LLMs)
The Perfect Combination
LangChain handles: Complex orchestration, RAG, data integration
AxonFlow handles: Governance, compliance, audit, multi-tenancy
Integration Architecture
AxonFlow integrates with LangChain using Gateway Mode, which wraps LLM calls with policy pre-checks and audit logging:
[LangChain Chain/Agent]
|
v
[AxonFlow Pre-Check] --> Policy Evaluation
|
v (if approved)
[LLM Provider (OpenAI/Anthropic)]
|
v
[AxonFlow Audit] --> Compliance Logging
|
v
[Response to LangChain]
Note: AxonFlow uses its own API for governance, not an OpenAI-compatible endpoint. Integration requires wrapping your LLM calls with AxonFlow's pre-check and audit endpoints.
Quick Start
Prerequisites
- AxonFlow running locally or deployed (see Getting Started)
- Python 3.9+
- LangChain installed (
pip install langchain langchain-openai)
AxonFlow API Overview
AxonFlow Gateway Mode uses two main endpoints:
| Endpoint | Purpose |
|---|---|
POST /api/policy/pre-check | Policy evaluation before LLM call |
POST /api/audit/llm-call | Audit logging after LLM call completes |
Required Headers:
Content-Type: application/jsonX-Client-Secret: your-client-secretX-License-Key: your-license-key(optional, for enterprise features)
Python Integration (Raw HTTP Reference)
Note: This section shows raw HTTP integration for developers using languages without an SDK or for educational purposes. For Python, we recommend using the Python SDK below.
Create AxonFlow Governance Wrapper
import requests
import time
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
@dataclass
class AxonFlowConfig:
"""Configuration for AxonFlow governance."""
agent_url: str
client_id: str
client_secret: str
license_key: Optional[str] = None
class AxonFlowGovernance:
"""Gateway Mode wrapper for governed LLM calls with LangChain."""
def __init__(self, config: AxonFlowConfig):
self.config = config
def _get_headers(self) -> Dict[str, str]:
headers = {
"Content-Type": "application/json",
"X-Client-Secret": self.config.client_secret,
}
if self.config.license_key:
headers["X-License-Key"] = self.config.license_key
return headers
def pre_check(
self,
user_token: str,
query: str,
data_sources: List[str] = None,
context: Dict[str, Any] = None
) -> Dict[str, Any]:
"""
Policy pre-check before making LLM call.
Returns:
dict with context_id, approved, block_reason (if blocked)
"""
payload = {
"user_token": user_token,
"client_id": self.config.client_id,
"query": query,
"data_sources": data_sources or [],
}
if context:
payload["context"] = context
response = requests.post(
f"{self.config.agent_url}/api/policy/pre-check",
json=payload,
headers=self._get_headers(),
timeout=30,
)
response.raise_for_status()
return response.json()
def audit_llm_call(
self,
context_id: str,
response_summary: str,
provider: str,
model: str,
prompt_tokens: int,
completion_tokens: int,
latency_ms: int,
metadata: Dict[str, Any] = None
) -> Dict[str, Any]:
"""
Audit LLM call after completion.
Returns:
dict with success and audit_id
"""
payload = {
"context_id": context_id,
"client_id": self.config.client_id,
"response_summary": response_summary,
"provider": provider,
"model": model,
"token_usage": {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens,
},
"latency_ms": latency_ms,
}
if metadata:
payload["metadata"] = metadata
response = requests.post(
f"{self.config.agent_url}/api/audit/llm-call",
json=payload,
headers=self._get_headers(),
timeout=30,
)
response.raise_for_status()
return response.json()
# Initialize governance
governance = AxonFlowGovernance(AxonFlowConfig(
agent_url="http://localhost:8080",
client_id="langchain-app",
client_secret="your-client-secret",
license_key="your-license-key", # Optional
))
Pattern 1: Governed LLM Wrapper
Create a governed LLM class that works with LangChain:
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
class GovernedChatOpenAI:
"""Governed ChatOpenAI wrapper using AxonFlow Gateway Mode."""
def __init__(
self,
governance: AxonFlowGovernance,
model: str = "gpt-4",
temperature: float = 0.7,
openai_api_key: str = None,
):
self.governance = governance
self.model = model
self.llm = ChatOpenAI(
model=model,
temperature=temperature,
openai_api_key=openai_api_key,
)
def invoke(
self,
user_token: str,
messages: List[Dict[str, str]],
context: Dict[str, Any] = None
) -> str:
"""Invoke LLM with AxonFlow governance."""
start_time = time.time()
# Extract query from last user message
query = ""
for msg in reversed(messages):
if msg.get("role") == "user":
query = msg.get("content", "")
break
# 1. Pre-check with AxonFlow
pre_check = self.governance.pre_check(
user_token=user_token,
query=query,
context=context or {"framework": "langchain"},
)
if not pre_check.get("approved"):
raise PermissionError(f"Request blocked: {pre_check.get('block_reason')}")
context_id = pre_check["context_id"]
try:
# 2. Convert to LangChain messages and call LLM
lc_messages = []
for msg in messages:
if msg["role"] == "system":
lc_messages.append(SystemMessage(content=msg["content"]))
elif msg["role"] == "user":
lc_messages.append(HumanMessage(content=msg["content"]))
response = self.llm.invoke(lc_messages)
content = response.content
latency_ms = int((time.time() - start_time) * 1000)
# 3. Audit the call
self.governance.audit_llm_call(
context_id=context_id,
response_summary=content[:200] if len(content) > 200 else content,
provider="openai",
model=self.model,
prompt_tokens=response.usage_metadata.get("input_tokens", 0) if response.usage_metadata else 100,
completion_tokens=response.usage_metadata.get("output_tokens", 0) if response.usage_metadata else 50,
latency_ms=latency_ms,
metadata=context,
)
return content
except Exception as e:
latency_ms = int((time.time() - start_time) * 1000)
self.governance.audit_llm_call(
context_id=context_id,
response_summary=f"Error: {str(e)}",
provider="openai",
model=self.model,
prompt_tokens=0,
completion_tokens=0,
latency_ms=latency_ms,
metadata={"error": str(e)},
)
raise
# Usage
governed_llm = GovernedChatOpenAI(
governance=governance,
model="gpt-4",
openai_api_key="your-openai-key",
)
response = governed_llm.invoke(
user_token="user-123",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "What is the capital of France?"},
],
context={"department": "research"},
)
print(response)
Pattern 2: Governed LangChain Tools
Wrap LangChain tools with AxonFlow governance:
from langchain.agents import initialize_agent, AgentType, Tool
from langchain_openai import ChatOpenAI
def create_governed_tool(
governance: AxonFlowGovernance,
tool_func,
tool_name: str,
tool_description: str,
data_tier: str = "standard",
):
"""Create a governed tool that wraps an existing function."""
def governed_wrapper(input_str: str, user_token: str = "system") -> str:
start_time = time.time()
# Pre-check
pre_check = governance.pre_check(
user_token=user_token,
query=input_str,
context={
"tool_name": tool_name,
"data_tier": data_tier,
},
)
if not pre_check.get("approved"):
return f"Tool blocked by policy: {pre_check.get('block_reason')}"
try:
result = tool_func(input_str)
latency_ms = int((time.time() - start_time) * 1000)
# Audit tool execution
governance.audit_llm_call(
context_id=pre_check["context_id"],
response_summary=str(result)[:200],
provider="tool",
model=tool_name,
prompt_tokens=len(input_str.split()),
completion_tokens=len(str(result).split()),
latency_ms=latency_ms,
metadata={"tool_execution": True},
)
return result
except Exception as e:
latency_ms = int((time.time() - start_time) * 1000)
governance.audit_llm_call(
context_id=pre_check["context_id"],
response_summary=f"Tool error: {str(e)}",
provider="tool",
model=tool_name,
prompt_tokens=0,
completion_tokens=0,
latency_ms=latency_ms,
metadata={"error": str(e)},
)
raise
return Tool(
name=tool_name,
func=governed_wrapper,
description=tool_description,
)
# Example: Create governed database query tool
def query_database(query: str) -> str:
"""Simulate database query."""
return f"Results for: {query}"
governed_db_tool = create_governed_tool(
governance=governance,
tool_func=query_database,
tool_name="Database Query",
tool_description="Query the customer database for information",
data_tier="internal",
)
# Use in LangChain agent
llm = ChatOpenAI(model="gpt-4", temperature=0)
agent = initialize_agent(
tools=[governed_db_tool],
llm=llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True,
)
Pattern 3: Governed RAG Pipeline
Add governance to LangChain RAG workflows:
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_community.vectorstores import Chroma
from langchain.chains import ConversationalRetrievalChain
class GovernedRAG:
"""RAG pipeline with AxonFlow governance."""
def __init__(
self,
governance: AxonFlowGovernance,
vectorstore_path: str,
model: str = "gpt-4",
):
self.governance = governance
self.model = model
# Initialize LangChain components
self.embeddings = OpenAIEmbeddings()
self.vectorstore = Chroma(
persist_directory=vectorstore_path,
embedding_function=self.embeddings,
)
self.retriever = self.vectorstore.as_retriever()
self.llm = ChatOpenAI(model=model, temperature=0)
self.chain = ConversationalRetrievalChain.from_llm(
llm=self.llm,
retriever=self.retriever,
return_source_documents=True,
)
def query(
self,
user_token: str,
question: str,
chat_history: List = None,
context: Dict[str, Any] = None,
) -> Dict[str, Any]:
"""Execute governed RAG query."""
start_time = time.time()
chat_history = chat_history or []
# 1. Pre-check with AxonFlow
pre_check = self.governance.pre_check(
user_token=user_token,
query=question,
context={
**(context or {}),
"pipeline": "rag",
"retriever": "chroma",
},
)
if not pre_check.get("approved"):
return {
"answer": f"Query blocked: {pre_check.get('block_reason')}",
"sources": [],
"blocked": True,
}
context_id = pre_check["context_id"]
try:
# 2. Execute RAG chain
result = self.chain({
"question": question,
"chat_history": chat_history,
})
answer = result["answer"]
sources = [doc.metadata for doc in result.get("source_documents", [])]
latency_ms = int((time.time() - start_time) * 1000)
# 3. Audit the call
self.governance.audit_llm_call(
context_id=context_id,
response_summary=answer[:200] if len(answer) > 200 else answer,
provider="openai",
model=self.model,
prompt_tokens=100, # Estimate
completion_tokens=len(answer.split()),
latency_ms=latency_ms,
metadata={
"pipeline": "rag",
"source_count": len(sources),
},
)
return {
"answer": answer,
"sources": sources,
"context_id": context_id,
}
except Exception as e:
latency_ms = int((time.time() - start_time) * 1000)
self.governance.audit_llm_call(
context_id=context_id,
response_summary=f"RAG error: {str(e)}",
provider="openai",
model=self.model,
prompt_tokens=0,
completion_tokens=0,
latency_ms=latency_ms,
metadata={"error": str(e)},
)
raise
# Usage
rag = GovernedRAG(
governance=governance,
vectorstore_path="./knowledge_base",
model="gpt-4",
)
result = rag.query(
user_token="doctor-token",
question="What are the treatment options for Type 2 diabetes?",
context={"department": "medical", "data_tier": "hipaa"},
)
print(result["answer"])
Python SDK Integration (Recommended)
The AxonFlow Python SDK provides a cleaner, type-safe way to integrate governance:
Install Dependencies
pip install axonflow langchain langchain-openai
Using the SDK
import asyncio
from axonflow import AxonFlow, TokenUsage
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
class GovernedLangChain:
"""LangChain integration using AxonFlow Python SDK."""
def __init__(
self,
axonflow_url: str,
client_id: str,
client_secret: str,
openai_api_key: str,
model: str = "gpt-4",
license_key: str = None,
):
self.axonflow = AxonFlow(
agent_url=axonflow_url,
client_id=client_id,
client_secret=client_secret,
license_key=license_key,
)
self.llm = ChatOpenAI(
model=model,
openai_api_key=openai_api_key,
)
self.model = model
async def invoke(
self,
user_token: str,
messages: list[dict],
context: dict = None,
) -> str:
"""Invoke LangChain LLM with AxonFlow governance."""
import time
start_time = time.time()
# Extract query from last user message
query = next(
(m["content"] for m in reversed(messages) if m["role"] == "user"),
""
)
async with self.axonflow:
# 1. Pre-check with AxonFlow
ctx = await self.axonflow.get_policy_approved_context(
user_token=user_token,
query=query,
context={**(context or {}), "framework": "langchain"},
)
if not ctx.approved:
raise PermissionError(f"Request blocked: {ctx.block_reason}")
try:
# 2. Convert and call LLM
lc_messages = [
SystemMessage(content=m["content"]) if m["role"] == "system"
else HumanMessage(content=m["content"])
for m in messages
]
response = self.llm.invoke(lc_messages)
content = response.content
latency_ms = int((time.time() - start_time) * 1000)
# 3. Audit the call
await self.axonflow.audit_llm_call(
context_id=ctx.context_id,
response_summary=content[:200],
provider="openai",
model=self.model,
token_usage=TokenUsage(
prompt_tokens=response.usage_metadata.get("input_tokens", 100),
completion_tokens=response.usage_metadata.get("output_tokens", 50),
total_tokens=response.usage_metadata.get("total_tokens", 150),
),
latency_ms=latency_ms,
metadata=context,
)
return content
except Exception as e:
latency_ms = int((time.time() - start_time) * 1000)
await self.axonflow.audit_llm_call(
context_id=ctx.context_id,
response_summary=f"Error: {str(e)}",
provider="openai",
model=self.model,
token_usage=TokenUsage(prompt_tokens=0, completion_tokens=0, total_tokens=0),
latency_ms=latency_ms,
metadata={"error": str(e)},
)
raise
# Usage
async def main():
governed = GovernedLangChain(
axonflow_url="http://localhost:8080",
client_id="langchain-app",
client_secret="your-client-secret",
openai_api_key="your-openai-key",
)
response = await governed.invoke(
user_token="user-123",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "What is the capital of France?"},
],
context={"department": "research"},
)
print(response)
asyncio.run(main())
Sync Usage
For synchronous code:
from axonflow import AxonFlow, TokenUsage
def governed_langchain_call(
user_token: str,
query: str,
llm,
context: dict = None,
) -> str:
"""Synchronous LangChain call with governance."""
import time
start_time = time.time()
with AxonFlow.sync(
agent_url="http://localhost:8080",
client_id="langchain-app",
client_secret="your-client-secret",
) as axonflow:
# 1. Pre-check
ctx = axonflow.get_policy_approved_context(
user_token=user_token,
query=query,
context={**(context or {}), "framework": "langchain"},
)
if not ctx.approved:
raise PermissionError(f"Blocked: {ctx.block_reason}")
# 2. Call LLM
response = llm.invoke(query)
latency_ms = int((time.time() - start_time) * 1000)
# 3. Audit
axonflow.audit_llm_call(
context_id=ctx.context_id,
response_summary=response.content[:200],
provider="openai",
model="gpt-4",
token_usage=TokenUsage(prompt_tokens=100, completion_tokens=50, total_tokens=150),
latency_ms=latency_ms,
)
return response.content
OpenAI Interceptor (Zero-Code Integration)
For minimal code changes, use the OpenAI interceptor:
from openai import OpenAI
from axonflow import AxonFlow
from axonflow.interceptors.openai import wrap_openai_client
# Initialize clients
openai_client = OpenAI()
axonflow = AxonFlow(
agent_url="http://localhost:8080",
client_id="langchain-app",
client_secret="your-client-secret",
)
# Wrap OpenAI client - governance is now automatic
governed_openai = wrap_openai_client(openai_client, axonflow, user_token="user-123")
# Use normally - all calls are now governed
response = governed_openai.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}]
)
Go SDK Integration
For Go-based services that orchestrate LangChain workflows or need to integrate with Python LangChain applications:
Governed LLM Service in Go
package main
import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/getaxonflow/axonflow-go-sdk/axonflow"
"github.com/sashabaranov/go-openai"
)
// LangChainGovernanceService provides AxonFlow governance for LangChain workflows
type LangChainGovernanceService struct {
gateway *axonflow.GatewayClient
openai *openai.Client
}
// NewLangChainGovernanceService creates a new governance service
func NewLangChainGovernanceService(axonflowURL, clientSecret, openaiKey string) *LangChainGovernanceService {
return &LangChainGovernanceService{
gateway: axonflow.NewGatewayClient(axonflow.Config{
AgentURL: axonflowURL,
ClientID: "langchain-go-service",
ClientSecret: clientSecret,
}),
openai: openai.NewClient(openaiKey),
}
}
// GovernedChatRequest represents a governed chat request
type GovernedChatRequest struct {
UserToken string `json:"user_token"`
Messages []ChatMessage `json:"messages"`
Context map[string]interface{} `json:"context,omitempty"`
}
// ChatMessage represents a chat message
type ChatMessage struct {
Role string `json:"role"`
Content string `json:"content"`
}
// GovernedChatResponse represents the response
type GovernedChatResponse struct {
Response string `json:"response"`
ContextID string `json:"context_id"`
Blocked bool `json:"blocked,omitempty"`
Reason string `json:"reason,omitempty"`
}
// HandleGovernedChat processes a governed chat request
func (s *LangChainGovernanceService) HandleGovernedChat(
ctx context.Context,
req GovernedChatRequest,
) (*GovernedChatResponse, error) {
startTime := time.Now()
// Extract query from last user message
var query string
for i := len(req.Messages) - 1; i >= 0; i-- {
if req.Messages[i].Role == "user" {
query = req.Messages[i].Content
break
}
}
// 1. Pre-check with AxonFlow
preCheck, err := s.gateway.PreCheck(ctx, axonflow.PreCheckRequest{
UserToken: req.UserToken,
Query: query,
Context: mergeContext(req.Context, map[string]interface{}{
"framework": "langchain",
"language": "go",
}),
})
if err != nil {
return nil, fmt.Errorf("pre-check failed: %w", err)
}
if !preCheck.Approved {
return &GovernedChatResponse{
Blocked: true,
Reason: preCheck.BlockReason,
ContextID: preCheck.ContextID,
}, nil
}
// 2. Make LLM call
messages := make([]openai.ChatCompletionMessage, len(req.Messages))
for i, msg := range req.Messages {
messages[i] = openai.ChatCompletionMessage{
Role: msg.Role,
Content: msg.Content,
}
}
resp, err := s.openai.CreateChatCompletion(ctx, openai.ChatCompletionRequest{
Model: openai.GPT4,
Messages: messages,
})
latencyMs := int(time.Since(startTime).Milliseconds())
if err != nil {
// Audit error
s.gateway.AuditLLMCall(ctx, axonflow.AuditRequest{
ContextID: preCheck.ContextID,
ResponseSummary: fmt.Sprintf("Error: %v", err),
Provider: "openai",
Model: "gpt-4",
TokenUsage: axonflow.TokenUsage{},
LatencyMs: latencyMs,
Metadata: map[string]interface{}{"error": true},
})
return nil, err
}
response := resp.Choices[0].Message.Content
// 3. Audit success
s.gateway.AuditLLMCall(ctx, axonflow.AuditRequest{
ContextID: preCheck.ContextID,
ResponseSummary: truncate(response, 200),
Provider: "openai",
Model: "gpt-4",
TokenUsage: axonflow.TokenUsage{
PromptTokens: resp.Usage.PromptTokens,
CompletionTokens: resp.Usage.CompletionTokens,
TotalTokens: resp.Usage.TotalTokens,
},
LatencyMs: latencyMs,
})
return &GovernedChatResponse{
Response: response,
ContextID: preCheck.ContextID,
}, nil
}
func mergeContext(a, b map[string]interface{}) map[string]interface{} {
result := make(map[string]interface{})
for k, v := range a {
result[k] = v
}
for k, v := range b {
result[k] = v
}
return result
}
func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen]
}
// HTTP handler
func (s *LangChainGovernanceService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var req GovernedChatRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
resp, err := s.HandleGovernedChat(r.Context(), req)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
}
func main() {
service := NewLangChainGovernanceService(
"http://localhost:8080",
"your-client-secret",
"your-openai-key",
)
http.Handle("/api/chat", service)
http.ListenAndServe(":9090", nil)
}
Calling Go Service from Python LangChain
import requests
from langchain.llms.base import LLM
from typing import Optional, List
class GovernedLLM(LLM):
"""LangChain LLM that routes through Go governance service."""
governance_url: str = "http://localhost:9090/api/chat"
user_token: str = "default"
@property
def _llm_type(self) -> str:
return "governed-go-service"
def _call(
self,
prompt: str,
stop: Optional[List[str]] = None,
**kwargs,
) -> str:
response = requests.post(
self.governance_url,
json={
"user_token": self.user_token,
"messages": [{"role": "user", "content": prompt}],
"context": kwargs.get("context", {}),
},
)
result = response.json()
if result.get("blocked"):
raise PermissionError(f"Request blocked: {result.get('reason')}")
return result["response"]
# Usage with LangChain
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
llm = GovernedLLM(
governance_url="http://localhost:9090/api/chat",
user_token="user-123",
)
prompt = PromptTemplate(
input_variables=["topic"],
template="Write a summary about {topic}",
)
chain = LLMChain(llm=llm, prompt=prompt)
result = chain.run(topic="AI governance")
AxonFlow Policy Configuration
Create policies that match your LangChain use cases:
{
"policies": [
{
"name": "langchain-rag-policy",
"description": "Policy for LangChain RAG pipelines",
"enabled": true,
"rules": [
{
"type": "pii_protection",
"config": {
"fields": ["email", "phone", "ssn"],
"action": "mask"
}
},
{
"type": "rate_limit",
"config": {
"requests_per_minute": 30,
"action": "throttle"
}
}
]
},
{
"name": "langchain-tool-policy",
"description": "Policy for LangChain tool executions",
"enabled": true,
"rules": [
{
"type": "content_filter",
"config": {
"blocked_patterns": ["DELETE FROM", "DROP TABLE"],
"action": "block"
}
}
]
}
]
}
Best Practices
1. Always Use Context IDs
The context_id from pre-check must be passed to audit for proper correlation:
pre_check = governance.pre_check(user_token, query)
context_id = pre_check["context_id"] # Store this immediately
# ... make LLM call ...
governance.audit_llm_call(context_id=context_id, ...) # Use same context_id
2. Handle Blocked Requests Gracefully
pre_check = governance.pre_check(user_token, query)
if not pre_check.get("approved"):
# Log the block reason
logger.warning(f"Request blocked: {pre_check.get('block_reason')}")
# Return user-friendly message
return "I'm unable to help with that request due to policy restrictions."
3. Always Audit, Even on Errors
try:
result = chain.invoke(query)
governance.audit_llm_call(context_id, result, ...)
except Exception as e:
governance.audit_llm_call(context_id, f"Error: {e}", ...)
raise
4. Use Meaningful Context
pre_check = governance.pre_check(
user_token=user_token,
query=query,
context={
"department": "medical",
"data_tier": "hipaa",
"chain_type": "conversational_retrieval",
"use_case": "diagnosis_support",
},
)
Troubleshooting
Common Issues
Issue: Pre-check returns 401 Unauthorized
- Verify
X-Client-Secretheader is correct - Check
X-License-Keyif using enterprise features - Ensure client_id is registered in AxonFlow
Issue: Audit calls failing
- Verify context_id is from a valid pre-check (not expired)
- Check that AxonFlow agent is healthy (
/healthendpoint)
Issue: High latency with governance
- Measure LangChain vs AxonFlow latency separately
- AxonFlow should add <10ms overhead
- If AxonFlow >10ms, check network latency to AxonFlow endpoint
Issue: Policies not being applied
- Verify context fields match policy conditions
- Check policy is enabled in AxonFlow
- Review AxonFlow logs for policy evaluation details