Skip to main content

Saga Integration Examples

Practical code examples for integrating Saga service discovery with your microservices. These examples demonstrate real-world patterns and best practices.

Integration Patterns

This guide covers:

  • Service registration on startup
  • Heartbeat implementation
  • Service discovery with fallbacks
  • Error handling strategies
  • Testing with mocks

Gateway Integration

Registering Gateway with Saga

When the gateway starts, register it with Saga to make it discoverable:

use saga_client::SagaServiceDiscovery;
use std::env;

#[actix_web::main]
async fn main() -> std::io::Result<()> {
// Initialize Saga client
let saga_url = env::var("SAGA_SERVICE_URL")
.unwrap_or_else(|_| "http://localhost:8030".to_string());
let redis_url = env::var("REDIS_URL").ok();

let saga_client = SagaServiceDiscovery::new(
saga_url,
redis_url,
Some(30), // Cache TTL
).expect("Failed to initialize Saga client");

// Register gateway service on startup
if let Err(e) = saga_client.register_service(
"gateway",
"http://localhost:8000",
&["rest", "graphql", "grpc"],
).await {
eprintln!("Failed to register gateway: {}", e);
// Continue startup - service can work without registration
}

// Start HTTP server
HttpServer::new(|| {
App::new()
.route("/health", web::get().to(health_check))
})
.bind("0.0.0.0:8000")?
.run()
.await
}

Key Points:

  • ✅ Register on startup
  • ✅ Handle registration failures gracefully
  • ✅ Continue startup even if registration fails
  • ✅ Declare all service capabilities
import os
import requests
import atexit
from typing import Optional

class SagaClient:
def __init__(self, saga_url: str):
self.saga_url = saga_url
self.service_name = None

def register(
self,
service_name: str,
service_url: str,
capabilities: list[str]
) -> bool:
"""Register service with Saga"""
try:
response = requests.post(
f"{self.saga_url}/api/v1/services/register",
json={
"service_name": service_name,
"service_url": service_url,
"capabilities": capabilities
},
timeout=5
)
response.raise_for_status()
self.service_name = service_name
print(f"Registered {service_name} with Saga")
return True
except Exception as e:
print(f"Failed to register: {e}")
return False

def unregister(self) -> None:
"""Unregister service from Saga"""
if not self.service_name:
return

try:
requests.delete(
f"{self.saga_url}/api/v1/services/{self.service_name}",
timeout=5
)
print(f"Unregistered {self.service_name} from Saga")
except Exception as e:
print(f"Failed to unregister: {e}")

# Initialize and register
saga_url = os.getenv("SAGA_SERVICE_URL", "http://localhost:8030")
saga_client = SagaClient(saga_url)

# Register on startup
saga_client.register(
"gateway",
"http://localhost:8000",
["rest", "graphql", "grpc"]
)

# Unregister on shutdown
atexit.register(saga_client.unregister)

Discovering Services from Gateway

Discover other services for routing:

use saga_client::SagaServiceDiscovery;

async fn route_to_service(
saga_client: &SagaServiceDiscovery,
service_name: &str,
path: &str,
) -> Result<String, Box<dyn std::error::Error>> {
// Discover service
let service_info = match saga_client.discover_service(service_name).await {
Ok(info) => info,
Err(e) => {
tracing::warn!("Service discovery failed: {}. Using fallback.", e);
// Fallback to static configuration
return get_fallback_url(service_name, path);
}
};

// Make request to discovered service
let client = reqwest::Client::new();
let url = format!("{}{}", service_info.service_url, path);

let response = client
.get(&url)
.send()
.await?;

Ok(response.text().await?)
}

fn get_fallback_url(service_name: &str, path: &str) -> Result<String, Box<dyn std::error::Error>> {
// Fallback to static configuration
let base_url = match service_name {
"authentication" => "http://localhost:8001",
"payment" => "http://localhost:8002",
_ => return Err("Unknown service".into()),
};

Ok(format!("{}{}", base_url, path))
}
import aiohttp
from typing import Optional

