DSPy + AxonFlow Integration
What DSPy Does Well
DSPy is Stanford's framework for programming language models with signatures, modules, and optimizers. It takes a fundamentally different approach to LLM application development:
Declarative Signatures: Define what you want (inputs and outputs), not how to prompt for it. The framework handles prompt optimization.
Composable Modules: Chain-of-thought, retrieval, and other reasoning patterns as first-class abstractions. Build complex pipelines from simple modules.
Automatic Optimization: Compilers that optimize prompts using training examples. No manual prompt engineering required.
Reproducibility: Signatures create testable contracts. Unit tests for AI behavior become possible.
Provider Agnostic: Works with any LLM—OpenAI, Anthropic, local models. Swap providers without changing code.
Academic Rigor: Research-backed patterns from Stanford NLP. Optimizers based on peer-reviewed methods.
What DSPy Doesn't Try to Solve
DSPy focuses on making LLM programming declarative and optimizable. These concerns are explicitly out of scope:
| Production Requirement | DSPy's Position |
|---|---|
| Policy enforcement before module execution | Not provided—modules execute based on signature contracts |
| PII detection in inputs/outputs | Not addressed—data flows through modules without filtering |
| SQL injection prevention in RAG | Not provided—retrieval happens without input validation |
| Per-module or per-user cost attribution | Not tracked—requires external monitoring |
| Audit trails for compliance | Not built in—must implement logging externally |
| Access control for different signatures | Not addressed—any signature can be used |
| Token budget enforcement | Not provided—optimizers can consume unlimited tokens |
This isn't a criticism—it's a design choice. DSPy handles declarative LLM programming. Governance is a separate concern.
Where Teams Hit Production Friction
Based on real enterprise deployments, here are the blockers that appear after the prototype works:
1. The Optimizer Loop
A compiler optimizes prompts using 1,000 training examples. Each example requires 5 LLM calls for bootstrapping. That's 5,000 API calls before deployment.
DSPy optimized the prompts as intended. Nothing was watching the cost of optimization.
2. The RAG Leak
A RAG module retrieves documents containing customer data. The retrieved context is sent to the LLM. The LLM response is cached.
Now customer data is in your cache. DSPy has no mechanism to filter PII from retrieved documents.
3. The "What Signature Was Used?" Question
Compliance asks for an audit trail. A module produced a recommendation. They need:
- What was the signature?
- What inputs were provided?
- What was retrieved (for RAG)?
- What was the chain-of-thought?
DSPy executed the module. Without custom logging, the execution trace is gone.
4. The Security Review Block
Security review: BLOCKED
- No audit trail for module executions
- PII can flow through RAG retrieval
- No policy enforcement before execution
- Cost controls missing (especially for optimizers)
- No input validation at signature level
The DSPy pipeline worked perfectly. It can't ship.
5. The Chain-of-Thought Exposure
A chain-of-thought module reasons through a problem. The rationale includes internal business logic. This rationale is visible in the API response.
DSPy provided transparency for debugging. That transparency is a compliance risk.
How AxonFlow Plugs In
AxonFlow doesn't replace DSPy. It sits underneath it—providing the governance layer that DSPy intentionally doesn't include:
┌─────────────────┐
│ Your App │
└────────┬────────┘
│
v
┌─────────────────┐
│ DSPy │ <-- Signatures, Modules, Optimizers
└────────┬────────┘
│
v
┌─────────────────────────────────┐
│ AxonFlow │
│ ┌───────────┐ ┌────────────┐ │
│ │ Policy │ │ Audit │ │
│ │ Enforce │ │ Trail │ │
│ └───────────┘ └────────────┘ │
│ ┌───────────┐ ┌────────────┐ │
│ │ PII │ │ Cost │ │
│ │ Detection│ │ Control │ │
│ └───────────┘ └────────────┘ │
└────────────────┬────────────────┘
│
v
┌─────────────────┐
│ LLM Provider │
└─────────────────┘
What this gives you:
- Every module execution logged with signature details and inputs
- PII detected and blocked in both retrieval and generation
- SQL injection attempts blocked in RAG queries
- Cost tracked per module, per user, per optimization run
- Compliance auditors can query the full execution chain
What stays the same:
- Your DSPy code doesn't change
- Signatures and modules work as before
- No new abstractions to learn
Integration Patterns
Pattern 1: Governed Module (Python)
Wrap DSPy modules with AxonFlow governance:
from dataclasses import dataclass
from typing import Optional
from axonflow import AxonFlow
import time
@dataclass
class Signature:
name: str
input_fields: list[str]
output_fields: list[str]
@dataclass
class ModuleResult:
success: bool
output: Optional[dict] = None
blocked: bool = False
block_reason: Optional[str] = None
rationale: Optional[str] = None
class GovernedModule:
"""Base class for DSPy modules with AxonFlow governance."""
def __init__(self, signature: Signature, axonflow: AxonFlow, user_token: str):
self.signature = signature
self.axonflow = axonflow
self.user_token = user_token
def _check_policy(self, query: str, context: dict) -> tuple[bool, str, Optional[str]]:
"""Check policy before execution."""
ctx = self.axonflow.get_policy_approved_context(
user_token=self.user_token,
query=query,
context={
"module": self.signature.name,
"framework": "dspy",
"input_fields": self.signature.input_fields,
"output_fields": self.signature.output_fields,
**context
}
)
return ctx.approved, ctx.block_reason or "", ctx.context_id
def _audit(self, context_id: str, response_summary: str, latency_ms: int, metadata: dict = None):
"""Audit module execution."""
self.axonflow.audit_llm_call(
context_id=context_id,
response_summary=response_summary[:200],
provider="openai",
model="gpt-4",
latency_ms=latency_ms,
metadata={"module": self.signature.name, **(metadata or {})}
)
class GovernedPredict(GovernedModule):
"""Simple prediction with governance."""
def forward(self, **inputs) -> ModuleResult:
query = " ".join(f"{k}: {v}" for k, v in inputs.items())
start_time = time.time()
approved, block_reason, context_id = self._check_policy(query, {"operation": "predict"})
if not approved:
return ModuleResult(success=False, blocked=True, block_reason=block_reason)
# Execute prediction (your actual DSPy logic here)
output = {field: f"Predicted {field} for: {query[:50]}" for field in self.signature.output_fields}
latency_ms = int((time.time() - start_time) * 1000)
self._audit(context_id, str(output), latency_ms)
return ModuleResult(success=True, output=output)
class GovernedChainOfThought(GovernedModule):
"""Chain-of-thought reasoning with governance at each step."""
def forward(self, **inputs) -> ModuleResult:
query = " ".join(f"{k}: {v}" for k, v in inputs.items())
start_time = time.time()
# Step 1: Reasoning
approved, block_reason, context_id = self._check_policy(
query, {"operation": "reasoning", "step": 1}
)
if not approved:
return ModuleResult(success=False, blocked=True, block_reason=block_reason)
rationale = f"Let me think step by step about: {query[:100]}..."
# Step 2: Answer
approved, block_reason, answer_ctx = self._check_policy(
rationale, {"operation": "answer", "step": 2}
)
if not approved:
return ModuleResult(success=False, blocked=True, block_reason=block_reason)
output = {field: f"Answer for {field}" for field in self.signature.output_fields}
latency_ms = int((time.time() - start_time) * 1000)
self._audit(context_id, str(output), latency_ms, {"rationale_length": len(rationale)})
return ModuleResult(success=True, output=output, rationale=rationale)
class GovernedRAG(GovernedModule):
"""RAG with governance on both retrieval and generation."""
def forward(self, **inputs) -> ModuleResult:
query = " ".join(f"{k}: {v}" for k, v in inputs.items())
start_time = time.time()
# Step 1: Retrieval
approved, block_reason, retrieval_ctx = self._check_policy(
query, {"operation": "retrieval", "step": 1}
)
if not approved:
return ModuleResult(success=False, blocked=True, block_reason=f"Retrieval blocked: {block_reason}")
# Simulate retrieval (your actual retrieval here)
retrieved_docs = [f"Document about {query[:30]}"]
# Step 2: Generation
generation_query = f"{query}\n\nContext: {retrieved_docs}"
approved, block_reason, generation_ctx = self._check_policy(
generation_query, {"operation": "generation", "step": 2, "doc_count": len(retrieved_docs)}
)
if not approved:
return ModuleResult(success=False, blocked=True, block_reason=f"Generation blocked: {block_reason}")
output = {field: f"Based on retrieved documents: {field}" for field in self.signature.output_fields}
latency_ms = int((time.time() - start_time) * 1000)
self._audit(retrieval_ctx, str(output), latency_ms, {"retrieved_docs": len(retrieved_docs)})
return ModuleResult(success=True, output=output)
# Usage
with AxonFlow.sync(agent_url="http://localhost:8080") as client:
# Simple prediction
qa = GovernedPredict(
Signature("QA", ["question"], ["answer"]),
client,
"dspy-user"
)
result = qa.forward(question="What are the benefits of renewable energy?")
# Chain-of-thought
cot = GovernedChainOfThought(
Signature("ReasoningQA", ["question"], ["answer"]),
client,
"dspy-user"
)
result = cot.forward(question="Why is the sky blue?")
# RAG
rag = GovernedRAG(
Signature("RAG", ["question"], ["answer"]),
client,
"dspy-user"
)
result = rag.forward(question="What are AI safety best practices?")
Pattern 2: Governed Module (Go)
package main
import (
"fmt"
"strings"
"time"
"github.com/getaxonflow/axonflow-sdk-go"
)
type Signature struct {
Name string
InputFields []string
OutputFields []string
}
type ModuleResult struct {
Success bool
Output map[string]string
Blocked bool
BlockReason string
}
type GovernedModule struct {
Signature Signature
Client *axonflow.AxonFlowClient
UserToken string
}
func (m *GovernedModule) checkPolicy(query string, ctx map[string]interface{}) (bool, string, string) {
context := map[string]interface{}{
"module": m.Signature.Name,
"framework": "dspy",
"input_fields": m.Signature.InputFields,
"output_fields": m.Signature.OutputFields,
}
for k, v := range ctx {
context[k] = v
}
result, err := m.Client.ExecuteQuery(m.UserToken, query, "chat", context)
if err != nil {
return false, err.Error(), ""
}
if result.Blocked {
return false, result.BlockReason, ""
}
return true, "", result.ContextID
}
type GovernedPredict struct {
GovernedModule
}
func (p *GovernedPredict) Forward(inputs map[string]string) ModuleResult {
var queryParts []string
for k, v := range inputs {
queryParts = append(queryParts, fmt.Sprintf("%s: %s", k, v))
}
query := strings.Join(queryParts, " ")
startTime := time.Now()
approved, blockReason, contextID := p.checkPolicy(query, map[string]interface{}{
"operation": "predict",
})
if !approved {
return ModuleResult{Blocked: true, BlockReason: blockReason}
}
// Execute prediction (your actual DSPy logic here)
output := make(map[string]string)
for _, field := range p.Signature.OutputFields {
output[field] = fmt.Sprintf("Predicted %s for: %s", field, truncate(query, 50))
}
latencyMs := int(time.Since(startTime).Milliseconds())
// Audit (fire and forget)
go p.Client.AuditLLMCall(axonflow.AuditRequest{
ContextID: contextID,
ResponseSummary: truncate(fmt.Sprint(output), 200),
Provider: "openai",
Model: "gpt-4",
LatencyMs: latencyMs,
Metadata: map[string]interface{}{"module": p.Signature.Name},
})
return ModuleResult{Success: true, Output: output}
}
func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen]
}
Pattern 3: Multi-Module Pipeline
Chain modules with governance at each step:
class GovernedPipeline:
"""Execute multiple modules in sequence with governance."""
def __init__(self, modules: list[GovernedModule]):
self.modules = modules
def forward(self, initial_input: dict) -> list[ModuleResult]:
results = []
current_input = initial_input
for module in self.modules:
result = module.forward(**current_input)
results.append(result)
if not result.success:
print(f"Pipeline halted at {module.signature.name}: {result.block_reason}")
break
# Pass output as next input
if result.output:
current_input = result.output
return results
# Usage
pipeline = GovernedPipeline([
GovernedPredict(Signature("QA", ["question"], ["answer"]), client, "user"),
GovernedPredict(Signature("Summarize", ["answer"], ["summary"]), client, "user"),
])
results = pipeline.forward({"question": "Explain machine learning"})
Example Implementations
| Language | SDK | Example |
|---|---|---|
| Python | axonflow | dspy/python |
| Go | axonflow-sdk-go | dspy/go |