Skip to main content

DSPy + AxonFlow Integration

What DSPy Does Well

DSPy is Stanford's framework for programming language models with signatures, modules, and optimizers. It takes a fundamentally different approach to LLM application development:

Declarative Signatures: Define what you want (inputs and outputs), not how to prompt for it. The framework handles prompt optimization.

Composable Modules: Chain-of-thought, retrieval, and other reasoning patterns as first-class abstractions. Build complex pipelines from simple modules.

Automatic Optimization: Compilers that optimize prompts using training examples. No manual prompt engineering required.

Reproducibility: Signatures create testable contracts. Unit tests for AI behavior become possible.

Provider Agnostic: Works with any LLM—OpenAI, Anthropic, local models. Swap providers without changing code.

Academic Rigor: Research-backed patterns from Stanford NLP. Optimizers based on peer-reviewed methods.


What DSPy Doesn't Try to Solve

DSPy focuses on making LLM programming declarative and optimizable. These concerns are explicitly out of scope:

Production RequirementDSPy's Position
Policy enforcement before module executionNot provided—modules execute based on signature contracts
PII detection in inputs/outputsNot addressed—data flows through modules without filtering
SQL injection prevention in RAGNot provided—retrieval happens without input validation
Per-module or per-user cost attributionNot tracked—requires external monitoring
Audit trails for complianceNot built in—must implement logging externally
Access control for different signaturesNot addressed—any signature can be used
Token budget enforcementNot provided—optimizers can consume unlimited tokens

This isn't a criticism—it's a design choice. DSPy handles declarative LLM programming. Governance is a separate concern.


Where Teams Hit Production Friction

Based on real enterprise deployments, here are the blockers that appear after the prototype works:

1. The Optimizer Loop

A compiler optimizes prompts using 1,000 training examples. Each example requires 5 LLM calls for bootstrapping. That's 5,000 API calls before deployment.

DSPy optimized the prompts as intended. Nothing was watching the cost of optimization.

2. The RAG Leak

A RAG module retrieves documents containing customer data. The retrieved context is sent to the LLM. The LLM response is cached.

Now customer data is in your cache. DSPy has no mechanism to filter PII from retrieved documents.

3. The "What Signature Was Used?" Question

Compliance asks for an audit trail. A module produced a recommendation. They need:

  • What was the signature?
  • What inputs were provided?
  • What was retrieved (for RAG)?
  • What was the chain-of-thought?

DSPy executed the module. Without custom logging, the execution trace is gone.

4. The Security Review Block

Security review: BLOCKED
- No audit trail for module executions
- PII can flow through RAG retrieval
- No policy enforcement before execution
- Cost controls missing (especially for optimizers)
- No input validation at signature level

The DSPy pipeline worked perfectly. It can't ship.

5. The Chain-of-Thought Exposure

A chain-of-thought module reasons through a problem. The rationale includes internal business logic. This rationale is visible in the API response.

DSPy provided transparency for debugging. That transparency is a compliance risk.


How AxonFlow Plugs In

AxonFlow doesn't replace DSPy. It sits underneath it—providing the governance layer that DSPy intentionally doesn't include:

┌─────────────────┐
│ Your App │
└────────┬────────┘

v
┌─────────────────┐
│ DSPy │ <-- Signatures, Modules, Optimizers
└────────┬────────┘

v
┌─────────────────────────────────┐
│ AxonFlow │
│ ┌───────────┐ ┌────────────┐ │
│ │ Policy │ │ Audit │ │
│ │ Enforce │ │ Trail │ │
│ └───────────┘ └────────────┘ │
│ ┌───────────┐ ┌────────────┐ │
│ │ PII │ │ Cost │ │
│ │ Detection│ │ Control │ │
│ └───────────┘ └────────────┘ │
└────────────────┬────────────────┘

v
┌─────────────────┐
│ LLM Provider │
└─────────────────┘

What this gives you:

  • Every module execution logged with signature details and inputs
  • PII detected and blocked in both retrieval and generation
  • SQL injection attempts blocked in RAG queries
  • Cost tracked per module, per user, per optimization run
  • Compliance auditors can query the full execution chain

What stays the same:

  • Your DSPy code doesn't change
  • Signatures and modules work as before
  • No new abstractions to learn

