2.2、分布式之性能与伸缩性:异步处理

一、简介

异步处理,是提升服务吞吐量的最佳方法,无论在单机请求处理中,还是分布式系统中,异步处理的思路经常被用到。

  • 单机异步化:比如callback,开启多个线程去处理任务,响应处理结果。我们平时开发也经常使用这种方式。
  • 微服务服务之间流程异步化:比如运营在cms上选择了一批视频id,想要查询这批视频的信息,因为需要聚合大量信息,查询时间过长,让运营等待是很久是一个很差的体验。异步化处理的方式就是运营提交id列表后,可以继续去其他运营操作,等到系统查询完成后,通知用户去看结果。

归根结底,异步处理为什么可以提升性能?主要是因为异步处理可以使被动变主动。更好的调度资源,让系统拥有统一调控的能力。

二、异步设计

  • push 模式:将任务派发给下游处理,因为对下游状态是无感知的避免压垮下游(当然也有池子化的,例如线程池,协程池),所以做好限流。当然任务派发者也可以主动感知下游的状态,但是会增加系统的复杂度。
  • pull 模式:下游主动去获取任务,相对于push,可以根据自身情况去主动承担更多的任务处理,但是缺少调度能力。

push、pull各有各的优势,但是在我们的异步设计中,其实push与pull都是结合使用的。由分发器做好调度将任务推送到队列,handle主动去队列中拉取任务处理。例如下面这种结构。

根据以上设计我们分别实现单机异步处理与服务间异步处理设计。

二、单机异步

设计一个通用的http并发请求数据,异步处理response。具体实现直接在注释中标记。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package Asynchttp

import (
"context"
"errors"
"io/ioutil"
"net/http"
"sync"
"time"

syncrate "golang.org/x/time/rate"
)

const (
REQNULL = -7001 // Req为空
HANDLENULL = -7002 // handle为空
)

//处理函数
type Handle func(response *AsResponse)

//组件结构体
type AsyncHttp struct {
httpClient *http.Client
handel Handle
limter *syncrate.Limiter

responses chan *AsResponse
abort chan string
retry bool
rate int32

Wg sync.WaitGroup
}
// 响应结构
type AsResponse struct {
RespBody []byte
Mark string
Err error
}

// 通过NewAsyncHttp可以获取AsyncHttp实例。
func NewAsyncHttp(httpClient *http.Client, handel Handle, rate int, retry bool) (int32, *AsyncHttp, error) {
asyncHttp := new(AsyncHttp)
//如果Client为空,则默认创建,超时时间3秒
if httpClient == nil {
asyncHttp.httpClient = &http.Client{
Timeout: time.Second * time.Duration(3),
}
}
if handel == nil {
return HANDLENULL, nil, errors.New("asynchttp handel is nil please check http.Request")
}

asyncHttp.limter = syncrate.NewLimiter(syncrate.Limit(rate), rate)
asyncHttp.handel = handel
asyncHttp.responses = make(chan *AsResponse)
asyncHttp.abort = make(chan string)
asyncHttp.retry = retry

//启动接收,用于异步接收请求结果
go asyncHttp.receive(&asyncHttp.Wg)

return 0, asyncHttp, nil
}

// 请求入口
func (a *AsyncHttp) AsyncHttpReq(req *http.Request, mark string) (int32, error) {

if req == nil {
return REQNULL, errors.New("asynchttp req is nil please check http.Request")
}
//限流处理。
a.limter.Wait(context.TODO())
// 添加WaitGroup数
a.Wg.Add(1)
// 新建一个goroutine执行请求
go a.exec(req, mark)
return 0, nil
}

//exec支持重试,失败后指数级退避重试。
func (a *AsyncHttp) exec(req *http.Request, mark string) {
retryNum := 1
resp := new(AsResponse)
resp.Mark = mark
for true {
// 发起请求
resp.RespBody, resp.Err = a.httpReq(req)
if resp.Err != nil {
if a.retry {
if retryNum >= 64 {
retryNum = retryNum + 1
} else {
retryNum = retryNum * 2
}

time.Sleep(time.Second * time.Duration(retryNum))
continue
} else {
a.responses <- resp
break
}
}
//结果写入队列,等待handle处理
a.responses <- resp
break
}
}

func (a *AsyncHttp) receive(wg *sync.WaitGroup) {
// 多路复用,读取channel中数据,并通过handel处理。
for true {
select {
case r := <-a.responses:
a.handel(r)
wg.Done()

case abort := <-a.abort:
if abort == "exit" {
break
}
}
}
}

// http请求。
func (a *AsyncHttp) httpReq(req *http.Request) ([]byte, error) {

resp, err := a.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return body, nil
}

// 退出处理。
func (a *AsyncHttp) Close() {
a.abort <- "exit"
}

以上就是一个通用的http并发请求,异步处理的组件。这里receive中是单协程处理response的。也可以在新建协程去处理。

测试一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package Asynchttp

import (
"fmt"
"net/http"
"testing"
)

func TestAsyncHttp(t *testing.T) {
_, ay, _ := NewAsyncHttp(nil, Tests, 1000, false)
defer ay.Close()
for i := 0; i < 1000; i++ {
req, _ := http.NewRequest("post", "https://www.baidu.com/", nil)
ay.AsyncHttpReq(req,"test")
}
ay.Wg.Waitl
}

//handle
func Tests(resp *AsResponse) {
fmt.Println(resp)
}

三、服务间异步化

​ 比如观看视频,并且关注了作者,此时需要存储关注列表+1,这是一个同步过程,接着这种关注动作需要记录流水,可能还需要同步推荐测,甚至直接跟活动挂钩,关注了指定用户,同步给活动系统发放奖品等等。如果这一系列的动作都是一个同步的过程,想想都知道体验会很差。

回到最开始的模式,我们可以并发的去请求这些系统,通知他们这个关注动作,也就是push模式,这些系统也可以遍历用户关注列表,增加了就触发相关逻辑,也就是pull模式。相对来看,都不是一个很好的方案。

看下push与pull结合的方案,实线为同步过程,虚线为异步。

观看视频,修改存储后,直接向消息队列中发布消息(比如kafka),后面的推荐系统,活动系统等,直接订阅消息根据自身能力主动去处理数据就可以了。

四、异步化的问题

异步处理可以理解是将强一致性转变为最终一致性,整改异步处理的过程可能出现,消息丢失的问题,通知出现问题,handle处理失败等等。

  • 消息丢失:并非所有的业务都需要100%消息不丢失的。如果想要保证数据不丢失,需要记录流水,按天来做对账。
  • handle处理失败:可以选择重试,也可以选择重新消费。比如使用kafka,重置offset位置。要做好幂等设计。

具体可以看下:容错设计:补偿事务

五、总结

异步处理是提升性能和体验的有效手段,本质上因为异步处理可以使被动变主动,更好的调度资源,让系统拥有统一调控的能力。异步处理也是把强一致性转换为最终一致性的处理手段。