async def discover_and_call(
saga_url: str,
service_name: str,
path: str
) -> Optional[dict]:
"""Discover service and make request"""
try:
# Discover service
async with aiohttp.ClientSession() as session:
async with session.get(
f"{saga_url}/api/v1/services/{service_name}"
) as response:
if response.status == 404:
# Service not found - use fallback
return await fallback_call(service_name, path)

response.raise_for_status()
service = await response.json()

# Make request to discovered service
async with session.get(
f"{service['service_url']}{path}"
) as service_response:
return await service_response.json()
except Exception as e:
print(f"Discovery failed: {e}")
return await fallback_call(service_name, path)

async def fallback_call(service_name: str, path: str) -> dict:
"""Fallback to static configuration"""
fallback_urls = {
"authentication": "http://localhost:8001",
"payment": "http://localhost:8002",
}

base_url = fallback_urls.get(service_name)
if not base_url:
raise ValueError(f"Unknown service: {service_name}")

async with aiohttp.ClientSession() as session:
async with session.get(f"{base_url}{path}") as response:
return await response.json()

Service Registration on Startup

Rust Service Example

Complete example of a Rust service that registers with Saga:

use actix_web::{web, App, HttpServer};
use saga_client::SagaServiceDiscovery;
use std::sync::Arc;
use tokio::time::{interval, Duration};

struct AppState {
saga_client: Arc<SagaServiceDiscovery>,
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
// Initialize Saga client
let saga_url = std::env::var("SAGA_SERVICE_URL")
.unwrap_or_else(|_| "http://localhost:8030".to_string());
let redis_url = std::env::var("REDIS_URL").ok();

let saga_client = Arc::new(
SagaServiceDiscovery::new(saga_url, redis_url, Some(30))
.expect("Failed to initialize Saga client")
);

// Register service on startup
let service_name = "my-service";
let service_url = "http://localhost:8001";

match saga_client.register_service(
service_name,
service_url,
&["rest"],
).await {
Ok(info) => {
println!("Registered {} with ID: {}", service_name, info.service_id);
}
Err(e) => {
eprintln!("Registration failed: {}", e);
// Continue startup - service can work without registration
}
}

// Start heartbeat task
let saga_client_clone = saga_client.clone();
let service_name_clone = service_name.to_string();
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(30));
loop {
interval.tick().await;
if let Err(e) = saga_client_clone.refresh_registration(&service_name_clone).await {
eprintln!("Heartbeat failed: {}", e);
// Attempt to re-register
if let Err(e) = saga_client_clone.register_service(
&service_name_clone,
service_url,
&["rest"],
).await {
eprintln!("Re-registration failed: {}", e);
}
}
}
});

// Start HTTP server
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(AppState {
saga_client: saga_client.clone(),
}))
.route("/health", web::get().to(health_check))
})
.bind("0.0.0.0:8001")?
.run()
.await
}

Features:

  • ✅ Registration on startup
  • ✅ Automatic heartbeat
  • ✅ Re-registration on failure
  • ✅ Graceful error handling
use saga_client::SagaServiceDiscovery;

#[tokio::main]
async fn main() {
let saga_client = SagaServiceDiscovery::new(
"http://localhost:8030".to_string(),
Some("redis://localhost:6379".to_string()),
Some(30),
)?;

// Register
saga_client.register_service(
"my-service",
"http://localhost:8001",
&["rest"],
).await?;

// Your service code here
}

Python Service Example

Complete Python service with Saga integration:

import os
import asyncio
import aiohttp
from fastapi import FastAPI
from contextlib import asynccontextmanager

SAGA_URL = os.getenv("SAGA_SERVICE_URL", "http://localhost:8030")
SERVICE_NAME = "my-python-service"
SERVICE_URL = os.getenv("SERVICE_URL", "http://localhost:8002")

async def register_service():
"""Register service with Saga"""
async with aiohttp.ClientSession() as session:
try:
async with session.post(
f"{SAGA_URL}/api/v1/services/register",
json={
"service_name": SERVICE_NAME,
"service_url": SERVICE_URL,
"capabilities": ["rest"]
}
) as response:
response.raise_for_status()
result = await response.json()
print(f"Registered {SERVICE_NAME}: {result['service_id']}")
return True
except Exception as e:
print(f"Registration failed: {e}")
return False

