LangChainGo + AxonFlow Integration
Overview
LangChainGo is the Go implementation of the popular LangChain framework, providing tools for building LLM-powered applications with modular components and support for multiple LLM providers and vector databases.
AxonFlow adds production-grade governance, audit trails, and compliance controls to ensure LangChainGo applications operate within enterprise policies.
Together, they enable Go developers to build governed AI applications with type safety, performance, and compliance.
Why Use AxonFlow with LangChainGo?
LangChainGo Strengths
- Native Go implementation with compile-time type checking
- High performance and low memory footprint
- Support for multiple LLM providers (OpenAI, Anthropic, Ollama)
- Vector store integrations (Pinecone, Weaviate, Chroma)
- Chains, agents, and memory abstractions
- Concurrent execution patterns
AxonFlow Strengths
- Real-time inference governance (policy enforcement at request time)
- Go-native SDK (idiomatic Go patterns)
- Cross-system audit trails (compliance logging)
- Cost control (budget limits per application)
- PII protection (automatic masking)
The Perfect Combination
LangChainGo handles: LLM orchestration, chains, agents, vector stores
AxonFlow handles: Governance, compliance, audit trails, access control
Integration Architecture
AxonFlow integrates with LangChainGo using Gateway Mode, which wraps LLM calls with policy pre-checks and audit logging:
[LangChainGo Chain/Agent]
|
v
[AxonFlow Pre-Check] --> Policy Evaluation
|
v (if approved)
[LLM Provider (OpenAI/Anthropic/Ollama)]
|
v
[AxonFlow Audit] --> Compliance Logging
|
v
[Response to LangChainGo]
Note: AxonFlow uses its own API for governance, not an OpenAI-compatible endpoint. Integration requires wrapping your LLM calls with AxonFlow's pre-check and audit endpoints.
Quick Start
Prerequisites
- AxonFlow running locally or deployed (see Getting Started)
- Go 1.21+
- LangChainGo installed
Install Dependencies
go get github.com/tmc/langchaingo
go get github.com/getaxonflow/axonflow-go-sdk/axonflow
AxonFlow API Overview
AxonFlow Gateway Mode uses two main endpoints:
| Endpoint | Purpose |
|---|---|
POST /api/policy/pre-check | Policy evaluation before LLM call |
POST /api/audit/llm-call | Audit logging after LLM call completes |
Required Headers:
Content-Type: application/jsonX-Client-Secret: your-client-secretX-License-Key: your-license-key(optional, for enterprise features)
Go SDK Integration
Create Governed LLM Wrapper
package main
import (
"context"
"fmt"
"time"
"github.com/getaxonflow/axonflow-go-sdk/axonflow"
"github.com/tmc/langchaingo/llms"
"github.com/tmc/langchaingo/llms/openai"
)
// GovernedLLM wraps a LangChainGo LLM with AxonFlow governance
type GovernedLLM struct {
llm llms.Model
gateway *axonflow.GatewayClient
provider string
model string
}
// NewGovernedLLM creates a new governed LLM wrapper
func NewGovernedLLM(
axonflowURL string,
clientSecret string,
openaiKey string,
) (*GovernedLLM, error) {
// Initialize LangChainGo LLM
llm, err := openai.New(openai.WithToken(openaiKey), openai.WithModel("gpt-4"))
if err != nil {
return nil, fmt.Errorf("failed to create LLM: %w", err)
}
// Initialize AxonFlow gateway
gateway := axonflow.NewGatewayClient(axonflow.Config{
AgentURL: axonflowURL,
ClientID: "langchaingo-app",
ClientSecret: clientSecret,
})
return &GovernedLLM{
llm: llm,
gateway: gateway,
provider: "openai",
model: "gpt-4",
}, nil
}
// GovernedCallResult contains the result of a governed LLM call
type GovernedCallResult struct {
Response string
ContextID string
Blocked bool
Reason string
}
// Call executes a governed LLM call
func (g *GovernedLLM) Call(
ctx context.Context,
userToken string,
prompt string,
callContext map[string]interface{},
) (*GovernedCallResult, error) {
startTime := time.Now()
// 1. Pre-check with AxonFlow
preCheck, err := g.gateway.PreCheck(ctx, axonflow.PreCheckRequest{
UserToken: userToken,
Query: prompt,
Context: mergeContext(callContext, map[string]interface{}{"framework": "langchaingo"}),
})
if err != nil {
return nil, fmt.Errorf("pre-check failed: %w", err)
}
if !preCheck.Approved {
return &GovernedCallResult{
ContextID: preCheck.ContextID,
Blocked: true,
Reason: preCheck.BlockReason,
}, nil
}
// 2. Execute LangChainGo LLM call
response, err := llms.GenerateFromSinglePrompt(ctx, g.llm, prompt)
latencyMs := int(time.Since(startTime).Milliseconds())
if err != nil {
// Audit error
g.gateway.AuditLLMCall(ctx, axonflow.AuditRequest{
ContextID: preCheck.ContextID,
ResponseSummary: fmt.Sprintf("Error: %v", err),
Provider: g.provider,
Model: g.model,
TokenUsage: axonflow.TokenUsage{},
LatencyMs: latencyMs,
Metadata: map[string]interface{}{"error": true},
})
return nil, err
}
// 3. Audit success
g.gateway.AuditLLMCall(ctx, axonflow.AuditRequest{
ContextID: preCheck.ContextID,
ResponseSummary: truncate(response, 200),
Provider: g.provider,
Model: g.model,
TokenUsage: axonflow.TokenUsage{
PromptTokens: estimateTokens(prompt),
CompletionTokens: estimateTokens(response),
TotalTokens: estimateTokens(prompt) + estimateTokens(response),
},
LatencyMs: latencyMs,
})
return &GovernedCallResult{
Response: response,
ContextID: preCheck.ContextID,
}, nil
}
func mergeContext(a, b map[string]interface{}) map[string]interface{} {
result := make(map[string]interface{})
for k, v := range a {
result[k] = v
}
for k, v := range b {
result[k] = v
}
return result
}
func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen]
}
func estimateTokens(s string) int {
// Rough estimate: ~4 chars per token
return len(s) / 4
}
func main() {
governedLLM, err := NewGovernedLLM(
"http://localhost:8080",
"your-client-secret",
"your-openai-key",
)
if err != nil {
panic(err)
}
result, err := governedLLM.Call(
context.Background(),
"user-123",
"What is the capital of France?",
map[string]interface{}{"department": "general"},
)
if err != nil {
panic(err)
}
if result.Blocked {
fmt.Printf("Request blocked: %s\n", result.Reason)
return
}
fmt.Printf("Response: %s\n", result.Response)
}
Integration Patterns
Pattern 1: Governed Chain Execution
Wrap LangChainGo chains with governance:
package main
import (
"context"
"fmt"
"time"
"github.com/getaxonflow/axonflow-go-sdk/axonflow"
"github.com/tmc/langchaingo/chains"
"github.com/tmc/langchaingo/llms/openai"
"github.com/tmc/langchaingo/prompts"
)
// GovernedChain wraps a LangChainGo chain with governance
type GovernedChain struct {
chain chains.Chain
gateway *axonflow.GatewayClient
}
// NewGovernedLLMChain creates a governed LLM chain
func NewGovernedLLMChain(
axonflowURL string,
clientSecret string,
openaiKey string,
promptTemplate string,
) (*GovernedChain, error) {
llm, err := openai.New(openai.WithToken(openaiKey))
if err != nil {
return nil, err
}
prompt := prompts.NewPromptTemplate(promptTemplate, []string{"input"})
chain := chains.NewLLMChain(llm, prompt)
gateway := axonflow.NewGatewayClient(axonflow.Config{
AgentURL: axonflowURL,
ClientID: "langchaingo-chain",
ClientSecret: clientSecret,
})
return &GovernedChain{
chain: chain,
gateway: gateway,
}, nil
}
// Run executes the chain with governance
func (g *GovernedChain) Run(
ctx context.Context,
userToken string,
input string,
chainContext map[string]interface{},
) (string, error) {
startTime := time.Now()
// Pre-check
preCheck, err := g.gateway.PreCheck(ctx, axonflow.PreCheckRequest{
UserToken: userToken,
Query: input,
Context: mergeContext(chainContext, map[string]interface{}{
"framework": "langchaingo",
"chain_type": "llm_chain",
}),
})
if err != nil {
return "", fmt.Errorf("pre-check failed: %w", err)
}
if !preCheck.Approved {
return "", fmt.Errorf("request blocked: %s", preCheck.BlockReason)
}
// Execute chain
result, err := chains.Run(ctx, g.chain, input)
latencyMs := int(time.Since(startTime).Milliseconds())
if err != nil {
g.gateway.AuditLLMCall(ctx, axonflow.AuditRequest{
ContextID: preCheck.ContextID,
ResponseSummary: fmt.Sprintf("Chain error: %v", err),
Provider: "openai",
Model: "gpt-3.5-turbo",
TokenUsage: axonflow.TokenUsage{},
LatencyMs: latencyMs,
Metadata: map[string]interface{}{"error": true},
})
return "", err
}
// Audit success
g.gateway.AuditLLMCall(ctx, axonflow.AuditRequest{
ContextID: preCheck.ContextID,
ResponseSummary: truncate(result, 200),
Provider: "openai",
Model: "gpt-3.5-turbo",
TokenUsage: axonflow.TokenUsage{
PromptTokens: 100,
CompletionTokens: 50,
TotalTokens: 150,
},
LatencyMs: latencyMs,
})
return result, nil
}
// Usage
func main() {
chain, err := NewGovernedLLMChain(
"http://localhost:8080",
"your-client-secret",
"your-openai-key",
"Write a summary about: {{.input}}",
)
if err != nil {
panic(err)
}
result, err := chain.Run(
context.Background(),
"user-123",
"artificial intelligence governance",
map[string]interface{}{"department": "research"},
)
if err != nil {
panic(err)
}
fmt.Println(result)
}
Pattern 2: Governed RAG with Vector Store
Implement governed RAG using LangChainGo vector stores:
package main
import (
"context"
"fmt"
"time"
"github.com/getaxonflow/axonflow-go-sdk/axonflow"
"github.com/tmc/langchaingo/embeddings"
"github.com/tmc/langchaingo/llms/openai"
"github.com/tmc/langchaingo/schema"
"github.com/tmc/langchaingo/vectorstores"
"github.com/tmc/langchaingo/vectorstores/chroma"
)
// GovernedRAG provides governed RAG capabilities
type GovernedRAG struct {
llm *openai.LLM
vectorStore vectorstores.VectorStore
gateway *axonflow.GatewayClient
}
// NewGovernedRAG creates a new governed RAG instance
func NewGovernedRAG(
axonflowURL string,
clientSecret string,
openaiKey string,
chromaURL string,
collectionName string,
) (*GovernedRAG, error) {
llm, err := openai.New(openai.WithToken(openaiKey))
if err != nil {
return nil, err
}
embedder, err := embeddings.NewEmbedder(llm)
if err != nil {
return nil, err
}
store, err := chroma.New(
chroma.WithChromaURL(chromaURL),
chroma.WithEmbedder(embedder),
chroma.WithNameSpace(collectionName),
)
if err != nil {
return nil, err
}
gateway := axonflow.NewGatewayClient(axonflow.Config{
AgentURL: axonflowURL,
ClientID: "langchaingo-rag",
ClientSecret: clientSecret,
})
return &GovernedRAG{
llm: llm,
vectorStore: store,
gateway: gateway,
}, nil
}
// RAGResult contains the RAG query result
type RAGResult struct {
Answer string
Sources []string
ContextID string
}
// Query performs a governed RAG query
func (g *GovernedRAG) Query(
ctx context.Context,
userToken string,
query string,
topK int,
ragContext map[string]interface{},
) (*RAGResult, error) {
startTime := time.Now()
// Pre-check
preCheck, err := g.gateway.PreCheck(ctx, axonflow.PreCheckRequest{
UserToken: userToken,
Query: query,
Context: mergeContext(ragContext, map[string]interface{}{
"framework": "langchaingo",
"query_type": "rag",
"top_k": topK,
}),
})
if err != nil {
return nil, fmt.Errorf("pre-check failed: %w", err)
}
if !preCheck.Approved {
return nil, fmt.Errorf("RAG query blocked: %s", preCheck.BlockReason)
}
// Retrieve relevant documents
docs, err := g.vectorStore.SimilaritySearch(ctx, query, topK)
if err != nil {
return nil, fmt.Errorf("similarity search failed: %w", err)
}
// Build context from retrieved documents
var contextText string
var sources []string
for _, doc := range docs {
contextText += doc.PageContent + "\n\n"
if source, ok := doc.Metadata["source"].(string); ok {
sources = append(sources, source)
}
}
// Generate answer
prompt := fmt.Sprintf(
"Based on the following context, answer the question.\n\nContext:\n%s\n\nQuestion: %s\n\nAnswer:",
contextText,
query,
)
answer, err := g.llm.Call(ctx, prompt)
latencyMs := int(time.Since(startTime).Milliseconds())
if err != nil {
g.gateway.AuditLLMCall(ctx, axonflow.AuditRequest{
ContextID: preCheck.ContextID,
ResponseSummary: fmt.Sprintf("RAG error: %v", err),
Provider: "openai",
Model: "gpt-3.5-turbo",
TokenUsage: axonflow.TokenUsage{},
LatencyMs: latencyMs,
Metadata: map[string]interface{}{"error": true},
})
return nil, err
}
// Audit success
g.gateway.AuditLLMCall(ctx, axonflow.AuditRequest{
ContextID: preCheck.ContextID,
ResponseSummary: truncate(answer, 200),
Provider: "openai",
Model: "gpt-3.5-turbo",
TokenUsage: axonflow.TokenUsage{
PromptTokens: estimateTokens(prompt),
CompletionTokens: estimateTokens(answer),
TotalTokens: estimateTokens(prompt) + estimateTokens(answer),
},
LatencyMs: latencyMs,
Metadata: map[string]interface{}{
"source_count": len(sources),
"sources": sources,
},
})
return &RAGResult{
Answer: answer,
Sources: sources,
ContextID: preCheck.ContextID,
}, nil
}
// AddDocuments adds documents to the vector store
func (g *GovernedRAG) AddDocuments(ctx context.Context, docs []schema.Document) error {
_, err := g.vectorStore.AddDocuments(ctx, docs)
return err
}
Pattern 3: Governed Agent with Tools
Create a governed agent with tool execution:
package main
import (
"context"
"fmt"
"time"
"github.com/getaxonflow/axonflow-go-sdk/axonflow"
"github.com/tmc/langchaingo/agents"
"github.com/tmc/langchaingo/llms/openai"
"github.com/tmc/langchaingo/tools"
)
// GovernedAgent wraps a LangChainGo agent with governance
type GovernedAgent struct {
executor *agents.Executor
gateway *axonflow.GatewayClient
}
// NewGovernedAgent creates a governed agent
func NewGovernedAgent(
axonflowURL string,
clientSecret string,
openaiKey string,
agentTools []tools.Tool,
) (*GovernedAgent, error) {
llm, err := openai.New(openai.WithToken(openaiKey))
if err != nil {
return nil, err
}
agent := agents.NewOneShotAgent(llm, agentTools)
executor := agents.NewExecutor(agent)
gateway := axonflow.NewGatewayClient(axonflow.Config{
AgentURL: axonflowURL,
ClientID: "langchaingo-agent",
ClientSecret: clientSecret,
})
return &GovernedAgent{
executor: executor,
gateway: gateway,
}, nil
}
// Run executes the agent with governance
func (g *GovernedAgent) Run(
ctx context.Context,
userToken string,
input string,
agentContext map[string]interface{},
) (string, error) {
startTime := time.Now()
// Pre-check
preCheck, err := g.gateway.PreCheck(ctx, axonflow.PreCheckRequest{
UserToken: userToken,
Query: input,
Context: mergeContext(agentContext, map[string]interface{}{
"framework": "langchaingo",
"agent_type": "one_shot",
}),
})
if err != nil {
return "", fmt.Errorf("pre-check failed: %w", err)
}
if !preCheck.Approved {
return "", fmt.Errorf("agent execution blocked: %s", preCheck.BlockReason)
}
// Execute agent
result, err := g.executor.Call(ctx, map[string]any{"input": input})
latencyMs := int(time.Since(startTime).Milliseconds())
output := ""
if out, ok := result["output"].(string); ok {
output = out
}
if err != nil {
g.gateway.AuditLLMCall(ctx, axonflow.AuditRequest{
ContextID: preCheck.ContextID,
ResponseSummary: fmt.Sprintf("Agent error: %v", err),
Provider: "openai",
Model: "gpt-3.5-turbo",
TokenUsage: axonflow.TokenUsage{},
LatencyMs: latencyMs,
Metadata: map[string]interface{}{"error": true},
})
return "", err
}
// Audit success
g.gateway.AuditLLMCall(ctx, axonflow.AuditRequest{
ContextID: preCheck.ContextID,
ResponseSummary: truncate(output, 200),
Provider: "openai",
Model: "gpt-3.5-turbo",
TokenUsage: axonflow.TokenUsage{
PromptTokens: 200,
CompletionTokens: 100,
TotalTokens: 300,
},
LatencyMs: latencyMs,
Metadata: map[string]interface{}{"agent_type": "one_shot"},
})
return output, nil
}
HTTP API Service
Create an HTTP service for governed LangChainGo:
package main
import (
"context"
"encoding/json"
"log"
"net/http"
"os"
"time"
"github.com/getaxonflow/axonflow-go-sdk/axonflow"
"github.com/tmc/langchaingo/llms"
"github.com/tmc/langchaingo/llms/openai"
)
type Server struct {
llm llms.Model
gateway *axonflow.GatewayClient
}
type ChatRequest struct {
UserToken string `json:"user_token"`
Message string `json:"message"`
Context map[string]interface{} `json:"context,omitempty"`
}
type ChatResponse struct {
Response string `json:"response,omitempty"`
ContextID string `json:"context_id"`
Error string `json:"error,omitempty"`
Blocked bool `json:"blocked,omitempty"`
Reason string `json:"reason,omitempty"`
}
func (s *Server) handleChat(w http.ResponseWriter, r *http.Request) {
var req ChatRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
ctx := r.Context()
startTime := time.Now()
// Pre-check
preCheck, err := s.gateway.PreCheck(ctx, axonflow.PreCheckRequest{
UserToken: req.UserToken,
Query: req.Message,
Context: req.Context,
})
if err != nil {
json.NewEncoder(w).Encode(ChatResponse{Error: err.Error()})
return
}
if !preCheck.Approved {
json.NewEncoder(w).Encode(ChatResponse{
ContextID: preCheck.ContextID,
Blocked: true,
Reason: preCheck.BlockReason,
})
return
}
// Generate response
response, err := llms.GenerateFromSinglePrompt(ctx, s.llm, req.Message)
latencyMs := int(time.Since(startTime).Milliseconds())
if err != nil {
s.gateway.AuditLLMCall(ctx, axonflow.AuditRequest{
ContextID: preCheck.ContextID,
ResponseSummary: "Error: " + err.Error(),
Provider: "openai",
Model: "gpt-3.5-turbo",
LatencyMs: latencyMs,
Metadata: map[string]interface{}{"error": true},
})
json.NewEncoder(w).Encode(ChatResponse{
ContextID: preCheck.ContextID,
Error: err.Error(),
})
return
}
// Audit
s.gateway.AuditLLMCall(ctx, axonflow.AuditRequest{
ContextID: preCheck.ContextID,
ResponseSummary: truncate(response, 200),
Provider: "openai",
Model: "gpt-3.5-turbo",
TokenUsage: axonflow.TokenUsage{
PromptTokens: estimateTokens(req.Message),
CompletionTokens: estimateTokens(response),
},
LatencyMs: latencyMs,
})
json.NewEncoder(w).Encode(ChatResponse{
Response: response,
ContextID: preCheck.ContextID,
})
}
func main() {
llm, _ := openai.New(openai.WithToken(os.Getenv("OPENAI_API_KEY")))
server := &Server{
llm: llm,
gateway: axonflow.NewGatewayClient(axonflow.Config{
AgentURL: os.Getenv("AXONFLOW_URL"),
ClientID: "langchaingo-api",
ClientSecret: os.Getenv("AXONFLOW_CLIENT_SECRET"),
}),
}
http.HandleFunc("/api/chat", server.handleChat)
log.Println("LangChainGo API running on :8090")
http.ListenAndServe(":8090", nil)
}
AxonFlow Policy Configuration
Create policies for LangChainGo applications:
{
"policies": [
{
"name": "langchaingo-policy",
"description": "Policy for LangChainGo applications",
"enabled": true,
"rules": [
{
"type": "rate_limit",
"config": {
"requests_per_minute": 60,
"action": "throttle"
}
},
{
"type": "pii_protection",
"config": {
"fields": ["email", "phone", "ssn"],
"action": "mask"
}
}
]
},
{
"name": "langchaingo-rag-policy",
"description": "Policy for RAG queries",
"enabled": true,
"rules": [
{
"type": "content_filter",
"config": {
"blocked_patterns": ["confidential", "internal only"],
"action": "block"
}
}
]
}
]
}
Best Practices
1. Always Use Context IDs
preCheck, _ := gateway.PreCheck(ctx, axonflow.PreCheckRequest{...})
contextID := preCheck.ContextID // Store immediately
// ... execute LLM call ...
gateway.AuditLLMCall(ctx, axonflow.AuditRequest{ContextID: contextID, ...})
2. Handle Errors with Audit
result, err := llm.Call(ctx, prompt)
if err != nil {
gateway.AuditLLMCall(ctx, axonflow.AuditRequest{
ContextID: contextID,
ResponseSummary: fmt.Sprintf("Error: %v", err),
// ...
})
return err
}
3. Use Goroutines for Non-Blocking Audit
go func() {
gateway.AuditLLMCall(context.Background(), auditRequest)
}()
4. Include Metadata for Debugging
gateway.AuditLLMCall(ctx, axonflow.AuditRequest{
// ...
Metadata: map[string]interface{}{
"chain_type": "llm_chain",
"source_count": len(sources),
"top_k": topK,
},
})
Troubleshooting
Common Issues
Issue: Pre-check returns 401 Unauthorized
- Verify client secret is correct
- Check license key if using enterprise features
- Ensure client_id is registered in AxonFlow
Issue: Audit calls failing
- Verify context_id is from a valid pre-check
- Check that AxonFlow agent is healthy (
/healthendpoint)
Issue: High latency
- Use goroutines for non-blocking audit
- Check network latency to AxonFlow endpoint