Skip to main content

CrewAI + AxonFlow Integration

Prerequisites: Python 3.9+, AxonFlow running (Getting Started), pip install crewai axonflow


What CrewAI Does Well

CrewAI is a framework for orchestrating autonomous AI agents that collaborate on complex tasks. Its strengths are compelling:

Role-Based Agents: Define agents with specific roles (Researcher, Writer, Analyst). Each agent has a backstory, goals, and expertise.

Task Delegation: Agents delegate subtasks to each other. Complex work is divided naturally.

Process Types: Sequential, hierarchical, and consensual processes. Different collaboration patterns for different needs.

Tool Integration: Each agent can use different tools. Researchers search, writers format, analysts compute.

Memory and Context: Agents remember context across interactions. Long-running collaborations work naturally.

Human-in-the-Loop: Agents can ask for human input when needed. Supervised autonomy is built in.


What CrewAI Doesn't Try to Solve

CrewAI focuses on multi-agent collaboration. These concerns are explicitly out of scope:

Production RequirementCrewAI's Position
Policy enforcement before agent actionsNot provided—agents act based on their roles, not policies
PII detection in agent communicationsNot addressed—agents share data freely
SQL injection preventionNot provided—must implement at tool level
Per-agent or per-crew cost attributionNot tracked—requires external monitoring
Audit trails for complianceNot built in—conversations aren't logged by default
Cross-agent access controlNot addressed—agents can delegate to any other agent
Token budget enforcementNot provided—crews can consume unlimited tokens

This isn't a criticism—it's a design choice. CrewAI handles collaboration. Governance is a separate concern.


Where Teams Hit Production Friction

Based on real enterprise deployments, here are the blockers that appear after the prototype works:

1. The Delegation Loop

A researcher agent delegates to an analyst. The analyst delegates back with a question. The researcher re-delegates. This continues. By Monday, 15,000 API calls have accumulated.

CrewAI processed every delegation as intended. Nothing was watching the cost of collaboration.

2. The Invisible Handoff

A customer service crew handles a complaint. The complaint gets handed off between three agents. The customer asks:

  • Who handled my issue?
  • What was discussed at each step?
  • Who made the final decision?

CrewAI orchestrated the handoffs. Without custom logging, the collaboration history is gone.

3. The Cross-Agent Data Leak

An HR agent collects employee information. It delegates a summary to a reporting agent. The reporting agent, designed for external reports, now has internal HR data in its context.

CrewAI has no mechanism to filter data between agents based on sensitivity.

4. The Security Review Block

Security review: BLOCKED
- No audit trail for inter-agent delegation
- PII can flow between agents without filtering
- No policy enforcement per agent role
- Cost controls missing
- No role-based access for agent actions

The multi-agent crew worked perfectly. It can't ship.

5. The Autonomous Tool Abuse

An agent with database access decides to query extensively for a task. 10,000 database queries later, the task is complete—and the database is throttled.

CrewAI gave the agent autonomy. Nothing governed how much autonomy was appropriate.


How AxonFlow Plugs In

AxonFlow doesn't replace CrewAI. It sits underneath it—providing the governance layer that CrewAI intentionally doesn't include:

┌─────────────────┐
│ Your App │
└────────┬────────┘

v
┌─────────────────┐
│ CrewAI │ <-- Agents, Tasks, Delegation
└────────┬────────┘

v
┌─────────────────────────────────┐
│ AxonFlow │
│ ┌───────────┐ ┌────────────┐ │
│ │ Policy │ │ Audit │ │
│ │ Enforce │ │ Trail │ │
│ └───────────┘ └────────────┘ │
│ ┌───────────┐ ┌────────────┐ │
│ │ PII │ │ Cost │ │
│ │ Detection│ │ Control │ │
│ └───────────┘ └────────────┘ │
└────────────────┬────────────────┘

v
┌─────────────────┐
│ LLM Provider │
└─────────────────┘

