Connector SDK
The AxonFlow Connector SDK provides a framework for building custom MCP connectors. Use it to integrate AxonFlow with your internal systems, databases, or third-party APIs.
Overview
The SDK provides:
- BaseConnector: Embeddable base with lifecycle, metrics, and health checks
- Authentication: OAuth 2.0, API Key, IAM, and custom auth providers
- Rate Limiting: Token bucket rate limiter with adaptive limits
- Retry Logic: Exponential backoff with jitter
- Metrics: Prometheus-compatible metrics collection
- Testing: Mock connector and test harness
Quick Start
1. Create Your Connector
package myconnector
import (
"context"
"axonflow/platform/connectors/sdk"
"axonflow/platform/connectors/base"
)
type MyConnector struct {
sdk.BaseConnector
client *http.Client
apiKey string
}
func NewMyConnector() *MyConnector {
return &MyConnector{
BaseConnector: sdk.BaseConnector{
ConnectorType: "myconnector",
ConnectorVersion: "1.0.0",
ConnectorCaps: []string{"query", "execute"},
},
client: &http.Client{Timeout: 30 * time.Second},
}
}
2. Implement Connect
func (c *MyConnector) Connect(ctx context.Context, config *base.ConnectorConfig) error {
// Call base connect for validation
if err := c.BaseConnector.Connect(ctx, config); err != nil {
return err
}
// Get credentials
c.apiKey = config.Credentials["api_key"]
if c.apiKey == "" {
return &base.ConnectorError{
Code: "MISSING_CREDENTIALS",
Message: "api_key is required",
}
}
c.SetConnected(true)
return nil
}
3. Implement Query
func (c *MyConnector) Query(ctx context.Context, q *base.Query) (*base.QueryResult, error) {
if !c.IsConnected() {
return nil, &base.ConnectorError{
Code: "NOT_CONNECTED",
Message: "connector is not connected",
}
}
start := time.Now()
defer func() {
c.GetMetrics().RecordQuery(time.Since(start))
}()
// Build request
req, err := http.NewRequestWithContext(ctx, "GET",
fmt.Sprintf("https://api.example.com/%s", q.Statement), nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+c.apiKey)
// Execute request
resp, err := c.client.Do(req)
if err != nil {
c.GetMetrics().RecordQueryError()
return nil, err
}
defer resp.Body.Close()
// Parse response
var data map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
return nil, err
}
return &base.QueryResult{
Rows: []map[string]interface{}{data},
}, nil
}
4. Implement Execute
func (c *MyConnector) Execute(ctx context.Context, cmd *base.Command) (*base.CommandResult, error) {
if !c.IsConnected() {
return nil, &base.ConnectorError{
Code: "NOT_CONNECTED",
Message: "connector is not connected",
}
}
start := time.Now()
defer func() {
c.GetMetrics().RecordExecute(time.Since(start))
}()
// Build request body
body, _ := json.Marshal(cmd.Parameters)
req, err := http.NewRequestWithContext(ctx, "POST",
fmt.Sprintf("https://api.example.com/%s", cmd.Action),
bytes.NewReader(body))
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+c.apiKey)
req.Header.Set("Content-Type", "application/json")
resp, err := c.client.Do(req)
if err != nil {
c.GetMetrics().RecordExecuteError()
return nil, err
}
defer resp.Body.Close()
return &base.CommandResult{
Success: resp.StatusCode < 300,
AffectedRows: 1,
}, nil
}
Authentication Providers
API Key
auth := sdk.NewAPIKeyAuth("your-api-key", "X-API-Key", sdk.APIKeyHeader)
// Or in query parameter
auth := sdk.NewAPIKeyAuth("your-api-key", "api_key", sdk.APIKeyQuery)
Bearer Token
auth := sdk.NewBearerTokenAuth("your-token")
Basic Auth
auth := sdk.NewBasicAuth("username", "password")
OAuth 2.0
auth := sdk.NewOAuthAuth(
"https://auth.example.com/token",
"client-id",
"client-secret",
[]string{"read", "write"},
)
// Token auto-refresh is handled automatically
AWS IAM (Signature V4)
auth := sdk.NewIAMAuth(
"AKIAIOSFODNN7EXAMPLE",
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"us-east-1",
"execute-api",
)
Using Auth Providers
func (c *MyConnector) Connect(ctx context.Context, config *base.ConnectorConfig) error {
c.auth = sdk.NewOAuthAuth(
config.Options["token_url"].(string),
config.Credentials["client_id"],
config.Credentials["client_secret"],
[]string{"read"},
)
return nil
}
func (c *MyConnector) Query(ctx context.Context, q *base.Query) (*base.QueryResult, error) {
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
// Apply authentication
if err := c.auth.Apply(req); err != nil {
return nil, err
}
// Make request...
}
Rate Limiting
Basic Rate Limiter
// 100 requests per second, burst of 10
limiter := sdk.NewRateLimiter(100, 10)
func (c *MyConnector) Query(ctx context.Context, q *base.Query) (*base.QueryResult, error) {
// Wait for rate limit token
if err := limiter.Wait(ctx); err != nil {
return nil, err // Context cancelled
}
// Make request...
}
Adaptive Rate Limiter
Automatically adjusts based on API response headers:
limiter := sdk.NewAdaptiveRateLimiter(100, 10)
resp, err := client.Do(req)
if err == nil {
// Update limits from response headers
limiter.UpdateFromHeaders(resp.Header)
}
Multi-Tenant Rate Limiting
Per-tenant rate limiting:
limiter := sdk.NewMultiTenantRateLimiter(func() *sdk.RateLimiter {
return sdk.NewRateLimiter(10, 5) // 10 req/s per tenant
})
func (c *MyConnector) Query(ctx context.Context, q *base.Query) (*base.QueryResult, error) {
tenantID := q.Parameters["tenant_id"].(string)
if err := limiter.Wait(ctx, tenantID); err != nil {
return nil, err
}
// Make request...
}
Retry Logic
Basic Retry
config := sdk.DefaultRetryConfig()
result, err := sdk.RetryWithBackoff(ctx, config, func(ctx context.Context) (string, error) {
return callExternalAPI()
})
Custom Retry Config
config := sdk.RetryConfig{
MaxRetries: 5,
InitialBackoff: 100 * time.Millisecond,
MaxBackoff: 30 * time.Second,
BackoffFactor: 2.0,
Jitter: 0.1,
RetryIf: sdk.DefaultRetryable, // Retry on 429, 500-599
}
Circuit Breaker
cb := sdk.NewCircuitBreaker(5, 30*time.Second) // Open after 5 failures
func (c *MyConnector) Query(ctx context.Context, q *base.Query) (*base.QueryResult, error) {
if !cb.Allow() {
return nil, errors.New("circuit breaker open")
}
result, err := c.doQuery(ctx, q)
if err != nil {
cb.RecordFailure()
return nil, err
}
cb.RecordSuccess()
return result, nil
}
Metrics
Recording Metrics
func NewMyConnector() *MyConnector {
c := &MyConnector{
BaseConnector: sdk.BaseConnector{
ConnectorType: "myconnector",
},
}
// Metrics are automatically initialized by BaseConnector
return c
}
func (c *MyConnector) Query(ctx context.Context, q *base.Query) (*base.QueryResult, error) {
start := time.Now()
result, err := c.doQuery(ctx, q)
if err != nil {
c.GetMetrics().RecordQueryError()
} else {
c.GetMetrics().RecordQuery(time.Since(start))
}
return result, err
}
Available Metrics
metrics.RecordQuery(duration) // Query latency
metrics.RecordQueryError() // Query errors
metrics.RecordExecute(duration) // Execute latency
metrics.RecordExecuteError() // Execute errors
metrics.RecordHealthCheck(dur, ok) // Health check
metrics.RecordConnect() // Connection established
metrics.RecordDisconnect() // Disconnected
// Get stats
stats := metrics.GetStats()
// Returns: query_count, query_errors, avg_latency, etc.
Testing
Mock Connector
func TestMyHandler(t *testing.T) {
mock := sdk.NewMockConnector()
mock.SetQueryResult(&base.QueryResult{
Rows: []map[string]interface{}{
{"id": 1, "name": "test"},
},
})
// Use mock in your handler
handler := NewHandler(mock)
result, err := handler.HandleQuery(ctx, query)
// Assert results
assert.NoError(t, err)
assert.Equal(t, 1, len(result.Rows))
}
Test Harness
func TestMyConnector(t *testing.T) {
conn := NewMyConnector()
harness := sdk.NewTestHarness(conn)
config := &base.ConnectorConfig{
Name: "test",
Credentials: map[string]string{
"api_key": "test-key",
},
}
// Test connection
harness.TestConnection(t, config)
// Test query
harness.TestQuery(t, &base.Query{
Statement: "users",
})
// Test execute
harness.TestExecute(t, &base.Command{
Action: "create_user",
Parameters: map[string]interface{}{
"name": "test",
},
})
}
Benchmark Harness
func BenchmarkMyConnector(b *testing.B) {
conn := NewMyConnector()
conn.Connect(ctx, config)
harness := sdk.NewBenchmarkHarness(conn)
harness.BenchmarkQuery(b, &base.Query{Statement: "users"})
}
Registration
Register your connector with the MCP registry:
// In platform/agent/mcp_handler.go
func registerMyConnector() error {
config := loadMyConnectorConfig()
if config == nil {
return nil // Skip if not configured
}
conn := myconnector.NewMyConnector()
if err := conn.Connect(context.Background(), config); err != nil {
return err
}
return mcpRegistry.Register("my-connector", conn)
}
Error Types
The SDK defines standard error types for consistent error handling across connectors:
| Error Code | Description | Retryable | Example |
|---|---|---|---|
CONNECTION_ERROR | Failed to establish connection to the backend service | Yes | Network unreachable, DNS resolution failure |
AUTH_ERROR | Authentication or authorization failure | No | Invalid API key, expired token, insufficient permissions |
TIMEOUT_ERROR | Operation exceeded the configured timeout | Yes | Slow query, unresponsive service |
VALIDATION_ERROR | Invalid input parameters or configuration | No | Missing required field, invalid query syntax |
NOT_CONNECTED | Connector has not been connected or connection was lost | Yes | Calling Query before Connect, connection dropped |
RATE_LIMITED | Backend service rate limit exceeded | Yes | Too many API requests in time window |
NOT_FOUND | Requested resource does not exist | No | Table not found, object deleted |
CONFLICT | Write conflict or duplicate resource | No | Duplicate key, optimistic lock failure |
INTERNAL_ERROR | Unexpected internal error in the connector | No | Unhandled exception, serialization failure |
Usage:
return nil, &base.ConnectorError{
Code: "AUTH_ERROR",
Message: "OAuth token expired",
Retryable: false,
Cause: err,
}
The retry logic (see Retry Logic) automatically checks the Retryable field when deciding whether to retry failed operations.
SDK Interface Reference
The connector SDK defines the following core interfaces and methods:
Connector Interface
| Method | Signature | Description |
|---|---|---|
Connect | Connect(ctx context.Context, config *base.ConnectorConfig) error | Initialize the connector with configuration and credentials. Must be called before Query/Execute. |
Disconnect | Disconnect(ctx context.Context) error | Gracefully close connections and release resources. |
Query | Query(ctx context.Context, q *base.Query) (*base.QueryResult, error) | Execute a read operation and return results. |
Execute | Execute(ctx context.Context, cmd *base.Command) (*base.CommandResult, error) | Execute a write operation (create, update, delete). |
HealthCheck | HealthCheck(ctx context.Context) (*base.HealthResult, error) | Check connector health and return status details. |
BaseConnector Methods
| Method | Description |
|---|---|
SetConnected(bool) | Set the connection state |
IsConnected() bool | Check if the connector is currently connected |
GetMetrics() *Metrics | Access the metrics recorder for the connector |
GetConfig() *ConnectorConfig | Retrieve the current connector configuration |
Key Types
| Type | Fields | Description |
|---|---|---|
ConnectorConfig | Name, Type, ConnectionURL, Credentials, Options, TimeoutMs, MaxRetries, TenantID | Configuration passed to Connect() |
Query | Statement, Parameters, Options | Read operation input |
QueryResult | Rows, RowCount, DurationMs, Metadata | Read operation output |
Command | Action, Statement, Parameters, Options | Write operation input |
CommandResult | Success, AffectedRows, Message, DurationMs | Write operation output |
HealthResult | Healthy, LatencyMs, Details, Timestamp | Health check output |
ConnectorError | Code, Message, Retryable, Cause | Structured error with retry hint |
Best Practices
- Always call BaseConnector methods - They handle lifecycle and metrics
- Use context for cancellation - Pass context through all operations
- Record metrics - Use
GetMetrics()for all operations - Handle errors gracefully - Return
ConnectorErrorwith codes - Implement health checks - Override
HealthCheck()for meaningful checks - Write tests - Use the test harness for comprehensive testing
- Document capabilities - Set accurate
ConnectorCapsarray
Next Steps
- MCP Overview - Understanding MCP architecture
- S3 Connector - Example Community connector
- Connector Setup - Enterprise configuration (Enterprise)
The Connector SDK will be extracted to a separate repository (getaxonflow/axonflow-connector-sdk) to enable third-party connector development. See Issue #325 for details.
