Skip to main content

Python SDK - Getting Started

Add invisible AI governance to your Python applications in 3 lines of code. No UI changes. No user training. Just drop-in enterprise protection.

Installation

pip install axonflow

With LLM provider support:

pip install axonflow[openai]      # OpenAI integration
pip install axonflow[anthropic] # Anthropic integration
pip install axonflow[all] # All integrations

Quick Start

import asyncio
from axonflow import AxonFlow

async def main():
async with AxonFlow(
agent_url="https://your-agent.axonflow.com",
client_id="your-client-id",
client_secret="your-client-secret"
) as client:
# Execute a governed query
response = await client.execute_query(
user_token="user-jwt-token",
query="What is AI governance?",
request_type="chat"
)
print(response.data)

asyncio.run(main())

Sync Usage

For applications that don't use async/await:

from axonflow import AxonFlow

with AxonFlow.sync(
agent_url="https://your-agent.axonflow.com",
client_id="your-client-id",
client_secret="your-client-secret"
) as client:
response = client.execute_query(
user_token="user-jwt-token",
query="What is AI governance?",
request_type="chat"
)
print(response.data)

Gateway Mode (Lowest Latency)

Gateway Mode lets you make direct LLM calls while AxonFlow handles governance. Use this when you need the lowest possible latency or want full control over your LLM provider.

See Choosing a Mode for detailed comparison with Proxy Mode.

Async Usage

from axonflow import AxonFlow, TokenUsage
from openai import AsyncOpenAI
import time

openai = AsyncOpenAI()

async with AxonFlow(
agent_url="https://your-agent.axonflow.com",
client_id="your-client-id",
client_secret="your-client-secret"
) as client:
# 1. Pre-check: Get policy approval
ctx = await client.get_policy_approved_context(
user_token="user-jwt",
query="Find patient records",
data_sources=["postgres"]
)

if not ctx.approved:
raise Exception(f"Blocked: {ctx.block_reason}")

# 2. Make LLM call directly (lowest latency)
start = time.time()
llm_response = await openai.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": str(ctx.approved_data)}]
)
latency_ms = int((time.time() - start) * 1000)

# 3. Audit the call
await client.audit_llm_call(
context_id=ctx.context_id,
response_summary=llm_response.choices[0].message.content[:100],
provider="openai",
model="gpt-4",
token_usage=TokenUsage(
prompt_tokens=llm_response.usage.prompt_tokens,
completion_tokens=llm_response.usage.completion_tokens,
total_tokens=llm_response.usage.total_tokens
),
latency_ms=latency_ms
)

Sync Usage

from axonflow import AxonFlow, TokenUsage
from openai import OpenAI
import time

openai_client = OpenAI()

with AxonFlow.sync(
agent_url="https://your-agent.axonflow.com",
client_id="your-client-id",
client_secret="your-client-secret"
) as client:
# 1. Pre-check
ctx = client.get_policy_approved_context(
user_token="user-jwt",
query="Find patient records",
data_sources=["postgres"]
)

if not ctx.approved:
raise Exception(f"Blocked: {ctx.block_reason}")

# 2. Direct LLM call
start = time.time()
llm_response = openai_client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": str(ctx.approved_data)}]
)
latency_ms = int((time.time() - start) * 1000)

# 3. Audit
client.audit_llm_call(
context_id=ctx.context_id,
response_summary=llm_response.choices[0].message.content[:100],
provider="openai",
model="gpt-4",
token_usage=TokenUsage(
prompt_tokens=llm_response.usage.prompt_tokens,
completion_tokens=llm_response.usage.completion_tokens,
total_tokens=llm_response.usage.total_tokens
),
latency_ms=latency_ms
)

See Gateway Mode Deep Dive for more details on API reference, error handling, and framework integrations.

Framework Integration

FastAPI

from fastapi import FastAPI, HTTPException
from axonflow import AxonFlow
from contextlib import asynccontextmanager

axonflow: AxonFlow | None = None

@asynccontextmanager
async def lifespan(app: FastAPI):
global axonflow
axonflow = AxonFlow(
agent_url=os.environ["AXONFLOW_AGENT_URL"],
client_id=os.environ["AXONFLOW_CLIENT_ID"],
client_secret=os.environ["AXONFLOW_CLIENT_SECRET"],
)
yield
await axonflow.close()

app = FastAPI(lifespan=lifespan)

@app.post("/chat")
async def chat(prompt: str, user_token: str):
try:
response = await axonflow.execute_query(
user_token=user_token,
query=prompt,
request_type="chat"
)
return {"success": True, "data": response.data}
except PolicyViolationError as e:
raise HTTPException(status_code=403, detail=e.block_reason)

Flask

from flask import Flask, request, jsonify
from axonflow import AxonFlow

app = Flask(__name__)

client = AxonFlow.sync(
agent_url=os.environ["AXONFLOW_AGENT_URL"],
client_id=os.environ["AXONFLOW_CLIENT_ID"],
client_secret=os.environ["AXONFLOW_CLIENT_SECRET"],
)

@app.route("/chat", methods=["POST"])
def chat():
data = request.json
try:
response = client.execute_query(
user_token=data["user_token"],
query=data["prompt"],
request_type="chat"
)
return jsonify({"success": True, "data": response.data})
except PolicyViolationError as e:
return jsonify({"error": e.block_reason}), 403

Django

# views.py
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from axonflow import AxonFlow
import json

client = AxonFlow.sync(
agent_url=os.environ["AXONFLOW_AGENT_URL"],
client_id=os.environ["AXONFLOW_CLIENT_ID"],
client_secret=os.environ["AXONFLOW_CLIENT_SECRET"],
)

