Skip to main content

CrewAI + AxonFlow Integration

Overview

CrewAI enables multi-agent collaboration where specialized AI agents work together on complex tasks. AxonFlow adds governance, compliance, and audit trails to ensure multi-agent systems operate within enterprise policies.

Together, they enable enterprises to deploy autonomous agent crews with full control and observability.


Why Use AxonFlow with CrewAI?

CrewAI Strengths

  • Role-based agent design (Researcher, Writer, Analyst, etc.)
  • Task delegation and collaboration between agents
  • Process types: sequential, hierarchical, consensual
  • Tool integration for each agent
  • Memory and context sharing

AxonFlow Strengths

  • Per-agent policy enforcement (different rules for different roles)
  • Cross-agent audit trails (track how agents interact)
  • Data isolation (agents only access permitted data)
  • Cost control (budget limits per agent or crew)
  • PII protection (mask sensitive data in agent communications)

The Perfect Combination

CrewAI handles: Agent orchestration, task delegation, collaboration
AxonFlow handles: Governance, compliance, audit, access control

Integration Architecture

AxonFlow integrates with CrewAI using Gateway Mode, which wraps LLM calls with policy pre-checks and audit logging:

[CrewAI Agent]
|
v
[AxonFlow Pre-Check] --> Policy Evaluation
|
v (if approved)
[LLM Provider (OpenAI/Anthropic)]
|
v
[AxonFlow Audit] --> Compliance Logging
|
v
[Response to CrewAI]

Note: AxonFlow uses its own API for governance, not an OpenAI-compatible endpoint. Integration requires wrapping your LLM calls with AxonFlow's pre-check and audit endpoints.


Quick Start

Prerequisites

  • AxonFlow running locally or deployed (see Getting Started)
  • Python 3.9+
  • CrewAI installed (pip install crewai crewai-tools)

AxonFlow API Overview

AxonFlow Gateway Mode uses two main endpoints:

EndpointPurpose
POST /api/policy/pre-checkPolicy evaluation before LLM call
POST /api/audit/llm-callAudit logging after LLM call completes

Required Headers:

  • Content-Type: application/json
  • X-Client-Secret: your-client-secret
  • X-License-Key: your-license-key (optional, for enterprise features)

Python SDK Integration

Install Dependencies

pip install axonflow crewai crewai-tools langchain-openai

Pattern 1: Governed CrewAI Agent

Create a governed crew runner using the AxonFlow Python SDK:

import os
import time
from crewai import Agent, Task, Crew, Process
from langchain_openai import ChatOpenAI
from axonflow import AxonFlow, TokenUsage


class GovernedCrewAIRunner:
"""Run CrewAI crews with AxonFlow governance."""

def __init__(
self,
axonflow_url: str,
client_id: str,
client_secret: str,
openai_api_key: str,
model: str = "gpt-4",
license_key: str = None,
):
self.axonflow = AxonFlow(
agent_url=axonflow_url,
client_id=client_id,
client_secret=client_secret,
license_key=license_key,
)
self.openai_api_key = openai_api_key
self.model = model
self.llm = ChatOpenAI(
model=model,
temperature=0.7,
openai_api_key=openai_api_key,
)

def create_agent(
self,
role: str,
goal: str,
backstory: str,
tools: list = None,
) -> Agent:
"""Create a CrewAI agent."""
return Agent(
role=role,
goal=goal,
backstory=backstory,
verbose=True,
llm=self.llm,
tools=tools or [],
memory=True,
)

def run_governed_crew(
self,
user_token: str,
crew: Crew,
inputs: dict,
context: dict = None,
) -> str:
"""Execute a CrewAI crew with AxonFlow governance (sync)."""
start_time = time.time()

# Build query from inputs for policy check
query = " ".join(f"{k}: {v}" for k, v in inputs.items())

with AxonFlow.sync(
agent_url=self.axonflow.config.agent_url,
client_id=self.axonflow.config.client_id,
client_secret=self.axonflow.config.client_secret,
) as axonflow:
# 1. Pre-check with AxonFlow
ctx = axonflow.get_policy_approved_context(
user_token=user_token,
query=query,
context={
**(context or {}),
"framework": "crewai",
"agent_count": len(crew.agents),
"task_count": len(crew.tasks),
},
)

if not ctx.approved:
raise PermissionError(f"Crew blocked: {ctx.block_reason}")

