最近使用influxdb的方案做项目的TSDB,考虑到influxdb的多活设计,我们找到了influxdb提供的一款高可用层influxdb-relay,这个开源项目是influxdb提出来的,后提供出来很久不再维护了。
我们在做单点influxdb测试的时候,整体表现良好。加入influxdb-relay后在做大数据量测试的时候发现有很大问题,特别是在插入的时候内存消耗很大,而且插入速度也会变慢,出现了严重的写问题。
因为influxdb-relay是用go语言编写的,在网上找了很多资料,找到以下资料后按照步骤对源码做了修改,重新做大数据量测试表现良好。
现贡献给使用influxdb-relay的各位。
起因
为了让influxdb能够达到高可用,我便考虑在influxdb外面套一层,比如nginx。然而发现官方已经开发了一个influxdb-relay的东西,于是决定索性使用这个东西。 首先看一张官方README.md中的图。
1 | ┌─────────────────┐ |
influxdb-relay只做数据的冗余写入,并在后端的influxdb宕机时,将数据存储在内存,当influxdb恢复时,将宕机期间的数据重新写回influxdb。
客户端在访问influxdb的时候,实际访问的是一个 Load Balancer 比如 Nginx , 然后 Load Balancer 根据不同的path,选择influxdb或者influxdb-relay。
然而,这influxdb-relay的性能却很奇怪。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17POST /write
Requests [total, rate] 180000, 3000.02
Duration [total, attack, wait] 1m20.11586256s, 59.999606572s, 20.116255988s
Latencies [mean, 50, 95, 99, max] 1.404856127s, 1.214236ms, 5.609988425s, 43.430687195s, 50.347228848s
Bytes In [total, mean] 0, 0.00
Bytes Out [total, mean] 3958680, 21.99
Success [ratio] 54.98%
Status Codes [code:count] 204:98967 0:81033
GET /ping
Requests [total, rate] 180000, 3000.02
Duration [total, attack, wait] 59.999785977s, 59.999606599s, 179.378µs
Latencies [mean, 50, 95, 99, max] 150.317µs, 145.68µs, 166.688µs, 194.65µs, 2.344456ms
Bytes In [total, mean] 0, 0.00
Bytes Out [total, mean] 0, 0.00
Success [ratio] 100.00%
Status Codes [code:count] 204:180000
过程
在之前的文章里面提过,最新版influxdb,即便是在 vegeta 5000/s 的攻击下,还是能保证99%以上的成功率, 然而 influxdb-relay 却在 3000/s 的速度的时候就已经撑不住了。
作为influxdb的前置程序,至少要达到或者 超过influxdb的处理性能,才算合理吧。所以我做了个对比试验,在处理Http请求的 http.go 里加入了 ping 处理逻辑。
经过测试,可以看出,go的http库,应该不慢,所以还是在relay处理write的逻辑上有问题。
/ping 这个url官方的代码里面,没有加入,这是我在 relay/http.go 加的,代码如下:1
2
3
4
5
6
7
8
9func (h *HTTP) ServeHTTP(w http.ResponseWriter, r *http.Request) {
start := time.Now()
if r.URL.Path == "/ping" && (r.Method == "GET" || r.Method == "HEAD") {
w.Header().Add("X-InfluxDB-Version", "relay")
w.WriteHeader(http.StatusNoContent)
return
}
}
实际这段代码,根据 influxdb-relay 的设计理念是不需要的。
再回看一下vegeta的report中的Error Set:1
2
3
4
5
6Error Set:
Post http://127.0.0.1:9096/write?db=test: dial tcp 0.0.0.0:0->127.0.0.1:9096: bind: can't assign requested address
Post http://127.0.0.1:9096/write?db=test: dial tcp 0.0.0.0:0->127.0.0.1:9096: socket: too many open files
Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:59803->127.0.0.1:9096: read: connection reset by peer
Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:59805->127.0.0.1:9096: read: connection reset by peer
Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:59806->127.0.0.1:9096: read: connection reset by peer
这里既有 socket: too many open files , 又有 read: connection reset by peer , 还有 bind: can’t assign requested address , 然后结合代码。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27var responses = make(chan *responseData, len(h.backends))
for _, b := range h.backends {
b := b
go func() {
defer wg.Done()
resp, err := b.post(outBytes, query, authHeader)
if err != nil {
log.Printf("Problem posting to relay %q backend %q: %v", h.Name(), b.name, err)
} else {
if resp.StatusCode/100 == 5 {
log.Printf("5xx response for relay %q backend %q: %v", h.Name(), b.name, resp.StatusCode)
}
responses <- resp
}
}()
}
go func() {
wg.Wait()
close(responses)
putBuf(outBuf)
}()
var errResponse *responseData
for resp := range responses {
首先这里,开了一个 channel , var responses = make(chan *responseData, len(h.backends)) , 只有当 所有的backends都回复了之后,至二个 responses channel 才会关闭,客户端才能拿到结果,
然而一旦某一个 backends卡壳了,就要等待go的http client timeout了,这个timeout默认时间是10s, 相当于说客户端至少要等待 10s,然而实际并不止这样。在看看 retry.go 中的部分代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19interval := r.initialInterval
for {
resp, err := r.p.post(buf.Bytes(), batch.query, batch.auth)
if err == nil && resp.StatusCode/100 != 5 {
batch.resp = resp
atomic.StoreInt32(&r.buffering, 0)
batch.wg.Done()
break
}
if interval != r.maxInterval {
interval *= r.multiplier
if interval > r.maxInterval {
interval = r.maxInterval
}
}
time.Sleep(interval)
}
当超时等statusCode >= 500的错误发生时,retry会将这个请求加入bufer中,然后由run方法获取batch并向后端influxdb请求。
这时的逻辑是,一旦请求失败,就sleep一定时间,而这个一定时间就是初始时间乘以一个放大因子,放大因子默认是2,于是客户端 就会在不断等待中,最后超时。而在vegeta疯狂的攻击下,是经不起等待的。
所以我改了下http.go中的逻辑,客户端请求后,直接 返回204,让客户端不再等待。1
2
3(&responseData{
StatusCode: 204,
}).Write(w)
删除 responses channel , 以及对应的代码。 貌似有了一定的改善。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20Requests [total, rate] 180000, 3000.02
Duration [total, attack, wait] 1m17.299212505s, 59.999606586s, 17.299605919s
Latencies [mean, 50, 95, 99, max] 672.645729ms, 185.598µs, 345.300005ms, 30.003589182s, 36.777965011s
Bytes In [total, mean] 0, 0.00
Bytes Out [total, mean] 6231240, 34.62
Success [ratio] 86.55%
Status Codes [code:count] 204:155781 0:24219
Error Set:
Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:57421->127.0.0.1:9096: read: connection reset by peer
Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:57406->127.0.0.1:9096: read: connection reset by peer
Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:57407->127.0.0.1:9096: read: connection reset by peer
Post http://127.0.0.1:9096/write?db=test: write tcp 127.0.0.1:57404->127.0.0.1:9096: write: broken pipe
Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:57399->127.0.0.1:9096: read: connection reset by peer
Post http://127.0.0.1:9096/write?db=test: write tcp 127.0.0.1:57413->127.0.0.1:9096: write: broken pipe
Post http://127.0.0.1:9096/write?db=test: write tcp 127.0.0.1:57418->127.0.0.1:9096: write: broken pipe
Post http://127.0.0.1:9096/write?db=test: write tcp 127.0.0.1:57416->127.0.0.1:9096: write: broken pipe
Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:57398->127.0.0.1:9096: read: connection reset by peer
Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:57396->127.0.0.1:9096: read: connection reset by peer
Post http://127.0.0.1:9096/write?db=test: write tcp 127.0.0.1:57402->127.0.0.1:9096: write: broken pipe
Post http://127.0.0.1:9096/write?db=test: read tcp 127.0.0.1:57415->127.0.0.1:9096: read: connection reset by peer
但是还是很糟糕,毕竟之前influxdb的数据与这个还是有一定差距的。 于是我把目光放到的 retry.go 中1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19func (r *retryBuffer) post(buf []byte, query string, auth string) (*responseData, error) {
if atomic.LoadInt32(&r.buffering) == 0 {
resp, err := r.p.post(buf, query, auth)
// TODO A 5xx caused by the point data could cause the relay to buffer forever
if err == nil && resp.StatusCode/100 != 5 {
return resp, err
}
atomic.StoreInt32(&r.buffering, 1)
}
// already buffering or failed request
batch, err := r.list.add(buf, query, auth)
if err != nil {
return nil, err
}
batch.wg.Wait()
return batch.resp, nil
}
如果没有buffering那么,直接发送请求给influxdb,不然就把请求放到buffer中,如果buffer满了,就返回错误。既然已经在客户端那边 直接返回了204那么,这个没有buffer的raw的请求就没有必要再单独处理了,
索性一并放到buffer中去,buffer有一个好处,就是能把多个 请求合并成一个请求提交给后端的influxdb,这样就能减少请求次数了。代码改成如下:1
2
3
4
5
6
7
8
9func (r *retryBuffer) post(buf []byte, query string, auth string) (*responseData, error) {
batch, err := r.list.add(buf, query, auth)
if err != nil {
return nil, err
}
batch.wg.Wait()
return batch.resp, nil
}
用2000/s速度测试,结果如下:1
2
3
4
5
6
7
8Requests [total, rate] 120000, 2000.02
Duration [total, attack, wait] 1m0.000271382s, 59.999499926s, 771.456µs
Latencies [mean, 50, 95, 99, max] 304.395µs, 259.447µs, 460.682µs, 1.044402ms, 42.391318ms
Bytes In [total, mean] 0, 0.00
Bytes Out [total, mean] 4800000, 40.00
Success [ratio] 100.00%
Status Codes [code:count] 204:120000
Error Set:
其实我没法用更快的速度测试,如果是3000/s,那么就会出下面的问题。1
2
3
4
5
6
7
8
9
10
11
12
13
142016/08/13 17:52:22 starting relays...
2016/08/13 17:52:22 Starting HTTP relay "example-http" on 127.0.0.1:9096
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x0 pc=0x837d8]
goroutine 38179 [running]:
panic(0x370fc0, 0xc820014200)
/Users/shane/.gvm/gos/go1.6.2/src/runtime/panic.go:481 +0x3e6
github.com/influxdata/influxdb-relay/relay.(*retryBuffer).post(0xc820010b90, 0xc8202de254, 0x3c, 0x40, 0xc820393700, 0x7, 0x0, 0x0, 0xc82002d500, 0x0, ...)
/Users/shane/Documents/gosrc/influxdb-relay/src/github.com/influxdata/influxdb-relay/relay/retry.go:56 +0x118
github.com/influxdata/influxdb-relay/relay.(*HTTP).ServeHTTP.func1(0xc820393710, 0xc8200c9ce0, 0xc8202de254, 0x3c, 0x40, 0xc820393700, 0x7, 0x0, 0x0, 0xc820022280)
/Users/shane/Documents/gosrc/influxdb-relay/src/github.com/influxdata/influxdb-relay/relay/http.go:210 +0xe8
created by github.com/influxdata/influxdb-relay/relay.(*HTTP).ServeHTTP
/Users/shane/Documents/gosrc/influxdb-relay/src/github.com/influxdata/influxdb-relay/relay/http.go:218 +0xce6
这块地方正是我修改的代码,而出错的那行是这样的:1
batch.wg.Wait()
invalid memory address or nil , 我在这行代码前面加几行。1
2
3
4if batch == nil {
log.Print("batch is nil")
}
batch.wg.Wait()
果然打出了日志1
2016/08/13 18:06:28 batch is nil
这个错误很有意思了,batch是通过 bufferList 的 add 方法得到,并且在方法的末尾,有空值检查。1
2
3
4
5
6
7
8
9
10
11
12if *cur == nil {
// new tail element
*cur = newBatch(buf, query, auth)
} else {
// append to current batch
b := *cur
b.size += len(buf)
b.bufs = append(b.bufs, buf)
}
l.cond.L.Unlock()
return *cur, nil
首先要排除,我的修改有没有问题,把代码回退,用2000/s的速度测试。但是很不幸,这个速度会让influxdb-relay直接挂起,所以索性把 http.go 请求influxdb的代码改了。1
2
3
4
5
6
7
8
9
10
11
12func (b *simplePoster) post(buf []byte, query string, auth string) (*responseData, error) {
time.Sleep(time.Microsecond * time.Duration(rand.Intn(400)))
if auth == "hello" {
return &responseData{
StatusCode: 204,
}, nil
} else {
return &responseData{
StatusCode: 502,
}, nil
}
}
这里要模拟一个场景:第一次请求的时候均失败,在run方法请求的时候均成功,time.Sleep模拟请求耗时。为了甄别请求的调用者,这里在auth这个参数上做了点文章。
所以要修改下 retry.go 中的 run 方法的调用,把 “hello” 作为参数传递给 SimplePoster.post 方法。1
2
3for {
resp, err := r.p.post(buf.Bytes(), batch.query, "hello")
if err == nil && resp.StatusCode/100 != 5 {
然后用2000/s的速度测试,果然出问题了。1
2
3
4
5
6
7
8
9
10
11
12
13
142016/08/14 09:11:40 starting relays...
2016/08/14 09:11:40 Starting HTTP relay "example-http" on 127.0.0.1:9096
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x0 pc=0x83463]
goroutine 77131 [running]:
panic(0x370cc0, 0xc820014200)
/Users/shane/.gvm/gos/go1.6.2/src/runtime/panic.go:481 +0x3e6
github.com/influxdata/influxdb-relay/relay.(*retryBuffer).post(0xc820010b90, 0xc820164000, 0x3c, 0x200, 0xc8205efbb0, 0x7, 0x0, 0x0, 0xc82002c000, 0x0, ...)
/Users/shane/Documents/gosrc/influxdb-relay/src/github.com/influxdata/influxdb-relay/relay/retry.go:66 +0x273
github.com/influxdata/influxdb-relay/relay.(*HTTP).ServeHTTP.func1(0xc8205efbc0, 0xc8200d5d00, 0xc820164000, 0x3c, 0x200, 0xc8205efbb0, 0x7, 0x0, 0x0, 0xc820022280)
/Users/shane/Documents/gosrc/influxdb-relay/src/github.com/influxdata/influxdb-relay/relay/http.go:211 +0xe8
created by github.com/influxdata/influxdb-relay/relay.(*HTTP).ServeHTTP
/Users/shane/Documents/gosrc/influxdb-relay/src/github.com/influxdata/influxdb-relay/relay/http.go:219 +0xce6
然后把用来模拟http请求耗时的time.Sleep去掉,异常又不发生了。以我这三脚猫的go语言功底,一时间难以发现错误的原因,但是直觉很重要。我在 BufferList.add 的 l.cond.L.Unlock 后面加了一个 time.Sleep , 情况会怎样呢。1
2
3
4
5
6
7
8func (l *bufferList) add(buf []byte, query string, auth string) (*batch, error) {
// ...
l.cond.L.Unlock()
time.Sleep(time.Microsecond * time.Duration(rand.Intn(100)))
return *cur, nil
}
启动之后,一请求就把报错。 经过一番仔细思考,我得出一个结论。 BufferList.add 方法返回了执行 Batch 的指针,而 Unlock 之后, BufferList.pop 方法就会改变 BufferList 中数据的,
这时候post方法中,获取的地址指向的 Batch 已经被 pop 方法改变,很可能已经是nil,所以就报错了。知道了原因修改起来就相对容易了,把 Unlock 调用置后,在 return 之后,
也就是 post 方法中获取到值之后,再 Unlock 。1
2
3
4
5
6
7func (l *bufferList) add(buf []byte, query string, auth string) (*batch, error) {
// ...
defer l.cond.L.Unlock()
return *cur, nil
}
测试之后果然没有再出现之前的错误了。
回到之前的故事。我把所有的请求都扔到了 BufferList 中,这样由于发送速度相对较快,那么必然出现请求合并的场景,这样减少请求次数,增加influxdb的稳定性。
但是当Buffer满的时候,这种情况在请求速度大于消费速度(比如influxdb宕机)的情况下就会发生。如果按照之前的逻辑,那么客户端是不知道自己的这次请求因为 BufferList 满了,而没有成功。
为了解决这个问题,我把 http.go 中用来处理response的代码,加回来,并修改了 retry.go 中的 post 方法。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18func (r *retryBuffer) post(buf []byte, query string, auth string) (*responseData, error) {
pb := getBuf()
pb.Write(buf)
batch, err := r.list.add(pb.Bytes(), query, auth)
if err != nil {
putBuf(pb)
return nil, err
}
go func() {
batch.wg.Wait()
putBuf(pb)
}()
return &responseData{
StatusCode: 204,
}, nil
}
下面分别是 vegeta 在3000/s, 5000/s, 10000/s的测试结果1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46Requests [total, rate] 180000, 3000.02
Duration [total, attack, wait] 59.999890163s, 59.999606586s, 283.577µs
Latencies [mean, 50, 95, 99, max] 290.602µs, 232.224µs, 402.502µs, 1.371521ms, 16.056569ms
Bytes In [total, mean] 0, 0.00
Bytes Out [total, mean] 7200000, 40.00
Success [ratio] 100.00%
Status Codes [code:count] 204:180000
Error Set:
> select count(value) from cpu
name: cpu
---------
time count
0 180000
Requests [total, rate] 300000, 5000.02
Duration [total, attack, wait] 1m0.000013963s, 59.999799896s, 214.067µs
Latencies [mean, 50, 95, 99, max] 258.591µs, 191.622µs, 350.592µs, 1.479882ms, 14.940625ms
Bytes In [total, mean] 0, 0.00
Bytes Out [total, mean] 12000000, 40.00
Success [ratio] 100.00%
Status Codes [code:count] 204:300000
Error Set:
> select count(value) from cpu
name: cpu
---------
time count
0 299997
Requests [total, rate] 600000, 10000.02
Duration [total, attack, wait] 1m0.000158017s, 59.999899912s, 258.105µs
Latencies [mean, 50, 95, 99, max] 329.228µs, 185.111µs, 745.028µs, 4.522189ms, 18.195853ms
Bytes In [total, mean] 0, 0.00
Bytes Out [total, mean] 24000000, 40.00
Success [ratio] 100.00%
Status Codes [code:count] 204:600000
Error Set:
> select count(value) from cpu
name: cpu
---------
time count
0 599989
我的influxdb承受不了vegeta 6000/s以上的攻击,而现在套了influxdb-relay之后就能承受10000/s+的攻击了,虽然真实场景可能更为复杂, 尤其是读和写都会发生的情况,
单从上面的实验可以看出修改版的influxdb-relay已经基本能满足需求了。
参考文档:https://xusheng.org/blog/2016/08/12/influxdb-relay-performance-bottle-neck-analysing/
加油!Coding For Dream!!
I never feared death or dying, I only fear never trying. –Fast & Furious