Saga Integration Examples
Practical code examples for integrating Saga service discovery with your microservices. These examples demonstrate real-world patterns and best practices.
This guide covers:
- Service registration on startup
- Heartbeat implementation
- Service discovery with fallbacks
- Error handling strategies
- Testing with mocks
Gateway Integration
Registering Gateway with Saga
When the gateway starts, register it with Saga to make it discoverable:
use saga_client::SagaServiceDiscovery;
use std::env;
#[actix_web::main]
async fn main() -> std::io::Result<()> {
// Initialize Saga client
let saga_url = env::var("SAGA_SERVICE_URL")
.unwrap_or_else(|_| "http://localhost:8030".to_string());
let redis_url = env::var("REDIS_URL").ok();
let saga_client = SagaServiceDiscovery::new(
saga_url,
redis_url,
Some(30), // Cache TTL
).expect("Failed to initialize Saga client");
// Register gateway service on startup
if let Err(e) = saga_client.register_service(
"gateway",
"http://localhost:8000",
&["rest", "graphql", "grpc"],
).await {
eprintln!("Failed to register gateway: {}", e);
// Continue startup - service can work without registration
}
// Start HTTP server
HttpServer::new(|| {
App::new()
.route("/health", web::get().to(health_check))
})
.bind("0.0.0.0:8000")?
.run()
.await
}
Key Points:
- ✅ Register on startup
- ✅ Handle registration failures gracefully
- ✅ Continue startup even if registration fails
- ✅ Declare all service capabilities
import os
import requests
import atexit
from typing import Optional
class SagaClient:
def __init__(self, saga_url: str):
self.saga_url = saga_url
self.service_name = None
def register(
self,
service_name: str,
service_url: str,
capabilities: list[str]
) -> bool:
"""Register service with Saga"""
try:
response = requests.post(
f"{self.saga_url}/api/v1/services/register",
json={
"service_name": service_name,
"service_url": service_url,
"capabilities": capabilities
},
timeout=5
)
response.raise_for_status()
self.service_name = service_name
print(f"Registered {service_name} with Saga")
return True
except Exception as e:
print(f"Failed to register: {e}")
return False
def unregister(self) -> None:
"""Unregister service from Saga"""
if not self.service_name:
return
try:
requests.delete(
f"{self.saga_url}/api/v1/services/{self.service_name}",
timeout=5
)
print(f"Unregistered {self.service_name} from Saga")
except Exception as e:
print(f"Failed to unregister: {e}")
# Initialize and register
saga_url = os.getenv("SAGA_SERVICE_URL", "http://localhost:8030")
saga_client = SagaClient(saga_url)
# Register on startup
saga_client.register(
"gateway",
"http://localhost:8000",
["rest", "graphql", "grpc"]
)
# Unregister on shutdown
atexit.register(saga_client.unregister)
Discovering Services from Gateway
Discover other services for routing:
use saga_client::SagaServiceDiscovery;
async fn route_to_service(
saga_client: &SagaServiceDiscovery,
service_name: &str,
path: &str,
) -> Result<String, Box<dyn std::error::Error>> {
// Discover service
let service_info = match saga_client.discover_service(service_name).await {
Ok(info) => info,
Err(e) => {
tracing::warn!("Service discovery failed: {}. Using fallback.", e);
// Fallback to static configuration
return get_fallback_url(service_name, path);
}
};
// Make request to discovered service
let client = reqwest::Client::new();
let url = format!("{}{}", service_info.service_url, path);
let response = client
.get(&url)
.send()
.await?;
Ok(response.text().await?)
}
fn get_fallback_url(service_name: &str, path: &str) -> Result<String, Box<dyn std::error::Error>> {
// Fallback to static configuration
let base_url = match service_name {
"authentication" => "http://localhost:8001",
"payment" => "http://localhost:8002",
_ => return Err("Unknown service".into()),
};
Ok(format!("{}{}", base_url, path))
}
import aiohttp
from typing import Optional
async def discover_and_call(
saga_url: str,
service_name: str,
path: str
) -> Optional[dict]:
"""Discover service and make request"""
try:
# Discover service
async with aiohttp.ClientSession() as session:
async with session.get(
f"{saga_url}/api/v1/services/{service_name}"
) as response:
if response.status == 404:
# Service not found - use fallback
return await fallback_call(service_name, path)
response.raise_for_status()
service = await response.json()
# Make request to discovered service
async with session.get(
f"{service['service_url']}{path}"
) as service_response:
return await service_response.json()
except Exception as e:
print(f"Discovery failed: {e}")
return await fallback_call(service_name, path)
async def fallback_call(service_name: str, path: str) -> dict:
"""Fallback to static configuration"""
fallback_urls = {
"authentication": "http://localhost:8001",
"payment": "http://localhost:8002",
}
base_url = fallback_urls.get(service_name)
if not base_url:
raise ValueError(f"Unknown service: {service_name}")
async with aiohttp.ClientSession() as session:
async with session.get(f"{base_url}{path}") as response:
return await response.json()
Service Registration on Startup
Rust Service Example
Complete example of a Rust service that registers with Saga:
use actix_web::{web, App, HttpServer};
use saga_client::SagaServiceDiscovery;
use std::sync::Arc;
use tokio::time::{interval, Duration};
struct AppState {
saga_client: Arc<SagaServiceDiscovery>,
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
// Initialize Saga client
let saga_url = std::env::var("SAGA_SERVICE_URL")
.unwrap_or_else(|_| "http://localhost:8030".to_string());
let redis_url = std::env::var("REDIS_URL").ok();
let saga_client = Arc::new(
SagaServiceDiscovery::new(saga_url, redis_url, Some(30))
.expect("Failed to initialize Saga client")
);
// Register service on startup
let service_name = "my-service";
let service_url = "http://localhost:8001";
match saga_client.register_service(
service_name,
service_url,
&["rest"],
).await {
Ok(info) => {
println!("Registered {} with ID: {}", service_name, info.service_id);
}
Err(e) => {
eprintln!("Registration failed: {}", e);
// Continue startup - service can work without registration
}
}
// Start heartbeat task
let saga_client_clone = saga_client.clone();
let service_name_clone = service_name.to_string();
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(30));
loop {
interval.tick().await;
if let Err(e) = saga_client_clone.refresh_registration(&service_name_clone).await {
eprintln!("Heartbeat failed: {}", e);
// Attempt to re-register
if let Err(e) = saga_client_clone.register_service(
&service_name_clone,
service_url,
&["rest"],
).await {
eprintln!("Re-registration failed: {}", e);
}
}
}
});
// Start HTTP server
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(AppState {
saga_client: saga_client.clone(),
}))
.route("/health", web::get().to(health_check))
})
.bind("0.0.0.0:8001")?
.run()
.await
}
Features:
- ✅ Registration on startup
- ✅ Automatic heartbeat
- ✅ Re-registration on failure
- ✅ Graceful error handling
use saga_client::SagaServiceDiscovery;
#[tokio::main]
async fn main() {
let saga_client = SagaServiceDiscovery::new(
"http://localhost:8030".to_string(),
Some("redis://localhost:6379".to_string()),
Some(30),
)?;
// Register
saga_client.register_service(
"my-service",
"http://localhost:8001",
&["rest"],
).await?;
// Your service code here
}
Python Service Example
Complete Python service with Saga integration:
import os
import asyncio
import aiohttp
from fastapi import FastAPI
from contextlib import asynccontextmanager
SAGA_URL = os.getenv("SAGA_SERVICE_URL", "http://localhost:8030")
SERVICE_NAME = "my-python-service"
SERVICE_URL = os.getenv("SERVICE_URL", "http://localhost:8002")
async def register_service():
"""Register service with Saga"""
async with aiohttp.ClientSession() as session:
try:
async with session.post(
f"{SAGA_URL}/api/v1/services/register",
json={
"service_name": SERVICE_NAME,
"service_url": SERVICE_URL,
"capabilities": ["rest"]
}
) as response:
response.raise_for_status()
result = await response.json()
print(f"Registered {SERVICE_NAME}: {result['service_id']}")
return True
except Exception as e:
print(f"Registration failed: {e}")
return False
async def unregister_service():
"""Unregister service from Saga"""
async with aiohttp.ClientSession() as session:
try:
async with session.delete(
f"{SAGA_URL}/api/v1/services/{SERVICE_NAME}"
) as response:
response.raise_for_status()
print(f"Unregistered {SERVICE_NAME}")
except Exception as e:
print(f"Unregistration failed: {e}")
async def heartbeat_loop():
"""Send periodic heartbeats"""
async with aiohttp.ClientSession() as session:
while True:
try:
async with session.post(
f"{SAGA_URL}/api/v1/services/{SERVICE_NAME}/heartbeat"
) as response:
if response.status == 200:
print("Heartbeat sent")
else:
print(f"Heartbeat failed: {response.status}")
# Attempt re-registration
await register_service()
except Exception as e:
print(f"Heartbeat error: {e}")
await register_service()
await asyncio.sleep(30)
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
await register_service()
heartbeat_task = asyncio.create_task(heartbeat_loop())
yield
# Shutdown
heartbeat_task.cancel()
await unregister_service()
app = FastAPI(lifespan=lifespan)
@app.get("/health")
async def health():
return {"status": "healthy"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8002)
from flask import Flask
import atexit
import requests
import threading
import time
app = Flask(__name__)
SAGA_URL = "http://localhost:8030"
SERVICE_NAME = "my-flask-service"
SERVICE_URL = "http://localhost:8003"
def register_service():
"""Register service with Saga"""
try:
response = requests.post(
f"{SAGA_URL}/api/v1/services/register",
json={
"service_name": SERVICE_NAME,
"service_url": SERVICE_URL,
"capabilities": ["rest"]
},
timeout=5
)
response.raise_for_status()
print(f"Registered {SERVICE_NAME}")
except Exception as e:
print(f"Registration failed: {e}")
def unregister_service():
"""Unregister service from Saga"""
try:
requests.delete(
f"{SAGA_URL}/api/v1/services/{SERVICE_NAME}",
timeout=5
)
print(f"Unregistered {SERVICE_NAME}")
except Exception as e:
print(f"Unregistration failed: {e}")
def heartbeat_loop():
"""Send periodic heartbeats"""
while True:
try:
response = requests.post(
f"{SAGA_URL}/api/v1/services/{SERVICE_NAME}/heartbeat",
timeout=5
)
if response.status_code == 200:
print("Heartbeat sent")
else:
register_service() # Re-register on failure
except Exception as e:
print(f"Heartbeat error: {e}")
register_service()
time.sleep(30)
# Register on startup
register_service()
# Start heartbeat thread
heartbeat_thread = threading.Thread(target=heartbeat_loop, daemon=True)
heartbeat_thread.start()
# Unregister on shutdown
atexit.register(unregister_service)
@app.route("/health")
def health():
return {"status": "healthy"}
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8003)
Heartbeat Implementation
Automatic Heartbeat
Keep service registration alive with periodic heartbeats:
use tokio::time::{interval, Duration};
async fn start_heartbeat(
saga_client: Arc<SagaServiceDiscovery>,
service_name: String,
service_url: String,
capabilities: Vec<&str>,
) {
let mut interval = interval(Duration::from_secs(30));
loop {
interval.tick().await;
// Try to refresh registration
match saga_client.refresh_registration(&service_name).await {
Ok(_) => {
tracing::debug!("Heartbeat sent for {}", service_name);
}
Err(e) => {
tracing::warn!("Heartbeat failed: {}. Attempting re-registration.", e);
// Attempt to re-register
match saga_client.register_service(
&service_name,
&service_url,
&capabilities,
).await {
Ok(_) => {
tracing::info!("Re-registered {}", service_name);
}
Err(e) => {
tracing::error!("Re-registration failed: {}", e);
}
}
}
}
}
}
Usage:
let saga_client = Arc::new(saga_client);
let service_name = "my-service".to_string();
let service_url = "http://localhost:8001".to_string();
tokio::spawn(start_heartbeat(
saga_client.clone(),
service_name,
service_url,
vec!["rest"],
));
import asyncio
import aiohttp
async def heartbeat_loop(saga_url: str, service_name: str):
"""Send periodic heartbeats to Saga"""
async with aiohttp.ClientSession() as session:
while True:
try:
async with session.post(
f"{saga_url}/api/v1/services/{service_name}/heartbeat"
) as response:
if response.status == 200:
print(f"Heartbeat sent for {service_name}")
else:
print(f"Heartbeat failed: {response.status}")
except Exception as e:
print(f"Heartbeat error: {e}")
await asyncio.sleep(30)
# Start heartbeat in background
asyncio.create_task(heartbeat_loop("http://localhost:8030", "my-service"))
- Send heartbeats every 30 seconds (half of TTL)
- Handle failures gracefully - Re-register if heartbeat fails
- Use background tasks - Don't block main application
- Monitor heartbeat success - Log failures for debugging
Service Discovery Patterns
Cache-First Discovery
Use Saga client's built-in caching for optimal performance:
use saga_client::SagaServiceDiscovery;
async fn discover_with_cache(
saga_client: &SagaServiceDiscovery,
service_name: &str,
) -> Result<String, Box<dyn std::error::Error>> {
// First call hits Redis, subsequent calls use cache
let service = saga_client.discover_service(service_name).await?;
Ok(service.service_url)
}
Benefits:
- ✅ Fast subsequent lookups (cache hit)
- ✅ Reduced Redis load
- ✅ Automatic cache refresh
from functools import lru_cache
import time
class CachedSagaClient:
def __init__(self, saga_url: str, cache_ttl: int = 30):
self.saga_url = saga_url
self.cache_ttl = cache_ttl
self._cache = {}
self._cache_times = {}
async def discover(self, service_name: str) -> dict:
"""Discover service with caching"""
now = time.time()
# Check cache
if service_name in self._cache:
cached_time = self._cache_times.get(service_name, 0)
if now - cached_time < self.cache_ttl:
return self._cache[service_name]
# Cache miss - query Saga
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.saga_url}/api/v1/services/{service_name}"
) as response:
response.raise_for_status()
service = await response.json()
# Update cache
self._cache[service_name] = service
self._cache_times[service_name] = now
return service
# Usage
client = CachedSagaClient("http://localhost:8030")
service = await client.discover("authentication")
Fallback Pattern
Implement fallback to static configuration:
use std::collections::HashMap;
async fn discover_with_fallback(
saga_client: &SagaServiceDiscovery,
service_name: &str,
) -> String {
// Try Saga discovery first
match saga_client.discover_service(service_name).await {
Ok(service_info) => {
tracing::info!("Discovered {} via Saga", service_name);
return service_info.service_url;
}
Err(e) => {
tracing::warn!("Saga discovery failed: {}. Using fallback.", e);
}
}
// Fallback to static configuration
let fallback_urls: HashMap<&str, &str> = [
("authentication", "http://localhost:8001"),
("payment", "http://localhost:8002"),
("gateway", "http://localhost:8000"),
].iter().cloned().collect();
fallback_urls.get(service_name)
.map(|url| url.to_string())
.unwrap_or_else(|| {
panic!("Unknown service: {} and no fallback", service_name)
})
}
FALLBACK_URLS = {
"authentication": "http://localhost:8001",
"payment": "http://localhost:8002",
"gateway": "http://localhost:8000",
}
async def discover_with_fallback(
saga_url: str,
service_name: str
) -> str:
"""Discover service with fallback"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{saga_url}/api/v1/services/{service_name}"
) as response:
if response.status == 200:
service = await response.json()
return service["service_url"]
except Exception as e:
print(f"Discovery failed: {e}")
# Fallback
if service_name in FALLBACK_URLS:
return FALLBACK_URLS[service_name]
raise ValueError(f"Unknown service: {service_name}")
Error Handling
Graceful Degradation
Handle failures gracefully to ensure service availability:
use saga_client::SagaServiceDiscovery;
async fn call_service_safely(
saga_client: &SagaServiceDiscovery,
service_name: &str,
path: &str,
) -> Result<String, Box<dyn std::error::Error>> {
// Try to discover service
let service_url = match saga_client.discover_service(service_name).await {
Ok(info) => {
tracing::info!("Discovered {}: {}", service_name, info.service_url);
info.service_url
}
Err(e) => {
tracing::warn!("Service discovery failed: {}. Using fallback.", e);
// Use fallback URL
get_fallback_url(service_name)?
}
};
// Make request with retry logic
let client = reqwest::Client::new();
let url = format!("{}{}", service_url, path);
// Retry up to 3 times
for attempt in 1..=3 {
match client.get(&url).send().await {
Ok(response) => {
if response.status().is_success() {
return Ok(response.text().await?);
}
}
Err(e) => {
if attempt == 3 {
return Err(e.into());
}
tracing::warn!("Request failed (attempt {}): {}", attempt, e);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
}
Err("Max retries exceeded".into())
}
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10)
)
async def call_service_safely(
saga_url: str,
service_name: str,
path: str
) -> dict:
"""Call service with retry logic"""
try:
# Discover service
service_url = await discover_with_fallback(saga_url, service_name)
# Make request
async with aiohttp.ClientSession() as session:
async with session.get(f"{service_url}{path}") as response:
response.raise_for_status()
return await response.json()
except Exception as e:
print(f"Service call failed: {e}")
raise
Testing Integration
Mock Saga for Testing
Test your services without a running Saga instance:
#[cfg(test)]
mod tests {
use wiremock::{MockServer, Mock, ResponseTemplate};
use wiremock::matchers::{method, path};
async fn setup_mock_saga() -> MockServer {
let mock_server = MockServer::start().await;
// Mock service discovery endpoint
Mock::given(method("GET"))
.and(path("/api/v1/services/authentication"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"service_name": "authentication",
"service_url": "http://localhost:8001",
"service_id": "test-id",
"registered_at": "2025-01-01T00:00:00Z",
"last_heartbeat": "2025-01-01T00:00:00Z",
"capabilities": ["rest"]
})))
.mount(&mock_server)
.await;
mock_server
}
#[tokio::test]
async fn test_service_discovery() {
let mock_saga = setup_mock_saga().await;
let saga_url = mock_saga.uri();
// Test your service discovery logic
// ...
}
}
import pytest
from unittest.mock import AsyncMock, patch
import aiohttp
@pytest.fixture
async def mock_saga():
"""Mock Saga server for testing"""
async with aiohttp.ClientSession() as session:
# Mock responses
with patch('aiohttp.ClientSession.get') as mock_get:
mock_response = AsyncMock()
mock_response.status = 200
mock_response.json = AsyncMock(return_value={
"service_name": "authentication",
"service_url": "http://localhost:8001",
"service_id": "test-id",
"capabilities": ["rest"]
})
mock_get.return_value.__aenter__.return_value = mock_response
yield mock_get
@pytest.mark.asyncio
async def test_service_discovery(mock_saga):
"""Test service discovery with mock"""
# Your test code here
pass
Best Practices Summary
-
Register on Startup
- Register your service when it starts
- Handle registration failures gracefully
- Continue startup even if registration fails
-
Implement Heartbeats
- Send heartbeats every 30 seconds
- Handle heartbeat failures
- Re-register if heartbeat fails
-
Handle Failures Gracefully
- Implement fallback mechanisms
- Use static configuration as backup
- Log failures for monitoring
-
Use Caching
- Leverage Saga's built-in caching
- Implement client-side caching if needed
- Monitor cache hit ratios
-
Monitor Health
- Check Saga health endpoint regularly
- Monitor service registration status
- Alert on discovery failures
-
Unregister on Shutdown
- Clean up registrations when services stop
- Use graceful shutdown handlers
- Handle shutdown errors
Next Steps
- Review API Reference for endpoint details
- Check Configuration for environment setup
- See Troubleshooting for common issues