CrewAI + AxonFlow Integration
Prerequisites: Python 3.9+, AxonFlow running (Getting Started), pip install crewai axonflow
What CrewAI Does Well
CrewAI is a framework for orchestrating autonomous AI agents that collaborate on complex tasks. Its strengths are compelling:
Role-Based Agents: Define agents with specific roles (Researcher, Writer, Analyst). Each agent has a backstory, goals, and expertise.
Task Delegation: Agents delegate subtasks to each other. Complex work is divided naturally.
Process Types: Sequential, hierarchical, and consensual processes. Different collaboration patterns for different needs.
Tool Integration: Each agent can use different tools. Researchers search, writers format, analysts compute.
Memory and Context: Agents remember context across interactions. Long-running collaborations work naturally.
Human-in-the-Loop: Agents can ask for human input when needed. Supervised autonomy is built in.
What CrewAI Doesn't Try to Solve
CrewAI focuses on multi-agent collaboration. These concerns are explicitly out of scope:
| Production Requirement | CrewAI's Position |
|---|---|
| Policy enforcement before agent actions | Not provided—agents act based on their roles, not policies |
| PII detection in agent communications | Not addressed—agents share data freely |
| SQL injection prevention | Not provided—must implement at tool level |
| Per-agent or per-crew cost attribution | Not tracked—requires external monitoring |
| Audit trails for compliance | Not built in—conversations aren't logged by default |
| Cross-agent access control | Not addressed—agents can delegate to any other agent |
| Token budget enforcement | Not provided—crews can consume unlimited tokens |
This isn't a criticism—it's a design choice. CrewAI handles collaboration. Governance is a separate concern.
Where Teams Hit Production Friction
Based on real enterprise deployments, here are the blockers that appear after the prototype works:
1. The Delegation Loop
A researcher agent delegates to an analyst. The analyst delegates back with a question. The researcher re-delegates. This continues. By Monday, 15,000 API calls have accumulated.
CrewAI processed every delegation as intended. Nothing was watching the cost of collaboration.
2. The Invisible Handoff
A customer service crew handles a complaint. The complaint gets handed off between three agents. The customer asks:
- Who handled my issue?
- What was discussed at each step?
- Who made the final decision?
CrewAI orchestrated the handoffs. Without custom logging, the collaboration history is gone.
3. The Cross-Agent Data Leak
An HR agent collects employee information. It delegates a summary to a reporting agent. The reporting agent, designed for external reports, now has internal HR data in its context.
CrewAI has no mechanism to filter data between agents based on sensitivity.
4. The Security Review Block
Security review: BLOCKED
- No audit trail for inter-agent delegation
- PII can flow between agents without filtering
- No policy enforcement per agent role
- Cost controls missing
- No role-based access for agent actions
The multi-agent crew worked perfectly. It can't ship.
5. The Autonomous Tool Abuse
An agent with database access decides to query extensively for a task. 10,000 database queries later, the task is complete—and the database is throttled.
CrewAI gave the agent autonomy. Nothing governed how much autonomy was appropriate.
How AxonFlow Plugs In
AxonFlow doesn't replace CrewAI. It sits underneath it—providing the governance layer that CrewAI intentionally doesn't include:
┌─────────────────┐
│ Your App │
└────────┬────────┘
│
v
┌─────────────────┐
│ CrewAI │ <-- Agents, Tasks, Delegation
└────────┬────────┘
│
v
┌─────────────────────────────────┐
│ AxonFlow │
│ ┌───────────┐ ┌────────────┐ │
│ │ Policy │ │ Audit │ │
│ │ Enforce │ │ Trail │ │
│ └───────────┘ └────────────┘ │
│ ┌───────────┐ ┌────────────┐ │
│ │ PII │ │ Cost │ │
│ │ Detection│ │ Control │ │
│ └───────────┘ └────────────┘ │
└────────────────┬────────────────┘
│
v
┌─────────────────┐
│ LLM Provider │
└─────────────────┘
What this gives you:
- Every agent action logged with role and delegation context
- PII detected and blocked before crossing agent boundaries
- SQL injection attempts blocked in agent tools
- Cost tracked per agent, per crew, per user
- Compliance auditors can query the full collaboration history
What stays the same:
- Your CrewAI code doesn't change
- Agent definitions work as before
- No new abstractions to learn
Integration Patterns
Pattern 1: Governed Crew Runner (Python) — Recommended
Recommended default for most teams. Wrap CrewAI crews with AxonFlow governance:
import os
import time
from crewai import Agent, Task, Crew, Process
from langchain_openai import ChatOpenAI
from axonflow import AxonFlow, TokenUsage
class GovernedCrewRunner:
"""Run CrewAI crews with AxonFlow governance."""
def __init__(
self,
axonflow_url: str = None,
client_id: str = None,
client_secret: str = None,
model: str = "gpt-4",
):
self.axonflow_endpoint = axonflow_url or os.getenv("AXONFLOW_ENDPOINT", "http://localhost:8080")
self.client_id = client_id or os.getenv("AXONFLOW_CLIENT_ID", "crewai-app")
self.client_secret = client_secret or os.getenv("AXONFLOW_CLIENT_SECRET")
self.model = model
self.llm = ChatOpenAI(
model=model,
temperature=0.7,
openai_api_key=os.getenv("OPENAI_API_KEY"),
)
def create_agent(
self,
role: str,
goal: str,
backstory: str,
tools: list = None,
) -> Agent:
"""Create a CrewAI agent."""
return Agent(
role=role,
goal=goal,
backstory=backstory,
verbose=True,
llm=self.llm,
tools=tools or [],
)
def run_governed_crew(
self,
user_token: str,
crew: Crew,
inputs: dict,
context: dict = None,
) -> str:
"""Execute a CrewAI crew with AxonFlow governance."""
start_time = time.time()
query = " ".join(f"{k}: {v}" for k, v in inputs.items())
with AxonFlow.sync(
endpoint=self.axonflow_endpoint,
client_id=self.client_id,
client_secret=self.client_secret,
) as axonflow:
# 1. Pre-check
ctx = axonflow.get_policy_approved_context(
user_token=user_token,
query=query,
context={
**(context or {}),
"framework": "crewai",
"agent_count": len(crew.agents),
"agent_roles": [a.role for a in crew.agents],
},
)
if not ctx.approved:
raise PermissionError(f"Crew blocked: {ctx.block_reason}")
try:
# 2. Execute crew
result = crew.kickoff(inputs=inputs)
latency_ms = int((time.time() - start_time) * 1000)
# 3. Audit
axonflow.audit_llm_call(
context_id=ctx.context_id,
response_summary=str(result)[:200],
provider="openai",
model=self.model,
token_usage=TokenUsage(prompt_tokens=500, completion_tokens=200, total_tokens=700),
latency_ms=latency_ms,
metadata={"crew_agents": [a.role for a in crew.agents]},
)
return str(result)
except Exception as e:
axonflow.audit_llm_call(
context_id=ctx.context_id,
response_summary=f"Crew error: {str(e)}",
provider="openai",
model=self.model,
token_usage=TokenUsage(prompt_tokens=0, completion_tokens=0, total_tokens=0),
latency_ms=int((time.time() - start_time) * 1000),
metadata={"error": str(e)},
)
raise
# Usage
runner = GovernedCrewRunner()
# Create agents
researcher = runner.create_agent(
role="Research Analyst",
goal="Find accurate information",
backstory="Expert researcher with analytical skills",
)
writer = runner.create_agent(
role="Content Writer",
goal="Create clear, engaging content",
backstory="Technical writer specializing in documentation",
)
# Create tasks
research_task = Task(
description="Research the latest developments in {topic}",
expected_output="Comprehensive research summary",
agent=researcher,
)
writing_task = Task(
description="Write a blog post based on the research",
expected_output="Well-structured blog post",
agent=writer,
)
# Create and run governed crew
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, writing_task],
process=Process.sequential,
)
result = runner.run_governed_crew(
user_token="user-123",
crew=crew,
inputs={"topic": "AI governance in healthcare"},
context={"department": "marketing"},
)
Pattern 2: Go Service for Crew Governance — For service-oriented architectures
For Go services coordinating CrewAI:
package main
import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/getaxonflow/axonflow-sdk-go"
)
type CrewGovernanceService struct {
client *axonflow.AxonFlowClient
}
func NewCrewGovernanceService(agentURL, clientSecret string) *CrewGovernanceService {
return &CrewGovernanceService{
client: axonflow.NewClient(axonflow.Config{
Endpoint: agentURL,
ClientID: "crewai-service",
ClientSecret: clientSecret,
}),
}
}
type CrewRequest struct {
UserToken string `json:"user_token"`
CrewID string `json:"crew_id"`
Inputs map[string]interface{} `json:"inputs"`
AgentRoles []string `json:"agent_roles"`
}
type CrewResponse struct {
Approved bool `json:"approved"`
ContextID string `json:"context_id"`
Reason string `json:"reason,omitempty"`
}
func (s *CrewGovernanceService) CheckCrewExecution(ctx context.Context, req CrewRequest) (*CrewResponse, error) {
inputsJSON, _ := json.Marshal(req.Inputs)
result, err := s.client.ExecuteQuery(
req.UserToken,
string(inputsJSON),
"chat",
map[string]interface{}{
"framework": "crewai",
"crew_id": req.CrewID,
"agent_roles": req.AgentRoles,
"agent_count": len(req.AgentRoles),
},
)
if err != nil {
return nil, fmt.Errorf("pre-check failed: %w", err)
}
return &CrewResponse{
Approved: !result.Blocked,
ContextID: result.ContextID,
Reason: result.BlockReason,
}, nil
}
func (s *CrewGovernanceService) AuditCrewCompletion(
ctx context.Context,
contextID, result string,
latencyMs int,
metadata map[string]interface{},
) error {
return s.client.AuditLLMCall(axonflow.AuditRequest{
ContextID: contextID,
ResponseSummary: truncate(result, 200),
Provider: "openai",
Model: "gpt-4",
TokenUsage: axonflow.TokenUsage{PromptTokens: 500, CompletionTokens: 200, TotalTokens: 700},
LatencyMs: latencyMs,
Metadata: metadata,
})
}
func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen]
}
Pattern 3: Per-Task Governance — Advanced: For fine-grained control
Apply governance before each task:
class TaskGovernedCrew:
"""CrewAI with per-task governance checks."""
def __init__(self, axonflow_url: str, client_id: str, client_secret: str):
self.axonflow_url = axonflow_url
self.client_id = client_id
self.client_secret = client_secret
def execute_governed_task(
self,
user_token: str,
task: Task,
context: dict = None,
) -> str:
"""Execute a single task with governance."""
start_time = time.time()
with AxonFlow.sync(
endpoint=self.axonflow_endpoint,
client_id=self.client_id,
client_secret=self.client_secret,
) as axonflow:
ctx = axonflow.get_policy_approved_context(
user_token=user_token,
query=task.description,
context={
**(context or {}),
"task_name": task.description[:50],
"agent_role": task.agent.role if task.agent else "unknown",
},
)
if not ctx.approved:
return f"Task blocked: {ctx.block_reason}"
result = task.execute()
axonflow.audit_llm_call(
context_id=ctx.context_id,
response_summary=str(result)[:200],
provider="openai",
model="gpt-4",
latency_ms=int((time.time() - start_time) * 1000),
)
return result
Example Implementations
| Language | SDK | Example |
|---|---|---|
| Python | axonflow | crewai/python |
| Go | axonflow-sdk-go | crewai/go |