Skip to main content

Go SDK - Getting Started

Current Version: 5.5.0

Use the Go SDK when you want AxonFlow governance inside production Go services, agent runtimes, API backends, or internal developer platforms. The current SDK supports Proxy Mode, Gateway Mode, MCP connector operations, multi-agent planning, and Go interceptor wrappers for existing provider clients.

Installation

go get github.com/getaxonflow/axonflow-sdk-go/v5

Quick Start

Use Proxy Mode when you want AxonFlow to evaluate policy, route the LLM call, and return a governed response in a single SDK call.

package main

import (
"fmt"
"log"
"os"

"github.com/getaxonflow/axonflow-sdk-go/v5"
)

func main() {
client := axonflow.NewClient(axonflow.AxonFlowConfig{
Endpoint: os.Getenv("AXONFLOW_ENDPOINT"),
ClientID: os.Getenv("AXONFLOW_CLIENT_ID"),
ClientSecret: os.Getenv("AXONFLOW_CLIENT_SECRET"),
})

resp, err := client.ProxyLLMCall(
"user-token",
"What is the capital of France?",
"chat",
map[string]interface{}{
"temperature": 0.2,
"max_tokens": 120,
},
)
if err != nil {
log.Fatalf("ProxyLLMCall failed: %v", err)
}

if resp.Blocked {
log.Printf("Request blocked: %s", resp.BlockReason)
return
}

if !resp.Success {
log.Printf("Request failed: %s", resp.Error)
return
}

fmt.Printf("Result: %v\n", resp.Data)
}

Advanced Configuration

import (
"os"
"time"

"github.com/getaxonflow/axonflow-sdk-go/v5"
)

client := axonflow.NewClient(axonflow.AxonFlowConfig{
Endpoint: os.Getenv("AXONFLOW_ENDPOINT"),
ClientID: os.Getenv("AXONFLOW_CLIENT_ID"),
ClientSecret: os.Getenv("AXONFLOW_CLIENT_SECRET"),
Mode: "production",
Debug: true,
Timeout: 60 * time.Second,
MapTimeout: 120 * time.Second,
Retry: axonflow.RetryConfig{
Enabled: true,
MaxAttempts: 3,
InitialDelay: 1 * time.Second,
},
Cache: axonflow.CacheConfig{
Enabled: true,
TTL: 60 * time.Second,
},
})

MAP plan timeout

MAP (Multi-Agent Planning) plans chain multiple LLM calls end-to-end. A five-step plan at ~15s/step takes about 60-75s by itself, longer than the default Timeout of 60s. The SDK uses a separate MapTimeout (default 120s) for every plan-lifecycle call (GeneratePlan, ExecutePlan, GetPlan, UpdatePlan, CancelPlan, ResumePlan) so those calls aren't cut off by the single-request timeout.

For plans that push beyond 120s, raise MapTimeout on the client and raise the matching knobs on the server side or the connection will be killed before the plan finishes:

client := axonflow.NewClient(axonflow.AxonFlowConfig{
Endpoint: os.Getenv("AXONFLOW_ENDPOINT"),
ClientID: os.Getenv("AXONFLOW_CLIENT_ID"),
ClientSecret: os.Getenv("AXONFLOW_CLIENT_SECRET"),
MapTimeout: 300 * time.Second, // 5-minute plan budget
})

Keep these three values moving together. The smallest link breaks the chain:

LayerKnobDefault
SDKMapTimeout120s
OrchestratorAXONFLOW_MAP_MAX_TIMEOUT_SECONDS300s
Front-door ALBidle_timeout.timeout_seconds (AlbIdleTimeoutSeconds CFN)300s

The SDK's MapTimeout must be ≤ the orchestrator cap, which must be ≤ the ALB idle timeout. The orchestrator cap clamps to 60..1800s.

Production Features

Retry

client := axonflow.NewClient(axonflow.AxonFlowConfig{
Endpoint: os.Getenv("AXONFLOW_ENDPOINT"),
ClientID: os.Getenv("AXONFLOW_CLIENT_ID"),
ClientSecret: os.Getenv("AXONFLOW_CLIENT_SECRET"),
Retry: axonflow.RetryConfig{
Enabled: true,
MaxAttempts: 3,
InitialDelay: 1 * time.Second,
},
})

Cache