async def unregister_service():
"""Unregister service from Saga"""
async with aiohttp.ClientSession() as session:
try:
async with session.delete(
f"{SAGA_URL}/api/v1/services/{SERVICE_NAME}"
) as response:
response.raise_for_status()
print(f"Unregistered {SERVICE_NAME}")
except Exception as e:
print(f"Unregistration failed: {e}")

async def heartbeat_loop():
"""Send periodic heartbeats"""
async with aiohttp.ClientSession() as session:
while True:
try:
async with session.post(
f"{SAGA_URL}/api/v1/services/{SERVICE_NAME}/heartbeat"
) as response:
if response.status == 200:
print("Heartbeat sent")
else:
print(f"Heartbeat failed: {response.status}")
# Attempt re-registration
await register_service()
except Exception as e:
print(f"Heartbeat error: {e}")
await register_service()

await asyncio.sleep(30)

@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
await register_service()
heartbeat_task = asyncio.create_task(heartbeat_loop())

yield

# Shutdown
heartbeat_task.cancel()
await unregister_service()

app = FastAPI(lifespan=lifespan)

@app.get("/health")
async def health():
return {"status": "healthy"}

if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8002)
from flask import Flask
import atexit
import requests
import threading
import time

app = Flask(__name__)
SAGA_URL = "http://localhost:8030"
SERVICE_NAME = "my-flask-service"
SERVICE_URL = "http://localhost:8003"

def register_service():
"""Register service with Saga"""
try:
response = requests.post(
f"{SAGA_URL}/api/v1/services/register",
json={
"service_name": SERVICE_NAME,
"service_url": SERVICE_URL,
"capabilities": ["rest"]
},
timeout=5
)
response.raise_for_status()
print(f"Registered {SERVICE_NAME}")
except Exception as e:
print(f"Registration failed: {e}")

def unregister_service():
"""Unregister service from Saga"""
try:
requests.delete(
f"{SAGA_URL}/api/v1/services/{SERVICE_NAME}",
timeout=5
)
print(f"Unregistered {SERVICE_NAME}")
except Exception as e:
print(f"Unregistration failed: {e}")

def heartbeat_loop():
"""Send periodic heartbeats"""
while True:
try:
response = requests.post(
f"{SAGA_URL}/api/v1/services/{SERVICE_NAME}/heartbeat",
timeout=5
)
if response.status_code == 200:
print("Heartbeat sent")
else:
register_service() # Re-register on failure
except Exception as e:
print(f"Heartbeat error: {e}")
register_service()

time.sleep(30)

# Register on startup
register_service()

# Start heartbeat thread
heartbeat_thread = threading.Thread(target=heartbeat_loop, daemon=True)
heartbeat_thread.start()

# Unregister on shutdown
atexit.register(unregister_service)

@app.route("/health")
def health():
return {"status": "healthy"}

if __name__ == "__main__":
app.run(host="0.0.0.0", port=8003)

Heartbeat Implementation

Automatic Heartbeat

Keep service registration alive with periodic heartbeats:

use tokio::time::{interval, Duration};

async fn start_heartbeat(
saga_client: Arc<SagaServiceDiscovery>,
service_name: String,
service_url: String,
capabilities: Vec<&str>,
) {
let mut interval = interval(Duration::from_secs(30));

loop {
interval.tick().await;

// Try to refresh registration
match saga_client.refresh_registration(&service_name).await {
Ok(_) => {
tracing::debug!("Heartbeat sent for {}", service_name);
}
Err(e) => {
tracing::warn!("Heartbeat failed: {}. Attempting re-registration.", e);

// Attempt to re-register
match saga_client.register_service(
&service_name,
&service_url,
&capabilities,
).await {
Ok(_) => {
tracing::info!("Re-registered {}", service_name);
}
Err(e) => {
tracing::error!("Re-registration failed: {}", e);
}
}
}
}
}
}

Usage:

