Skip to main content

Connector SDK

The AxonFlow Connector SDK provides a framework for building custom MCP connectors. Use it to integrate AxonFlow with your internal systems, databases, or third-party APIs.

Overview

The SDK provides:

  • BaseConnector: Embeddable base with lifecycle, metrics, and health checks
  • Authentication: OAuth 2.0, API Key, IAM, and custom auth providers
  • Rate Limiting: Token bucket rate limiter with adaptive limits
  • Retry Logic: Exponential backoff with jitter
  • Metrics: Prometheus-compatible metrics collection
  • Testing: Mock connector and test harness

Quick Start

1. Create Your Connector

package myconnector

import (
"context"
"axonflow/platform/connectors/sdk"
"axonflow/platform/connectors/base"
)

type MyConnector struct {
sdk.BaseConnector
client *http.Client
apiKey string
}

func NewMyConnector() *MyConnector {
return &MyConnector{
BaseConnector: sdk.BaseConnector{
ConnectorType: "myconnector",
ConnectorVersion: "1.0.0",
ConnectorCaps: []string{"query", "execute"},
},
client: &http.Client{Timeout: 30 * time.Second},
}
}

2. Implement Connect

func (c *MyConnector) Connect(ctx context.Context, config *base.ConnectorConfig) error {
// Call base connect for validation
if err := c.BaseConnector.Connect(ctx, config); err != nil {
return err
}

// Get credentials
c.apiKey = config.Credentials["api_key"]
if c.apiKey == "" {
return &base.ConnectorError{
Code: "MISSING_CREDENTIALS",
Message: "api_key is required",
}
}

c.SetConnected(true)
return nil
}

3. Implement Query

func (c *MyConnector) Query(ctx context.Context, q *base.Query) (*base.QueryResult, error) {
if !c.IsConnected() {
return nil, &base.ConnectorError{
Code: "NOT_CONNECTED",
Message: "connector is not connected",
}
}

start := time.Now()
defer func() {
c.GetMetrics().RecordQuery(time.Since(start))
}()

// Build request
req, err := http.NewRequestWithContext(ctx, "GET",
fmt.Sprintf("https://api.example.com/%s", q.Statement), nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+c.apiKey)

// Execute request
resp, err := c.client.Do(req)
if err != nil {
c.GetMetrics().RecordQueryError()
return nil, err
}
defer resp.Body.Close()

// Parse response
var data map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
return nil, err
}

return &base.QueryResult{
Rows: []map[string]interface{}{data},
}, nil
}

4. Implement Execute

func (c *MyConnector) Execute(ctx context.Context, cmd *base.Command) (*base.CommandResult, error) {
if !c.IsConnected() {
return nil, &base.ConnectorError{
Code: "NOT_CONNECTED",
Message: "connector is not connected",
}
}

start := time.Now()
defer func() {
c.GetMetrics().RecordExecute(time.Since(start))
}()

// Build request body
body, _ := json.Marshal(cmd.Parameters)

req, err := http.NewRequestWithContext(ctx, "POST",
fmt.Sprintf("https://api.example.com/%s", cmd.Action),
bytes.NewReader(body))
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+c.apiKey)
req.Header.Set("Content-Type", "application/json")

resp, err := c.client.Do(req)
if err != nil {
c.GetMetrics().RecordExecuteError()
return nil, err
}
defer resp.Body.Close()

return &base.CommandResult{
Success: resp.StatusCode < 300,
AffectedRows: 1,
}, nil
}

Authentication Providers

API Key

auth := sdk.NewAPIKeyAuth("your-api-key", "X-API-Key", sdk.APIKeyHeader)
// Or in query parameter
auth := sdk.NewAPIKeyAuth("your-api-key", "api_key", sdk.APIKeyQuery)

Bearer Token

auth := sdk.NewBearerTokenAuth("your-token")

Basic Auth

auth := sdk.NewBasicAuth("username", "password")

OAuth 2.0

auth := sdk.NewOAuthAuth(
"https://auth.example.com/token",
"client-id",
"client-secret",
[]string{"read", "write"},
)

// Token auto-refresh is handled automatically

AWS IAM (Signature V4)

auth := sdk.NewIAMAuth(
"AKIAIOSFODNN7EXAMPLE",
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"us-east-1",
"execute-api",
)

Using Auth Providers

