python 压力测试脚本

CSDN 2024-08-16 13:07:04 阅读 85

需求:

生成一个12位不重复的随机数将随机数赋值给Json 串中的 orderCode字段将Json用ECB 指定 key为bJXQezYtR4ZSNK4p进行加密并作为值传给{

“data”: “”

}设置每秒30个并发持续1分钟调用接口接口输出测试测试报告

代码示例

<code>import json

import random

import string

from Crypto.Cipher import AES

from Crypto.Util.Padding import pad

from Crypto.Random import get_random_bytes # 实际上对于ECB模式,这个随机字节不是必需的

from base64 import b64encode

import requests

from concurrent.futures import ThreadPoolExecutor

# 加密函数

def encrypt_data(data, key):

key = key.encode('utf-8')

key = key[:16] # AES-128

cipher = AES.new(key, AES.MODE_ECB)

ct_bytes = cipher.encrypt(pad(data.encode('utf-8'), AES.block_size))

# 注意:对于ECB模式,通常不添加IV,但如果你需要发送IV(尽管它不会影响解密),可以这样做

# iv = get_random_bytes(AES.block_size) # 对于ECB,这不是必需的

# return b64encode(iv + ct_bytes).decode('utf-8')

return b64encode(ct_bytes).decode('utf-8')

# 生成不重复的随机数(简化版)

def generate_unique_random_string(length=12):

characters = string.ascii_letters + string.digits

result = ''.join(random.choice(characters) for i in range(length))

return result

# 构造请求数据

def prepare_request_data():

data = {

"head": {

"method": "batch_create_order",

"version": "1.0.0",

"requestTime": "2022-07-20 10:06:31"

},

"uaccount": "xxxx",

"body": [{

"mailCode": "TEST201072000MAILCODE",

"orderCode": "TE_" + generate_unique_random_string(10),

"length": 20.3,

"width": 18.1,

"height": 6.3,

"weight": 0.53,

"receiverCompName": "AMZ-tt",

"receiverName": "Kaila Gilam 123456",

"receiverProv": "Atlantico",

"receiverCity": "Barranquilla",

"receiverArea": "Fayetteville",

"receiverHouseNo": "15",

"receiverAddress": "138 Lowery Dr",

"receiverAddress2": "apto 2B",

"receiverMobile": "",

"receiverTel": "3013357358",

"receiverEmail": "yh267lps4r1207d@us.shipping.temuemail.com",

"receiverPostCode": "110121",

"senderCompName": "CAN109",

"senderName": "CAN101",

"senderEngProv": "guangdong",

"senderEngCity": "guangzhou",

"senderAddress": "baiyun helong",

"senderPostCode": "80000",

"senderTel": "123456456",

"senderMobile": "147258369",

"countryCode": "CO",

"countryName": "哥伦比亚",

"productCode": "COL-X",

"tariffType": "ddp",

"vatNumber": "5236523",

"taxNo": "3923982135",

"remark": "",

"referenceNumber": "BG-" + generate_unique_random_string(10),

"declValue": "50",

"declCurrency": "RMB",

"cardType": "2",

"cardNo": "48452153625425361252",

"inner": [{

"gName": "加厚压缩袋",

"gEngName": "compression bag",

"innerContext": "",

"innerQty": 5,

"innerWeight": 0.22,

"innerPrice": 3.5,

"currencyCode": "USD",

"madePlace": "CN",

"hscode": "3923290000",

"model": "无",

"gBrand": "无",

"originCountry": "",

"remark": "",

"sku": "100010",

"material": "塑料",

"purpose": "物品",

"taxNo": "3923",

"salesAddress": "https://suijimimashengcheng.bmcx.com/",

"distribution": "compression",

"electricityFlag": "",

"unitCode": "11",

"secUnitCode": "",

"innerDeclValue": "25",

"innerDeclCurrency": "RMB"

}, {

"gName": "浴裙",

"gEngName": "Bath skirt",

"innerContext": "",

"innerQty": 5,

"innerWeight": 0.472,

"innerPrice": 5.73,

"currencyCode": "USD",

"madePlace": "CN",

"hscode": "9807200000",

"model": "无",

"gBrand": "无",

"originCountry": "",

"remark": "",

"sku": "100010",

"material": "塑料",

"purpose": "物品",

"taxNo": "3923",

"salesAddress": "https://suijimimashengcheng.bmcx.com/",

"distribution": "compression",

"electricityFlag": "",

"unitCode": "11",

"secUnitCode": "",

"innerDeclValue": "25",

"innerDeclCurrency": "RMB"

}

]

}

]

}

return data

# 发送HTTP请求

def send_request(data, key, url):

encrypted_data = encrypt_data(json.dumps(data), key)

response = requests.post(url, json={ "data": encrypted_data})

return response

# 主函数

def main():

url = "http://xx.xx.xx:8017/api/GDPostFeiYouTemu/NotMailTemuOrderPush"

key = "bJXQezYtR4ZSNK4p"

# 设置每秒30个并发请求

with ThreadPoolExecutor(max_workers=30) as executor:

futures = [executor.submit(send_request, prepare_request_data(), key, url) for _ in range(30)]

# 输出测试结果

results = [future.result() for future in futures]

test_report = "\n".join(

[f"Request { i + 1}: Status Code { result.status_code}" for i, result in enumerate(results)])

print(test_report)

if __name__ == "__main__":

main()

返回示例

D:\PYTHON-学习\邮政接口压力测试脚本\pythonProject1\.venv\Scripts\python.exe D:\PYTHON-学习\邮政接口压力测试脚本\pythonProject1\非邮下单接口压力测试脚本.py

Request 1: Status Code 200

Request 2: Status Code 200

Request 3: Status Code 200

Request 4: Status Code 200

Request 5: Status Code 200

Request 6: Status Code 200

Request 7: Status Code 200

Request 8: Status Code 200

Request 9: Status Code 200

Request 10: Status Code 200

Request 11: Status Code 200

Request 12: Status Code 200

Request 13: Status Code 200

Request 14: Status Code 200

Request 15: Status Code 200

Request 16: Status Code 200

Request 17: Status Code 200

Request 18: Status Code 200

Request 19: Status Code 200

Request 20: Status Code 200

Request 21: Status Code 200

Request 22: Status Code 200

Request 23: Status Code 200

Request 24: Status Code 200

Request 25: Status Code 200

Request 26: Status Code 200

Request 27: Status Code 200

Request 28: Status Code 200

Request 29: Status Code 200

Request 30: Status Code 200

进程已结束,退出代码为 0



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。