let saga_client = Arc::new(saga_client);
let service_name = "my-service".to_string();
let service_url = "http://localhost:8001".to_string();

tokio::spawn(start_heartbeat(
saga_client.clone(),
service_name,
service_url,
vec!["rest"],
));
import asyncio
import aiohttp

async def heartbeat_loop(saga_url: str, service_name: str):
"""Send periodic heartbeats to Saga"""
async with aiohttp.ClientSession() as session:
while True:
try:
async with session.post(
f"{saga_url}/api/v1/services/{service_name}/heartbeat"
) as response:
if response.status == 200:
print(f"Heartbeat sent for {service_name}")
else:
print(f"Heartbeat failed: {response.status}")
except Exception as e:
print(f"Heartbeat error: {e}")

await asyncio.sleep(30)

# Start heartbeat in background
asyncio.create_task(heartbeat_loop("http://localhost:8030", "my-service"))
Heartbeat Best Practices
  1. Send heartbeats every 30 seconds (half of TTL)
  2. Handle failures gracefully - Re-register if heartbeat fails
  3. Use background tasks - Don't block main application
  4. Monitor heartbeat success - Log failures for debugging

Service Discovery Patterns

Cache-First Discovery

Use Saga client's built-in caching for optimal performance:

use saga_client::SagaServiceDiscovery;

async fn discover_with_cache(
saga_client: &SagaServiceDiscovery,
service_name: &str,
) -> Result<String, Box<dyn std::error::Error>> {
// First call hits Redis, subsequent calls use cache
let service = saga_client.discover_service(service_name).await?;
Ok(service.service_url)
}

Benefits:

  • ✅ Fast subsequent lookups (cache hit)
  • ✅ Reduced Redis load
  • ✅ Automatic cache refresh
from functools import lru_cache
import time

class CachedSagaClient:
def __init__(self, saga_url: str, cache_ttl: int = 30):
self.saga_url = saga_url
self.cache_ttl = cache_ttl
self._cache = {}
self._cache_times = {}

async def discover(self, service_name: str) -> dict:
"""Discover service with caching"""
now = time.time()

# Check cache
if service_name in self._cache:
cached_time = self._cache_times.get(service_name, 0)
if now - cached_time < self.cache_ttl:
return self._cache[service_name]

# Cache miss - query Saga
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.saga_url}/api/v1/services/{service_name}"
) as response:
response.raise_for_status()
service = await response.json()

# Update cache
self._cache[service_name] = service
self._cache_times[service_name] = now

return service

# Usage
client = CachedSagaClient("http://localhost:8030")
service = await client.discover("authentication")

Fallback Pattern

Implement fallback to static configuration:

use std::collections::HashMap;

async fn discover_with_fallback(
saga_client: &SagaServiceDiscovery,
service_name: &str,
) -> String {
// Try Saga discovery first
match saga_client.discover_service(service_name).await {
Ok(service_info) => {
tracing::info!("Discovered {} via Saga", service_name);
return service_info.service_url;
}
Err(e) => {
tracing::warn!("Saga discovery failed: {}. Using fallback.", e);
}
}

// Fallback to static configuration
let fallback_urls: HashMap<&str, &str> = [
("authentication", "http://localhost:8001"),
("payment", "http://localhost:8002"),
("gateway", "http://localhost:8000"),
].iter().cloned().collect();

fallback_urls.get(service_name)
.map(|url| url.to_string())
.unwrap_or_else(|| {
panic!("Unknown service: {} and no fallback", service_name)
})
}
FALLBACK_URLS = {
"authentication": "http://localhost:8001",
"payment": "http://localhost:8002",
"gateway": "http://localhost:8000",
}