func (c *MyConnector) Connect(ctx context.Context, config *base.ConnectorConfig) error {
c.auth = sdk.NewOAuthAuth(
config.Options["token_url"].(string),
config.Credentials["client_id"],
config.Credentials["client_secret"],
[]string{"read"},
)
return nil
}

func (c *MyConnector) Query(ctx context.Context, q *base.Query) (*base.QueryResult, error) {
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)

// Apply authentication
if err := c.auth.Apply(req); err != nil {
return nil, err
}

// Make request...
}

Rate Limiting

Basic Rate Limiter

// 100 requests per second, burst of 10
limiter := sdk.NewRateLimiter(100, 10)

func (c *MyConnector) Query(ctx context.Context, q *base.Query) (*base.QueryResult, error) {
// Wait for rate limit token
if err := limiter.Wait(ctx); err != nil {
return nil, err // Context cancelled
}

// Make request...
}

Adaptive Rate Limiter

Automatically adjusts based on API response headers:

limiter := sdk.NewAdaptiveRateLimiter(100, 10)

resp, err := client.Do(req)
if err == nil {
// Update limits from response headers
limiter.UpdateFromHeaders(resp.Header)
}

Multi-Tenant Rate Limiting

Per-tenant rate limiting:

limiter := sdk.NewMultiTenantRateLimiter(func() *sdk.RateLimiter {
return sdk.NewRateLimiter(10, 5) // 10 req/s per tenant
})

func (c *MyConnector) Query(ctx context.Context, q *base.Query) (*base.QueryResult, error) {
tenantID := q.Parameters["tenant_id"].(string)
if err := limiter.Wait(ctx, tenantID); err != nil {
return nil, err
}
// Make request...
}

Retry Logic

Basic Retry

config := sdk.DefaultRetryConfig()

result, err := sdk.RetryWithBackoff(ctx, config, func(ctx context.Context) (string, error) {
return callExternalAPI()
})

Custom Retry Config

config := sdk.RetryConfig{
MaxRetries: 5,
InitialBackoff: 100 * time.Millisecond,
MaxBackoff: 30 * time.Second,
BackoffFactor: 2.0,
Jitter: 0.1,
RetryIf: sdk.DefaultRetryable, // Retry on 429, 500-599
}

Circuit Breaker

cb := sdk.NewCircuitBreaker(5, 30*time.Second) // Open after 5 failures

func (c *MyConnector) Query(ctx context.Context, q *base.Query) (*base.QueryResult, error) {
if !cb.Allow() {
return nil, errors.New("circuit breaker open")
}

result, err := c.doQuery(ctx, q)
if err != nil {
cb.RecordFailure()
return nil, err
}

cb.RecordSuccess()
return result, nil
}

Metrics

Recording Metrics

func NewMyConnector() *MyConnector {
c := &MyConnector{
BaseConnector: sdk.BaseConnector{
ConnectorType: "myconnector",
},
}
// Metrics are automatically initialized by BaseConnector
return c
}

func (c *MyConnector) Query(ctx context.Context, q *base.Query) (*base.QueryResult, error) {
start := time.Now()

result, err := c.doQuery(ctx, q)

if err != nil {
c.GetMetrics().RecordQueryError()
} else {
c.GetMetrics().RecordQuery(time.Since(start))
}

return result, err
}

Available Metrics

metrics.RecordQuery(duration)      // Query latency
metrics.RecordQueryError() // Query errors
metrics.RecordExecute(duration) // Execute latency
metrics.RecordExecuteError() // Execute errors
metrics.RecordHealthCheck(dur, ok) // Health check
metrics.RecordConnect() // Connection established
metrics.RecordDisconnect() // Disconnected

// Get stats
stats := metrics.GetStats()
// Returns: query_count, query_errors, avg_latency, etc.

Testing

Mock Connector

func TestMyHandler(t *testing.T) {
mock := sdk.NewMockConnector()
mock.SetQueryResult(&base.QueryResult{
Rows: []map[string]interface{}{
{"id": 1, "name": "test"},
},
})

// Use mock in your handler
handler := NewHandler(mock)
result, err := handler.HandleQuery(ctx, query)

// Assert results
assert.NoError(t, err)
assert.Equal(t, 1, len(result.Rows))
}

Test Harness

func TestMyConnector(t *testing.T) {
conn := NewMyConnector()
harness := sdk.NewTestHarness(conn)

config := &base.ConnectorConfig{
Name: "test",
Credentials: map[string]string{
"api_key": "test-key",
},
}

// Test connection
harness.TestConnection(t, config)

// Test query
harness.TestQuery(t, &base.Query{
Statement: "users",
})

// Test execute
harness.TestExecute(t, &base.Command{
Action: "create_user",
Parameters: map[string]interface{}{
"name": "test",
},
})
}

