微服务模式:构建弹性分布式系统 - Openclaw Skills

作者:互联网

2026-03-29

AI教程

什么是 微服务模式?

此微服务模式技能为开发者从单体架构过渡到可扩展的分布式系统提供了一个结构化的框架。通过利用 Openclaw Skills,开发者可以实施经过验证的服务拆分、数据管理和服务间通信策略,而不会陷入像“分布式单体”这样的常见架构陷阱。

该技能涵盖了核心模式,例如用于迁移的绞杀者模式 (Strangler Fig)、用于分布式事务的 Saga,以及各种弹性模式(如断路器和隔板模式)。它为在复杂的云环境中构建可独立部署、可扩展且能抵御局部故障的系统提供了技术蓝图。

下载入口:https://github.com/openclaw/skills/tree/main/skills/wpank/microservice-patterns

安装与下载

1. ClawHub CLI

从源直接安装技能的最快方式。

npx clawhub@latest install microservice-patterns

2. 手动安装

将技能文件夹复制到以下位置之一

全局模式 ~/.openclaw/skills/ 工作区 /skills/

优先级:工作区 > 本地 > 内置

3. 提示词安装

将此提示词复制到 OpenClaw 即可自动安装。

请帮我使用 Clawhub 安装 microservice-patterns。如果尚未安装 Clawhub,请先安装(npm i -g clawhub)。

微服务模式 应用场景

  • 将单体应用程序分解为易于管理的微服务。
  • 使用同步 REST/gRPC 或异步事件驱动模型实现可靠的服务间通信。
  • 使用 Saga 模式管理分布式数据库之间的数据一致性。
  • 通过弹性模式构建可防止级联故障的容错系统。
  • 在多团队开发环境中建立清晰的服务边界和契约。
微服务模式 工作原理
  1. 分析现有系统,识别业务能力和潜在的有界上下文以进行服务提取。
  2. 定义通信策略,在用于即时响应的同步查询或用于解耦的异步事件之间做出选择。
  3. 实施“每个服务一个数据库”模式,以确保数据隔离和独立的可扩展性。
  4. 使用 Openclaw Skills 应用绞杀者模式等迁移策略,逐步将功能从单体移动到新服务。
  5. 集成断路器和重试等弹性模式,以便在网络或服务故障期间保持系统稳定性。
  6. 使用集成在 API 网关中的标准化存活(Liveness)和就绪(Readiness)探针监控系统健康状况。

微服务模式 配置指南

要在开发工作流中使用这些模式,请确保您的环境与 Openclaw Skills 兼容。为代码示例安装必要的 Python 依赖项:

pip install httpx aiokafka tenacity fastapi uvicorn

根据提供的拆分模式配置您的服务网格或 API 网关以处理路由和服务发现。

微服务模式 数据架构与分类体系

该技能提倡“每个服务一个数据库”的方法,即每个微服务管理自己的模式。数据一致性通过领域事件和 Saga 来维护。

模式 数据组织 一致性模型
每个服务一个数据库 每个服务拥有私有模式 最终一致性(通过事件)
Saga 模式 分布式状态机 补偿事务
事件总线 基于主题的事件流 发布/订阅
CQRS 读写模型分离 最终一致性
name: microservices-patterns
model: reasoning

Microservices Patterns

WHAT

Patterns for building distributed systems: service decomposition, inter-service communication, data management, and resilience. Helps you avoid the "distributed monolith" anti-pattern.

WHEN

  • Decomposing a monolith into microservices
  • Designing service boundaries and contracts
  • Implementing inter-service communication
  • Managing distributed transactions
  • Building resilient distributed systems

KEYWORDS

microservices, service mesh, event-driven, saga, circuit breaker, API gateway, service discovery, distributed transactions, eventual consistency, CQRS


Decision Framework: When to Use Microservices

If you have... Then...
Small team (<5 devs), simple domain Start with monolith
Need independent deployment/scaling Consider microservices
Multiple teams, clear domain boundaries Microservices work well
Tight deadlines, unknown requirements Monolith first, extract later

Rule of thumb: If you can't define clear service boundaries, you're not ready for microservices.


Service Decomposition Patterns

Pattern 1: By Business Capability

Organize services around business functions, not technical layers.