async def discover_with_fallback(
saga_url: str,
service_name: str
) -> str:
"""Discover service with fallback"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{saga_url}/api/v1/services/{service_name}"
) as response:
if response.status == 200:
service = await response.json()
return service["service_url"]
except Exception as e:
print(f"Discovery failed: {e}")

# Fallback
if service_name in FALLBACK_URLS:
return FALLBACK_URLS[service_name]

raise ValueError(f"Unknown service: {service_name}")

Error Handling

Graceful Degradation

Handle failures gracefully to ensure service availability:

use saga_client::SagaServiceDiscovery;

async fn call_service_safely(
saga_client: &SagaServiceDiscovery,
service_name: &str,
path: &str,
) -> Result<String, Box<dyn std::error::Error>> {
// Try to discover service
let service_url = match saga_client.discover_service(service_name).await {
Ok(info) => {
tracing::info!("Discovered {}: {}", service_name, info.service_url);
info.service_url
}
Err(e) => {
tracing::warn!("Service discovery failed: {}. Using fallback.", e);
// Use fallback URL
get_fallback_url(service_name)?
}
};

// Make request with retry logic
let client = reqwest::Client::new();
let url = format!("{}{}", service_url, path);

// Retry up to 3 times
for attempt in 1..=3 {
match client.get(&url).send().await {
Ok(response) => {
if response.status().is_success() {
return Ok(response.text().await?);
}
}
Err(e) => {
if attempt == 3 {
return Err(e.into());
}
tracing::warn!("Request failed (attempt {}): {}", attempt, e);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
}

Err("Max retries exceeded".into())
}
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10)
)
async def call_service_safely(
saga_url: str,
service_name: str,
path: str
) -> dict:
"""Call service with retry logic"""
try:
# Discover service
service_url = await discover_with_fallback(saga_url, service_name)

# Make request
async with aiohttp.ClientSession() as session:
async with session.get(f"{service_url}{path}") as response:
response.raise_for_status()
return await response.json()
except Exception as e:
print(f"Service call failed: {e}")
raise

Testing Integration

Mock Saga for Testing

Test your services without a running Saga instance:

#[cfg(test)]
mod tests {
use wiremock::{MockServer, Mock, ResponseTemplate};
use wiremock::matchers::{method, path};

async fn setup_mock_saga() -> MockServer {
let mock_server = MockServer::start().await;

// Mock service discovery endpoint
Mock::given(method("GET"))
.and(path("/api/v1/services/authentication"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"service_name": "authentication",
"service_url": "http://localhost:8001",
"service_id": "test-id",
"registered_at": "2025-01-01T00:00:00Z",
"last_heartbeat": "2025-01-01T00:00:00Z",
"capabilities": ["rest"]
})))
.mount(&mock_server)
.await;

mock_server
}

#[tokio::test]
async fn test_service_discovery() {
let mock_saga = setup_mock_saga().await;
let saga_url = mock_saga.uri();

// Test your service discovery logic
// ...
}
}
import pytest
from unittest.mock import AsyncMock, patch
import aiohttp

@pytest.fixture
async def mock_saga():
"""Mock Saga server for testing"""
async with aiohttp.ClientSession() as session:
# Mock responses
with patch('aiohttp.ClientSession.get') as mock_get:
mock_response = AsyncMock()
mock_response.status = 200
mock_response.json = AsyncMock(return_value={
"service_name": "authentication",
"service_url": "http://localhost:8001",
"service_id": "test-id",
"capabilities": ["rest"]
})
mock_get.return_value.__aenter__.return_value = mock_response

yield mock_get

@pytest.mark.asyncio
async def test_service_discovery(mock_saga):
"""Test service discovery with mock"""
# Your test code here
pass

Best Practices Summary

Integration Best Practices
  1. Register on Startup

    • Register your service when it starts
    • Handle registration failures gracefully
    • Continue startup even if registration fails
  2. Implement Heartbeats

    • Send heartbeats every 30 seconds
    • Handle heartbeat failures
    • Re-register if heartbeat fails
  3. Handle Failures Gracefully

    • Implement fallback mechanisms
    • Use static configuration as backup
    • Log failures for monitoring
  4. Use Caching

    • Leverage Saga's built-in caching
    • Implement client-side caching if needed
    • Monitor cache hit ratios
  5. Monitor Health

    • Check Saga health endpoint regularly
    • Monitor service registration status
    • Alert on discovery failures
  6. Unregister on Shutdown

    • Clean up registrations when services stop
    • Use graceful shutdown handlers
    • Handle shutdown errors

Next Steps