Featured image of post Go 无侵入实现读写分离

Go 无侵入实现读写分离

本文详细讲解如何使用go-zero框架实现无侵入式的数据库读写分离,涵盖使用场景、核心原理、实战示例及最佳实践。

核心内容点:

  • 读写分离通过将读操作路由到从库、写操作路由到主库,有效解决高并发场景下的数据库性能瓶颈。
  • go-zero框架通过配置文件和上下文管理器实现简单灵活的读写分离,支持轮询/随机负载均衡策略。
  • 重点介绍强制主库读取(强一致性场景)、事务处理、故障转移等关键实现细节和最佳实践。

源自 | kevwan微服务实践 2025-07-15 09:29

在高并发的现代应用中,数据库往往成为系统的瓶颈。读写分离作为一种有效的数据库优化策略,能够显著提升系统的性能和可用性。本文将深入讲解读写分离的核心概念、实现原理,并通过go-zero框架提供详细的实战示例。

1. 读写分离的使用场景和必要性

1.1 什么是读写分离

读写分离是一种数据库架构模式,它将数据库操作分为两类:

  • • 写操作:INSERT、UPDATE、DELETE等修改数据的操作,路由到主库(Master)

  • • 读操作(强一致性要求):SELECT查询操作,路由到主库(Master)

  • • 读操作(非强一致性要求):SELECT查询操作,路由到从库(Replica/Slave)

1.2 核心使用场景

高读写比例的应用

大多数 Web 应用的 DB 操作都是读多写少,典型场景包括:

  • • 电商平台:商品浏览远多于下单操作

  • • 内容平台:文章阅读远多于发布操作

  • • 社交媒体:内容消费远多于内容创建

  • • 新闻网站:新闻浏览远多于新闻发布

数据库负载分担需求

  • • 主库压力过大:单一数据库无法承受高并发访问

  • • 读写操作互相影响:大量读操作影响写操作性能

  • • 资源利用不均:数据库服务器资源未充分利用

1.3 读写分离的必要性

性能提升

  • • 传统单库模式:

  • • 读写操作 → 主库 (100%负载)

  • • 读写分离模式:

  • • 写操作 → 主库 (小负载,但无法横向扩容)

  • • 读操作 → 从库 (大负载,但可以分散到多个从库)

可用性增强

  • • 故障隔离:读操作故障不影响写操作

  • • 负载均衡:多个从库分担读取压力

  • • 灾备能力:从库可作为备份数据源

扩展性提升

  • • 水平扩展:可通过增加从库处理更多读请求

  • • 成本效益:从库可使用较低配置的硬件

  • • 维护便利:可在从库上进行复杂查询和报表生成

2. 读写分离的实现原理

2.1 整体架构

2.2 核心组件

连接路由器 (Connection Router)

负责根据SQL操作类型决定使用哪个数据库连接:

  • • 根据上下文模式选择连接

  • • 管理连接池和负载均衡

负载均衡器 (Load Balancer)

在多个从库之间分配读请求:

  • • 轮询策略:按顺序依次访问从库

  • • 随机策略:随机选择从库

上下文管理器 (Context Manager)

通过上下文传递读写模式信息:

  • • 显式指定读主库场景

  • • 显式指定读从库场景

  • • 写操作强制使用主库

2.3 数据一致性处理

最终一致性

  • • 主从同步存在延迟(通常几毫秒到几秒)

  • • 适用于对数据实时性要求不严格的场景

强一致性需求处理

1
2
3
// 写入后立即读取,使用主库
ctx := sqlx.WithReadPrimary(context.Background())
result, err := db.QueryRowCtx(ctx, &user, "SELECT * FROM users WHERE id = ?", userID)

3. 使用go-zero读写分离的示例

3.1 配置读写分离

配置文件设置

1
2
3
4
5
6
7
8
9
# config.yaml
DB:
DataSource:"user:password@tcp(master:3306)/database"
DriverName:mysql# 默认值,可不写
Policy:"round-robin"# 负载均衡策略:round-robin 或 random,默认 round-robin
Replicas:
    - "user:password@tcp(replica1:3306)/database"
    - "user:password@tcp(replica2:3306)/database"
    - "user:password@tcp(replica3:3306)/database"

配置结构体定义

1
2
3
4
5
6
7
package config

import "github.com/zeromicro/go-zero/core/stores/sqlx"

type Config struct {
    DB sqlx.SqlConf
}

3.2 初始化数据库连接

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package main

import (
    "context"
    "log"

    "github.com/zeromicro/go-zero/core/conf"
    "github.com/zeromicro/go-zero/core/stores/sqlx"
)

type UserModel struct {
    conn sqlx.SqlConn
}

func NewUserModel(conn sqlx.SqlConn) *UserModel {
    return &UserModel{
        conn: conn,
    }
}