E-commerce Example:
├── order-service       # Order lifecycle
├── payment-service     # Payment processing
├── inventory-service   # Stock management
├── shipping-service    # Fulfillment
└── notification-service # Emails, SMS

Pattern 2: Strangler Fig (Monolith Migration)

Gradually extract from monolith without big-bang rewrites.

1. Identify bounded context to extract
2. Create new microservice
3. Route new traffic to microservice
4. Gradually migrate existing functionality
5. Remove from monolith when complete
# API Gateway routing during migration
async def route_orders(request):
    if request.path.startswith("/api/orders/v2"):
        return await new_order_service.forward(request)
    else:
        return await legacy_monolith.forward(request)

Communication Patterns

Pattern 1: Synchronous (REST/gRPC)

Use for: Queries, when you need immediate response.

import httpx
from tenacity import retry, stop_after_attempt, wait_exponential

class ServiceClient:
    def __init__(self, base_url: str):
        self.base_url = base_url
        self.client = httpx.AsyncClient(timeout=5.0)
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
    async def get(self, path: str):
        """GET with automatic retries."""
        response = await self.client.get(f"{self.base_url}{path}")
        response.raise_for_status()
        return response.json()

# Usage
payment_client = ServiceClient("http://payment-service:8001")
result = await payment_client.get(f"/payments/{payment_id}")

Pattern 2: Asynchronous (Events)

Use for: Commands, when eventual consistency is acceptable.

from aiokafka import AIOKafkaProducer
import json

@dataclass
class DomainEvent:
    event_id: str
    event_type: str
    aggregate_id: str
    occurred_at: datetime
    data: dict

class EventBus:
    def __init__(self, bootstrap_servers: List[str]):
        self.producer = AIOKafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode()
        )
    
    async def publish(self, event: DomainEvent):
        await self.producer.send_and_wait(
            event.event_type,  # Topic = event type
            value=asdict(event),
            key=event.aggregate_id.encode()
        )

# Order service publishes
await event_bus.publish(DomainEvent(
    event_id=str(uuid.uuid4()),
    event_type="OrderCreated",
    aggregate_id=order.id,
    occurred_at=datetime.now(),
    data={"order_id": order.id, "customer_id": order.customer_id}
))

# Inventory service subscribes and reacts
async def handle_order_created(event_data: dict):
    order_id = event_data["data"]["order_id"]
    items = event_data["data"]["items"]
    await reserve_inventory(order_id, items)

When to Use Each

Synchronous Asynchronous
Need immediate response Fire-and-forget
Simple query/response Long-running operations
Low latency required Decoupling is priority
Tight coupling acceptable Eventual consistency OK

Data Patterns

Database Per Service

Each service owns its data. No shared databases.

order-service     → orders_db (PostgreSQL)
payment-service   → payments_db (PostgreSQL)
product-service   → products_db (MongoDB)
analytics-service → analytics_db (ClickHouse)

Saga Pattern (Distributed Transactions)

For operations spanning multiple services that need rollback capability.

class SagaStep:
    def __init__(self, name: str, action: Callable, compensation: Callable):
        self.name = name
        self.action = action
        self.compensation = compensation

class OrderFulfillmentSaga:
    def __init__(self):
        self.steps = [
            SagaStep("create_order", self.create_order, self.cancel_order),
            SagaStep("reserve_inventory", self.reserve_inventory, self.release_inventory),
            SagaStep("process_payment", self.process_payment, self.refund_payment),
            SagaStep("confirm_order", self.confirm_order, self.cancel_confirmation),
        ]
    
    async def execute(self, order_data: dict) -> SagaResult:
        completed_steps = []
        context = {"order_data": order_data}
        
        for step in self.steps:
            try:
                result = await step.action(context)
                if not result.success:
                    await self.compensate(completed_steps, context)
                    return SagaResult(status="failed", error=result.error)
                completed_steps.append(step)
                context.update(result.data)
            except Exception as e:
                await self.compensate(completed_steps, context)
                return SagaResult(status="failed", error=str(e))
        
        return SagaResult(status="completed", data=context)
    
    async def compensate(self, completed_steps: List[SagaStep], context: dict):
        """Execute compensating actions in reverse order."""
        for step in reversed(completed_steps):
            try:
                await step.compensation(context)
            except Exception as e:
                # Log but continue compensating
                logger.error(f"Compensation failed for {step.name}: {e}")

