5.4.1 服务注册与发现 #
服务发现是微服务架构的基础设施组件,它解决了在动态环境中服务实例如何相互发现和通信的问题。在传统的单体应用中,服务间的调用通过静态配置的 IP 和端口进行,但在微服务环境中,服务实例会动态创建和销毁,需要一个自动化的机制来管理服务的位置信息。
服务发现基础概念 #
服务发现的核心问题 #
在微服务架构中,服务发现需要解决以下核心问题:
- 服务注册:服务实例启动时如何向注册中心声明自己的存在
- 服务发现:客户端如何找到可用的服务实例
- 健康检查:如何确保返回的服务实例是健康可用的
- 负载均衡:如何在多个服务实例间分配请求
- 故障处理:如何处理服务实例的故障和恢复
服务发现模式 #
客户端发现模式:
┌─────────────┐ 1. 查询服务 ┌─────────────────┐
│ Client │ ───────────────→ │ Service Registry │
│ │ ←─────────────── │ │
└─────────────┘ 2. 返回实例 └─────────────────┘
│
│ 3. 直接调用
▼
┌─────────────┐
│ Service │
│ Instance │
└─────────────┘
服务端发现模式:
┌─────────────┐ 1. 请求 ┌─────────────────┐
│ Client │ ─────────────→ │ Load Balancer │
│ │ │ (Router) │
└─────────────┘ └─────────────────┘
│
│ 2. 路由请求
▼
┌─────────────┐
│ Service │
│ Instance │
└─────────────┘
服务注册中心实现 #
基础服务注册中心 #
// registry/service_registry.go
package registry
import (
"context"
"fmt"
"sync"
"time"
)
// ServiceInstance 服务实例信息
type ServiceInstance struct {
ID string `json:"id"`
Name string `json:"name"`
Address string `json:"address"`
Port int `json:"port"`
Metadata map[string]string `json:"metadata"`
Health HealthStatus `json:"health"`
// 内部字段
LastHeartbeat time.Time `json:"last_heartbeat"`
RegisterTime time.Time `json:"register_time"`
}
// HealthStatus 健康状态
type HealthStatus string
const (
HealthStatusHealthy HealthStatus = "healthy"
HealthStatusUnhealthy HealthStatus = "unhealthy"
HealthStatusUnknown HealthStatus = "unknown"
)
// ServiceRegistry 服务注册中心接口
type ServiceRegistry interface {
Register(ctx context.Context, instance *ServiceInstance) error
Deregister(ctx context.Context, instanceID string) error
Discover(ctx context.Context, serviceName string) ([]*ServiceInstance, error)
Watch(ctx context.Context, serviceName string) (<-chan []*ServiceInstance, error)
Heartbeat(ctx context.Context, instanceID string) error
}
// InMemoryRegistry 内存实现的服务注册中心
type InMemoryRegistry struct {
services map[string]map[string]*ServiceInstance
watchers map[string][]chan []*ServiceInstance
mutex sync.RWMutex
// 健康检查配置
heartbeatTimeout time.Duration
cleanupInterval time.Duration
}
// NewInMemoryRegistry 创建内存注册中心
func NewInMemoryRegistry() *InMemoryRegistry {
registry := &InMemoryRegistry{
services: make(map[string]map[string]*ServiceInstance),
watchers: make(map[string][]chan []*ServiceInstance),
heartbeatTimeout: time.Minute * 2,
cleanupInterval: time.Second * 30,
}
// 启动清理协程
go registry.startCleanup()
return registry
}
// Register 注册服务实例
func (r *InMemoryRegistry) Register(ctx context.Context, instance *ServiceInstance) error {
r.mutex.Lock()
defer r.mutex.Unlock()
// 初始化服务映射
if r.services[instance.Name] == nil {
r.services[instance.Name] = make(map[string]*ServiceInstance)
}
// 设置注册时间和心跳时间
now := time.Now()
instance.RegisterTime = now
instance.LastHeartbeat = now
instance.Health = HealthStatusHealthy
// 注册实例
r.services[instance.Name][instance.ID] = instance
// 通知观察者
r.notifyWatchers(instance.Name)
return nil
}
// Deregister 注销服务实例
func (r *InMemoryRegistry) Deregister(ctx context.Context, instanceID string) error {
r.mutex.Lock()
defer r.mutex.Unlock()
// 查找并删除实例
for serviceName, instances := range r.services {
if _, exists := instances[instanceID]; exists {
delete(instances, instanceID)
// 如果服务没有实例了,删除服务
if len(instances) == 0 {
delete(r.services, serviceName)
}
// 通知观察者
r.notifyWatchers(serviceName)
return nil
}
}
return fmt.Errorf("instance %s not found", instanceID)
}
// Discover 发现服务实例
func (r *InMemoryRegistry) Discover(ctx context.Context, serviceName string) ([]*ServiceInstance, error) {
r.mutex.RLock()
defer r.mutex.RUnlock()
instances := r.services[serviceName]
if instances == nil {
return []*ServiceInstance{}, nil
}
// 只返回健康的实例
var healthyInstances []*ServiceInstance
for _, instance := range instances {
if instance.Health == HealthStatusHealthy {
healthyInstances = append(healthyInstances, instance)
}
}
return healthyInstances, nil
}
// Watch 监听服务变化
func (r *InMemoryRegistry) Watch(ctx context.Context, serviceName string) (<-chan []*ServiceInstance, error) {
r.mutex.Lock()
defer r.mutex.Unlock()
ch := make(chan []*ServiceInstance, 1)
r.watchers[serviceName] = append(r.watchers[serviceName], ch)
// 发送当前状态
if instances := r.services[serviceName]; instances != nil {
var healthyInstances []*ServiceInstance
for _, instance := range instances {
if instance.Health == HealthStatusHealthy {
healthyInstances = append(healthyInstances, instance)
}
}
select {
case ch <- healthyInstances:
default:
}
}
return ch, nil
}
// Heartbeat 心跳检查
func (r *InMemoryRegistry) Heartbeat(ctx context.Context, instanceID string) error {
r.mutex.Lock()
defer r.mutex.Unlock()
// 查找实例并更新心跳时间
for serviceName, instances := range r.services {
if instance, exists := instances[instanceID]; exists {
instance.LastHeartbeat = time.Now()
instance.Health = HealthStatusHealthy
// 通知观察者
r.notifyWatchers(serviceName)
return nil
}
}
return fmt.Errorf("instance %s not found", instanceID)
}
// notifyWatchers 通知观察者
func (r *InMemoryRegistry) notifyWatchers(serviceName string) {
watchers := r.watchers[serviceName]
if len(watchers) == 0 {
return
}
instances := r.services[serviceName]
var healthyInstances []*ServiceInstance
for _, instance := range instances {
if instance.Health == HealthStatusHealthy {
healthyInstances = append(healthyInstances, instance)
}
}
// 通知所有观察者
for _, watcher := range watchers {
select {
case watcher <- healthyInstances:
default:
// 如果通道阻塞,跳过这个观察者
}
}
}
// startCleanup 启动清理协程
func (r *InMemoryRegistry) startCleanup() {
ticker := time.NewTicker(r.cleanupInterval)
defer ticker.Stop()
for range ticker.C {
r.cleanup()
}
}
// cleanup 清理不健康的实例
func (r *InMemoryRegistry) cleanup() {
r.mutex.Lock()
defer r.mutex.Unlock()
now := time.Now()
for serviceName, instances := range r.services {
changed := false
for instanceID, instance := range instances {
// 检查心跳超时
if now.Sub(instance.LastHeartbeat) > r.heartbeatTimeout {
if instance.Health == HealthStatusHealthy {
instance.Health = HealthStatusUnhealthy
changed = true
}
// 如果超时太久,直接删除实例
if now.Sub(instance.LastHeartbeat) > r.heartbeatTimeout*2 {
delete(instances, instanceID)
changed = true
}
}
}
// 如果服务没有实例了,删除服务
if len(instances) == 0 {
delete(r.services, serviceName)
changed = true
}
// 通知观察者
if changed {
r.notifyWatchers(serviceName)
}
}
}
服务注册客户端 #
// client/registry_client.go
package client
import (
"context"
"fmt"
"log"
"net"
"os"
"os/signal"
"syscall"
"time"
"github.com/example/service-discovery/registry"
)
// RegistryClient 注册中心客户端
type RegistryClient struct {
registry registry.ServiceRegistry
instance *registry.ServiceInstance
heartbeatInterval time.Duration
// 控制协程
ctx context.Context
cancel context.CancelFunc
}
// NewRegistryClient 创建注册中心客户端
func NewRegistryClient(reg registry.ServiceRegistry, serviceName, address string, port int) *RegistryClient {
ctx, cancel := context.WithCancel(context.Background())
// 生成实例 ID
hostname, _ := os.Hostname()
instanceID := fmt.Sprintf("%s-%s-%d-%d", serviceName, hostname, port, time.Now().Unix())
instance := ®istry.ServiceInstance{
ID: instanceID,
Name: serviceName,
Address: address,
Port: port,
Metadata: make(map[string]string),
}
return &RegistryClient{
registry: reg,
instance: instance,
heartbeatInterval: time.Second * 30,
ctx: ctx,
cancel: cancel,
}
}
// Start 启动客户端
func (c *RegistryClient) Start() error {
// 注册服务实例
if err := c.registry.Register(c.ctx, c.instance); err != nil {
return fmt.Errorf("failed to register service: %v", err)
}
log.Printf("Service registered: %s (%s:%d)", c.instance.ID, c.instance.Address, c.instance.Port)
// 启动心跳
go c.startHeartbeat()
// 监听退出信号
go c.handleShutdown()
return nil
}
// Stop 停止客户端
func (c *RegistryClient) Stop() error {
c.cancel()
// 注销服务实例
if err := c.registry.Deregister(context.Background(), c.instance.ID); err != nil {
log.Printf("Failed to deregister service: %v", err)
return err
}
log.Printf("Service deregistered: %s", c.instance.ID)
return nil
}
// SetMetadata 设置元数据
func (c *RegistryClient) SetMetadata(key, value string) {
c.instance.Metadata[key] = value
}
// startHeartbeat 启动心跳
func (c *RegistryClient) startHeartbeat() {
ticker := time.NewTicker(c.heartbeatInterval)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
if err := c.registry.Heartbeat(c.ctx, c.instance.ID); err != nil {
log.Printf("Heartbeat failed: %v", err)
}
}
}
}
// handleShutdown 处理优雅关闭
func (c *RegistryClient) handleShutdown() {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
select {
case <-sigCh:
log.Println("Received shutdown signal")
c.Stop()
case <-c.ctx.Done():
return
}
}
健康检查机制 #
HTTP 健康检查 #
// health/http_checker.go
package health
import (
"context"
"fmt"
"net/http"
"time"
)
// HTTPHealthChecker HTTP 健康检查器
type HTTPHealthChecker struct {
client *http.Client
timeout time.Duration
}
// NewHTTPHealthChecker 创建 HTTP 健康检查器
func NewHTTPHealthChecker(timeout time.Duration) *HTTPHealthChecker {
return &HTTPHealthChecker{
client: &http.Client{
Timeout: timeout,
},
timeout: timeout,
}
}
// Check 执行健康检查
func (h *HTTPHealthChecker) Check(ctx context.Context, address string, port int, path string) error {
url := fmt.Sprintf("http://%s:%d%s", address, port, path)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return err
}
resp, err := h.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("health check failed with status: %d", resp.StatusCode)
}
return nil
}
TCP 健康检查 #
// health/tcp_checker.go
package health
import (
"context"
"fmt"
"net"
"time"
)
// TCPHealthChecker TCP 健康检查器
type TCPHealthChecker struct {
timeout time.Duration
}
// NewTCPHealthChecker 创建 TCP 健康检查器
func NewTCPHealthChecker(timeout time.Duration) *TCPHealthChecker {
return &TCPHealthChecker{
timeout: timeout,
}
}
// Check 执行 TCP 连接检查
func (t *TCPHealthChecker) Check(ctx context.Context, address string, port int) error {
dialer := &net.Dialer{
Timeout: t.timeout,
}
conn, err := dialer.DialContext(ctx, "tcp", fmt.Sprintf("%s:%d", address, port))
if err != nil {
return err
}
defer conn.Close()
return nil
}
健康检查管理器 #
// health/manager.go
package health
import (
"context"
"log"
"sync"
"time"
"github.com/example/service-discovery/registry"
)
// HealthCheckManager 健康检查管理器
type HealthCheckManager struct {
registry registry.ServiceRegistry
httpChecker *HTTPHealthChecker
tcpChecker *TCPHealthChecker
checkInterval time.Duration
// 控制协程
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// NewHealthCheckManager 创建健康检查管理器
func NewHealthCheckManager(reg registry.ServiceRegistry) *HealthCheckManager {
ctx, cancel := context.WithCancel(context.Background())
return &HealthCheckManager{
registry: reg,
httpChecker: NewHTTPHealthChecker(time.Second * 5),
tcpChecker: NewTCPHealthChecker(time.Second * 3),
checkInterval: time.Second * 10,
ctx: ctx,
cancel: cancel,
}
}
// Start 启动健康检查
func (h *HealthCheckManager) Start() {
h.wg.Add(1)
go h.runHealthChecks()
}
// Stop 停止健康检查
func (h *HealthCheckManager) Stop() {
h.cancel()
h.wg.Wait()
}
// runHealthChecks 运行健康检查
func (h *HealthCheckManager) runHealthChecks() {
defer h.wg.Done()
ticker := time.NewTicker(h.checkInterval)
defer ticker.Stop()
for {
select {
case <-h.ctx.Done():
return
case <-ticker.C:
h.performHealthChecks()
}
}
}
// performHealthChecks 执行健康检查
func (h *HealthCheckManager) performHealthChecks() {
// 这里需要获取所有服务实例并进行健康检查
// 由于我们的简单实现没有提供获取所有实例的接口,
// 在实际实现中需要扩展注册中心接口
// 示例:检查特定服务的健康状态
instances, err := h.registry.Discover(h.ctx, "example-service")
if err != nil {
log.Printf("Failed to discover services: %v", err)
return
}
for _, instance := range instances {
go h.checkInstance(instance)
}
}
// checkInstance 检查单个实例
func (h *HealthCheckManager) checkInstance(instance *registry.ServiceInstance) {
// 根据元数据决定使用哪种健康检查
checkType := instance.Metadata["health_check_type"]
var err error
switch checkType {
case "http":
path := instance.Metadata["health_check_path"]
if path == "" {
path = "/health"
}
err = h.httpChecker.Check(h.ctx, instance.Address, instance.Port, path)
case "tcp":
err = h.tcpChecker.Check(h.ctx, instance.Address, instance.Port)
default:
// 默认使用 TCP 检查
err = h.tcpChecker.Check(h.ctx, instance.Address, instance.Port)
}
if err != nil {
log.Printf("Health check failed for %s: %v", instance.ID, err)
// 在实际实现中,这里应该更新实例的健康状态
}
}
服务发现客户端 #
服务发现客户端实现 #
// discovery/client.go
package discovery
import (
"context"
"fmt"
"log"
"math/rand"
"sync"
"time"
"github.com/example/service-discovery/registry"
)
// DiscoveryClient 服务发现客户端
type DiscoveryClient struct {
registry registry.ServiceRegistry
// 缓存
cache map[string][]*registry.ServiceInstance
mutex sync.RWMutex
// 监听器
watchers map[string]context.CancelFunc
// 配置
cacheTimeout time.Duration
}
// NewDiscoveryClient 创建服务发现客户端
func NewDiscoveryClient(reg registry.ServiceRegistry) *DiscoveryClient {
return &DiscoveryClient{
registry: reg,
cache: make(map[string][]*registry.ServiceInstance),
watchers: make(map[string]context.CancelFunc),
cacheTimeout: time.Minute * 5,
}
}
// GetService 获取服务实例
func (d *DiscoveryClient) GetService(serviceName string) ([]*registry.ServiceInstance, error) {
// 先检查缓存
d.mutex.RLock()
if instances, exists := d.cache[serviceName]; exists {
d.mutex.RUnlock()
return instances, nil
}
d.mutex.RUnlock()
// 从注册中心获取
instances, err := d.registry.Discover(context.Background(), serviceName)
if err != nil {
return nil, err
}
// 更新缓存
d.mutex.Lock()
d.cache[serviceName] = instances
d.mutex.Unlock()
// 启动监听器(如果还没有)
d.startWatcher(serviceName)
return instances, nil
}
// GetRandomInstance 获取随机服务实例
func (d *DiscoveryClient) GetRandomInstance(serviceName string) (*registry.ServiceInstance, error) {
instances, err := d.GetService(serviceName)
if err != nil {
return nil, err
}
if len(instances) == 0 {
return nil, fmt.Errorf("no available instances for service: %s", serviceName)
}
// 随机选择一个实例
index := rand.Intn(len(instances))
return instances[index], nil
}
// startWatcher 启动服务监听器
func (d *DiscoveryClient) startWatcher(serviceName string) {
d.mutex.Lock()
defer d.mutex.Unlock()
// 如果已经有监听器,直接返回
if _, exists := d.watchers[serviceName]; exists {
return
}
ctx, cancel := context.WithCancel(context.Background())
d.watchers[serviceName] = cancel
go func() {
defer cancel()
ch, err := d.registry.Watch(ctx, serviceName)
if err != nil {
log.Printf("Failed to watch service %s: %v", serviceName, err)
return
}
for {
select {
case <-ctx.Done():
return
case instances := <-ch:
// 更新缓存
d.mutex.Lock()
d.cache[serviceName] = instances
d.mutex.Unlock()
log.Printf("Service %s instances updated: %d instances", serviceName, len(instances))
}
}
}()
}
// Stop 停止客户端
func (d *DiscoveryClient) Stop() {
d.mutex.Lock()
defer d.mutex.Unlock()
// 停止所有监听器
for _, cancel := range d.watchers {
cancel()
}
d.watchers = make(map[string]context.CancelFunc)
d.cache = make(map[string][]*registry.ServiceInstance)
}
实际应用示例 #
HTTP 服务示例 #
// example/http_service.go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"strconv"
"time"
"github.com/example/service-discovery/client"
"github.com/example/service-discovery/discovery"
"github.com/example/service-discovery/registry"
)
func main() {
// 从环境变量获取配置
serviceName := getEnv("SERVICE_NAME", "example-service")
port, _ := strconv.Atoi(getEnv("PORT", "8080"))
address := getEnv("ADDRESS", "localhost")
// 创建注册中心
reg := registry.NewInMemoryRegistry()
// 创建注册客户端
regClient := client.NewRegistryClient(reg, serviceName, address, port)
regClient.SetMetadata("version", "1.0.0")
regClient.SetMetadata("health_check_type", "http")
regClient.SetMetadata("health_check_path", "/health")
// 创建发现客户端
discoveryClient := discovery.NewDiscoveryClient(reg)
// 创建 HTTP 服务器
mux := http.NewServeMux()
// 健康检查端点
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(map[string]string{
"status": "healthy",
"time": time.Now().Format(time.RFC3339),
})
})
// 服务信息端点
mux.HandleFunc("/info", func(w http.ResponseWriter, r *http.Request) {
hostname, _ := os.Hostname()
json.NewEncoder(w).Encode(map[string]interface{}{
"service": serviceName,
"hostname": hostname,
"address": address,
"port": port,
"time": time.Now().Format(time.RFC3339),
})
})
// 调用其他服务的端点
mux.HandleFunc("/call", func(w http.ResponseWriter, r *http.Request) {
targetService := r.URL.Query().Get("service")
if targetService == "" {
http.Error(w, "service parameter required", http.StatusBadRequest)
return
}
// 发现目标服务
instance, err := discoveryClient.GetRandomInstance(targetService)
if err != nil {
http.Error(w, fmt.Sprintf("Service discovery failed: %v", err), http.StatusServiceUnavailable)
return
}
// 调用目标服务
url := fmt.Sprintf("http://%s:%d/info", instance.Address, instance.Port)
resp, err := http.Get(url)
if err != nil {
http.Error(w, fmt.Sprintf("Service call failed: %v", err), http.StatusServiceUnavailable)
return
}
defer resp.Body.Close()
// 返回响应
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(resp.StatusCode)
var result map[string]interface{}
json.NewDecoder(resp.Body).Decode(&result)
json.NewEncoder(w).Encode(map[string]interface{}{
"caller": map[string]interface{}{
"service": serviceName,
"address": address,
"port": port,
},
"target": result,
})
})
// 启动注册客户端
if err := regClient.Start(); err != nil {
log.Fatal("Failed to start registry client:", err)
}
defer regClient.Stop()
// 启动 HTTP 服务器
server := &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: mux,
}
log.Printf("Starting HTTP server on port %d", port)
log.Fatal(server.ListenAndServe())
}
func getEnv(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}
使用示例 #
# 启动第一个服务实例
SERVICE_NAME=user-service PORT=8080 go run example/http_service.go
# 启动第二个服务实例
SERVICE_NAME=user-service PORT=8081 go run example/http_service.go
# 启动另一个服务
SERVICE_NAME=order-service PORT=8082 go run example/http_service.go
# 测试服务发现
curl http://localhost:8080/info
curl http://localhost:8080/call?service=order-service
监控和指标 #
// metrics/registry_metrics.go
package metrics
import (
"sync"
"time"
)
// RegistryMetrics 注册中心指标
type RegistryMetrics struct {
// 服务统计
TotalServices int64
TotalInstances int64
HealthyInstances int64
// 操作统计
RegisterCount int64
DeregisterCount int64
DiscoverCount int64
HeartbeatCount int64
// 性能指标
AvgResponseTime time.Duration
mutex sync.RWMutex
}
// NewRegistryMetrics 创建指标收集器
func NewRegistryMetrics() *RegistryMetrics {
return &RegistryMetrics{}
}
// IncrementRegister 增加注册计数
func (m *RegistryMetrics) IncrementRegister() {
m.mutex.Lock()
defer m.mutex.Unlock()
m.RegisterCount++
}
// IncrementDeregister 增加注销计数
func (m *RegistryMetrics) IncrementDeregister() {
m.mutex.Lock()
defer m.mutex.Unlock()
m.DeregisterCount++
}
// IncrementDiscover 增加发现计数
func (m *RegistryMetrics) IncrementDiscover() {
m.mutex.Lock()
defer m.mutex.Unlock()
m.DiscoverCount++
}
// IncrementHeartbeat 增加心跳计数
func (m *RegistryMetrics) IncrementHeartbeat() {
m.mutex.Lock()
defer m.mutex.Unlock()
m.HeartbeatCount++
}
// UpdateServiceStats 更新服务统计
func (m *RegistryMetrics) UpdateServiceStats(services, instances, healthy int64) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.TotalServices = services
m.TotalInstances = instances
m.HealthyInstances = healthy
}
// GetMetrics 获取指标快照
func (m *RegistryMetrics) GetMetrics() map[string]interface{} {
m.mutex.RLock()
defer m.mutex.RUnlock()
return map[string]interface{}{
"total_services": m.TotalServices,
"total_instances": m.TotalInstances,
"healthy_instances": m.HealthyInstances,
"register_count": m.RegisterCount,
"deregister_count": m.DeregisterCount,
"discover_count": m.DiscoverCount,
"heartbeat_count": m.HeartbeatCount,
"avg_response_time": m.AvgResponseTime.Milliseconds(),
}
}
通过实现这个完整的服务注册与发现系统,我们为微服务架构提供了基础的服务管理能力。这个系统支持服务的动态注册和发现、健康检查、以及基本的监控功能,为后续的负载均衡实现奠定了基础。