func main() {
    var c Config
    conf.MustLoad("config.yaml", &c)

    // 创建支持读写分离的数据库连接
    conn := sqlx.MustNewConn(c.DB)
    userModel := NewUserModel(conn)

    // 示例1:普通读操作(路由到从库)
    user, err := userModel.FindUserFromReplica(ctx, 1)
    if err != nil {
        log.Fatal(err)
    }

    // 示例2:写操作(自动路由到主库)
    err = userModel.CreateUser(context.Background(), &User{Name: "张三", Email: "zhangsan@example.com"})
    if err != nil {
        log.Fatal(err)
    }

    // 示例3:写入后立即读取(强制使用主库)
    user, err = userModel.FindUserFromPrimary(ctx, 1)
    if err != nil {
        log.Fatal(err)
    }
}

3.3 模型层实现

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
package model

import (
    "context"
    "database/sql"
    "fmt"

    "github.com/zeromicro/go-zero/core/stores/sqlx"
)

type User struct {
    ID       int64`db:"id"`
    Name     string`db:"name"`
    Email    string`db:"email"`
    CreateAt int64`db:"create_at"`
    UpdateAt int64`db:"update_at"`
}

type UserModel struct {
    conn sqlx.SqlConn
}

func NewUserModel(conn sqlx.SqlConn) *UserModel {
    return &UserModel{
        conn: conn,
    }
}

// findUser 查询用户
func (m *UserModel) FindUser(ctx context.Context, id int64) (*User, error) {
    var user User
    query := "SELECT id, name, email, create_at, update_at FROM users WHERE id = ?"

    err := m.conn.QueryRowCtx(ctx, &user, query, id)
    if err != nil {
        if err == sql.ErrNoRows {
            returnnil, fmt.Errorf("user not found")
        }
        returnnil, err
    }

    return &user, nil
}

// FindUserFromMaster 强制从主库查询用户
func (m *UserModel) FindUserFromMaster(ctx context.Context, id int64) (*User, error) {
    // 强制使用主库
    masterCtx := sqlx.WithReadPrimary(ctx)
    return m.FindUser(masterCtx, id)
}

// FindUserFromReplica 强制从从库查询用户
func (m *UserModel) FindUserFromReplica(ctx context.Context, id int64) (*User, error) {
    // 强制使用从库
    replicaCtx := sqlx.WithReadReplica(ctx)
    return m.FindUser(replicaCtx, id)
}

// CreateUser 创建用户(自动使用主库)
func (m *UserModel) CreateUser(ctx context.Context, user *User) error {
    query := "INSERT INTO users (name, email, create_at, update_at) VALUES (?, ?, UNIX_TIMESTAMP(), UNIX_TIMESTAMP())"

    result, err := m.conn.ExecCtx(sqlx.WithWrite(ctx), query, user.Name, user.Email)
    if err != nil {
        return err
    }

    id, err := result.LastInsertId()
    if err != nil {
        return err
    }

    user.ID = id
    returnnil
}

// UpdateUser 更新用户(自动使用主库)
func (m *UserModel) UpdateUser(ctx context.Context, user *User) error {
    query := "UPDATE users SET name = ?, email = ?, update_at = UNIX_TIMESTAMP() WHERE id = ?"

    _, err := m.conn.ExecCtx(sqlx.WithWrite(ctx), query, user.Name, user.Email, user.ID)
    return err
}

// DeleteUser 删除用户(自动使用主库)
func (m *UserModel) DeleteUser(ctx context.Context, id int64) error {
    query := "DELETE FROM users WHERE id = ?"

    _, err := m.conn.ExecCtx(sqlx.WithWrite(ctx), query, id)
    return err
}

// ListUsers 查询用户列表(使用从库)
func (m *UserModel) ListUsers(ctx context.Context, limit, offset int) ([]*User, error) {
    var users []*User
    query := "SELECT id, name, email, create_at, update_at FROM users LIMIT ? OFFSET ?"

    err := m.conn.QueryRowsCtx(sqlx.WithReadReplica(ctx), &users, query, limit, offset)
    if err != nil {
        returnnil, err
    }

    return users, nil
}

3.4 服务层最佳实践

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package service

import (
    "context"
    "time"

    "github.com/zeromicro/go-zero/core/stores/sqlx"
)

type UserService struct {
    userModel *UserModel
}

func NewUserService(userModel *UserModel) *UserService {
    return &UserService{
        userModel: userModel,
    }
}

// 场景1:用户注册后立即返回用户信息
func (s *UserService) RegisterUser(ctx context.Context, name, email string) (*User, error) {
    user := &User{
        Name:  name,
        Email: email,
    }

    // 1. 创建用户(写操作,使用主库)
    err := s.userModel.CreateUser(ctx, user)
    if err != nil {
        returnnil, err
    }

    // 2. 立即返回用户信息(读操作,但需要最新数据,使用主库)
    masterCtx := sqlx.WithReadPrimary(ctx)
    return s.userModel.FindUser(masterCtx, user.ID)
}