Benchmark Harness

func BenchmarkMyConnector(b *testing.B) {
conn := NewMyConnector()
conn.Connect(ctx, config)

harness := sdk.NewBenchmarkHarness(conn)
harness.BenchmarkQuery(b, &base.Query{Statement: "users"})
}

Registration

Register your connector with the MCP registry:

// In platform/agent/mcp_handler.go

func registerMyConnector() error {
config := loadMyConnectorConfig()
if config == nil {
return nil // Skip if not configured
}

conn := myconnector.NewMyConnector()
if err := conn.Connect(context.Background(), config); err != nil {
return err
}

return mcpRegistry.Register("my-connector", conn)
}

Error Types

The SDK defines standard error types for consistent error handling across connectors:

Error CodeDescriptionRetryableExample
CONNECTION_ERRORFailed to establish connection to the backend serviceYesNetwork unreachable, DNS resolution failure
AUTH_ERRORAuthentication or authorization failureNoInvalid API key, expired token, insufficient permissions
TIMEOUT_ERROROperation exceeded the configured timeoutYesSlow query, unresponsive service
VALIDATION_ERRORInvalid input parameters or configurationNoMissing required field, invalid query syntax
NOT_CONNECTEDConnector has not been connected or connection was lostYesCalling Query before Connect, connection dropped
RATE_LIMITEDBackend service rate limit exceededYesToo many API requests in time window
NOT_FOUNDRequested resource does not existNoTable not found, object deleted
CONFLICTWrite conflict or duplicate resourceNoDuplicate key, optimistic lock failure
INTERNAL_ERRORUnexpected internal error in the connectorNoUnhandled exception, serialization failure

Usage:

return nil, &base.ConnectorError{
Code: "AUTH_ERROR",
Message: "OAuth token expired",
Retryable: false,
Cause: err,
}

The retry logic (see Retry Logic) automatically checks the Retryable field when deciding whether to retry failed operations.

SDK Interface Reference

The connector SDK defines the following core interfaces and methods:

Connector Interface

MethodSignatureDescription
ConnectConnect(ctx context.Context, config *base.ConnectorConfig) errorInitialize the connector with configuration and credentials. Must be called before Query/Execute.
DisconnectDisconnect(ctx context.Context) errorGracefully close connections and release resources.
QueryQuery(ctx context.Context, q *base.Query) (*base.QueryResult, error)Execute a read operation and return results.
ExecuteExecute(ctx context.Context, cmd *base.Command) (*base.CommandResult, error)Execute a write operation (create, update, delete).
HealthCheckHealthCheck(ctx context.Context) (*base.HealthResult, error)Check connector health and return status details.

BaseConnector Methods

MethodDescription
SetConnected(bool)Set the connection state
IsConnected() boolCheck if the connector is currently connected
GetMetrics() *MetricsAccess the metrics recorder for the connector
GetConfig() *ConnectorConfigRetrieve the current connector configuration

Key Types

TypeFieldsDescription
ConnectorConfigName, Type, ConnectionURL, Credentials, Options, TimeoutMs, MaxRetries, TenantIDConfiguration passed to Connect()
QueryStatement, Parameters, OptionsRead operation input
QueryResultRows, RowCount, DurationMs, MetadataRead operation output
CommandAction, Statement, Parameters, OptionsWrite operation input
CommandResultSuccess, AffectedRows, Message, DurationMsWrite operation output
HealthResultHealthy, LatencyMs, Details, TimestampHealth check output
ConnectorErrorCode, Message, Retryable, CauseStructured error with retry hint

Best Practices

  1. Always call BaseConnector methods - They handle lifecycle and metrics
  2. Use context for cancellation - Pass context through all operations
  3. Record metrics - Use GetMetrics() for all operations
  4. Handle errors gracefully - Return ConnectorError with codes
  5. Implement health checks - Override HealthCheck() for meaningful checks
  6. Write tests - Use the test harness for comprehensive testing
  7. Document capabilities - Set accurate ConnectorCaps array

Next Steps

Enterprise Feature

The Connector SDK will be extracted to a separate repository (getaxonflow/axonflow-connector-sdk) to enable third-party connector development. See Issue #325 for details.