@csrf_exempt
def chat_view(request):
if request.method == "POST":
data = json.loads(request.body)
try:
response = client.execute_query(
user_token=data["user_token"],
query=data["prompt"],
request_type="chat"
)
return JsonResponse({"success": True, "data": response.data})
except PolicyViolationError as e:
return JsonResponse({"error": e.block_reason}, status=403)

OpenAI Integration

Transparent governance for existing OpenAI code:

from openai import OpenAI
from axonflow import AxonFlow
from axonflow.interceptors.openai import wrap_openai_client

openai = OpenAI()
axonflow = AxonFlow(...)

# Wrap client - governance is now automatic
wrapped = wrap_openai_client(openai, axonflow, user_token="user-123")

# Use as normal - governance happens invisibly
response = wrapped.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}]
)

Configuration

from axonflow import AxonFlow, Mode, RetryConfig

client = AxonFlow(
agent_url="https://your-agent.axonflow.com",
client_id="your-client-id",
client_secret="your-client-secret",
license_key="optional-license-key", # For enterprise features
mode=Mode.PRODUCTION, # or Mode.SANDBOX
debug=True, # Enable debug logging
timeout=60.0, # Request timeout in seconds
retry_config=RetryConfig( # Retry configuration
enabled=True,
max_attempts=3,
initial_delay=1.0,
max_delay=30.0,
),
cache_enabled=True, # Enable response caching
cache_ttl=60.0, # Cache TTL in seconds
)

VPC Private Endpoint (Low-Latency)

For customers running within AWS VPC:

client = AxonFlow(
agent_url="https://YOUR_VPC_IP:8443", # VPC private endpoint
client_id=os.environ["AXONFLOW_CLIENT_ID"],
client_secret=os.environ["AXONFLOW_CLIENT_SECRET"],
insecure_skip_verify=False, # Set True only for self-signed certs in dev
)

Performance Comparison:

  • Public endpoint: ~100ms (internet routing)
  • VPC private endpoint: under 10ms P99 (intra-VPC routing)

Sandbox Mode

For testing without affecting production:

client = AxonFlow.sandbox("demo-key")

# Test with aggressive policies
response = await client.execute_query(
user_token="test-user",
query="My SSN is 123-45-6789", # Will be blocked/redacted
request_type="chat"
)

MCP Connector Integration

List Available Connectors

connectors = await client.list_connectors()

for conn in connectors:
print(f"Connector: {conn.name} ({conn.type})")
print(f" Installed: {conn.installed}")
print(f" Capabilities: {', '.join(conn.capabilities)}")

Query a Connector

result = await client.query_connector(
user_token="user-jwt",
connector_name="postgres",
operation="query",
params={"sql": "SELECT * FROM users LIMIT 10"}
)

if result.success:
print(f"Data: {result.data}")
else:
print(f"Error: {result.error}")

Multi-Agent Planning (MAP)

Generate a Plan

plan = await client.generate_plan(
query="Book a flight and hotel for my trip to Paris",
domain="travel"
)

print(f"Plan {plan.plan_id} has {len(plan.steps)} steps")
for step in plan.steps:
print(f" - {step.name}: {step.description}")

Execute the Plan

result = await client.execute_plan(plan.plan_id)

if result.status == "completed":
print(f"Result: {result.result}")
else:
print(f"Error: {result.error}")

Error Handling

from axonflow.exceptions import (
AxonFlowError,
PolicyViolationError,
AuthenticationError,
RateLimitError,
TimeoutError,
)

try:
response = await client.execute_query(...)
except PolicyViolationError as e:
print(f"Blocked by policy: {e.block_reason}")
except RateLimitError as e:
print(f"Rate limited: {e.limit}/{e.remaining}, resets at {e.reset_at}")
except AuthenticationError:
print("Invalid credentials")
except TimeoutError:
print("Request timed out")
except AxonFlowError as e:
print(f"AxonFlow error: {e.message}")

Type Hints

The SDK is fully typed with Pydantic v2 models:

from axonflow import (
ClientResponse,
PolicyApprovalResult,
PlanResponse,
ConnectorResponse,
TokenUsage,
)

# Full autocomplete and type checking support
response: ClientResponse = await client.execute_query(...)
print(response.success)
print(response.data)
print(response.policy_info.policies_evaluated)

Production Best Practices

1. Environment Variables

Never hardcode credentials:

# Good
client = AxonFlow(
agent_url=os.environ["AXONFLOW_AGENT_URL"],
client_id=os.environ["AXONFLOW_CLIENT_ID"],
client_secret=os.environ["AXONFLOW_CLIENT_SECRET"],
)

# Bad - Never do this!
client = AxonFlow(
agent_url="https://...",
client_id="hardcoded-id",
client_secret="hardcoded-secret",
)

2. Use Context Managers

Always use context managers to ensure proper cleanup:

# Async
async with AxonFlow(...) as client:
# client is automatically closed

# Sync
with AxonFlow.sync(...) as client:
# client is automatically closed

3. Enable Caching

Reduce latency for repeated queries:

client = AxonFlow(
...,
cache_enabled=True,
cache_ttl=60.0, # 1 minute
)

4. Enable Retry Logic

Handle transient failures automatically:

from axonflow import RetryConfig

client = AxonFlow(
...,
retry_config=RetryConfig(
enabled=True,
max_attempts=3,
initial_delay=1.0,
max_delay=30.0,
),
)

Support & Resources

Next Steps

License

MIT - See LICENSE