Integration Patterns

Pattern 1: Governed Module (Python)

Wrap DSPy modules with AxonFlow governance:

from dataclasses import dataclass
from typing import Optional
from axonflow import AxonFlow
import time

@dataclass
class Signature:
name: str
input_fields: list[str]
output_fields: list[str]

@dataclass
class ModuleResult:
success: bool
output: Optional[dict] = None
blocked: bool = False
block_reason: Optional[str] = None
rationale: Optional[str] = None

class GovernedModule:
"""Base class for DSPy modules with AxonFlow governance."""

def __init__(self, signature: Signature, axonflow: AxonFlow, user_token: str):
self.signature = signature
self.axonflow = axonflow
self.user_token = user_token

def _check_policy(self, query: str, context: dict) -> tuple[bool, str, Optional[str]]:
"""Check policy before execution."""
ctx = self.axonflow.get_policy_approved_context(
user_token=self.user_token,
query=query,
context={
"module": self.signature.name,
"framework": "dspy",
"input_fields": self.signature.input_fields,
"output_fields": self.signature.output_fields,
**context
}
)
return ctx.approved, ctx.block_reason or "", ctx.context_id

def _audit(self, context_id: str, response_summary: str, latency_ms: int, metadata: dict = None):
"""Audit module execution."""
self.axonflow.audit_llm_call(
context_id=context_id,
response_summary=response_summary[:200],
provider="openai",
model="gpt-4",
latency_ms=latency_ms,
metadata={"module": self.signature.name, **(metadata or {})}
)


class GovernedPredict(GovernedModule):
"""Simple prediction with governance."""

def forward(self, **inputs) -> ModuleResult:
query = " ".join(f"{k}: {v}" for k, v in inputs.items())
start_time = time.time()

approved, block_reason, context_id = self._check_policy(query, {"operation": "predict"})

if not approved:
return ModuleResult(success=False, blocked=True, block_reason=block_reason)

# Execute prediction (your actual DSPy logic here)
output = {field: f"Predicted {field} for: {query[:50]}" for field in self.signature.output_fields}
latency_ms = int((time.time() - start_time) * 1000)

self._audit(context_id, str(output), latency_ms)

return ModuleResult(success=True, output=output)


class GovernedChainOfThought(GovernedModule):
"""Chain-of-thought reasoning with governance at each step."""

def forward(self, **inputs) -> ModuleResult:
query = " ".join(f"{k}: {v}" for k, v in inputs.items())
start_time = time.time()

# Step 1: Reasoning
approved, block_reason, context_id = self._check_policy(
query, {"operation": "reasoning", "step": 1}
)

if not approved:
return ModuleResult(success=False, blocked=True, block_reason=block_reason)

rationale = f"Let me think step by step about: {query[:100]}..."

# Step 2: Answer
approved, block_reason, answer_ctx = self._check_policy(
rationale, {"operation": "answer", "step": 2}
)

if not approved:
return ModuleResult(success=False, blocked=True, block_reason=block_reason)

output = {field: f"Answer for {field}" for field in self.signature.output_fields}
latency_ms = int((time.time() - start_time) * 1000)

self._audit(context_id, str(output), latency_ms, {"rationale_length": len(rationale)})

return ModuleResult(success=True, output=output, rationale=rationale)


class GovernedRAG(GovernedModule):
"""RAG with governance on both retrieval and generation."""

def forward(self, **inputs) -> ModuleResult:
query = " ".join(f"{k}: {v}" for k, v in inputs.items())
start_time = time.time()

# Step 1: Retrieval
approved, block_reason, retrieval_ctx = self._check_policy(
query, {"operation": "retrieval", "step": 1}
)

if not approved:
return ModuleResult(success=False, blocked=True, block_reason=f"Retrieval blocked: {block_reason}")

# Simulate retrieval (your actual retrieval here)
retrieved_docs = [f"Document about {query[:30]}"]

# Step 2: Generation
generation_query = f"{query}\n\nContext: {retrieved_docs}"
approved, block_reason, generation_ctx = self._check_policy(
generation_query, {"operation": "generation", "step": 2, "doc_count": len(retrieved_docs)}
)

if not approved:
return ModuleResult(success=False, blocked=True, block_reason=f"Generation blocked: {block_reason}")

