5.8.4 Knative 函数计算 #
Knative 是一个基于 Kubernetes 的开源平台,用于构建、部署和管理现代无服务器工作负载。它提供了一套构建块,使开发者能够在 Kubernetes 上运行无服务器容器。
Knative 架构概述 #
核心组件 #
Knative 主要包含两个核心组件:
- Knative Serving:管理无服务器容器的部署和扩缩容
- Knative Eventing:提供事件驱动的架构支持
┌─────────────────────────────────────────────────────────┐
│ Knative 平台 │
├─────────────────────────────────────────────────────────┤
│ Knative Serving │ Knative Eventing │
│ - 自动扩缩容 │ - 事件路由 │
│ - 流量分割 │ - 事件过滤 │
│ - 版本管理 │ - 事件转换 │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Kubernetes │
└─────────────────────────────────────────────────────────┘
Knative Serving 资源模型 #
# knative-service-example.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: hello-go
namespace: default
spec:
template:
metadata:
annotations:
autoscaling.knative.dev/minScale: "0"
autoscaling.knative.dev/maxScale: "10"
autoscaling.knative.dev/target: "100"
spec:
containers:
- image: gcr.io/knative-samples/hello-go:latest
ports:
- containerPort: 8080
env:
- name: TARGET
value: "Knative"
resources:
requests:
memory: "64Mi"
cpu: "100m"
limits:
memory: "128Mi"
cpu: "200m"
开发 Knative 函数 #
基础 HTTP 函数 #
// knative-http-function.go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"strconv"
"time"
"github.com/gorilla/mux"
)
type Request struct {
Name string `json:"name"`
Data map[string]interface{} `json:"data"`
}
type Response struct {
Message string `json:"message"`
Timestamp time.Time `json:"timestamp"`
ProcessedBy string `json:"processed_by"`
Version string `json:"version"`
}
type HealthResponse struct {
Status string `json:"status"`
Timestamp time.Time `json:"timestamp"`
Uptime string `json:"uptime"`
}
var startTime = time.Now()
func healthHandler(w http.ResponseWriter, r *http.Request) {
uptime := time.Since(startTime)
response := HealthResponse{
Status: "healthy",
Timestamp: time.Now(),
Uptime: uptime.String(),
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(response)
}
func functionHandler(w http.ResponseWriter, r *http.Request) {
// 记录请求开始时间
requestStart := time.Now()
// 获取环境变量
target := os.Getenv("TARGET")
if target == "" {
target = "World"
}
version := os.Getenv("VERSION")
if version == "" {
version = "v1.0.0"
}
podName := os.Getenv("HOSTNAME")
if podName == "" {
podName = "unknown"
}
var req Request
if r.Method == "POST" {
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid JSON payload", http.StatusBadRequest)
return
}
} else {
// GET 请求从查询参数获取数据
req.Name = r.URL.Query().Get("name")
if req.Name == "" {
req.Name = target
}
}
// 模拟一些处理时间
processingTime := r.Header.Get("X-Processing-Time")
if processingTime != "" {
if duration, err := strconv.Atoi(processingTime); err == nil {
time.Sleep(time.Duration(duration) * time.Millisecond)
}
}
response := Response{
Message: fmt.Sprintf("Hello %s from Knative!", req.Name),
Timestamp: time.Now(),
ProcessedBy: podName,
Version: version,
}
// 添加处理时间头部
processingDuration := time.Since(requestStart)
w.Header().Set("X-Processing-Duration", processingDuration.String())
w.Header().Set("Content-Type", "application/json")
// 记录请求日志
log.Printf("Processed request for %s in %v", req.Name, processingDuration)
json.NewEncoder(w).Encode(response)
}
func loggingMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// 记录请求信息
log.Printf("Started %s %s from %s", r.Method, r.URL.Path, r.RemoteAddr)
// 创建响应记录器来捕获状态码
recorder := &responseRecorder{ResponseWriter: w, statusCode: http.StatusOK}
next.ServeHTTP(recorder, r)
duration := time.Since(start)
log.Printf("Completed %s %s with %d in %v",
r.Method, r.URL.Path, recorder.statusCode, duration)
})
}
type responseRecorder struct {
http.ResponseWriter
statusCode int
}
func (r *responseRecorder) WriteHeader(statusCode int) {
r.statusCode = statusCode
r.ResponseWriter.WriteHeader(statusCode)
}
func main() {
port := os.Getenv("PORT")
if port == "" {
port = "8080"
}
r := mux.NewRouter()
// 健康检查端点
r.HandleFunc("/health", healthHandler).Methods("GET")
r.HandleFunc("/", functionHandler).Methods("GET", "POST")
// 添加日志中间件
handler := loggingMiddleware(r)
log.Printf("Knative function starting on port %s", port)
log.Printf("Target: %s", os.Getenv("TARGET"))
log.Printf("Version: %s", os.Getenv("VERSION"))
if err := http.ListenAndServe(":"+port, handler); err != nil {
log.Fatal("Server failed to start:", err)
}
}
CloudEvents 函数 #
// knative-cloudevents-function.go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"time"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
)
type EventData struct {
UserID string `json:"user_id"`
Action string `json:"action"`
Timestamp time.Time `json:"timestamp"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
type ProcessingResult struct {
Status string `json:"status"`
ProcessedAt time.Time `json:"processed_at"`
ProcessedBy string `json:"processed_by"`
Message string `json:"message"`
}
// 处理 CloudEvents
func handleCloudEvent(ctx context.Context, event event.Event) (*event.Event, error) {
log.Printf("Received CloudEvent: Type=%s, Source=%s, ID=%s",
event.Type(), event.Source(), event.ID())
// 解析事件数据
var eventData EventData
if err := event.DataAs(&eventData); err != nil {
log.Printf("Failed to parse event data: %v", err)
return nil, fmt.Errorf("invalid event data: %v", err)
}
// 处理不同类型的事件
var result ProcessingResult
switch event.Type() {
case "com.example.user.created":
result = processUserCreated(eventData)
case "com.example.order.placed":
result = processOrderPlaced(eventData)
case "com.example.payment.completed":
result = processPaymentCompleted(eventData)
default:
result = ProcessingResult{
Status: "ignored",
ProcessedAt: time.Now(),
ProcessedBy: os.Getenv("HOSTNAME"),
Message: fmt.Sprintf("Unknown event type: %s", event.Type()),
}
}
// 创建响应事件
responseEvent := cloudevents.NewEvent()
responseEvent.SetID(fmt.Sprintf("response-%s", event.ID()))
responseEvent.SetSource("knative-function")
responseEvent.SetType("com.example.processing.completed")
responseEvent.SetTime(time.Now())
if err := responseEvent.SetData(cloudevents.ApplicationJSON, result); err != nil {
return nil, fmt.Errorf("failed to set response data: %v", err)
}
log.Printf("Processing completed: %s", result.Status)
return &responseEvent, nil
}
func processUserCreated(data EventData) ProcessingResult {
log.Printf("Processing user creation for user: %s", data.UserID)
// 模拟用户创建后的处理逻辑
// 例如:发送欢迎邮件、创建用户档案等
time.Sleep(100 * time.Millisecond)
return ProcessingResult{
Status: "success",
ProcessedAt: time.Now(),
ProcessedBy: os.Getenv("HOSTNAME"),
Message: fmt.Sprintf("User %s created successfully", data.UserID),
}
}
func processOrderPlaced(data EventData) ProcessingResult {
log.Printf("Processing order placement for user: %s", data.UserID)
// 模拟订单处理逻辑
// 例如:库存检查、支付处理等
time.Sleep(200 * time.Millisecond)
return ProcessingResult{
Status: "success",
ProcessedAt: time.Now(),
ProcessedBy: os.Getenv("HOSTNAME"),
Message: fmt.Sprintf("Order processed for user %s", data.UserID),
}
}
func processPaymentCompleted(data EventData) ProcessingResult {
log.Printf("Processing payment completion for user: %s", data.UserID)
// 模拟支付完成后的处理逻辑
// 例如:更新订单状态、发送确认邮件等
time.Sleep(150 * time.Millisecond)
return ProcessingResult{
Status: "success",
ProcessedAt: time.Now(),
ProcessedBy: os.Getenv("HOSTNAME"),
Message: fmt.Sprintf("Payment processed for user %s", data.UserID),
}
}
func main() {
port := os.Getenv("PORT")
if port == "" {
port = "8080"
}
// 创建 CloudEvents 客户端
client, err := cloudevents.NewClientHTTP(
cloudevents.WithPort(port),
)
if err != nil {
log.Fatal("Failed to create CloudEvents client:", err)
}
log.Printf("CloudEvents function starting on port %s", port)
log.Printf("Hostname: %s", os.Getenv("HOSTNAME"))
// 启动事件接收器
if err := client.StartReceiver(context.Background(), handleCloudEvent); err != nil {
log.Fatal("Failed to start CloudEvents receiver:", err)
}
}
部署配置 #
Dockerfile #
# Dockerfile
FROM golang:1.21-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o function .
FROM alpine:latest
RUN apk --no-cache add ca-certificates tzdata
WORKDIR /root/
COPY --from=builder /app/function .
EXPOSE 8080
CMD ["./function"]
Knative Service 配置 #
# knative-service.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: go-function
namespace: default
annotations:
serving.knative.dev/creator: "[email protected]"
serving.knative.dev/lastModifier: "[email protected]"
spec:
template:
metadata:
annotations:
# 扩缩容配置
autoscaling.knative.dev/minScale: "0"
autoscaling.knative.dev/maxScale: "100"
autoscaling.knative.dev/target: "10"
autoscaling.knative.dev/targetUtilizationPercentage: "70"
# 冷启动配置
autoscaling.knative.dev/scaleToZeroGracePeriod: "30s"
autoscaling.knative.dev/scaleDownDelay: "0s"
# 并发配置
autoscaling.knative.dev/containerConcurrency: "10"
spec:
containerConcurrency: 10
timeoutSeconds: 300
containers:
- name: function
image: your-registry/go-function:latest
ports:
- containerPort: 8080
protocol: TCP
env:
- name: TARGET
value: "Knative User"
- name: VERSION
value: "v1.0.0"
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "256Mi"
cpu: "200m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 10
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
事件驱动架构 #
Knative Eventing 配置 #
# knative-eventing.yaml
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
name: default
namespace: default
spec:
config:
apiVersion: v1
kind: ConfigMap
name: config-br-default-channel
namespace: knative-eventing
---
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: user-events-trigger
namespace: default
spec:
broker: default
filter:
attributes:
type: com.example.user.created
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: user-processor
---
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: order-events-trigger
namespace: default
spec:
broker: default
filter:
attributes:
type: com.example.order.placed
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: order-processor
事件发布器 #
// event-publisher.go
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"time"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/gorilla/mux"
)
type EventPublisher struct {
client cloudevents.Client
brokerURL string
}
func NewEventPublisher(brokerURL string) (*EventPublisher, error) {
client, err := cloudevents.NewClientHTTP()
if err != nil {
return nil, fmt.Errorf("failed to create CloudEvents client: %v", err)
}
return &EventPublisher{
client: client,
brokerURL: brokerURL,
}, nil
}
func (ep *EventPublisher) PublishUserCreated(userID, email string) error {
event := cloudevents.NewEvent()
event.SetID(fmt.Sprintf("user-created-%s-%d", userID, time.Now().Unix()))
event.SetSource("user-service")
event.SetType("com.example.user.created")
event.SetTime(time.Now())
eventData := map[string]interface{}{
"user_id": userID,
"email": email,
"action": "created",
"timestamp": time.Now(),
}
if err := event.SetData(cloudevents.ApplicationJSON, eventData); err != nil {
return fmt.Errorf("failed to set event data: %v", err)
}
ctx := cloudevents.ContextWithTarget(context.Background(), ep.brokerURL)
if result := ep.client.Send(ctx, event); cloudevents.IsUndelivered(result) {
return fmt.Errorf("failed to send event: %v", result)
}
log.Printf("Published user created event for user: %s", userID)
return nil
}
func (ep *EventPublisher) PublishOrderPlaced(userID, orderID string, amount float64) error {
event := cloudevents.NewEvent()
event.SetID(fmt.Sprintf("order-placed-%s-%d", orderID, time.Now().Unix()))
event.SetSource("order-service")
event.SetType("com.example.order.placed")
event.SetTime(time.Now())
eventData := map[string]interface{}{
"user_id": userID,
"order_id": orderID,
"amount": amount,
"action": "placed",
"timestamp": time.Now(),
}
if err := event.SetData(cloudevents.ApplicationJSON, eventData); err != nil {
return fmt.Errorf("failed to set event data: %v", err)
}
ctx := cloudevents.ContextWithTarget(context.Background(), ep.brokerURL)
if result := ep.client.Send(ctx, event); cloudevents.IsUndelivered(result) {
return fmt.Errorf("failed to send event: %v", result)
}
log.Printf("Published order placed event for order: %s", orderID)
return nil
}
func main() {
brokerURL := os.Getenv("BROKER_URL")
if brokerURL == "" {
brokerURL = "http://broker-ingress.knative-eventing.svc.cluster.local/default/default"
}
publisher, err := NewEventPublisher(brokerURL)
if err != nil {
log.Fatal("Failed to create event publisher:", err)
}
r := mux.NewRouter()
// 用户创建端点
r.HandleFunc("/users", func(w http.ResponseWriter, req *http.Request) {
if req.Method != "POST" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
userID := req.FormValue("user_id")
email := req.FormValue("email")
if userID == "" || email == "" {
http.Error(w, "user_id and email are required", http.StatusBadRequest)
return
}
if err := publisher.PublishUserCreated(userID, email); err != nil {
log.Printf("Failed to publish user created event: %v", err)
http.Error(w, "Failed to publish event", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
w.Write([]byte("User created event published"))
}).Methods("POST")
// 订单创建端点
r.HandleFunc("/orders", func(w http.ResponseWriter, req *http.Request) {
if req.Method != "POST" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
userID := req.FormValue("user_id")
orderID := req.FormValue("order_id")
if userID == "" || orderID == "" {
http.Error(w, "user_id and order_id are required", http.StatusBadRequest)
return
}
if err := publisher.PublishOrderPlaced(userID, orderID, 99.99); err != nil {
log.Printf("Failed to publish order placed event: %v", err)
http.Error(w, "Failed to publish event", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
w.Write([]byte("Order placed event published"))
}).Methods("POST")
port := os.Getenv("PORT")
if port == "" {
port = "8080"
}
log.Printf("Event publisher starting on port %s", port)
log.Printf("Broker URL: %s", brokerURL)
if err := http.ListenAndServe(":"+port, r); err != nil {
log.Fatal("Server failed to start:", err)
}
}
流量分割和版本管理 #
蓝绿部署 #
# blue-green-deployment.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: go-function-bg
spec:
template:
metadata:
name: go-function-v2
annotations:
autoscaling.knative.dev/minScale: "1"
spec:
containers:
- image: your-registry/go-function:v2
env:
- name: VERSION
value: "v2.0.0"
traffic:
- tag: blue
revisionName: go-function-v1
percent: 100
- tag: green
revisionName: go-function-v2
percent: 0
金丝雀发布 #
# canary-deployment.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: go-function-canary
spec:
template:
metadata:
name: go-function-v2-canary
spec:
containers:
- image: your-registry/go-function:v2-canary
env:
- name: VERSION
value: "v2.0.0-canary"
traffic:
- revisionName: go-function-v1
percent: 90
- revisionName: go-function-v2-canary
percent: 10
监控和可观测性 #
自定义指标 #
// metrics.go - 自定义指标收集
package main
import (
"context"
"fmt"
"log"
"net/http"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var (
// 请求计数器
requestsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "knative_function_requests_total",
Help: "Total number of requests processed",
},
[]string{"method", "status"},
)
// 请求持续时间
requestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "knative_function_request_duration_seconds",
Help: "Request duration in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"method"},
)
// 当前处理中的请求
requestsInFlight = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "knative_function_requests_in_flight",
Help: "Current number of requests being processed",
},
)
// 业务指标
businessMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "knative_function_business_operations_total",
Help: "Total number of business operations",
},
[]string{"operation", "result"},
)
)
func init() {
// 注册指标
prometheus.MustRegister(requestsTotal)
prometheus.MustRegister(requestDuration)
prometheus.MustRegister(requestsInFlight)
prometheus.MustRegister(businessMetric)
}
// 指标中间件
func metricsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
requestsInFlight.Inc()
defer requestsInFlight.Dec()
// 创建响应记录器
recorder := &statusRecorder{ResponseWriter: w, statusCode: http.StatusOK}
next.ServeHTTP(recorder, r)
duration := time.Since(start)
// 记录指标
requestsTotal.WithLabelValues(r.Method, fmt.Sprintf("%d", recorder.statusCode)).Inc()
requestDuration.WithLabelValues(r.Method).Observe(duration.Seconds())
})
}
type statusRecorder struct {
http.ResponseWriter
statusCode int
}
func (r *statusRecorder) WriteHeader(statusCode int) {
r.statusCode = statusCode
r.ResponseWriter.WriteHeader(statusCode)
}
func businessHandler(w http.ResponseWriter, r *http.Request) {
operation := r.URL.Query().Get("operation")
if operation == "" {
operation = "default"
}
// 模拟业务处理
success := processBusinessLogic(operation)
result := "success"
if !success {
result = "failure"
http.Error(w, "Business logic failed", http.StatusInternalServerError)
} else {
w.WriteHeader(http.StatusOK)
w.Write([]byte("Business operation completed"))
}
// 记录业务指标
businessMetric.WithLabelValues(operation, result).Inc()
}
func processBusinessLogic(operation string) bool {
// 模拟业务逻辑处理
time.Sleep(100 * time.Millisecond)
return true // 假设总是成功
}
func main() {
http.Handle("/metrics", promhttp.Handler())
http.HandleFunc("/business", businessHandler)
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
})
// 应用指标中间件
handler := metricsMiddleware(http.DefaultServeMux)
port := "8080"
log.Printf("Function with metrics starting on port %s", port)
log.Printf("Metrics available at /metrics")
if err := http.ListenAndServe(":"+port, handler); err != nil {
log.Fatal("Server failed to start:", err)
}
}
部署和管理 #
使用 kn CLI 部署 #
# 安装 kn CLI
curl -L https://github.com/knative/client/releases/download/knative-v1.11.0/kn-linux-amd64 -o kn
chmod +x kn
sudo mv kn /usr/local/bin/
# 部署函数
kn service create go-function \
--image your-registry/go-function:latest \
--env TARGET="Knative" \
--env VERSION="v1.0.0" \
--port 8080 \
--annotation autoscaling.knative.dev/minScale=0 \
--annotation autoscaling.knative.dev/maxScale=10
# 更新函数
kn service update go-function \
--image your-registry/go-function:v2 \
--env VERSION="v2.0.0"
# 流量分割
kn service update go-function \
--traffic go-function-v1=50,go-function-v2=50
# 查看服务状态
kn service list
kn service describe go-function
# 查看修订版本
kn revision list
# 删除服务
kn service delete go-function
CI/CD 集成 #
# .github/workflows/knative-deploy.yml
name: Deploy to Knative
on:
push:
branches: [main]
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.21
- name: Build and test
run: |
go mod download
go test ./...
go build -o function .
- name: Build Docker image
run: |
docker build -t ${{ secrets.REGISTRY }}/go-function:${{ github.sha }} .
docker push ${{ secrets.REGISTRY }}/go-function:${{ github.sha }}
- name: Deploy to Knative
run: |
echo "${{ secrets.KUBECONFIG }}" | base64 -d > kubeconfig
export KUBECONFIG=kubeconfig
kn service update go-function \
--image ${{ secrets.REGISTRY }}/go-function:${{ github.sha }} \
--env VERSION=${{ github.sha }} \
--annotation deployment.kubernetes.io/revision=${{ github.run_number }}
通过本节的学习,你已经掌握了 Knative 平台的核心概念和使用方法,包括函数开发、事件驱动架构、流量管理和监控等。Knative 为在 Kubernetes 上构建现代无服务器应用提供了强大而灵活的平台。
总结 #
本章我们深入学习了服务网格和 Serverless 两个重要的云原生技术:
- Istio 服务网格:提供了透明的服务间通信管理
- 流量管理:实现了精细化的流量控制和策略
- Serverless 基础:理解了无服务器计算的核心概念
- Knative 函数计算:掌握了在 Kubernetes 上构建 Serverless 应用
这些技术为构建现代云原生应用提供了强大的基础设施支持,帮助开发者专注于业务逻辑的实现。