try:
# 2. Execute the crew
result = crew.kickoff(inputs=inputs)
latency_ms = int((time.time() - start_time) * 1000)

# 3. Audit the execution
axonflow.audit_llm_call(
context_id=ctx.context_id,
response_summary=str(result)[:200],
provider="openai",
model=self.model,
token_usage=TokenUsage(
prompt_tokens=500,
completion_tokens=200,
total_tokens=700,
),
latency_ms=latency_ms,
metadata={
"framework": "crewai",
"crew_agents": [a.role for a in crew.agents],
},
)

return str(result)

except Exception as e:
latency_ms = int((time.time() - start_time) * 1000)
axonflow.audit_llm_call(
context_id=ctx.context_id,
response_summary=f"Crew error: {str(e)}",
provider="openai",
model=self.model,
token_usage=TokenUsage(prompt_tokens=0, completion_tokens=0, total_tokens=0),
latency_ms=latency_ms,
metadata={"error": str(e)},
)
raise


# Initialize runner
runner = GovernedCrewAIRunner(
axonflow_url="http://localhost:8080",
client_id="crewai-app",
client_secret="your-client-secret",
openai_api_key=os.environ["OPENAI_API_KEY"],
model="gpt-4",
)

# Create agents
researcher = runner.create_agent(
role="Research Analyst",
goal="Find accurate, up-to-date information",
backstory="Expert researcher with strong analytical skills",
)

writer = runner.create_agent(
role="Content Writer",
goal="Create clear, engaging content",
backstory="Technical writer specializing in documentation",
)

# Create tasks
research_task = Task(
description="Research the latest developments in {topic}",
expected_output="Comprehensive research summary",
agent=researcher,
)

writing_task = Task(
description="Write a blog post based on the research",
expected_output="Well-structured blog post (500-800 words)",
agent=writer,
)

# Create and run governed crew
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, writing_task],
process=Process.sequential,
verbose=True,
)

result = runner.run_governed_crew(
user_token="user-123",
crew=crew,
inputs={"topic": "AI governance in healthcare"},
context={"department": "marketing"},
)
print(result)

Pattern 2: Per-Task Governance

Apply governance checks before each task execution:

import time
from crewai import Task
from langchain_openai import ChatOpenAI
from axonflow import AxonFlow, TokenUsage


class TaskGovernedCrew:
"""CrewAI runner with per-task governance."""

def __init__(
self,
axonflow_url: str,
client_id: str,
client_secret: str,
openai_api_key: str,
):
self.axonflow_url = axonflow_url
self.client_id = client_id
self.client_secret = client_secret
self.llm = ChatOpenAI(
model="gpt-4",
openai_api_key=openai_api_key,
)

def execute_governed_task(
self,
user_token: str,
task: Task,
context: dict = None,
) -> str:
"""Execute a single task with governance."""
start_time = time.time()

with AxonFlow.sync(
agent_url=self.axonflow_url,
client_id=self.client_id,
client_secret=self.client_secret,
) as axonflow:
# Pre-check for this specific task
ctx = axonflow.get_policy_approved_context(
user_token=user_token,
query=task.description,
context={
**(context or {}),
"task_name": task.description[:50],
"agent_role": task.agent.role if task.agent else "unknown",
},
)

if not ctx.approved:
return f"Task blocked: {ctx.block_reason}"

try:
# Execute the task
result = task.execute()
latency_ms = int((time.time() - start_time) * 1000)

# Audit
axonflow.audit_llm_call(
context_id=ctx.context_id,
response_summary=str(result)[:200],
provider="openai",
model="gpt-4",
token_usage=TokenUsage(prompt_tokens=100, completion_tokens=50, total_tokens=150),
latency_ms=latency_ms,
)

return result

except Exception as e:
latency_ms = int((time.time() - start_time) * 1000)
axonflow.audit_llm_call(
context_id=ctx.context_id,
response_summary=f"Task error: {str(e)}",
provider="openai",
model="gpt-4",
token_usage=TokenUsage(prompt_tokens=0, completion_tokens=0, total_tokens=0),
latency_ms=latency_ms,
metadata={"error": str(e)},
)
raise

Go SDK Integration

For Go-based services that orchestrate CrewAI crews:

package main

import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/getaxonflow/axonflow-go-sdk/axonflow"
)

// CrewAIGovernanceService provides governance for CrewAI crews
type CrewAIGovernanceService struct {
gateway *axonflow.GatewayClient
}

