Skip to main content

Connector SDK

The AxonFlow Connector SDK provides a framework for building custom MCP connectors. Use it to integrate AxonFlow with your internal systems, databases, or third-party APIs.

Overview

The SDK provides:

  • BaseConnector: Embeddable base with lifecycle, metrics, and health checks
  • Authentication: OAuth 2.0, API Key, IAM, and custom auth providers
  • Rate Limiting: Token bucket rate limiter with adaptive limits
  • Retry Logic: Exponential backoff with jitter
  • Metrics: Prometheus-compatible metrics collection
  • Testing: Mock connector and test harness

Quick Start

1. Create Your Connector

package myconnector

import (
"context"
"axonflow/platform/connectors/sdk"
"axonflow/platform/connectors/base"
)

type MyConnector struct {
sdk.BaseConnector
client *http.Client
apiKey string
}

func NewMyConnector() *MyConnector {
return &MyConnector{
BaseConnector: sdk.BaseConnector{
ConnectorType: "myconnector",
ConnectorVersion: "1.0.0",
ConnectorCaps: []string{"query", "execute"},
},
client: &http.Client{Timeout: 30 * time.Second},
}
}

2. Implement Connect

func (c *MyConnector) Connect(ctx context.Context, config *base.ConnectorConfig) error {
// Call base connect for validation
if err := c.BaseConnector.Connect(ctx, config); err != nil {
return err
}

// Get credentials
c.apiKey = config.Credentials["api_key"]
if c.apiKey == "" {
return &base.ConnectorError{
Code: "MISSING_CREDENTIALS",
Message: "api_key is required",
}
}

c.SetConnected(true)
return nil
}

3. Implement Query

func (c *MyConnector) Query(ctx context.Context, q *base.Query) (*base.QueryResult, error) {
if !c.IsConnected() {
return nil, &base.ConnectorError{
Code: "NOT_CONNECTED",
Message: "connector is not connected",
}
}

start := time.Now()
defer func() {
c.GetMetrics().RecordQuery(time.Since(start))
}()

// Build request
req, err := http.NewRequestWithContext(ctx, "GET",
fmt.Sprintf("https://api.example.com/%s", q.Statement), nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+c.apiKey)

// Execute request
resp, err := c.client.Do(req)
if err != nil {
c.GetMetrics().RecordQueryError()
return nil, err
}
defer resp.Body.Close()

// Parse response
var data map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
return nil, err
}

return &base.QueryResult{
Rows: []map[string]interface{}{data},
}, nil
}

4. Implement Execute

func (c *MyConnector) Execute(ctx context.Context, cmd *base.Command) (*base.CommandResult, error) {
if !c.IsConnected() {
return nil, &base.ConnectorError{
Code: "NOT_CONNECTED",
Message: "connector is not connected",
}
}

start := time.Now()
defer func() {
c.GetMetrics().RecordExecute(time.Since(start))
}()

// Build request body
body, _ := json.Marshal(cmd.Parameters)

req, err := http.NewRequestWithContext(ctx, "POST",
fmt.Sprintf("https://api.example.com/%s", cmd.Action),
bytes.NewReader(body))
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+c.apiKey)
req.Header.Set("Content-Type", "application/json")

resp, err := c.client.Do(req)
if err != nil {
c.GetMetrics().RecordExecuteError()
return nil, err
}
defer resp.Body.Close()

return &base.CommandResult{
Success: resp.StatusCode < 300,
AffectedRows: 1,
}, nil
}

Authentication Providers

API Key

auth := sdk.NewAPIKeyAuth("your-api-key", "X-API-Key", sdk.APIKeyHeader)
// Or in query parameter
auth := sdk.NewAPIKeyAuth("your-api-key", "api_key", sdk.APIKeyQuery)

Bearer Token

auth := sdk.NewBearerTokenAuth("your-token")

Basic Auth

auth := sdk.NewBasicAuth("username", "password")

OAuth 2.0

auth := sdk.NewOAuthAuth(
"https://auth.example.com/token",
"client-id",
"client-secret",
[]string{"read", "write"},
)

// Token auto-refresh is handled automatically

AWS IAM (Signature V4)

auth := sdk.NewIAMAuth(
"AKIAIOSFODNN7EXAMPLE",
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"us-east-1",
"execute-api",
)

Using Auth Providers

func (c *MyConnector) Connect(ctx context.Context, config *base.ConnectorConfig) error {
c.auth = sdk.NewOAuthAuth(
config.Options["token_url"].(string),
config.Credentials["client_id"],
config.Credentials["client_secret"],
[]string{"read"},
)
return nil
}

func (c *MyConnector) Query(ctx context.Context, q *base.Query) (*base.QueryResult, error) {
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)

// Apply authentication
if err := c.auth.Apply(req); err != nil {
return nil, err
}

// Make request...
}

Rate Limiting

Basic Rate Limiter

// 100 requests per second, burst of 10
limiter := sdk.NewRateLimiter(100, 10)

func (c *MyConnector) Query(ctx context.Context, q *base.Query) (*base.QueryResult, error) {
// Wait for rate limit token
if err := limiter.Wait(ctx); err != nil {
return nil, err // Context cancelled
}

// Make request...
}

