快捷搜索:  汽车  科技

旗下免费压力测试平台如何使用(压力测试工具-locust使用)

旗下免费压力测试平台如何使用(压力测试工具-locust使用)4、权重3、思考时间由于许多设置和清除操作是相互依赖的,因此以下是它们的执行顺序:import hashlib import json import logging from urllib import parse from locust import TaskSet task between from locust.contrib.fasthttp import FastHttpUser logging.basicConfig(level=logging.INFO # filename="info.log" filemode="w " format="%(asctime)s %(filename)s %(funcName)s %(levelname)s Line:%(lineno)s :%(mes

locust简介

Locust直译是“蝗虫”的意思,意在压测时产生的压力就像是漫天蝗虫一样,铺天盖地。

Locust是用Python实现的开源性能测试框架,不同于其他压测工具基于进程/线程产生压力,Locust是完全基于事件,支持分布式,一个Locust节点可以在一个进程中轻松支持上千并发用户。

locust特点
  • 基于协程 低成本实现更多并发;
  • 脚本增强(“测试即代码”),不支持脚本录制,通过编写代码,提供更大的灵活性;
  • HttpUser类使用了requests发送http请求,FastHttpUser类基于geventhttpclient实现,性能更高;
  • 分布式压测更加简单,可以在同一服务器上使用分布式,也可以在多台服务器上进行分布式压测;
  • 使用Flask 提供WebUI,同时提供无界面压测;
  • 有第三方插件、 易于扩展,只要有相关协议的包,可以随机扩压测压。
locust安装

pip install locust # 查看安装版本 locust -v

locust使用帮助:locust --help

旗下免费压力测试平台如何使用(压力测试工具-locust使用)(1)

测试脚本执行事件顺序

由于许多设置和清除操作是相互依赖的,因此以下是它们的执行顺序:

  1. Locust setup (一次)
  2. TaskSet setup (一次)
  3. TaskSet on_start (每个locust一次)
  4. TaskSet tasks…
  5. TaskSet on_stop (每个locust一次)
  6. TaskSet teardown (一次)
  7. Locust teardown (一次) 通常,setup和teardown方法应该是互补的。
http协议示例
  1. 创建测试用户类,用于继承HttpUser或者FastHttpUser;
  2. 创建测试类,继承TaskSet类;
  3. __init__()方法进行数据初始化;
  4. on_start方法进行测试用例执行前准备,如执行登录;
  5. on_end方法测试结束后执行;
  6. 测试用例编写,待测用例方法前加@locust.task(weight: int=1)装饰器
  7. 使用self.client进行get/post/put/delete等请求

import hashlib import json import logging from urllib import parse from locust import TaskSet task between from locust.contrib.fasthttp import FastHttpUser logging.basicConfig(level=logging.INFO # filename="info.log" filemode="w " format="%(asctime)s %(filename)s %(funcName)s %(levelname)s Line:%(lineno)s :%(message)s") logger = logging.getLogger(__name__) HOST = "10.209.1.12" PORT = "8081" def get_hash_md5(text): # 返回MD5值 text = text.encode('utf-8') if isinstance(text str) else text # 转换bytes try: # m = hashlib.md5(b'credit') m = hashlib.md5() m.update(text) return m.hexdigest() except Exception as e: return False def dict_to_params_str(di): s = "" for k v in di.items(): if isinstance(v dict): v = str(v) s = "{}={}&".format(k v) return s[:-1] def get_data_result(data): data = data.decode('utf-8') if isinstance(data bytes) else data data = json.loads(data) if isinstance(data str) else data if int(data.get("errno")) == 0: return True return False class UserTask(TaskSet): def __init__(self parent): super().__init__(parent) self._cookies = "" self._headers = headers def on_start(self): logger.info("测试开始...") headers = { "Accept": "application/json text/plain */*" "Accept-Encoding": "gzip deflate" "Accept-Language": "zh-CN zh;q=0.9" "Connection": "keep-alive" "Cookie": "sidebarStatus=1" "Host": "{}:{}".format(HOST PORT) "Origin": "http://{}:{}".format(HOST PORT) "Referer": "http://{}:{}/dist/".format(HOST PORT) "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8" "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML like Gecko) Chrome/80.0.3987.100 Safari/537.36" } data = { "username": "admin" "password": "62638c576e044bffcbe34ff77554d43e"} resp = self.client.post("/login" data=dict_to_params_str(data) name='登录' headers=headers catch_response=True) if resp.status_code == 200: res = get_data_result(resp.content.decode('utf-8')) if res: self._cookies = resp.headers['Set-Cookie'] logger.info(self._cookies) self._headers["Cookie"] = self._cookies logger.info("登录成功!") else: logger.warning("登录失败:{}".format(resp.content.decode('utf-8'))) return def on_stop(self): logger.info("测试结束!") @task def test_host_getlist_terminal(self): params = { "limit": "10" "page": "1" "prodcombo": "360exthost/pcinfo" "filter": '{"id": "-1"}' } with self.client.get("/host/getlist" name='终端概况' data=dict_to_params_str(params) headers=self._headers catch_response=True) as resp: if resp.status_code == 200: if get_data_result(resp.content.decode('utf-8')): resp.success() else: resp.failure("resp data error:{}".format(resp.content.decode('utf-8'))) else: resp.failure('request status error:{}'.format(resp.status_code)) class WebsiteUser(FastHttpUser): # HttpUser host = 'http://10.209.1.12:8081' wait_time = between(0 0.01) # 每个请求间隔/秒 tasks = [UserTask] if __name__ == '__main__': # os.system("locust -f web_api.py --host=http://10.173.220.46:8081 --csv=req --logfile=info.log") os.system("locust -f web_api.py --loglevel=INFO")Websocket协议示例

class WebSocketClient(InstanceClass): def __init__(self): headers = ["Accept-Encoding:gzip deflate br" "Accept-Language:zh-CN zh;q=0.9" "Cache-Control:no-cache" "Connection:Upgrade" "Host:{}:36060".format(HOST) "Origin:file://" "Pragma:no-cache" "Sec-WebSocket-Extensions:permessage-deflate; client_max_window_bits" "Sec-WebSocket-Key:{}".format(get_websocket_key()) "Sec-WebSocket-Version:13" "Upgrade:websocket" "User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML like Gecko) Chrome/80.0.3987.100 Safari/537.366" ] self.ws = create_connection(WS_URL header=headers sslopt={"cert_reqs": ssl.CERT_NONE}) def on_message(self message): message = message.decode('utf-8') if isinstance(message bytes) else message logger.info("接收数据: {}".format(message)) def on_error(self error): logger.info("错误信息: {}".format(error)) def on_close(self): logger.info("### closed ###") def on_open(self): logger.info("####on openen####") class WebSocketLocust(User): abstract = True def __init__(self parent): super(WebSocketLocust self).__init__(parent) self.s_client = WebSocketClient() class UserBehavior(TaskSet): def __init__(self parent): super().__init__(parent) self.wss = self.parent.s_client.ws def get_m2_info(self): try: param = self.parent.q.get() # self.parent.q.put_nowait(param) # 获取完数据重新加入队列 return param except queue.Empty: logger.error("queue is Empty!!!") exit(0) def on_start(self): logger.info(f"发送AUTH数据...") self.client_auth() logger.info(f"资产上报...") self.test_asset_push() # self.wss.ping() self.wss.pong() threading.Thread(target=self.recv).start() # self.recv() def on_stop(self): logger.info('---------- task stop ------------') self.wss.close() def recv(self): logger.info("进入接收数据方法...") while True: res = self.wss.recv() try: data = json.loads(res) logger.info("接收数据receive:{}".format(data)) except websocket.WebSocketTimeoutException as e: logger.exception("接收数据异常:{}".format(str(e))) except websocket.WebSocketConnectionClosedException as e: logger.warning("websocket close!!!") def assert_result(self name result start_time msg=""): if result: events.request_success.fire( request_type=name name=name "_succ" response_time=int(time.time()) - start_time response_length=0) else: events.request_failure.fire(request_type=name name=name "_fail" response_time=int(time.time()) - start_time exception=msg response_length=1) def sends(self msg name): # logger.info("发送类型:{} 数据类型:{} 数据: {}!".format(name type(msg) msg)) start_time = int(time.time()) try: self.wss.send(json.dumps(msg)) self.assert_result(name True start_time) except Exception as e: logger.error(f"发送错误: {str(e)}") self.assert_result(name False start_time msg=str(e)) def client_auth(self): # 客户端认证 req = {"msgType": 0 "epGuid": self.uuid "authData": get_sha256(self.uuid ".eppmc")} logger.info("客户端认证,发送AUTH数据: {}".format(req)) self.sends(req "auth_conn") @task def policy_type_task(self): # 3. 获取策略 POLICY_REQUEST: 101 _policy_type = random.choice(policy_type) name = sys._getframe().f_code.co_name req = {"epGuid": self.uuid "_os": 1 "msgType": 104 "_guid": self.uuid "policyHash": get_str_md5(self.uuid) "policyType": "SetSecurityReinforce"} self.sends(req name=name)NSQ协议示例

import queue import gnsq from locust import between task User TaskSet events tag class InstanceClass(object): _instance = None def __new__(cls *args **kwargs): if not cls._instance: cls._instance = super(InstanceClass cls).__new__(cls *args **kwargs) return cls._instance class WebNsqclient(InstanceClass): """nsq连接""" def __init__(self): super(WebNsqclient self).__init__() """建立nsq连接""" self.producer_list = [] self.topic = "CloudSafeLine.EEPP" # self.cwpp_topic = "CloudSafeLine.CCPP" # _nsq_addr = f"{random.choice(nsq_host_list)}:{nsq_port}" self.producer = gnsq.Producer(nsq_addr) self.producer.start() @property def producers(self): return random.choice(self.producer_list) def on_message(self data): """nsq消息发送,发送成功返回OK""" zip_data = data_compress(data) res = self.producer.publish(self.topic zip_data) # 报文发送 return res def on_end(self): """nsq连接关闭""" self.producer.close() class WebNsqLocust(User): """自定义locust User类""" abstract = True def __init__(self parent): super(WebNsqLocust self).__init__(parent) self.client = WebNsqclient() class UserTask(TaskSet): def __init__(self parent): super().__init__(parent) monitor_name = "monitor" def get_m2_info(self): try: param = self.parent.q.get() # 参数化 # self.parent.q.put_nowait(param) # 获取完数据重新加入队列 return param except queue.Empty: logger.error("queue is Empty!!!") exit(0) def on_start(self): logger.debug("开始测试...") # threading.Thread(target=monitor_redis).start() global START_FLAG if not START_FLAG: START_FLAG = True threading.Thread(target=monitor_redis).start() # redis监控 monitor_name = "monitor" monitor_time = 5 # 分钟 minutes easynmon_monitor_start(easynmon_ip_port name=monitor_name monitor_time=monitor_time) # easynmon监控 def on_end(self): logger.debug("测试结束!!!") self.client.on_end() easynmon_monitor_stop(easynmon_ip_port) def assert_result(self name result start_time): if result.decode() == "OK": events.request_success.fire( request_type=name name=name "_succ" response_time=int((time.time() - start_time) * 1000) response_length=0) else: events.request_failure.fire(request_type=name name=name "_fail" response_time=int((time.time() - start_time) * 1000) exception=result response_length=1) @task @tag("pluginmgr" "test") def pluginmgr_task(self): name = sys._getframe().f_code.co_name param = self.get_m2_info() data = pluginmgr_info(param) start_time = time.time() res = self.client.on_message(data) self.assert_result(name res start_time)locust扩展增强

1、参数化

  • 引入队列的概念 queue ,实现方式是将参数推入队列,测试时依次取出,全部取完后 locust 会自动停止。若是使用参数循环压测,需要将取出的参数再推入队尾。

2、检查点

  • 使用self.client提供的catch_response=True`参数, 添加locust提供的ResponseContextManager类的上下文方法手动设置检查点。
  • ResponseContextManager里面的有两个方法来声明成功和失败,分别是success和failure。其中failure方法需要我们传入一个参数,内容就是失败的原因。

3、思考时间

  • wait_time = between(0 0.01)
  • wait_time = constant(1)
  • wait_time = constant_pacing/1)

4、权重

  • 类中测试方法执行比例通过@task(int)进行控制,默认1;
  • 一个文件多个类中可以通过weight=1进行控制;

5、集合点:模拟一定数量的用户,同时并发请求。

from locust import HttpUser TaskSet task between events from gevent._semaphore import Semaphore all_locusts_spawned = Semaphore() # 创建集合点 all_locusts_spawned.acquire() # 阻塞线程 # 注册事件 @events.spawning_complete.add_listener def on_hatch_complete(**kwargs): # Select_task类的钩子方法 # 挂在到locust钩子函数(所有的Locust实例产生完成时触发 all_locusts_spawned.release() num = 0 class UserTask(TaskSet): def login(self): global num num = 1 print("%s个虚拟用户开始启动,并登录"%n) self.client.post("/login" data={"username":"wantest001gwu" "password":"test00123"}) def logout(self): print("退出登录") def on_start(self): self.login() all_locusts_spawned.wait() @task(4) def test1(self): param = {"limit":1 "offset":0} with self.client.get("/list" params=param headers={} catch_response = True) as response: print("用户浏览首页商品列表")

6、分布式

locust -f locust.py --master --web-host=0.0.0.0 --loglevel=ERROR

locust -f locust.py --worker --master-host=10.20.14.24 --loglevel=ERROR

7、无界面测试

旗下免费压力测试平台如何使用(压力测试工具-locust使用)(2)

总结

locust不支持录制,使用python进行脚本开发,虽然需要一定编码基础,但是灵活性更大;可以通过代码组合方式实现更多场景!

通过代码增强,测试脚本中日常用到的检查点、参数化、集合、思考时间等都可以实现,而且分布式压测特别方便!

缺点是压测过程中,对服务器的检测需要借助其他工具进行!

旗下免费压力测试平台如何使用(压力测试工具-locust使用)(3)

#学习##python##性能测试#

猜您喜欢: