使用 pprof 排查 Go 消费 RabbitMQ 引起的内存占用过高
有一天,压力测试组的主管发来了这样的一张图:
并附上了一句话: MQ消息堆积时,服务器内存占用99% ,210台设备,每5秒一条记录,运行3小时。
看到这样的消息,我心里想着,终于有机会碰上 RabbitMQ 的调优场景了,得赶紧试试
于是很快啊,登上服务器,查看 RabbitMQ 面板:
大失所望,图中可见,Memory 占 210 M 没有超过 High Watermark 的 6.4 GB,再结合第一张图里所示,任务管理器占用最高的程序是 main.exe
,二话不说,开启 pprof,准备查看更详细的情况。
目前还得知,消息体里会携带 2 张照片,一张大小约为 49.5 k。
先通过以下命令,简单抓取一份 svg 图片
go tool pprof -http=:8000 https+insecure://localhost:20000/debug/pprof/heap?seconds=30
由 graphviz 生成的 svg 图片,清晰的展示这,amqp 包里的 recvContent 方法分配了很多内存,而且消息体的准确大小是 104 kb,接下来就是上 pprof 三板斧:top
/ list
/ web
list recvContent
Total: 103.85GB
ROUTINE ======================== github.com/streadway/amqp.(*Channel).recvContent in C:\Users\Administrator\go\pkg\mod\github.com\streadway\amqp@v1.0.0\channel.go
39.49GB 39.61GB (flat, cum) 38.14% of Total
. . 400: // drop and reset
. . 401: return ch.transition((*Channel).recvMethod)
. . 402:
. . 403: case *bodyFrame:
. . 404: if cap(ch.body) == 0 {
39.49GB 39.49GB 405: ch.body = make([]byte, 0, ch.header.Size)
. . 406: }
. . 407: ch.body = append(ch.body, frame.Body...)
. . 408:
. . 409: if uint64(len(ch.body)) >= ch.header.Size {
. . 410: ch.message.setContent(ch.header.Properties, ch.body)
. 121.54MB 411: ch.dispatch(ch.message) // termination state
. . 412: return ch.transition((*Channel).recvMethod)
. . 413: }
. . 414:
. . 415: return ch.transition((*Channel).recvContent)
. . 416: }
感觉越来约接近真相了,通过 list recvContent
之后可以发现,程序似乎一直在消费大量的 rabbitMQ 消息,再结合最开始的图片可以看到,deliver/get 速率为:100+/s,而 ack 速率则在 15/s,这是不是可以猜测,Go 端的消费者,一秒可以拿到 100 条数据,缓存在内存中,而实际的 ack 能力则为 15 条每秒,带着这份疑问,准备尝试限制一下消费能力,在 Go 程序中,施加了一份魔法:
conn, err := amqp.Dial(r.AmqpUrl)
if err != nil {
failOnError(err, "amqp dial err")
return nil
}
ch, err := conn.Channel()
if err != nil {
failOnError(err, "openChannel err")
return nil
}
//消费限制
ch.Qos(1, 0, true)
修改完代码,重启服务,再进行压测,得到了如下数据:
内存数据,已经从原有的 90%+ 的占用,稳定到 15% 以内,且 ack 的能力并没有受到影响,由此便暂时告一段落~
有关 PerfetchCount 设置值,可以参考以下几篇文章:
How to Optimize the RabbitMQ Prefetch Count
Some queuing theory: throughput, latency and bandwidth
Next:Go Iris 11.1 框架 Websocket 错误处理优化