// 场景2:用户更新后需要验证更新结果
func (s *UserService) UpdateUserProfile(ctx context.Context, userID int64, name, email string) (*User, error) {
    // 1. 更新用户信息(写操作,使用主库)
    user := &User{
        ID:    userID,
        Name:  name,
        Email: email,
    }

    err := s.userModel.UpdateUser(ctx, user)
    if err != nil {
        returnnil, err
    }

    // 2. 返回更新后的用户信息(读操作,需要最新数据,使用主库)
    masterCtx := sqlx.WithReadPrimary(ctx)
    return s.userModel.FindUser(masterCtx, userID)
}

// 场景3:用户列表查询(可以接受从库的延迟数据)
func (s *UserService) GetUserList(ctx context.Context, page, pageSize int) ([]*User, error) {
    offset := (page - 1) * pageSize

    // 使用从库查询,可以接受轻微的数据延迟
    replicaCtx := sqlx.WithReadReplica(ctx)
    return s.userModel.ListUsers(replicaCtx, pageSize, offset)
}

// 场景4:事务处理(读写操作都在主库)
func (s *UserService) TransferUserData(ctx context.Context, fromUserID, toUserID int64) error {
    // 事务中的所有操作都在主库执行
    ctx = sqlx.WithWrite(ctx)

    return s.userModel.conn.TransactCtx(ctx, func(ctx context.Context, session sqlx.Session)error {
        // 查询源用户
        var fromUser User
        err := session.QueryRowCtx(ctx, &fromUser, "SELECT * FROM users WHERE id = ?", fromUserID)
        if err != nil {
            return err
        }

        // 查询目标用户
        var toUser User
        err = session.QueryRowCtx(ctx, &toUser, "SELECT * FROM users WHERE id = ?", toUserID)
        if err != nil {
            return err
        }

        // 执行业务逻辑...
        // 更新操作
        _, err = session.ExecCtx(ctx, "UPDATE users SET update_at = UNIX_TIMESTAMP() WHERE id IN (?, ?)", fromUserID, toUserID)
        return err
    })
}

3.6 监控和调试

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main
import (
    "context"
    "log"
    "time"

    "github.com/zeromicro/go-zero/core/stores/sqlx"
)

// 监控读写分离效果
func MonitorReadWriteSeparation(conn sqlx.SqlConn) {
    ctx := context.Background()

    // 测试读操作路由
    log.Println("=== 测试读操作路由 ===")

    // 普通读操作(应该路由到从库)
    replicaCtx := sqlx.WithReadReplica(ctx)
    start := time.Now()
    var count int
    err := conn.QueryRowCtx(replicaCtx, &count, "SELECT COUNT(*) FROM users")
    log.Printf("从库查询耗时: %v, 错误: %v", time.Since(start), err)

    // 强制主库读操作
    masterCtx := sqlx.WithReadPrimary(ctx)
    start = time.Now()
    err = conn.QueryRowCtx(masterCtx, &count, "SELECT COUNT(*) FROM users")
    log.Printf("主库查询耗时: %v, 错误: %v", time.Since(start), err)

    // 测试写操作路由
    log.Println("=== 测试写操作路由 ===")

    // 写操作(应该自动路由到主库)
    writeCtx := sqlx.WithWrite(ctx)
    start = time.Now()
    _, err = conn.ExecCtx(writeCtx, "UPDATE users SET update_at = UNIX_TIMESTAMP() WHERE id = 1")
    log.Printf("写操作耗时: %v, 错误: %v", time.Since(start), err)
}

4. 故障转移

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// 实现主从切换的故障转移机制
func (m *UserModel) FindUserWithFailover(ctx context.Context, id int64) (*User, error) {
    // 优先尝试从库
    replicaCtx := sqlx.WithReadReplica(ctx)
    user, err := m.FindUser(replicaCtx, id)
    if err == nil {
        return user, nil
    }

    // 从库失败,回退到主库
    log.Printf("从库查询失败,回退到主库: %v", err)
    masterCtx := sqlx.WithReadPrimary(ctx)
    return m.FindUser(masterCtx, id)
}

5. 总结

读写分离是提升数据库性能的重要手段,go-zero框架提供了优雅的读写分离实现:

5.1 核心优势

  • • 简单配置:通过配置文件即可启用读写分离

  • • 自动路由:框架自动识别读写操作并路由到合适的数据库

  • • 灵活控制:支持通过上下文强制指定读写模式

  • • 负载均衡:支持轮询和随机负载均衡策略

5.2 使用建议

合理配置从库数量:根据读写比例确定从库数量

监控主从延迟:确保业务可接受的数据延迟

选择合适的负载均衡策略:根据从库性能选择轮询或随机

处理数据一致性:在需要强一致性的场景使用主库读取

通过合理的读写分离配置和使用,可以显著提升系统的并发处理能力和整体性能。

项目地址

https://github.com/zeromicro/go-zero

欢迎使用 go-zero 并 star 支持我们!