Python SDK - Getting Started
Add invisible AI governance to your Python applications in 3 lines of code. No UI changes. No user training. Just drop-in enterprise protection.
Installation
pip install axonflow
With LLM provider support:
pip install axonflow[openai] # OpenAI integration
pip install axonflow[anthropic] # Anthropic integration
pip install axonflow[all] # All integrations
Quick Start
Async Usage (Recommended)
import asyncio
from axonflow import AxonFlow
async def main():
async with AxonFlow(
agent_url="https://your-agent.axonflow.com",
client_id="your-client-id",
client_secret="your-client-secret"
) as client:
# Execute a governed query
response = await client.execute_query(
user_token="user-jwt-token",
query="What is AI governance?",
request_type="chat"
)
print(response.data)
asyncio.run(main())
Sync Usage
For applications that don't use async/await:
from axonflow import AxonFlow
with AxonFlow.sync(
agent_url="https://your-agent.axonflow.com",
client_id="your-client-id",
client_secret="your-client-secret"
) as client:
response = client.execute_query(
user_token="user-jwt-token",
query="What is AI governance?",
request_type="chat"
)
print(response.data)
Gateway Mode (Lowest Latency)
Gateway Mode lets you make direct LLM calls while AxonFlow handles governance. Use this when you need the lowest possible latency or want full control over your LLM provider.
See Choosing a Mode for detailed comparison with Proxy Mode.
Async Usage
from axonflow import AxonFlow, TokenUsage
from openai import AsyncOpenAI
import time
openai = AsyncOpenAI()
async with AxonFlow(
agent_url="https://your-agent.axonflow.com",
client_id="your-client-id",
client_secret="your-client-secret"
) as client:
# 1. Pre-check: Get policy approval
ctx = await client.get_policy_approved_context(
user_token="user-jwt",
query="Find patient records",
data_sources=["postgres"]
)
if not ctx.approved:
raise Exception(f"Blocked: {ctx.block_reason}")
# 2. Make LLM call directly (lowest latency)
start = time.time()
llm_response = await openai.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": str(ctx.approved_data)}]
)
latency_ms = int((time.time() - start) * 1000)
# 3. Audit the call
await client.audit_llm_call(
context_id=ctx.context_id,
response_summary=llm_response.choices[0].message.content[:100],
provider="openai",
model="gpt-4",
token_usage=TokenUsage(
prompt_tokens=llm_response.usage.prompt_tokens,
completion_tokens=llm_response.usage.completion_tokens,
total_tokens=llm_response.usage.total_tokens
),
latency_ms=latency_ms
)
Sync Usage
from axonflow import AxonFlow, TokenUsage
from openai import OpenAI
import time
openai_client = OpenAI()
with AxonFlow.sync(
agent_url="https://your-agent.axonflow.com",
client_id="your-client-id",
client_secret="your-client-secret"
) as client:
# 1. Pre-check
ctx = client.get_policy_approved_context(
user_token="user-jwt",
query="Find patient records",
data_sources=["postgres"]
)
if not ctx.approved:
raise Exception(f"Blocked: {ctx.block_reason}")
# 2. Direct LLM call
start = time.time()
llm_response = openai_client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": str(ctx.approved_data)}]
)
latency_ms = int((time.time() - start) * 1000)
# 3. Audit
client.audit_llm_call(
context_id=ctx.context_id,
response_summary=llm_response.choices[0].message.content[:100],
provider="openai",
model="gpt-4",
token_usage=TokenUsage(
prompt_tokens=llm_response.usage.prompt_tokens,
completion_tokens=llm_response.usage.completion_tokens,
total_tokens=llm_response.usage.total_tokens
),
latency_ms=latency_ms
)
See Gateway Mode Deep Dive for more details on API reference, error handling, and framework integrations.
Framework Integration
FastAPI
from fastapi import FastAPI, HTTPException
from axonflow import AxonFlow
from contextlib import asynccontextmanager
axonflow: AxonFlow | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global axonflow
axonflow = AxonFlow(
agent_url=os.environ["AXONFLOW_AGENT_URL"],
client_id=os.environ["AXONFLOW_CLIENT_ID"],
client_secret=os.environ["AXONFLOW_CLIENT_SECRET"],
)
yield
await axonflow.close()
app = FastAPI(lifespan=lifespan)
@app.post("/chat")
async def chat(prompt: str, user_token: str):
try:
response = await axonflow.execute_query(
user_token=user_token,
query=prompt,
request_type="chat"
)
return {"success": True, "data": response.data}
except PolicyViolationError as e:
raise HTTPException(status_code=403, detail=e.block_reason)
Flask
from flask import Flask, request, jsonify
from axonflow import AxonFlow
app = Flask(__name__)
client = AxonFlow.sync(
agent_url=os.environ["AXONFLOW_AGENT_URL"],
client_id=os.environ["AXONFLOW_CLIENT_ID"],
client_secret=os.environ["AXONFLOW_CLIENT_SECRET"],
)
@app.route("/chat", methods=["POST"])
def chat():
data = request.json
try:
response = client.execute_query(
user_token=data["user_token"],
query=data["prompt"],
request_type="chat"
)
return jsonify({"success": True, "data": response.data})
except PolicyViolationError as e:
return jsonify({"error": e.block_reason}), 403
Django
# views.py
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from axonflow import AxonFlow
import json
client = AxonFlow.sync(
agent_url=os.environ["AXONFLOW_AGENT_URL"],
client_id=os.environ["AXONFLOW_CLIENT_ID"],
client_secret=os.environ["AXONFLOW_CLIENT_SECRET"],
)
@csrf_exempt
def chat_view(request):
if request.method == "POST":
data = json.loads(request.body)
try:
response = client.execute_query(
user_token=data["user_token"],
query=data["prompt"],
request_type="chat"
)
return JsonResponse({"success": True, "data": response.data})
except PolicyViolationError as e:
return JsonResponse({"error": e.block_reason}, status=403)
OpenAI Integration
Transparent governance for existing OpenAI code:
from openai import OpenAI
from axonflow import AxonFlow
from axonflow.interceptors.openai import wrap_openai_client
openai = OpenAI()
axonflow = AxonFlow(...)
# Wrap client - governance is now automatic
wrapped = wrap_openai_client(openai, axonflow, user_token="user-123")
# Use as normal - governance happens invisibly
response = wrapped.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}]
)
Configuration
from axonflow import AxonFlow, Mode, RetryConfig
client = AxonFlow(
agent_url="https://your-agent.axonflow.com",
client_id="your-client-id",
client_secret="your-client-secret",
license_key="optional-license-key", # For enterprise features
mode=Mode.PRODUCTION, # or Mode.SANDBOX
debug=True, # Enable debug logging
timeout=60.0, # Request timeout in seconds
retry_config=RetryConfig( # Retry configuration
enabled=True,
max_attempts=3,
initial_delay=1.0,
max_delay=30.0,
),
cache_enabled=True, # Enable response caching
cache_ttl=60.0, # Cache TTL in seconds
)
VPC Private Endpoint (Low-Latency)
For customers running within AWS VPC:
client = AxonFlow(
agent_url="https://YOUR_VPC_IP:8443", # VPC private endpoint
client_id=os.environ["AXONFLOW_CLIENT_ID"],
client_secret=os.environ["AXONFLOW_CLIENT_SECRET"],
insecure_skip_verify=False, # Set True only for self-signed certs in dev
)
Performance Comparison:
- Public endpoint: ~100ms (internet routing)
- VPC private endpoint: under 10ms P99 (intra-VPC routing)
Sandbox Mode
For testing without affecting production:
client = AxonFlow.sandbox("demo-key")
# Test with aggressive policies
response = await client.execute_query(
user_token="test-user",
query="My SSN is 123-45-6789", # Will be blocked/redacted
request_type="chat"
)
MCP Connector Integration
List Available Connectors
connectors = await client.list_connectors()
for conn in connectors:
print(f"Connector: {conn.name} ({conn.type})")
print(f" Installed: {conn.installed}")
print(f" Capabilities: {', '.join(conn.capabilities)}")
Query a Connector
result = await client.query_connector(
user_token="user-jwt",
connector_name="postgres",
operation="query",
params={"sql": "SELECT * FROM users LIMIT 10"}
)
if result.success:
print(f"Data: {result.data}")
else:
print(f"Error: {result.error}")
Multi-Agent Planning (MAP)
Generate a Plan
plan = await client.generate_plan(
query="Book a flight and hotel for my trip to Paris",
domain="travel"
)
print(f"Plan {plan.plan_id} has {len(plan.steps)} steps")
for step in plan.steps:
print(f" - {step.name}: {step.description}")
Execute the Plan
result = await client.execute_plan(plan.plan_id)
if result.status == "completed":
print(f"Result: {result.result}")
else:
print(f"Error: {result.error}")
Error Handling
from axonflow.exceptions import (
AxonFlowError,
PolicyViolationError,
AuthenticationError,
RateLimitError,
TimeoutError,
)
try:
response = await client.execute_query(...)
except PolicyViolationError as e:
print(f"Blocked by policy: {e.block_reason}")
except RateLimitError as e:
print(f"Rate limited: {e.limit}/{e.remaining}, resets at {e.reset_at}")
except AuthenticationError:
print("Invalid credentials")
except TimeoutError:
print("Request timed out")
except AxonFlowError as e:
print(f"AxonFlow error: {e.message}")
Type Hints
The SDK is fully typed with Pydantic v2 models:
from axonflow import (
ClientResponse,
PolicyApprovalResult,
PlanResponse,
ConnectorResponse,
TokenUsage,
)
# Full autocomplete and type checking support
response: ClientResponse = await client.execute_query(...)
print(response.success)
print(response.data)
print(response.policy_info.policies_evaluated)
Production Best Practices
1. Environment Variables
Never hardcode credentials:
# Good
client = AxonFlow(
agent_url=os.environ["AXONFLOW_AGENT_URL"],
client_id=os.environ["AXONFLOW_CLIENT_ID"],
client_secret=os.environ["AXONFLOW_CLIENT_SECRET"],
)
# Bad - Never do this!
client = AxonFlow(
agent_url="https://...",
client_id="hardcoded-id",
client_secret="hardcoded-secret",
)
2. Use Context Managers
Always use context managers to ensure proper cleanup:
# Async
async with AxonFlow(...) as client:
# client is automatically closed
# Sync
with AxonFlow.sync(...) as client:
# client is automatically closed
3. Enable Caching
Reduce latency for repeated queries:
client = AxonFlow(
...,
cache_enabled=True,
cache_ttl=60.0, # 1 minute
)
4. Enable Retry Logic
Handle transient failures automatically:
from axonflow import RetryConfig
client = AxonFlow(
...,
retry_config=RetryConfig(
enabled=True,
max_attempts=3,
initial_delay=1.0,
max_delay=30.0,
),
)
Support & Resources
- Documentation: https://docs.getaxonflow.com
- PyPI Package: https://pypi.org/project/axonflow/
- GitHub: https://github.com/getaxonflow/axonflow-sdk-python
- Email: [email protected]
Next Steps
- Read the Authentication Guide for security best practices
- Learn about Gateway Mode for lowest-latency LLM calls
- Explore MCP Connectors for external data integration
- Check out the Go SDK or TypeScript SDK
License
MIT - See LICENSE