What this gives you:

  • Every agent action logged with role and delegation context
  • PII detected and blocked before crossing agent boundaries
  • SQL injection attempts blocked in agent tools
  • Cost tracked per agent, per crew, per user
  • Compliance auditors can query the full collaboration history

What stays the same:

  • Your CrewAI code doesn't change
  • Agent definitions work as before
  • No new abstractions to learn

Integration Patterns

Recommended default for most teams. Wrap CrewAI crews with AxonFlow governance:

import os
import time
from crewai import Agent, Task, Crew, Process
from langchain_openai import ChatOpenAI
from axonflow import AxonFlow, TokenUsage


class GovernedCrewRunner:
"""Run CrewAI crews with AxonFlow governance."""

def __init__(
self,
axonflow_url: str = None,
client_id: str = None,
client_secret: str = None,
model: str = "gpt-4",
):
self.axonflow_endpoint = axonflow_url or os.getenv("AXONFLOW_ENDPOINT", "http://localhost:8080")
self.client_id = client_id or os.getenv("AXONFLOW_CLIENT_ID", "crewai-app")
self.client_secret = client_secret or os.getenv("AXONFLOW_CLIENT_SECRET")
self.model = model
self.llm = ChatOpenAI(
model=model,
temperature=0.7,
openai_api_key=os.getenv("OPENAI_API_KEY"),
)

def create_agent(
self,
role: str,
goal: str,
backstory: str,
tools: list = None,
) -> Agent:
"""Create a CrewAI agent."""
return Agent(
role=role,
goal=goal,
backstory=backstory,
verbose=True,
llm=self.llm,
tools=tools or [],
)

def run_governed_crew(
self,
user_token: str,
crew: Crew,
inputs: dict,
context: dict = None,
) -> str:
"""Execute a CrewAI crew with AxonFlow governance."""
start_time = time.time()
query = " ".join(f"{k}: {v}" for k, v in inputs.items())

with AxonFlow.sync(
endpoint=self.axonflow_endpoint,
client_id=self.client_id,
client_secret=self.client_secret,
) as axonflow:
# 1. Pre-check
ctx = axonflow.get_policy_approved_context(
user_token=user_token,
query=query,
context={
**(context or {}),
"framework": "crewai",
"agent_count": len(crew.agents),
"agent_roles": [a.role for a in crew.agents],
},
)

if not ctx.approved:
raise PermissionError(f"Crew blocked: {ctx.block_reason}")

try:
# 2. Execute crew
result = crew.kickoff(inputs=inputs)
latency_ms = int((time.time() - start_time) * 1000)

# 3. Audit
axonflow.audit_llm_call(
context_id=ctx.context_id,
response_summary=str(result)[:200],
provider="openai",
model=self.model,
token_usage=TokenUsage(prompt_tokens=500, completion_tokens=200, total_tokens=700),
latency_ms=latency_ms,
metadata={"crew_agents": [a.role for a in crew.agents]},
)

return str(result)

except Exception as e:
axonflow.audit_llm_call(
context_id=ctx.context_id,
response_summary=f"Crew error: {str(e)}",
provider="openai",
model=self.model,
token_usage=TokenUsage(prompt_tokens=0, completion_tokens=0, total_tokens=0),
latency_ms=int((time.time() - start_time) * 1000),
metadata={"error": str(e)},
)
raise


# Usage
runner = GovernedCrewRunner()

# Create agents
researcher = runner.create_agent(
role="Research Analyst",
goal="Find accurate information",
backstory="Expert researcher with analytical skills",
)

writer = runner.create_agent(
role="Content Writer",
goal="Create clear, engaging content",
backstory="Technical writer specializing in documentation",
)

# Create tasks
research_task = Task(
description="Research the latest developments in {topic}",
expected_output="Comprehensive research summary",
agent=researcher,
)