Resilience Patterns

Circuit Breaker

Fail fast when a service is down. Prevents cascade failures.

from enum import Enum
from datetime import datetime, timedelta

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open" # Testing recovery

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 30,
        success_threshold: int = 2
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold
        self.failure_count = 0
        self.success_count = 0
        self.state = CircuitState.CLOSED
        self.opened_at = None
    
    async def call(self, func: Callable, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
            else:
                raise CircuitBreakerOpen("Service unavailable")
        
        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise
    
    def _on_success(self):
        self.failure_count = 0
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
                self.success_count = 0
    
    def _on_failure(self):
        self.failure_count += 1
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            self.opened_at = datetime.now()
    
    def _should_attempt_reset(self) -> bool:
        return datetime.now() - self.opened_at > timedelta(seconds=self.recovery_timeout)

# Usage
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)

async def call_payment_service(data: dict):
    return await breaker.call(payment_client.post, "/payments", json=data)

Retry with Exponential Backoff

For transient failures.

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10),
    retry=retry_if_exception_type((httpx.TimeoutException, httpx.HTTPStatusError))
)
async def fetch_user(user_id: str):
    response = await client.get(f"/users/{user_id}")
    response.raise_for_status()
    return response.json()

Bulkhead

Isolate resources to limit impact of failures.

import asyncio

class Bulkhead:
    def __init__(self, max_concurrent: int):
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def call(self, func: Callable, *args, **kwargs):
        async with self.semaphore:
            return await func(*args, **kwargs)

# Limit concurrent calls to each service
payment_bulkhead = Bulkhead(max_concurrent=10)
inventory_bulkhead = Bulkhead(max_concurrent=20)

result = await payment_bulkhead.call(payment_service.charge, amount)

API Gateway Pattern

Single entry point for all clients.

from fastapi import FastAPI, Depends, HTTPException
from circuitbreaker import circuit

app = FastAPI()

class APIGateway:
    def __init__(self):
        self.clients = {
            "orders": httpx.AsyncClient(base_url="http://order-service:8000"),
            "payments": httpx.AsyncClient(base_url="http://payment-service:8001"),
            "inventory": httpx.AsyncClient(base_url="http://inventory-service:8002"),
        }
    
    @circuit(failure_threshold=5, recovery_timeout=30)
    async def forward(self, service: str, path: str, **kwargs):
        client = self.clients[service]
        response = await client.request(**kwargs, url=path)
        response.raise_for_status()
        return response.json()
    
    async def aggregate(self, order_id: str) -> dict:
        """Aggregate data from multiple services."""
        results = await asyncio.gather(
            self.forward("orders", f"/orders/{order_id}", method="GET"),
            self.forward("payments", f"/payments/order/{order_id}", method="GET"),
            self.forward("inventory", f"/reservations/order/{order_id}", method="GET"),
            return_exceptions=True
        )
        
        return {
            "order": results[0] if not isinstance(results[0], Exception) else None,
            "payment": results[1] if not isinstance(results[1], Exception) else None,
            "inventory": results[2] if not isinstance(results[2], Exception) else None,
        }

gateway = APIGateway()

@app.get("/api/orders/{order_id}")
async def get_order_aggregate(order_id: str):
    return await gateway.aggregate(order_id)

Health Checks

Every service needs liveness and readiness probes.

@app.get("/health/live")
async def liveness():
    """Is the process running?"""
    return {"status": "alive"}

@app.get("/health/ready")
async def readiness():
    """Can we serve traffic?"""
    checks = {
        "database": await check_database(),
        "cache": await check_redis(),
    }
    
    all_healthy = all(checks.values())
    status = "ready" if all_healthy else "not_ready"
    
    return {"status": status, "checks": checks}

NEVER

  • Shared Databases: Creates tight coupling, defeats the purpose
  • Synchronous Chains: A → B → C → D = fragile, slow
  • No Circuit Breakers: One service down takes everything down
  • Distributed Monolith: Services that must deploy together
  • Ignoring Network Failures: Assume the network WILL fail
  • No Compensation Logic: Can't undo failed distributed transactions
  • Starting with Microservices: Always start with a well-structured monolith
  • Chatty Services: Too many inter-service calls = latency death