client := axonflow.NewClient(axonflow.AxonFlowConfig{
Endpoint: os.Getenv("AXONFLOW_ENDPOINT"),
ClientID: os.Getenv("AXONFLOW_CLIENT_ID"),
ClientSecret: os.Getenv("AXONFLOW_CLIENT_SECRET"),
Cache: axonflow.CacheConfig{
Enabled: true,
TTL: 60 * time.Second,
},
})

first, _ := client.ProxyLLMCall("token", "query", "chat", nil)
second, _ := client.ProxyLLMCall("token", "query", "chat", nil)

_ = first
_ = second

Fail-Open in Production

client := axonflow.NewClient(axonflow.AxonFlowConfig{
Endpoint: os.Getenv("AXONFLOW_ENDPOINT"),
ClientID: os.Getenv("AXONFLOW_CLIENT_ID"),
ClientSecret: os.Getenv("AXONFLOW_CLIENT_SECRET"),
Mode: "production",
})

When AxonFlow is unavailable and the error qualifies for fail-open handling, the SDK returns a successful response with an informational Error field instead of blocking your application path.

Debug Logging

client := axonflow.NewClient(axonflow.AxonFlowConfig{
Endpoint: os.Getenv("AXONFLOW_ENDPOINT"),
ClientID: os.Getenv("AXONFLOW_CLIENT_ID"),
ClientSecret: os.Getenv("AXONFLOW_CLIENT_SECRET"),
Debug: true,
})

Sandbox Mode

For local testing, the SDK exposes a convenience sandbox client:

client := axonflow.Sandbox("demo-key")

resp, err := client.ProxyLLMCall(
"",
"Test query with sensitive data: SSN 123-45-6789",
"chat",
nil,
)
if err != nil {
panic(err)
}

if resp.Blocked {
fmt.Printf("Blocked: %s\n", resp.BlockReason)
}

Environment Variables

client := axonflow.NewClient(axonflow.AxonFlowConfig{
Endpoint: os.Getenv("AXONFLOW_ENDPOINT"),
ClientID: os.Getenv("AXONFLOW_CLIENT_ID"),
ClientSecret: os.Getenv("AXONFLOW_CLIENT_SECRET"),
})

Typical values:

AXONFLOW_ENDPOINT=https://staging-eu.getaxonflow.com
AXONFLOW_CLIENT_ID=your-org-id
AXONFLOW_CLIENT_SECRET=your-client-secret

VPC Private Endpoint

For private deployments, point the client at the internal AxonFlow endpoint exposed inside your VPC:

client := axonflow.NewClient(axonflow.AxonFlowConfig{
Endpoint: "https://YOUR_VPC_IP:8443",
ClientID: os.Getenv("AXONFLOW_CLIENT_ID"),
ClientSecret: os.Getenv("AXONFLOW_CLIENT_SECRET"),
Mode: "production",
})

Error Handling

resp, err := client.ProxyLLMCall(
"user-123",
"Analyze this data",
"chat",
nil,
)
if err != nil {
log.Printf("Request failed: %v", err)
return
}

if resp.Blocked {
log.Printf("Blocked: %s", resp.BlockReason)
if resp.PolicyInfo != nil {
log.Printf("Policies evaluated: %v", resp.PolicyInfo.PoliciesEvaluated)
}
return
}

if !resp.Success {
log.Printf("Downstream failure: %s", resp.Error)
return
}

fmt.Printf("Result: %v\n", resp.Data)

MCP Connector Integration

List Available Connectors

connectors, err := client.ListConnectors()
if err != nil {
log.Fatalf("Failed to list connectors: %v", err)
}

for _, conn := range connectors {
fmt.Printf("Connector: %s (%s)\n", conn.Name, conn.Type)
fmt.Printf(" Description: %s\n", conn.Description)
fmt.Printf(" Installed: %v\n", conn.Installed)
}

Install a Connector

err := client.InstallConnector(axonflow.ConnectorInstallRequest{
ConnectorID: "amadeus-travel",
Name: "amadeus-prod",
TenantID: "demo-tenant",
Options: map[string]interface{}{
"environment": "production",
"region": "europe",
},
Credentials: map[string]string{
"api_key": os.Getenv("AMADEUS_API_KEY"),
"api_secret": os.Getenv("AMADEUS_API_SECRET"),
},
})
if err != nil {
log.Fatalf("Failed to install connector: %v", err)
}