// NewCrewAIGovernanceService creates a new governance service
func NewCrewAIGovernanceService(axonflowURL, clientSecret string) *CrewAIGovernanceService {
return &CrewAIGovernanceService{
gateway: axonflow.NewGatewayClient(axonflow.Config{
AgentURL: axonflowURL,
ClientID: "crewai-service",
ClientSecret: clientSecret,
}),
}
}

// CrewRequest represents a CrewAI execution request
type CrewRequest struct {
UserToken string `json:"user_token"`
CrewID string `json:"crew_id"`
Inputs map[string]interface{} `json:"inputs"`
AgentRoles []string `json:"agent_roles"`
}

// CrewResponse represents the governance check result
type CrewResponse struct {
Approved bool `json:"approved"`
ContextID string `json:"context_id"`
Reason string `json:"reason,omitempty"`
}

// CheckCrewExecution validates a crew execution request
func (s *CrewAIGovernanceService) CheckCrewExecution(
ctx context.Context,
req CrewRequest,
) (*CrewResponse, error) {
// Build query from inputs
inputsJSON, _ := json.Marshal(req.Inputs)

preCheck, err := s.gateway.PreCheck(ctx, axonflow.PreCheckRequest{
UserToken: req.UserToken,
Query: string(inputsJSON),
Context: map[string]interface{}{
"framework": "crewai",
"crew_id": req.CrewID,
"agent_roles": req.AgentRoles,
"agent_count": len(req.AgentRoles),
},
})
if err != nil {
return nil, fmt.Errorf("pre-check failed: %w", err)
}

return &CrewResponse{
Approved: preCheck.Approved,
ContextID: preCheck.ContextID,
Reason: preCheck.BlockReason,
}, nil
}

// AuditCrewCompletion logs crew execution completion
func (s *CrewAIGovernanceService) AuditCrewCompletion(
ctx context.Context,
contextID string,
result string,
latencyMs int,
metadata map[string]interface{},
) error {
return s.gateway.AuditLLMCall(ctx, axonflow.AuditRequest{
ContextID: contextID,
ResponseSummary: truncate(result, 200),
Provider: "openai",
Model: "gpt-4",
TokenUsage: axonflow.TokenUsage{
PromptTokens: 500,
CompletionTokens: 200,
TotalTokens: 700,
},
LatencyMs: latencyMs,
Metadata: metadata,
})
}

func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen]
}

// HTTP handler for crew governance
func (s *CrewAIGovernanceService) HandleCrewCheck(w http.ResponseWriter, r *http.Request) {
var req CrewRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

resp, err := s.CheckCrewExecution(r.Context(), req)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
}

func main() {
service := NewCrewAIGovernanceService(
"http://localhost:8080",
"your-client-secret",
)

http.HandleFunc("/api/crew/check", service.HandleCrewCheck)
http.ListenAndServe(":9090", nil)
}

Python Client for Go Service

import requests

def check_crew_with_go_service(
go_service_url: str,
user_token: str,
crew_id: str,
inputs: dict,
agent_roles: list,
) -> dict:
"""Check crew execution with Go governance service."""
response = requests.post(
f"{go_service_url}/api/crew/check",
json={
"user_token": user_token,
"crew_id": crew_id,
"inputs": inputs,
"agent_roles": agent_roles,
},
)
return response.json()


# Usage
result = check_crew_with_go_service(
go_service_url="http://localhost:9090",
user_token="user-123",
crew_id="research-crew",
inputs={"topic": "AI governance"},
agent_roles=["researcher", "writer"],
)

if result["approved"]:
# Run the crew
crew.kickoff(inputs={"topic": "AI governance"})
else:
print(f"Crew blocked: {result['reason']}")

AxonFlow Policy Configuration

Create policies for different CrewAI agent roles:

{
"policies": [
{
"name": "crewai-researcher-policy",
"description": "Policy for researcher agents",
"enabled": true,
"rules": [
{
"type": "rate_limit",
"config": {
"requests_per_minute": 30,
"action": "throttle"
}
},
{
"type": "content_filter",
"config": {
"blocked_patterns": ["confidential", "internal only"],
"action": "block"
}
}
]
},
{
"name": "crewai-writer-policy",
"description": "Policy for writer agents with PII protection",
"enabled": true,
"rules": [
{
"type": "pii_protection",
"config": {
"fields": ["email", "phone", "name"],
"action": "mask"
}
}
]
},
{
"name": "crewai-analyst-policy",
"description": "Policy for analyst agents with data access",
"enabled": true,
"rules": [
{
"type": "content_filter",
"config": {
"blocked_patterns": ["DELETE", "DROP", "TRUNCATE"],
"action": "block"
}
}
]
}
]
}