output = {field: f"Based on retrieved documents: {field}" for field in self.signature.output_fields}
latency_ms = int((time.time() - start_time) * 1000)

self._audit(retrieval_ctx, str(output), latency_ms, {"retrieved_docs": len(retrieved_docs)})

return ModuleResult(success=True, output=output)


# Usage
with AxonFlow.sync(agent_url="http://localhost:8080") as client:
# Simple prediction
qa = GovernedPredict(
Signature("QA", ["question"], ["answer"]),
client,
"dspy-user"
)
result = qa.forward(question="What are the benefits of renewable energy?")

# Chain-of-thought
cot = GovernedChainOfThought(
Signature("ReasoningQA", ["question"], ["answer"]),
client,
"dspy-user"
)
result = cot.forward(question="Why is the sky blue?")

# RAG
rag = GovernedRAG(
Signature("RAG", ["question"], ["answer"]),
client,
"dspy-user"
)
result = rag.forward(question="What are AI safety best practices?")

Pattern 2: Governed Module (Go)

package main

import (
"fmt"
"strings"
"time"

"github.com/getaxonflow/axonflow-sdk-go"
)

type Signature struct {
Name string
InputFields []string
OutputFields []string
}

type ModuleResult struct {
Success bool
Output map[string]string
Blocked bool
BlockReason string
}

type GovernedModule struct {
Signature Signature
Client *axonflow.AxonFlowClient
UserToken string
}

func (m *GovernedModule) checkPolicy(query string, ctx map[string]interface{}) (bool, string, string) {
context := map[string]interface{}{
"module": m.Signature.Name,
"framework": "dspy",
"input_fields": m.Signature.InputFields,
"output_fields": m.Signature.OutputFields,
}
for k, v := range ctx {
context[k] = v
}

result, err := m.Client.ExecuteQuery(m.UserToken, query, "chat", context)
if err != nil {
return false, err.Error(), ""
}
if result.Blocked {
return false, result.BlockReason, ""
}
return true, "", result.ContextID
}

type GovernedPredict struct {
GovernedModule
}

func (p *GovernedPredict) Forward(inputs map[string]string) ModuleResult {
var queryParts []string
for k, v := range inputs {
queryParts = append(queryParts, fmt.Sprintf("%s: %s", k, v))
}
query := strings.Join(queryParts, " ")
startTime := time.Now()

approved, blockReason, contextID := p.checkPolicy(query, map[string]interface{}{
"operation": "predict",
})

if !approved {
return ModuleResult{Blocked: true, BlockReason: blockReason}
}

// Execute prediction (your actual DSPy logic here)
output := make(map[string]string)
for _, field := range p.Signature.OutputFields {
output[field] = fmt.Sprintf("Predicted %s for: %s", field, truncate(query, 50))
}

latencyMs := int(time.Since(startTime).Milliseconds())

// Audit (fire and forget)
go p.Client.AuditLLMCall(axonflow.AuditRequest{
ContextID: contextID,
ResponseSummary: truncate(fmt.Sprint(output), 200),
Provider: "openai",
Model: "gpt-4",
LatencyMs: latencyMs,
Metadata: map[string]interface{}{"module": p.Signature.Name},
})

return ModuleResult{Success: true, Output: output}
}

func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen]
}

Pattern 3: Multi-Module Pipeline

Chain modules with governance at each step:

class GovernedPipeline:
"""Execute multiple modules in sequence with governance."""

def __init__(self, modules: list[GovernedModule]):
self.modules = modules

def forward(self, initial_input: dict) -> list[ModuleResult]:
results = []
current_input = initial_input

for module in self.modules:
result = module.forward(**current_input)
results.append(result)

if not result.success:
print(f"Pipeline halted at {module.signature.name}: {result.block_reason}")
break

# Pass output as next input
if result.output:
current_input = result.output

return results


# Usage
pipeline = GovernedPipeline([
GovernedPredict(Signature("QA", ["question"], ["answer"]), client, "user"),
GovernedPredict(Signature("Summarize", ["answer"], ["summary"]), client, "user"),
])

results = pipeline.forward({"question": "Explain machine learning"})

Example Implementations

LanguageSDKExample
Pythonaxonflowdspy/python
Goaxonflow-sdk-godspy/go