CrewAI + AxonFlow Integration
Overview
CrewAI enables multi-agent collaboration where specialized AI agents work together on complex tasks. AxonFlow adds governance, compliance, and audit trails to ensure multi-agent systems operate within enterprise policies.
Together, they enable enterprises to deploy autonomous agent crews with full control and observability.
Why Use AxonFlow with CrewAI?
CrewAI Strengths
- Role-based agent design (Researcher, Writer, Analyst, etc.)
- Task delegation and collaboration between agents
- Process types: sequential, hierarchical, consensual
- Tool integration for each agent
- Memory and context sharing
AxonFlow Strengths
- Per-agent policy enforcement (different rules for different roles)
- Cross-agent audit trails (track how agents interact)
- Data isolation (agents only access permitted data)
- Cost control (budget limits per agent or crew)
- PII protection (mask sensitive data in agent communications)
The Perfect Combination
CrewAI handles: Agent orchestration, task delegation, collaboration
AxonFlow handles: Governance, compliance, audit, access control
Integration Architecture
AxonFlow integrates with CrewAI using Gateway Mode, which wraps LLM calls with policy pre-checks and audit logging:
[CrewAI Agent]
|
v
[AxonFlow Pre-Check] --> Policy Evaluation
|
v (if approved)
[LLM Provider (OpenAI/Anthropic)]
|
v
[AxonFlow Audit] --> Compliance Logging
|
v
[Response to CrewAI]
Note: AxonFlow uses its own API for governance, not an OpenAI-compatible endpoint. Integration requires wrapping your LLM calls with AxonFlow's pre-check and audit endpoints.
Quick Start
Prerequisites
- AxonFlow running locally or deployed (see Getting Started)
- Python 3.9+
- CrewAI installed (
pip install crewai crewai-tools)
AxonFlow API Overview
AxonFlow Gateway Mode uses two main endpoints:
| Endpoint | Purpose |
|---|---|
POST /api/policy/pre-check | Policy evaluation before LLM call |
POST /api/audit/llm-call | Audit logging after LLM call completes |
Required Headers:
Content-Type: application/jsonX-Client-Secret: your-client-secretX-License-Key: your-license-key(optional, for enterprise features)
Python SDK Integration
Install Dependencies
pip install axonflow crewai crewai-tools langchain-openai
Pattern 1: Governed CrewAI Agent
Create a governed crew runner using the AxonFlow Python SDK:
import os
import time
from crewai import Agent, Task, Crew, Process
from langchain_openai import ChatOpenAI
from axonflow import AxonFlow, TokenUsage
class GovernedCrewAIRunner:
"""Run CrewAI crews with AxonFlow governance."""
def __init__(
self,
axonflow_url: str,
client_id: str,
client_secret: str,
openai_api_key: str,
model: str = "gpt-4",
license_key: str = None,
):
self.axonflow = AxonFlow(
agent_url=axonflow_url,
client_id=client_id,
client_secret=client_secret,
license_key=license_key,
)
self.openai_api_key = openai_api_key
self.model = model
self.llm = ChatOpenAI(
model=model,
temperature=0.7,
openai_api_key=openai_api_key,
)
def create_agent(
self,
role: str,
goal: str,
backstory: str,
tools: list = None,
) -> Agent:
"""Create a CrewAI agent."""
return Agent(
role=role,
goal=goal,
backstory=backstory,
verbose=True,
llm=self.llm,
tools=tools or [],
memory=True,
)
def run_governed_crew(
self,
user_token: str,
crew: Crew,
inputs: dict,
context: dict = None,
) -> str:
"""Execute a CrewAI crew with AxonFlow governance (sync)."""
start_time = time.time()
# Build query from inputs for policy check
query = " ".join(f"{k}: {v}" for k, v in inputs.items())
with AxonFlow.sync(
agent_url=self.axonflow.config.agent_url,
client_id=self.axonflow.config.client_id,
client_secret=self.axonflow.config.client_secret,
) as axonflow:
# 1. Pre-check with AxonFlow
ctx = axonflow.get_policy_approved_context(
user_token=user_token,
query=query,
context={
**(context or {}),
"framework": "crewai",
"agent_count": len(crew.agents),
"task_count": len(crew.tasks),
},
)
if not ctx.approved:
raise PermissionError(f"Crew blocked: {ctx.block_reason}")
try:
# 2. Execute the crew
result = crew.kickoff(inputs=inputs)
latency_ms = int((time.time() - start_time) * 1000)
# 3. Audit the execution
axonflow.audit_llm_call(
context_id=ctx.context_id,
response_summary=str(result)[:200],
provider="openai",
model=self.model,
token_usage=TokenUsage(
prompt_tokens=500,
completion_tokens=200,
total_tokens=700,
),
latency_ms=latency_ms,
metadata={
"framework": "crewai",
"crew_agents": [a.role for a in crew.agents],
},
)
return str(result)
except Exception as e:
latency_ms = int((time.time() - start_time) * 1000)
axonflow.audit_llm_call(
context_id=ctx.context_id,
response_summary=f"Crew error: {str(e)}",
provider="openai",
model=self.model,
token_usage=TokenUsage(prompt_tokens=0, completion_tokens=0, total_tokens=0),
latency_ms=latency_ms,
metadata={"error": str(e)},
)
raise
# Initialize runner
runner = GovernedCrewAIRunner(
axonflow_url="http://localhost:8080",
client_id="crewai-app",
client_secret="your-client-secret",
openai_api_key=os.environ["OPENAI_API_KEY"],
model="gpt-4",
)
# Create agents
researcher = runner.create_agent(
role="Research Analyst",
goal="Find accurate, up-to-date information",
backstory="Expert researcher with strong analytical skills",
)
writer = runner.create_agent(
role="Content Writer",
goal="Create clear, engaging content",
backstory="Technical writer specializing in documentation",
)
# Create tasks
research_task = Task(
description="Research the latest developments in {topic}",
expected_output="Comprehensive research summary",
agent=researcher,
)
writing_task = Task(
description="Write a blog post based on the research",
expected_output="Well-structured blog post (500-800 words)",
agent=writer,
)
# Create and run governed crew
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, writing_task],
process=Process.sequential,
verbose=True,
)
result = runner.run_governed_crew(
user_token="user-123",
crew=crew,
inputs={"topic": "AI governance in healthcare"},
context={"department": "marketing"},
)
print(result)
Pattern 2: Per-Task Governance
Apply governance checks before each task execution:
import time
from crewai import Task
from langchain_openai import ChatOpenAI
from axonflow import AxonFlow, TokenUsage
class TaskGovernedCrew:
"""CrewAI runner with per-task governance."""
def __init__(
self,
axonflow_url: str,
client_id: str,
client_secret: str,
openai_api_key: str,
):
self.axonflow_url = axonflow_url
self.client_id = client_id
self.client_secret = client_secret
self.llm = ChatOpenAI(
model="gpt-4",
openai_api_key=openai_api_key,
)
def execute_governed_task(
self,
user_token: str,
task: Task,
context: dict = None,
) -> str:
"""Execute a single task with governance."""
start_time = time.time()
with AxonFlow.sync(
agent_url=self.axonflow_url,
client_id=self.client_id,
client_secret=self.client_secret,
) as axonflow:
# Pre-check for this specific task
ctx = axonflow.get_policy_approved_context(
user_token=user_token,
query=task.description,
context={
**(context or {}),
"task_name": task.description[:50],
"agent_role": task.agent.role if task.agent else "unknown",
},
)
if not ctx.approved:
return f"Task blocked: {ctx.block_reason}"
try:
# Execute the task
result = task.execute()
latency_ms = int((time.time() - start_time) * 1000)
# Audit
axonflow.audit_llm_call(
context_id=ctx.context_id,
response_summary=str(result)[:200],
provider="openai",
model="gpt-4",
token_usage=TokenUsage(prompt_tokens=100, completion_tokens=50, total_tokens=150),
latency_ms=latency_ms,
)
return result
except Exception as e:
latency_ms = int((time.time() - start_time) * 1000)
axonflow.audit_llm_call(
context_id=ctx.context_id,
response_summary=f"Task error: {str(e)}",
provider="openai",
model="gpt-4",
token_usage=TokenUsage(prompt_tokens=0, completion_tokens=0, total_tokens=0),
latency_ms=latency_ms,
metadata={"error": str(e)},
)
raise
Go SDK Integration
For Go-based services that orchestrate CrewAI crews:
package main
import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/getaxonflow/axonflow-go-sdk/axonflow"
)
// CrewAIGovernanceService provides governance for CrewAI crews
type CrewAIGovernanceService struct {
gateway *axonflow.GatewayClient
}
// NewCrewAIGovernanceService creates a new governance service
func NewCrewAIGovernanceService(axonflowURL, clientSecret string) *CrewAIGovernanceService {
return &CrewAIGovernanceService{
gateway: axonflow.NewGatewayClient(axonflow.Config{
AgentURL: axonflowURL,
ClientID: "crewai-service",
ClientSecret: clientSecret,
}),
}
}
// CrewRequest represents a CrewAI execution request
type CrewRequest struct {
UserToken string `json:"user_token"`
CrewID string `json:"crew_id"`
Inputs map[string]interface{} `json:"inputs"`
AgentRoles []string `json:"agent_roles"`
}
// CrewResponse represents the governance check result
type CrewResponse struct {
Approved bool `json:"approved"`
ContextID string `json:"context_id"`
Reason string `json:"reason,omitempty"`
}
// CheckCrewExecution validates a crew execution request
func (s *CrewAIGovernanceService) CheckCrewExecution(
ctx context.Context,
req CrewRequest,
) (*CrewResponse, error) {
// Build query from inputs
inputsJSON, _ := json.Marshal(req.Inputs)
preCheck, err := s.gateway.PreCheck(ctx, axonflow.PreCheckRequest{
UserToken: req.UserToken,
Query: string(inputsJSON),
Context: map[string]interface{}{
"framework": "crewai",
"crew_id": req.CrewID,
"agent_roles": req.AgentRoles,
"agent_count": len(req.AgentRoles),
},
})
if err != nil {
return nil, fmt.Errorf("pre-check failed: %w", err)
}
return &CrewResponse{
Approved: preCheck.Approved,
ContextID: preCheck.ContextID,
Reason: preCheck.BlockReason,
}, nil
}
// AuditCrewCompletion logs crew execution completion
func (s *CrewAIGovernanceService) AuditCrewCompletion(
ctx context.Context,
contextID string,
result string,
latencyMs int,
metadata map[string]interface{},
) error {
return s.gateway.AuditLLMCall(ctx, axonflow.AuditRequest{
ContextID: contextID,
ResponseSummary: truncate(result, 200),
Provider: "openai",
Model: "gpt-4",
TokenUsage: axonflow.TokenUsage{
PromptTokens: 500,
CompletionTokens: 200,
TotalTokens: 700,
},
LatencyMs: latencyMs,
Metadata: metadata,
})
}
func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen]
}
// HTTP handler for crew governance
func (s *CrewAIGovernanceService) HandleCrewCheck(w http.ResponseWriter, r *http.Request) {
var req CrewRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
resp, err := s.CheckCrewExecution(r.Context(), req)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
}
func main() {
service := NewCrewAIGovernanceService(
"http://localhost:8080",
"your-client-secret",
)
http.HandleFunc("/api/crew/check", service.HandleCrewCheck)
http.ListenAndServe(":9090", nil)
}
Python Client for Go Service
import requests
def check_crew_with_go_service(
go_service_url: str,
user_token: str,
crew_id: str,
inputs: dict,
agent_roles: list,
) -> dict:
"""Check crew execution with Go governance service."""
response = requests.post(
f"{go_service_url}/api/crew/check",
json={
"user_token": user_token,
"crew_id": crew_id,
"inputs": inputs,
"agent_roles": agent_roles,
},
)
return response.json()
# Usage
result = check_crew_with_go_service(
go_service_url="http://localhost:9090",
user_token="user-123",
crew_id="research-crew",
inputs={"topic": "AI governance"},
agent_roles=["researcher", "writer"],
)
if result["approved"]:
# Run the crew
crew.kickoff(inputs={"topic": "AI governance"})
else:
print(f"Crew blocked: {result['reason']}")
AxonFlow Policy Configuration
Create policies for different CrewAI agent roles:
{
"policies": [
{
"name": "crewai-researcher-policy",
"description": "Policy for researcher agents",
"enabled": true,
"rules": [
{
"type": "rate_limit",
"config": {
"requests_per_minute": 30,
"action": "throttle"
}
},
{
"type": "content_filter",
"config": {
"blocked_patterns": ["confidential", "internal only"],
"action": "block"
}
}
]
},
{
"name": "crewai-writer-policy",
"description": "Policy for writer agents with PII protection",
"enabled": true,
"rules": [
{
"type": "pii_protection",
"config": {
"fields": ["email", "phone", "name"],
"action": "mask"
}
}
]
},
{
"name": "crewai-analyst-policy",
"description": "Policy for analyst agents with data access",
"enabled": true,
"rules": [
{
"type": "content_filter",
"config": {
"blocked_patterns": ["DELETE", "DROP", "TRUNCATE"],
"action": "block"
}
}
]
}
]
}
Use Case: Travel Booking Crew
Real-world example with compliance requirements:
from crewai import Agent, Task, Crew, Process
# Initialize governance
governance = AxonFlowGovernance(AxonFlowConfig(
agent_url="http://localhost:8080",
client_id="travel-crew",
client_secret="your-client-secret",
))
runner = GovernedCrewAIRunner(
governance=governance,
openai_api_key=os.environ["OPENAI_API_KEY"],
)
# Search Agent
search_agent = runner.create_governed_agent(
role="Travel Search Specialist",
goal="Find the best travel options within budget",
backstory="Expert in searching multiple travel providers",
agent_type="search",
data_tier="external",
)
# Compliance Agent
compliance_agent = runner.create_governed_agent(
role="Compliance Officer",
goal="Ensure bookings meet regulatory requirements",
backstory="Verifies GDPR compliance and visa requirements",
agent_type="compliance",
data_tier="internal",
)
# Booking Agent
booking_agent = runner.create_governed_agent(
role="Booking Coordinator",
goal="Process bookings with proper verification",
backstory="Handles payment processing and confirmations",
agent_type="booking",
data_tier="restricted",
)
# Tasks
search_task = Task(
description="""Search for flights and hotels for:
- Destination: {destination}
- Dates: {travel_dates}
- Budget: {budget}
Return top 3 options with prices.""",
expected_output="List of 3 travel options",
agent=search_agent,
)
compliance_task = Task(
description="""Review travel options and verify:
- GDPR compliance for EU destinations
- Visa requirements
- Travel insurance requirements""",
expected_output="Compliance report",
agent=compliance_agent,
)
booking_task = Task(
description="""Process booking for approved option:
- Verify traveler information
- Process payment
- Send confirmation""",
expected_output="Booking confirmation with audit reference",
agent=booking_agent,
)
# Create crew
travel_crew = Crew(
agents=[search_agent, compliance_agent, booking_agent],
tasks=[search_task, compliance_task, booking_task],
process=Process.sequential,
verbose=True,
)
# Execute with governance
result = runner.run_governed_crew(
user_token="travel-agent-123",
crew=travel_crew,
inputs={
"destination": "Paris, France",
"travel_dates": "2025-03-15 to 2025-03-20",
"budget": "3000 EUR",
},
context={
"compliance_framework": "gdpr",
"transaction_type": "travel_booking",
},
)
Best Practices
1. Always Use Context IDs
The context_id from pre-check must be passed to audit for proper correlation:
pre_check = governance.pre_check(user_token, query)
context_id = pre_check["context_id"] # Store this immediately
# ... run crew ...
governance.audit_llm_call(context_id=context_id, ...)
2. Handle Blocked Requests Gracefully
pre_check = governance.pre_check(user_token, query)
if not pre_check.get("approved"):
logger.warning(f"Crew blocked: {pre_check.get('block_reason')}")
return "Crew execution blocked due to policy restrictions."
3. Always Audit, Even on Errors
try:
result = crew.kickoff(inputs=inputs)
governance.audit_llm_call(context_id, str(result), ...)
except Exception as e:
governance.audit_llm_call(context_id, f"Error: {e}", ...)
raise
4. Role-Based Policy Context
Use meaningful context for policy routing:
pre_check = governance.pre_check(
user_token=user_token,
query=query,
context={
"framework": "crewai",
"agent_role": agent.role,
"data_tier": "restricted",
"compliance_framework": "hipaa",
},
)
Troubleshooting
Common Issues
Issue: Pre-check returns 401 Unauthorized
- Verify
X-Client-Secretheader is correct - Check
X-License-Keyif using enterprise features - Ensure client_id is registered in AxonFlow
Issue: Crew execution blocked unexpectedly
- Check which policy triggered the block
- Review context fields match policy conditions
- Verify agent roles have appropriate permissions
Issue: Audit calls failing
- Verify context_id is from a valid pre-check (not expired)
- Check that AxonFlow agent is healthy (
/healthendpoint)
Issue: High latency with multi-agent crews
- Consider batch governance checks for sequential tasks
- Use async audit calls to not block crew execution