Use Case: Travel Booking Crew

Real-world example with compliance requirements:

from crewai import Agent, Task, Crew, Process

# Initialize governance
governance = AxonFlowGovernance(AxonFlowConfig(
agent_url="http://localhost:8080",
client_id="travel-crew",
client_secret="your-client-secret",
))

runner = GovernedCrewAIRunner(
governance=governance,
openai_api_key=os.environ["OPENAI_API_KEY"],
)

# Search Agent
search_agent = runner.create_governed_agent(
role="Travel Search Specialist",
goal="Find the best travel options within budget",
backstory="Expert in searching multiple travel providers",
agent_type="search",
data_tier="external",
)

# Compliance Agent
compliance_agent = runner.create_governed_agent(
role="Compliance Officer",
goal="Ensure bookings meet regulatory requirements",
backstory="Verifies GDPR compliance and visa requirements",
agent_type="compliance",
data_tier="internal",
)

# Booking Agent
booking_agent = runner.create_governed_agent(
role="Booking Coordinator",
goal="Process bookings with proper verification",
backstory="Handles payment processing and confirmations",
agent_type="booking",
data_tier="restricted",
)

# Tasks
search_task = Task(
description="""Search for flights and hotels for:
- Destination: {destination}
- Dates: {travel_dates}
- Budget: {budget}
Return top 3 options with prices.""",
expected_output="List of 3 travel options",
agent=search_agent,
)

compliance_task = Task(
description="""Review travel options and verify:
- GDPR compliance for EU destinations
- Visa requirements
- Travel insurance requirements""",
expected_output="Compliance report",
agent=compliance_agent,
)

booking_task = Task(
description="""Process booking for approved option:
- Verify traveler information
- Process payment
- Send confirmation""",
expected_output="Booking confirmation with audit reference",
agent=booking_agent,
)

# Create crew
travel_crew = Crew(
agents=[search_agent, compliance_agent, booking_agent],
tasks=[search_task, compliance_task, booking_task],
process=Process.sequential,
verbose=True,
)

# Execute with governance
result = runner.run_governed_crew(
user_token="travel-agent-123",
crew=travel_crew,
inputs={
"destination": "Paris, France",
"travel_dates": "2025-03-15 to 2025-03-20",
"budget": "3000 EUR",
},
context={
"compliance_framework": "gdpr",
"transaction_type": "travel_booking",
},
)

Best Practices

1. Always Use Context IDs

The context_id from pre-check must be passed to audit for proper correlation:

pre_check = governance.pre_check(user_token, query)
context_id = pre_check["context_id"] # Store this immediately

# ... run crew ...

governance.audit_llm_call(context_id=context_id, ...)

2. Handle Blocked Requests Gracefully

pre_check = governance.pre_check(user_token, query)
if not pre_check.get("approved"):
logger.warning(f"Crew blocked: {pre_check.get('block_reason')}")
return "Crew execution blocked due to policy restrictions."

3. Always Audit, Even on Errors

try:
result = crew.kickoff(inputs=inputs)
governance.audit_llm_call(context_id, str(result), ...)
except Exception as e:
governance.audit_llm_call(context_id, f"Error: {e}", ...)
raise

4. Role-Based Policy Context

Use meaningful context for policy routing:

pre_check = governance.pre_check(
user_token=user_token,
query=query,
context={
"framework": "crewai",
"agent_role": agent.role,
"data_tier": "restricted",
"compliance_framework": "hipaa",
},
)

Troubleshooting

Common Issues

Issue: Pre-check returns 401 Unauthorized

  • Verify X-Client-Secret header is correct
  • Check X-License-Key if using enterprise features
  • Ensure client_id is registered in AxonFlow

Issue: Crew execution blocked unexpectedly

  • Check which policy triggered the block
  • Review context fields match policy conditions
  • Verify agent roles have appropriate permissions

Issue: Audit calls failing

  • Verify context_id is from a valid pre-check (not expired)
  • Check that AxonFlow agent is healthy (/health endpoint)

Issue: High latency with multi-agent crews

  • Consider batch governance checks for sequential tasks
  • Use async audit calls to not block crew execution

Additional Resources