Connector SDK
The AxonFlow Connector SDK provides a framework for building custom MCP connectors. Use it to integrate AxonFlow with your internal systems, databases, or third-party APIs.
Overview
The SDK provides:
- BaseConnector: Embeddable base with lifecycle, metrics, and health checks
- Authentication: OAuth 2.0, API Key, IAM, and custom auth providers
- Rate Limiting: Token bucket rate limiter with adaptive limits
- Retry Logic: Exponential backoff with jitter
- Metrics: Prometheus-compatible metrics collection
- Testing: Mock connector and test harness
Quick Start
1. Create Your Connector
package myconnector
import (
"context"
"axonflow/platform/connectors/sdk"
"axonflow/platform/connectors/base"
)
type MyConnector struct {
sdk.BaseConnector
client *http.Client
apiKey string
}
func NewMyConnector() *MyConnector {
return &MyConnector{
BaseConnector: sdk.BaseConnector{
ConnectorType: "myconnector",
ConnectorVersion: "1.0.0",
ConnectorCaps: []string{"query", "execute"},
},
client: &http.Client{Timeout: 30 * time.Second},
}
}
2. Implement Connect
func (c *MyConnector) Connect(ctx context.Context, config *base.ConnectorConfig) error {
// Call base connect for validation
if err := c.BaseConnector.Connect(ctx, config); err != nil {
return err
}
// Get credentials
c.apiKey = config.Credentials["api_key"]
if c.apiKey == "" {
return &base.ConnectorError{
Code: "MISSING_CREDENTIALS",
Message: "api_key is required",
}
}
c.SetConnected(true)
return nil
}
3. Implement Query
func (c *MyConnector) Query(ctx context.Context, q *base.Query) (*base.QueryResult, error) {
if !c.IsConnected() {
return nil, &base.ConnectorError{
Code: "NOT_CONNECTED",
Message: "connector is not connected",
}
}
start := time.Now()
defer func() {
c.GetMetrics().RecordQuery(time.Since(start))
}()
// Build request
req, err := http.NewRequestWithContext(ctx, "GET",
fmt.Sprintf("https://api.example.com/%s", q.Statement), nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+c.apiKey)
// Execute request
resp, err := c.client.Do(req)
if err != nil {
c.GetMetrics().RecordQueryError()
return nil, err
}
defer resp.Body.Close()
// Parse response
var data map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
return nil, err
}
return &base.QueryResult{
Rows: []map[string]interface{}{data},
}, nil
}
4. Implement Execute
func (c *MyConnector) Execute(ctx context.Context, cmd *base.Command) (*base.CommandResult, error) {
if !c.IsConnected() {
return nil, &base.ConnectorError{
Code: "NOT_CONNECTED",
Message: "connector is not connected",
}
}
start := time.Now()
defer func() {
c.GetMetrics().RecordExecute(time.Since(start))
}()
// Build request body
body, _ := json.Marshal(cmd.Parameters)
req, err := http.NewRequestWithContext(ctx, "POST",
fmt.Sprintf("https://api.example.com/%s", cmd.Action),
bytes.NewReader(body))
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+c.apiKey)
req.Header.Set("Content-Type", "application/json")
resp, err := c.client.Do(req)
if err != nil {
c.GetMetrics().RecordExecuteError()
return nil, err
}
defer resp.Body.Close()
return &base.CommandResult{
Success: resp.StatusCode < 300,
AffectedRows: 1,
}, nil
}
Authentication Providers
API Key
auth := sdk.NewAPIKeyAuth("your-api-key", "X-API-Key", sdk.APIKeyHeader)
// Or in query parameter
auth := sdk.NewAPIKeyAuth("your-api-key", "api_key", sdk.APIKeyQuery)
Bearer Token
auth := sdk.NewBearerTokenAuth("your-token")
Basic Auth
auth := sdk.NewBasicAuth("username", "password")
OAuth 2.0
auth := sdk.NewOAuthAuth(
"https://auth.example.com/token",
"client-id",
"client-secret",
[]string{"read", "write"},
)
// Token auto-refresh is handled automatically
AWS IAM (Signature V4)
auth := sdk.NewIAMAuth(
"AKIAIOSFODNN7EXAMPLE",
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"us-east-1",
"execute-api",
)
Using Auth Providers
func (c *MyConnector) Connect(ctx context.Context, config *base.ConnectorConfig) error {
c.auth = sdk.NewOAuthAuth(
config.Options["token_url"].(string),
config.Credentials["client_id"],
config.Credentials["client_secret"],
[]string{"read"},
)
return nil
}
func (c *MyConnector) Query(ctx context.Context, q *base.Query) (*base.QueryResult, error) {
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
// Apply authentication
if err := c.auth.Apply(req); err != nil {
return nil, err
}
// Make request...
}
Rate Limiting
Basic Rate Limiter
// 100 requests per second, burst of 10
limiter := sdk.NewRateLimiter(100, 10)
func (c *MyConnector) Query(ctx context.Context, q *base.Query) (*base.QueryResult, error) {
// Wait for rate limit token
if err := limiter.Wait(ctx); err != nil {
return nil, err // Context cancelled
}
// Make request...
}
Adaptive Rate Limiter
Automatically adjusts based on API response headers:
limiter := sdk.NewAdaptiveRateLimiter(100, 10)
resp, err := client.Do(req)
if err == nil {
// Update limits from response headers
limiter.UpdateFromHeaders(resp.Header)
}
Multi-Tenant Rate Limiting
Per-tenant rate limiting:
limiter := sdk.NewMultiTenantRateLimiter(func() *sdk.RateLimiter {
return sdk.NewRateLimiter(10, 5) // 10 req/s per tenant
})
func (c *MyConnector) Query(ctx context.Context, q *base.Query) (*base.QueryResult, error) {
tenantID := q.Parameters["tenant_id"].(string)
if err := limiter.Wait(ctx, tenantID); err != nil {
return nil, err
}
// Make request...
}
Retry Logic
Basic Retry
config := sdk.DefaultRetryConfig()
result, err := sdk.RetryWithBackoff(ctx, config, func(ctx context.Context) (string, error) {
return callExternalAPI()
})
Custom Retry Config
config := sdk.RetryConfig{
MaxRetries: 5,
InitialBackoff: 100 * time.Millisecond,
MaxBackoff: 30 * time.Second,
BackoffFactor: 2.0,
Jitter: 0.1,
RetryIf: sdk.DefaultRetryable, // Retry on 429, 500-599
}
Circuit Breaker
cb := sdk.NewCircuitBreaker(5, 30*time.Second) // Open after 5 failures
func (c *MyConnector) Query(ctx context.Context, q *base.Query) (*base.QueryResult, error) {
if !cb.Allow() {
return nil, errors.New("circuit breaker open")
}
result, err := c.doQuery(ctx, q)
if err != nil {
cb.RecordFailure()
return nil, err
}
cb.RecordSuccess()
return result, nil
}
Metrics
Recording Metrics
func NewMyConnector() *MyConnector {
c := &MyConnector{
BaseConnector: sdk.BaseConnector{
ConnectorType: "myconnector",
},
}
// Metrics are automatically initialized by BaseConnector
return c
}
func (c *MyConnector) Query(ctx context.Context, q *base.Query) (*base.QueryResult, error) {
start := time.Now()
result, err := c.doQuery(ctx, q)
if err != nil {
c.GetMetrics().RecordQueryError()
} else {
c.GetMetrics().RecordQuery(time.Since(start))
}
return result, err
}
Available Metrics
metrics.RecordQuery(duration) // Query latency
metrics.RecordQueryError() // Query errors
metrics.RecordExecute(duration) // Execute latency
metrics.RecordExecuteError() // Execute errors
metrics.RecordHealthCheck(dur, ok) // Health check
metrics.RecordConnect() // Connection established
metrics.RecordDisconnect() // Disconnected
// Get stats
stats := metrics.GetStats()
// Returns: query_count, query_errors, avg_latency, etc.
Testing
Mock Connector
func TestMyHandler(t *testing.T) {
mock := sdk.NewMockConnector()
mock.SetQueryResult(&base.QueryResult{
Rows: []map[string]interface{}{
{"id": 1, "name": "test"},
},
})
// Use mock in your handler
handler := NewHandler(mock)
result, err := handler.HandleQuery(ctx, query)
// Assert results
assert.NoError(t, err)
assert.Equal(t, 1, len(result.Rows))
}
Test Harness
func TestMyConnector(t *testing.T) {
conn := NewMyConnector()
harness := sdk.NewTestHarness(conn)
config := &base.ConnectorConfig{
Name: "test",
Credentials: map[string]string{
"api_key": "test-key",
},
}
// Test connection
harness.TestConnection(t, config)
// Test query
harness.TestQuery(t, &base.Query{
Statement: "users",
})
// Test execute
harness.TestExecute(t, &base.Command{
Action: "create_user",
Parameters: map[string]interface{}{
"name": "test",
},
})
}
Benchmark Harness
func BenchmarkMyConnector(b *testing.B) {
conn := NewMyConnector()
conn.Connect(ctx, config)
harness := sdk.NewBenchmarkHarness(conn)
harness.BenchmarkQuery(b, &base.Query{Statement: "users"})
}
Registration
Register your connector with the MCP registry:
// In platform/agent/mcp_handler.go
func registerMyConnector() error {
config := loadMyConnectorConfig()
if config == nil {
return nil // Skip if not configured
}
conn := myconnector.NewMyConnector()
if err := conn.Connect(context.Background(), config); err != nil {
return err
}
return mcpRegistry.Register("my-connector", conn)
}
Best Practices
- Always call BaseConnector methods - They handle lifecycle and metrics
- Use context for cancellation - Pass context through all operations
- Record metrics - Use
GetMetrics()for all operations - Handle errors gracefully - Return
ConnectorErrorwith codes - Implement health checks - Override
HealthCheck()for meaningful checks - Write tests - Use the test harness for comprehensive testing
- Document capabilities - Set accurate
ConnectorCapsarray
Next Steps
- MCP Overview - Understanding MCP architecture
- S3 Connector - Example Community connector
- Connector Setup - Enterprise configuration (Enterprise)
Enterprise Feature
The Connector SDK will be extracted to a separate repository (getaxonflow/axonflow-connector-sdk) to enable third-party connector development. See Issue #325 for details.