Adaptive Rate Limiter

Automatically adjusts based on API response headers:

limiter := sdk.NewAdaptiveRateLimiter(100, 10)

resp, err := client.Do(req)
if err == nil {
// Update limits from response headers
limiter.UpdateFromHeaders(resp.Header)
}

Multi-Tenant Rate Limiting

Per-tenant rate limiting:

limiter := sdk.NewMultiTenantRateLimiter(func() *sdk.RateLimiter {
return sdk.NewRateLimiter(10, 5) // 10 req/s per tenant
})

func (c *MyConnector) Query(ctx context.Context, q *base.Query) (*base.QueryResult, error) {
tenantID := q.Parameters["tenant_id"].(string)
if err := limiter.Wait(ctx, tenantID); err != nil {
return nil, err
}
// Make request...
}

Retry Logic

Basic Retry

config := sdk.DefaultRetryConfig()

result, err := sdk.RetryWithBackoff(ctx, config, func(ctx context.Context) (string, error) {
return callExternalAPI()
})

Custom Retry Config

config := sdk.RetryConfig{
MaxRetries: 5,
InitialBackoff: 100 * time.Millisecond,
MaxBackoff: 30 * time.Second,
BackoffFactor: 2.0,
Jitter: 0.1,
RetryIf: sdk.DefaultRetryable, // Retry on 429, 500-599
}

Circuit Breaker

cb := sdk.NewCircuitBreaker(5, 30*time.Second) // Open after 5 failures

func (c *MyConnector) Query(ctx context.Context, q *base.Query) (*base.QueryResult, error) {
if !cb.Allow() {
return nil, errors.New("circuit breaker open")
}

result, err := c.doQuery(ctx, q)
if err != nil {
cb.RecordFailure()
return nil, err
}

cb.RecordSuccess()
return result, nil
}

Metrics

Recording Metrics

func NewMyConnector() *MyConnector {
c := &MyConnector{
BaseConnector: sdk.BaseConnector{
ConnectorType: "myconnector",
},
}
// Metrics are automatically initialized by BaseConnector
return c
}

func (c *MyConnector) Query(ctx context.Context, q *base.Query) (*base.QueryResult, error) {
start := time.Now()

result, err := c.doQuery(ctx, q)

if err != nil {
c.GetMetrics().RecordQueryError()
} else {
c.GetMetrics().RecordQuery(time.Since(start))
}

return result, err
}

Available Metrics

metrics.RecordQuery(duration)      // Query latency
metrics.RecordQueryError() // Query errors
metrics.RecordExecute(duration) // Execute latency
metrics.RecordExecuteError() // Execute errors
metrics.RecordHealthCheck(dur, ok) // Health check
metrics.RecordConnect() // Connection established
metrics.RecordDisconnect() // Disconnected

// Get stats
stats := metrics.GetStats()
// Returns: query_count, query_errors, avg_latency, etc.

Testing

Mock Connector

func TestMyHandler(t *testing.T) {
mock := sdk.NewMockConnector()
mock.SetQueryResult(&base.QueryResult{
Rows: []map[string]interface{}{
{"id": 1, "name": "test"},
},
})

// Use mock in your handler
handler := NewHandler(mock)
result, err := handler.HandleQuery(ctx, query)

// Assert results
assert.NoError(t, err)
assert.Equal(t, 1, len(result.Rows))
}

Test Harness

func TestMyConnector(t *testing.T) {
conn := NewMyConnector()
harness := sdk.NewTestHarness(conn)

config := &base.ConnectorConfig{
Name: "test",
Credentials: map[string]string{
"api_key": "test-key",
},
}

// Test connection
harness.TestConnection(t, config)

// Test query
harness.TestQuery(t, &base.Query{
Statement: "users",
})

// Test execute
harness.TestExecute(t, &base.Command{
Action: "create_user",
Parameters: map[string]interface{}{
"name": "test",
},
})
}

Benchmark Harness

func BenchmarkMyConnector(b *testing.B) {
conn := NewMyConnector()
conn.Connect(ctx, config)

harness := sdk.NewBenchmarkHarness(conn)
harness.BenchmarkQuery(b, &base.Query{Statement: "users"})
}

Registration

Register your connector with the MCP registry:

// In platform/agent/mcp_handler.go

func registerMyConnector() error {
config := loadMyConnectorConfig()
if config == nil {
return nil // Skip if not configured
}

conn := myconnector.NewMyConnector()
if err := conn.Connect(context.Background(), config); err != nil {
return err
}

return mcpRegistry.Register("my-connector", conn)
}

Best Practices

  1. Always call BaseConnector methods - They handle lifecycle and metrics
  2. Use context for cancellation - Pass context through all operations
  3. Record metrics - Use GetMetrics() for all operations
  4. Handle errors gracefully - Return ConnectorError with codes
  5. Implement health checks - Override HealthCheck() for meaningful checks
  6. Write tests - Use the test harness for comprehensive testing
  7. Document capabilities - Set accurate ConnectorCaps array

Next Steps

Enterprise Feature

The Connector SDK will be extracted to a separate repository (getaxonflow/axonflow-connector-sdk) to enable third-party connector development. See Issue #325 for details.