writing_task = Task(
description="Write a blog post based on the research",
expected_output="Well-structured blog post",
agent=writer,
)

# Create and run governed crew
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, writing_task],
process=Process.sequential,
)

result = runner.run_governed_crew(
user_token="user-123",
crew=crew,
inputs={"topic": "AI governance in healthcare"},
context={"department": "marketing"},
)
Pattern 2: Go Service for Crew GovernanceFor service-oriented architectures

For Go services coordinating CrewAI:

package main

import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/getaxonflow/axonflow-sdk-go"
)

type CrewGovernanceService struct {
client *axonflow.AxonFlowClient
}

func NewCrewGovernanceService(agentURL, clientSecret string) *CrewGovernanceService {
return &CrewGovernanceService{
client: axonflow.NewClient(axonflow.Config{
Endpoint: agentURL,
ClientID: "crewai-service",
ClientSecret: clientSecret,
}),
}
}

type CrewRequest struct {
UserToken string `json:"user_token"`
CrewID string `json:"crew_id"`
Inputs map[string]interface{} `json:"inputs"`
AgentRoles []string `json:"agent_roles"`
}

type CrewResponse struct {
Approved bool `json:"approved"`
ContextID string `json:"context_id"`
Reason string `json:"reason,omitempty"`
}

func (s *CrewGovernanceService) CheckCrewExecution(ctx context.Context, req CrewRequest) (*CrewResponse, error) {
inputsJSON, _ := json.Marshal(req.Inputs)

result, err := s.client.ExecuteQuery(
req.UserToken,
string(inputsJSON),
"chat",
map[string]interface{}{
"framework": "crewai",
"crew_id": req.CrewID,
"agent_roles": req.AgentRoles,
"agent_count": len(req.AgentRoles),
},
)
if err != nil {
return nil, fmt.Errorf("pre-check failed: %w", err)
}

return &CrewResponse{
Approved: !result.Blocked,
ContextID: result.ContextID,
Reason: result.BlockReason,
}, nil
}

func (s *CrewGovernanceService) AuditCrewCompletion(
ctx context.Context,
contextID, result string,
latencyMs int,
metadata map[string]interface{},
) error {
return s.client.AuditLLMCall(axonflow.AuditRequest{
ContextID: contextID,
ResponseSummary: truncate(result, 200),
Provider: "openai",
Model: "gpt-4",
TokenUsage: axonflow.TokenUsage{PromptTokens: 500, CompletionTokens: 200, TotalTokens: 700},
LatencyMs: latencyMs,
Metadata: metadata,
})
}

func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen]
}
Pattern 3: Per-Task GovernanceAdvanced: For fine-grained control

Apply governance before each task:

class TaskGovernedCrew:
"""CrewAI with per-task governance checks."""

def __init__(self, axonflow_url: str, client_id: str, client_secret: str):
self.axonflow_url = axonflow_url
self.client_id = client_id
self.client_secret = client_secret

def execute_governed_task(
self,
user_token: str,
task: Task,
context: dict = None,
) -> str:
"""Execute a single task with governance."""
start_time = time.time()

with AxonFlow.sync(
endpoint=self.axonflow_endpoint,
client_id=self.client_id,
client_secret=self.client_secret,
) as axonflow:
ctx = axonflow.get_policy_approved_context(
user_token=user_token,
query=task.description,
context={
**(context or {}),
"task_name": task.description[:50],
"agent_role": task.agent.role if task.agent else "unknown",
},
)

if not ctx.approved:
return f"Task blocked: {ctx.block_reason}"

result = task.execute()

axonflow.audit_llm_call(
context_id=ctx.context_id,
response_summary=str(result)[:200],
provider="openai",
model="gpt-4",
latency_ms=int((time.time() - start_time) * 1000),
)

return result

Example Implementations

LanguageSDKExample
Pythonaxonflowcrewai/python
Goaxonflow-sdk-gocrewai/go