Custom Provider SDK
The AxonFlow LLM Provider SDK provides a comprehensive framework for building custom LLM providers with built-in support for authentication, rate limiting, retry logic, and circuit breaker patterns.
Quick Start
Using the Fluent Builder
The simplest way to create a custom provider:
package main
import (
"context"
"os"
"axonflow/platform/orchestrator/llm"
"axonflow/platform/orchestrator/llm/sdk"
)
func main() {
// Create a custom provider using the fluent builder
provider := sdk.NewProviderBuilder("my-provider", llm.ProviderTypeCustom).
WithModel("my-model-v1").
WithEndpoint("https://api.myprovider.com/v1").
WithAuth(sdk.NewAPIKeyAuth(os.Getenv("MY_API_KEY"))).
WithRateLimiter(sdk.NewRateLimiter(100, 100)).
WithRetry(sdk.DefaultRetryConfig()).
WithCompleteFunc(myCompletionHandler).
Build()
// Use the provider
ctx := context.Background()
resp, err := provider.Complete(ctx, llm.CompletionRequest{
Prompt: "Hello, world!",
MaxTokens: 100,
})
}
func myCompletionHandler(ctx context.Context, req llm.CompletionRequest) (*llm.CompletionResponse, error) {
// Your implementation here
return &llm.CompletionResponse{
Content: "Response from custom provider",
Model: req.Model,
Usage: llm.UsageStats{
PromptTokens: 10,
CompletionTokens: 20,
TotalTokens: 30,
},
}, nil
}
All Builder Options
The fluent builder supports the following options:
provider := sdk.NewProviderBuilder("my-provider", llm.ProviderTypeCustom).
// Required
WithCompleteFunc(handler). // Completion handler function
// Configuration
WithModel("gpt-4"). // Default model
WithEndpoint("https://api.example.com"). // API endpoint
WithTimeout(30 * time.Second). // Request timeout
// Authentication & networking
WithAuth(sdk.NewAPIKeyAuth("key")). // Authentication provider
WithHTTPClient(customClient). // Custom HTTP client
// Resilience
WithRateLimiter(limiter). // Rate limiting
WithRetry(retryConfig). // Retry configuration
// Features
WithStreaming(true). // Enable streaming support
WithCapabilities(llm.CapabilityChat, llm.CapabilityCompletion).
// Customization
WithLogger(logger). // Custom logger
WithCostEstimator(estimator). // Custom cost estimation
WithHealthChecker(checker). // Custom health check
Build()
Authentication Providers
The SDK supports multiple authentication methods:
API Key Authentication
// API Key in Authorization header (Bearer token) - most common for LLMs
auth := sdk.NewAPIKeyAuth("sk-your-api-key")
// API Key in custom header
auth := sdk.NewAPIKeyAuthWithHeader("sk-xxx", "X-API-Key")
// API Key as query parameter
auth := sdk.NewAPIKeyAuthWithQuery("sk-xxx", "api_key")
Other Methods
// Basic authentication
auth := sdk.NewBasicAuth("username", "password")
// Bearer token
auth := sdk.NewBearerTokenAuth("your-token")
// No authentication (for local/internal providers like Ollama)
auth := sdk.NewNoAuth()
// Chain multiple auth providers
auth := sdk.NewChainedAuth(apiKeyAuth, customHeaderAuth)
Rate Limiting
Token bucket rate limiting to respect API quotas:
// 100 requests per second with burst of 100
limiter := sdk.NewRateLimiter(100, 100)
// Wait for permission (blocks until available)
if err := limiter.Wait(ctx); err != nil {
return err // Context cancelled
}
// Try without blocking
if !limiter.TryAcquire() {
return errors.New("rate limited")
}
// Check available tokens
available := limiter.Available()
// Dynamically adjust rate
limiter.SetRate(50) // Reduce to 50 rps
limiter.SetBurst(200) // Increase burst capacity
Multi-Tenant Rate Limiting
For SaaS applications:
// Create per-tenant rate limiters
mtLimiter := sdk.NewMultiTenantRateLimiter(func() *sdk.RateLimiter {
return sdk.NewRateLimiter(10, 10) // 10 rps per tenant
})
// Wait for specific tenant
err := mtLimiter.Wait(ctx, "tenant-123")
// Or try without blocking
if mtLimiter.TryAcquire("tenant-456") {
// Proceed with request
}
// Clean up inactive tenants
mtLimiter.RemoveTenant("tenant-123")
Retry with Exponential Backoff
Automatic retry with configurable backoff:
// Use default configuration
config := sdk.DefaultRetryConfig()
// Or customize
config := sdk.RetryConfig{
MaxRetries: 5,
InitialBackoff: 100 * time.Millisecond,
MaxBackoff: 30 * time.Second,
BackoffFactor: 2.0,
Jitter: 0.1, // 10% jitter to avoid thundering herd
RetryIf: sdk.DefaultRetryable,
}
// Execute with retry
result, err := sdk.RetryWithBackoff(ctx, config, func(ctx context.Context) (*Response, error) {
return callAPI(ctx)
})
// Custom retry conditions
config.RetryIf = func(err error) bool {
// Only retry rate limit errors
if apiErr, ok := err.(*sdk.APIError); ok {
return apiErr.StatusCode == 429
}
return false
}
Circuit Breaker
Prevent cascading failures:
// Open circuit after 5 failures, reset after 30 seconds
cb := sdk.NewCircuitBreaker(5, 30*time.Second)
if cb.Allow() {
resp, err := callAPI()
if err != nil {
cb.RecordFailure()
// Circuit opens after threshold
} else {
cb.RecordSuccess()
// Circuit resets to closed
}
} else {
// Circuit is open, fail fast
return errors.New("service unavailable")
}
// Check circuit state
switch cb.State() {
case sdk.CircuitClosed:
// Normal operation
case sdk.CircuitOpen:
// Blocking requests
case sdk.CircuitHalfOpen:
// Testing with a single request
}
// Manual reset if needed
cb.Reset()
Implementing Provider Interface
For complex providers, implement the llm.Provider interface directly:
type MyProvider struct {
name string
client *http.Client
auth sdk.AuthProvider
limiter *sdk.RateLimiter
endpoint string
}
func (p *MyProvider) Name() string { return p.name }
func (p *MyProvider) Type() llm.ProviderType { return llm.ProviderTypeCustom }
func (p *MyProvider) Complete(ctx context.Context, req llm.CompletionRequest) (*llm.CompletionResponse, error) {
// Apply rate limiting
if err := p.limiter.Wait(ctx); err != nil {
return nil, err
}
// Build HTTP request
body := buildRequestBody(req)
httpReq, _ := http.NewRequestWithContext(ctx, "POST", p.endpoint+"/completions", body)
p.auth.Apply(httpReq)
httpReq.Header.Set("Content-Type", "application/json")
// Execute with retry
return sdk.RetryWithBackoff(ctx, *sdk.DefaultRetryConfig(), func(ctx context.Context) (*llm.CompletionResponse, error) {
resp, err := p.client.Do(httpReq)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return parseResponse(resp)
})
}
func (p *MyProvider) HealthCheck(ctx context.Context) (*llm.HealthCheckResult, error) {
// Implement health check
}
func (p *MyProvider) Capabilities() []llm.Capability {
return []llm.Capability{llm.CapabilityChat, llm.CapabilityCompletion}
}
func (p *MyProvider) SupportsStreaming() bool { return false }
func (p *MyProvider) EstimateCost(req llm.CompletionRequest) *llm.CostEstimate {
// Implement cost estimation
}
Registering with the Factory
Register your custom provider with the factory system:
// Register the factory function
llm.RegisterFactory(llm.ProviderTypeCustom, func(config llm.ProviderConfig) (llm.Provider, error) {
return sdk.NewProviderBuilder(config.Name, llm.ProviderTypeCustom).
WithModel(config.Model).
WithEndpoint(config.Endpoint).
WithAuth(sdk.NewAPIKeyAuth(config.APIKey)).
WithRateLimiter(sdk.NewRateLimiter(100, 100)).
WithRetry(sdk.DefaultRetryConfig()).
WithCompleteFunc(myHandler).
Build(), nil
})
// Now it can be used via the registry
registry := llm.NewRegistry()
registry.Register(ctx, &llm.ProviderConfig{
Name: "my-custom-llm",
Type: llm.ProviderTypeCustom,
APIKey: os.Getenv("MY_API_KEY"),
Model: "my-model-v1",
Endpoint: "https://api.myprovider.com",
Enabled: true,
})
provider, _ := registry.Get(ctx, "my-custom-llm")
Configuration via YAML
Providers can be configured via YAML file with environment variable expansion:
# axonflow.yaml
version: "1.0"
llm_providers:
my_custom_provider:
enabled: true
display_name: "My Custom LLM"
config:
model: "my-model-v1"
endpoint: "https://api.myprovider.com/v1"
max_tokens: 4096
credentials:
api_key: ${MY_PROVIDER_API_KEY}
priority: 10
weight: 0.5
Testing
Use the SDK for unit testing:
func TestMyProvider(t *testing.T) {
// Create a mock completion function
provider := sdk.NewProviderBuilder("test", llm.ProviderTypeCustom).
WithCompleteFunc(func(ctx context.Context, req llm.CompletionRequest) (*llm.CompletionResponse, error) {
return &llm.CompletionResponse{
Content: "mock response",
Model: req.Model,
}, nil
}).
Build()
ctx := context.Background()
resp, err := provider.Complete(ctx, llm.CompletionRequest{
Prompt: "test",
})
require.NoError(t, err)
assert.Equal(t, "mock response", resp.Content)
}
Best Practices
- Always use rate limiting to respect API quotas
- Implement retries for transient failures (rate limits, 5xx errors)
- Use circuit breakers for production deployments
- Set appropriate timeouts - LLM calls can be slow
- Implement health checks to enable automatic failover
- Log errors but don't expose API keys in logs
Next Steps
- LLM Providers Overview - All supported providers
- AWS Bedrock Setup - HIPAA-compliant deployment
- Ollama Setup - Self-hosted deployment