Ultron使用手册

1. 开发背景

在一次与其他压测工具比较的过程中,我无意发现之前使用的Locust有致命的性能问题:对于极快的GET接口,其产生的压力比AB、JMeter等少了几个量级,再加上我本身不满因Locust使用Python开发只能利用单核而导致经常需要多实例部署的情况,我决定利用Golang原生的高并发能力来重写一个压测工具。

我希望它满足以下特性:

  1. 单实力高并发能力
  2. 支持分布式部署
  3. 支持接入多种协议
  4. 脚本层面借鉴Locust的方式,高度的定制化能力

但与成熟的压测工具相比,Ultron存在以下明显的劣势:

  1. 缺少GUI界面,门槛较高,需要一定的Golang编程能力
  2. 更像一个“压测执行框架”,而不是工具
  3. 目前仅接入http协议,插件也极少
  4. 虽然开源(https://github.com/jacexh/ultron),但未做推广,使用者极少

2 主要对象介绍

2.1 Attacker

type Attacker interface {
    Name() string
    Fire() error
}

Attacker是请求行为、事务的interface,结构体HTTPAttacker是该interface的一个实现,其作用是使用内置net/http包来处理http请求

其中Name()方法返回了请求行为的名称,在http压测中,可以理解为接口名

Fire()方法是请求的行为,当结果返回为nil即没有错误时,则认为请求成功,反之视为请求失败,并记录具体的错误信息

2.2 Runner

type Runner interface {
    WithConfig(*RunnerConfig)
    WithTask(*Task)
    GetConfig() *RunnerConfig
    GetStatus() Status
    Start()
    Done()
}

测试执行器,目前具体的实现有localRunnermasterRunnerslaveRunner,分别用于单机压测、分布式压测的控制台、分布式压测的节点

2.3 RunnerConfig

type RunnerConfig struct {
    Duration    time.Duration `json:"duration"`
    Requests    uint64        `json:"requests"`
    Concurrence int           `json:"concurrence"`
    HatchRate   int           `json:"hatch_rate"`
    MinWait     time.Duration `json:"min_wait"`
    MaxWait     time.Duration `json:"max_wait"`
}

一次压测任务的配置,分别解释下字段含义:

  • Duration:压测持续时间,当为0时,则不控制时长
  • Requests:压测请求总量,注意:该字段无法严格控制
  • Concurrenct:压测并发数
  • HatchRate:压测开始时,每秒启动的线程(goroutine)数
  • MinWaitMaxWait:在一个线程(goroutine)中,两次连续的请求中间的等待时间间隔

2.4 Task

Runner运行的对象,是一个包含了多个Attacker对象的集合,可以为不同的Attacker设置权重,从而影响其压测的比例

3. 内置的HTTPAttacker使用

一个简单的例子:

package main

import (
	"net/http"

	"github.com/jacexh/ultron"
)

const (
	api = "http://10.0.0.30/benchmark"
)

func main() {
	attacker := ultron.NewHTTPAttacker(
        "benchmark", 
        func() (*http.Request, error) { 
            return http.NewRequest(http.MethodGet, api, nil) }
    )
	task := ultron.NewTask()
	task.Add(attacker, 1)

	ultron.LocalRunner.WithTask(task)
	ultron.LocalRunner.Start()
}

内置的HTTPAttacker已经实现了Fire()方法,其负责将传入的*http.Request对象发送到目标地址,并且自行判断status_code等字段,并判断是否错误。

*http.Request从通过调用脚本层实现HTTPAttacker.Prepay()方法获取,如上面代码中的

return http.NewRequest(http.MethodGet, api, nil)

另外,给一个稍微复杂点的例子:

func newBSCRequest() (*http.Request, error) {
	r := request{
		TerminalSN:  terminalSN,
		ClientSN:    atomic.AddUint64(&csn, 1),
		TotalAmount: 1,
		Payway:      paywayAlipay,
		DynamicID:   "285208607915032042",
		Subject:     "wosai-inc",
		Operator:    "123456",
	}
	data, err := ultron.J.Marshal(r)
	if err != nil {
		return nil, err
	}
	req, err := http.NewRequest(http.MethodPost, upayURL+"/upay/v2/pay", bytes.NewBuffer(data))
	if err != nil {
		return nil, err
	}
	req.Header.Set("Content-Type", "application/json")
	return req, nil
}

上面提到了HTTPAttacker.Fire会自行校验status_code,但往往这是不够的,好在HTTPAttacker也支持脚本层定义检验函数:func(*http.Response, []byte) error

如校验支付网关的返回结果:

func checkResponse(r *http.Response, body []byte) error {
	resp := new(response)
	err := ultron.J.Unmarshal(body, resp)
	if err != nil {
		return err
	}
	if resp.ResultCode != "200" {
		return errors.New(resp.ErrorCode + "::" + resp.ErrorMessage)
	}
	if resp.Biz != nil && !strings.Contains(resp.Biz.ResultCode, "SUCCESS") {
		if resp.Biz.Order != nil {
			return errors.New(resp.Biz.ResultCode + "@" + resp.Biz.Order.SN)
		}
		return errors.New(resp.Biz.ResultCode + "::" + resp.Biz.ErrorCode + "::" + resp.Biz.ErrorMessage)
	}
	return nil
}

4. 接入其他协议的Attacker

在上面介绍过了,理论上只要该协议能实现Attacer这个interface,即可被ultron支持,甚至还可以写出这样的Attacker https://github.com/jacexh/ultron/blob/master/example/benchmark/main.go

type (
	benchmark struct{}
)

func (b benchmark) Name() string {
	return "benchmark"
}

func (b benchmark) Fire() error {
	time.Sleep(time.Millisecond * 10)
	return nil
}

稍微实际一点的例子——玛雅云压测:

type MayaAttacker struct {
	payload []byte
	name    string
	pool    *sync.Pool
}

func (m *MayaAttacker) Fire() error {
	conn, err := net.DialTimeout("tcp", serverAddr, TimeoutConnect)
	if err != nil {
		return err
	}
	defer conn.Close()
	conn.SetWriteDeadline(time.Now().Add(TimeoutWrite))
	_, err = conn.Write(m.payload)
	if err != nil {
		return err
	}
	conn.SetReadDeadline(time.Now().Add(TimeoutRead))
	output := make([]byte, 512)
	i, err := conn.Read(output)
	if i == 0 && err == nil {
		return errors.New("invalid response")
	}
	return err
}

func (m *MayaAttacker) Name() string {
	return m.name
}

5. EventHook

目前具备LocalEventHookMasterEventHookSalveEventHook,并且支持对单次结果*Result以及每5秒一次的Result做扩展

type ResultHandleFunc func(*Result)
type ReportHandleFunc func(Report)

比如把这些数据实时写入influxdb (https://github.com/jacexh/ultron/blob/master/helper/influxdb.go)从而实现grafana的实时结果展示

6. 分布式执行

当目标是百万级QPS或者并发时,你也许需要分布式执行来扩展能力了,但一般情况没有必要使用这种方式,另外要注意:我并没有严格测试分布式执行,不确定是否足够健壮

分布式执行的架构可以参考该文章:

slave节点的脚本如下:

package main

import (
	"net/http"

	"github.com/jacexh/ultron"
	"google.golang.org/grpc"
)

func main() {
	task := ultron.NewTask()
	baidu := ultron.NewHTTPAttacker("nginx", func() (*http.Request, error) {
		req, err := http.NewRequest(http.MethodGet, "http://www.baidu.com/", nil)
		if err != nil {
			return nil, err
		}
		return req, nil
	})
	task.Add(baidu, 1)

	ultron.SlaveRunner.Connect("127.0.0.1:9500", grpc.WithInsecure())
	ultron.SlaveRunner.WithTask(task)
	ultron.SlaveRunner.Start()
}

RunnerConfig参数将会由master节点下发,无须在slave脚本层定义

通过调用master监听的http接口来POST /start触发任务的执行

7. TODO

  1. 加个可爱的web ui