Query a Connector

resp, err := client.QueryConnector(
"user-123",
"amadeus-prod",
"Find flights from Paris to Amsterdam on 2025-12-15",
map[string]interface{}{
"origin": "CDG",
"destination": "AMS",
"date": "2025-12-15",
"adults": 1,
},
)
if err != nil {
log.Fatalf("Connector query failed: %v", err)
}

if resp.Success {
fmt.Printf("Connector data: %v\n", resp.Data)
} else {
fmt.Printf("Query failed: %s\n", resp.Error)
}

Multi-Agent Planning (MAP)

Generate a Plan

plan, err := client.GeneratePlan(
"Plan a 3-day business trip to Paris with hotel, transport, and dinner recommendations",
"travel",
"user-123",
)
if err != nil {
log.Fatalf("Plan generation failed: %v", err)
}

fmt.Printf("Generated plan %s with %d steps\n", plan.PlanID, len(plan.Steps))
fmt.Printf("Complexity: %d, Parallel: %v\n", plan.Complexity, plan.Parallel)

Execute a Plan

execResp, err := client.ExecutePlan(plan.PlanID, "user-123")
if err != nil {
log.Fatalf("Plan execution failed: %v", err)
}

fmt.Printf("Status: %s\n", execResp.Status)
fmt.Printf("Result: %s\n", execResp.Result)

Poll Plan Status

status, err := client.GetPlanStatus(plan.PlanID)
if err != nil {
log.Fatalf("Failed to get plan status: %v", err)
}

fmt.Printf("Current status: %s\n", status.Status)

Gateway Mode

Gateway Mode is the right fit when you want AxonFlow to evaluate policy before the call, while your application keeps direct ownership of the provider request.

Pre-Check Policy Approval

policyCtx, err := client.GetPolicyApprovedContext(
"user-jwt-token",
"Analyze customer churn patterns",
[]string{"postgres"},
map[string]interface{}{
"department": "analytics",
},
)
if err != nil {
log.Fatalf("Pre-check failed: %v", err)
}

if !policyCtx.Approved {
log.Fatalf("Blocked: %s", policyCtx.BlockReason)
}

Direct LLM Call

import (
"context"
"fmt"
"time"

openai "github.com/sashabaranov/go-openai"
)

start := time.Now()
resp, err := openaiClient.CreateChatCompletion(
context.Background(),
openai.ChatCompletionRequest{
Model: openai.GPT4oMini,
Messages: []openai.ChatCompletionMessage{
{Role: "user", Content: fmt.Sprintf("%v", policyCtx.ApprovedData)},
},
},
)
latencyMs := time.Since(start).Milliseconds()
if err != nil {
log.Fatalf("OpenAI call failed: %v", err)
}

Audit the Call

_, err = client.AuditLLMCall(
policyCtx.ContextID,
resp.Choices[0].Message.Content[:min(100, len(resp.Choices[0].Message.Content))],
"openai",
"gpt-4o-mini",
axonflow.TokenUsage{
PromptTokens: resp.Usage.PromptTokens,
CompletionTokens: resp.Usage.CompletionTokens,
TotalTokens: resp.Usage.TotalTokens,
},
latencyMs,
map[string]interface{}{
"workflow": "churn-analysis",
},
)
if err != nil {
log.Printf("Audit logging failed: %v", err)
}

See Choosing a Mode for the broader trade-offs.

LLM Interceptors

Use interceptors when you already have provider client code and want governance with minimal application changes.

import (
"context"
"log"

"github.com/getaxonflow/axonflow-sdk-go/v5/interceptors"
)

wrapped := interceptors.WrapOpenAIClient(yourOpenAIClient, client, "user-123")

resp, err := wrapped.CreateChatCompletion(context.Background(), interceptors.ChatCompletionRequest{
Model: "gpt-4o-mini",
Messages: []interceptors.ChatMessage{
{Role: "user", Content: "Summarize these notes"},
},
})
if err != nil {
if interceptors.IsPolicyViolationError(err) {
violation, _ := interceptors.GetPolicyViolation(err)
log.Printf("Blocked: %s (policies: %v)", violation.BlockReason, violation.Policies)
return
}
log.Fatal(err)
}

_ = resp

Supported Go wrappers:

  • WrapOpenAIClient
  • WrapAnthropicClient
  • WrapGeminiModel
  • WrapOllamaClient
  • WrapBedrockClient

See LLM Interceptors for provider-specific details.

Health Check

if err := client.HealthCheck(); err != nil {
log.Printf("AxonFlow is unhealthy: %v", err)
} else {
log.Println("AxonFlow is healthy")
}

Middleware Pattern

For HTTP services, keep a shared AxonFlowClient and call Proxy Mode or Gateway Mode inside request handlers:

func axonflowMiddleware(client *axonflow.AxonFlowClient) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
userToken := r.Header.Get("Authorization")

resp, err := client.ProxyLLMCall(
userToken,
r.FormValue("prompt"),
"chat",
nil,
)
if err != nil {
http.Error(w, "governance check failed", http.StatusInternalServerError)
return
}
if resp.Blocked {
http.Error(w, resp.BlockReason, http.StatusForbidden)
return
}

next.ServeHTTP(w, r)
})
}
}

Concurrent Request Handling

The client is safe to reuse across goroutines.

func processQueriesConcurrently(client *axonflow.AxonFlowClient, queries []string) {
var wg sync.WaitGroup
results := make(chan *axonflow.ClientResponse, len(queries))

for _, query := range queries {
wg.Add(1)
go func(q string) {
defer wg.Done()

resp, err := client.ProxyLLMCall("user-123", q, "chat", nil)
if err != nil {
log.Printf("Query failed: %v", err)
return
}

results <- resp
}(query)
}

wg.Wait()
close(results)

for resp := range results {
fmt.Printf("Result: %v\n", resp.Data)
}
}

Configuration Reference

FieldTypeDefaultDescription
EndpointstringRequiredAxonFlow endpoint URL
ClientIDstringOptional in community, required for enterprise featuresOrganization or client identifier
ClientSecretstringOptional in community, required for enterprise featuresAuthentication secret
Modestring"production""production" or "sandbox"
DebugboolfalseEnable SDK debug logging
Timeouttime.Duration60sStandard request timeout
MapTimeouttime.Duration120sLonger timeout for multi-agent planning operations
Retry.EnabledbooltrueEnable retry logic
Retry.MaxAttemptsint3Maximum retry attempts
Retry.InitialDelaytime.Duration1sInitial retry delay
Cache.EnabledbooltrueEnable in-memory response cache
Cache.TTLtime.Duration60sCache time-to-live
InsecureSkipTLSVerifyboolfalseSkip TLS verification for local/dev scenarios only

Production Best Practices

1. Load Credentials from the Environment

client := axonflow.NewClient(axonflow.AxonFlowConfig{
Endpoint: os.Getenv("AXONFLOW_ENDPOINT"),
ClientID: os.Getenv("AXONFLOW_CLIENT_ID"),
ClientSecret: os.Getenv("AXONFLOW_CLIENT_SECRET"),
})

2. Use Mode: "production" for Resilience

client := axonflow.NewClient(axonflow.AxonFlowConfig{
Endpoint: os.Getenv("AXONFLOW_ENDPOINT"),
ClientID: os.Getenv("AXONFLOW_CLIENT_ID"),
ClientSecret: os.Getenv("AXONFLOW_CLIENT_SECRET"),
Mode: "production",
})

3. Tune Retry and Cache for Real Traffic

client := axonflow.NewClient(axonflow.AxonFlowConfig{
Endpoint: os.Getenv("AXONFLOW_ENDPOINT"),
ClientID: os.Getenv("AXONFLOW_CLIENT_ID"),
ClientSecret: os.Getenv("AXONFLOW_CLIENT_SECRET"),
Retry: axonflow.RetryConfig{
Enabled: true,
MaxAttempts: 3,
InitialDelay: 1 * time.Second,
},
Cache: axonflow.CacheConfig{
Enabled: true,
TTL: 60 * time.Second,
},
})

4. Disable Debug in Production

Use Debug: true during development and troubleshooting, but keep it off in steady-state production services.

Examples

Testing

Mock the client behavior behind your own interface so handler and service tests do not depend on live AxonFlow endpoints:

type GovernanceClient interface {
ProxyLLMCall(userToken, query, requestType string, context map[string]interface{}) (*axonflow.ClientResponse, error)
}