Commit 218e606f authored by liyuanhong's avatar liyuanhong

第一次提交

parents
Pipeline #88 canceled with stages
.idea/*
venv/*
\ No newline at end of file
#coding:utf-8
#########################################################
#
# 定义基类,供所有类继承
#
#########################################################
import threading
class Base(threading.Thread):
def __init__(self):
pass
#coding:utf-8
import binascii
import json
import socket
import threading
import time
from lib.multiThread.ThreadBase import ThreadBase
from lib.protocol.message.Location_msg import Location_msg
from lib.protocol.message.TerminalRegister_msg import TerminalRegister_msg
class SendMultMsgThread():
def __init__(self,host="10.100.11.20",port=9001,msg=""):
self.host = host
self.port = port
self.msg = msg
self.timeOut = 1 #socket超时时间
self.BUF_SIZE = 1024 #接收消息缓存
self.threadCount = 10000 #并发线程数
self.totalTime = 0 #所有线程的运行总和
self.threadArr = {} #保存每个线程的信息
self.failThreadCount = 0 #失败线程数
pass
############################################
# 设置host
############################################
def setHost(self,host):
self.host = host
############################################
# 设置端口号
############################################
def setPort(self,port):
self.port = port
############################################
# 设置消息
############################################
def setMsg(self,msg):
self.msg = msg
############################################
# 设置并发线程数
############################################
def setThreadCount(self,threadCount):
self.threadCount = threadCount
############################################
# 发送一条消息
############################################
def sendMsg(self,msg,threadName):
msg = msg
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳
client.settimeout(self.timeOut)
startTime = int(time.time() * 1000)
try:
client.connect((self.host, self.port))
client.send(binascii.a2b_hex(msg))
except BaseException as e:
client.close()
self.threadArr[threadName]["status"] = 1
self.failThreadCount = self.failThreadCount + 1
print("连接超时,socket断开")
return
try:
data = client.recv(self.BUF_SIZE)
# print(data)
except BaseException as e:
# traceback.print_exc()
client.close()
# raise RuntimeError('socket 接收消息超时!')
self.threadArr[threadName]["status"] = 1
self.failThreadCount = self.failThreadCount + 1
print('socket 接收消息超时!')
return
endTime = int(time.time() * 1000)
timeExpend = endTime - startTime
self.threadArr[threadName]["timeExp"] = timeExpend
self.totalTime = self.totalTime + timeExpend
client.close()
############################################
# 启动并发线程
############################################
def startThread(self):
timeStart = int(time.time() * 1000)
for i in range(0,self.threadCount):
threadName = "thread" + str(i)
# theThread = threading.Thread(target=self.sendMsg, args=("7e0002000001314620111800065b7e",threadName,)) # 数据写死,心跳
theThread = threading.Thread(target=self.sendMsg,args=("4040007000094d20191201000200120114030409123426d7fffff0000000000505000000143c00000bb80100000fa00000000a0000000000005e5f68e768e739331e100055320000001312001007d0001e0000000000000096000000280096ffff3e0001f40000003e00000000000000000000000f9a", threadName,)) # 数据写死
# theThread = threading.Thread(target=self.sendMsg, \
# args=(TerminalRegister_msg().generateMsg_random(), threadName,)) #终端注册
# theThread = threading.Thread(target=self.sendMsg, \
# args=(Location_msg().generateMsg_random(), threadName,)) #地理位置
threadInfo = {}
threadInfo["name"] = threadName
threadInfo["status"] = 0
self.threadArr[threadName] = threadInfo
theThread.start()
timeEnd = int(time.time() * 1000)
timeExpend = timeEnd - timeStart
time.sleep(6)
print("耗时:" + str(timeExpend) + " 毫秒")
print("并发数据每秒发送:" + str(int(self.threadCount / (timeExpend / 1000))))
print("平均响应时间:" + str(self.totalTime / self.threadCount) + "毫秒")
print("发送总数:" + str(self.threadCount))
print("响应失败数:" + str(self.failThreadCount))
self.writeToFile("../../data/threadDetails.json",self.threadArr)
# print(json.dumps( self.threadArr))
def writeToFile(self,path,data):
with open(path, "w", encoding='utf-8') as fi:
json.dump(data, fi)
# fi.write(data)
if __name__ == "__main__":
t = SendMultMsgThread()
t.setHost("10.100.12.32")
t.setPort(9008)
# t.setMsg("7e0002000001314620111800065b7e")
# t.setMsg("7e020001020131462011190001fffc7fff001c010401c0a6380659ad7a02090042003b200204185704310102EA6600010400000000000204001e7c1f0003050A0001f400000405020001d4c000050400057d0240000604000119400007040007530000100c0004006403f203f203f203f2001114ffffffffffffffffffff00200000000000000000001202002400130106001D0101EB7960C0020bb860D0013c62f00203216050014c60F0015860B001146330011c646001416490012060A00201146014010160100102610002022661100201f561F0020e746210040000119c6040012c60700200e660E00203206701010067020100670301016704024e20670502000067060200416707040000017d02097e")
t.startThread()
#coding:utf-8
import binascii
import json
import random
import socket
import threading
import time
from lib.pressure.ThreadBase import ThreadBase
class SendMultMsgThread_m300(ThreadBase):
def __init__(self,host="10.100.11.20",port=9001,msg=""):
self.host = host
self.port = port
self.msg = msg
self.timeOut = 30 #socket超时时间
self.BUF_SIZE = 1024 #接收消息缓存
self.threadCount = 10000 #并发线程数
self.totalTime = 0 #所有线程的运行总和
self.threadArr = {} #保存每个线程的信息
self.failThreadCount = 0 #失败线程数
self.durThreads = [] #持续发送线程数组,当数组为空,表示所有线程已经结束
dt = 1 * 20 * 60
self.durTime = dt #线程持续时间
self.connectTimeoutNum = 0 #连接超时线程数
self.sendTimeoutNum = 0 #发送超时线程数
self.reviceTimeoutNum = 0 #接收超时线程数
self.sucessNum = 0 #成功线程数
self.failThreadCount = 0 #失败线程数
self.setStartCarNumber = 0 #开始车机号
self.messageCon = [] #用来统计每个线程所发的消息数
self.messageCons = 0 # 用来统计每个线程所发的消息数
pass
############################################
# 设置host
############################################
def setHost(self,host):
self.host = host
############################################
# 设置端口号
############################################
def setPort(self,port):
self.port = port
############################################
# 设置消息
############################################
def setMsg(self,msg):
self.msg = msg
############################################
# 设置并发线程数
############################################
def setThreadCount(self,threadCount):
self.threadCount = threadCount
############################################
# 设置持续时间
############################################
def setDurTime(self,durTime):
self.durTime = durTime
############################################
# 设置超时时间
############################################
def setTimeOut(self,timeOut):
self.timeOut = timeOut
############################################
# 设置开始车机号
############################################
def setSetStartCarNumber(self,setStartCarNumber):
self.setStartCarNumber = setStartCarNumber
############################################
# 发送一条消息
############################################
def sendMsg(self,msg,threadName):
msg = msg
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳
client.settimeout(self.timeOut)
startTime = int(time.time() * 1000)
try:
client.connect((self.host, self.port))
client.send(binascii.a2b_hex(msg))
except BaseException as e:
client.close()
self.threadArr[threadName]["status"] = 1
self.failThreadCount = self.failThreadCount + 1
print("连接超时,socket断开")
return
try:
data = client.recv(self.BUF_SIZE)
# print(data)
except BaseException as e:
# traceback.print_exc()
client.close()
# raise RuntimeError('socket 接收消息超时!')
self.threadArr[threadName]["status"] = 1
self.failThreadCount = self.failThreadCount + 1
print('socket 接收消息超时!')
return
endTime = int(time.time() * 1000)
timeExpend = endTime - startTime
self.threadArr[threadName]["timeExp"] = timeExpend
self.totalTime = self.totalTime + timeExpend
client.close()
############################################
# 持续发送消息
# dur: 持续发送时间,如果为0一直发送, 设置了事件为持续发送多少秒,默认为一分钟
############################################
def sendMsgContinuous(self,carId,threadName="thread0"):
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳
client.settimeout(self.timeOut)
startTime = int(time.time())
endTime = int(time.time())
msgCon = 0 #统计线程发的消息数量
self.durThreads.append(threadName)
try:
client.connect((self.host, self.port))
except BaseException as e:
client.close()
self.durThreads.remove(threadName)
self.threadArr[threadName]["status"] = 1
self.failThreadCount = self.failThreadCount + 1
endTime = int(time.time())
timeExpend = endTime - startTime
self.threadArr[threadName]["timeExp"] = timeExpend
self.connectTimeoutNum = self.connectTimeoutNum + 1
self.messageCon.append(msgCon)
self.threadArr[threadName]["msgCon"] = msgCon
print(threadName + ":" + "连接超时,socket断开")
return
while (endTime - startTime) < self.durTime:
msg = self.getRandomMsg_M300(carId)
try:
client.send(binascii.a2b_hex(msg))
msgCon = msgCon + 1
self.messageCons = self.messageCons + 1
except BaseException as e:
client.close()
self.durThreads.remove(threadName)
self.threadArr[threadName]["status"] = 1
self.failThreadCount = self.failThreadCount + 1
self.sendTimeoutNum = self.sendTimeoutNum + 1
endTime = int(time.time())
timeExpend = endTime - startTime
self.threadArr[threadName]["timeExp"] = timeExpend
self.sendTimeoutNum = self.sendTimeoutNum + 1
self.messageCon.append(msgCon)
self.threadArr[threadName]["msgCon"] = msgCon
print(threadName + ":" + "发送超时,socket断开")
return
try:
data = client.recv(self.BUF_SIZE)
except BaseException as e:
client.close()
self.durThreads.remove(threadName)
self.threadArr[threadName]["status"] = 1
self.failThreadCount = self.failThreadCount + 1
self.reviceTimeoutNum = self.reviceTimeoutNum + 1
self.messageCon.append(msgCon)
self.threadArr[threadName]["msgCon"] = msgCon
print(threadName + ":" + 'socket 接收消息超时!')
endTime = int(time.time())
timeExpend = endTime - startTime
self.threadArr[threadName]["timeExp"] = timeExpend
return
endTime = int(time.time())
sleepTime = random.randint(1, 5)
time.sleep(sleepTime)
endTime = int(time.time())
timeExpend = endTime - startTime
self.threadArr[threadName]["timeExp"] = timeExpend
self.messageCon.append(msgCon)
self.threadArr[threadName]["msgCon"] = msgCon
client.close()
self.sucessNum = self.sucessNum + 1
self.durThreads.remove(threadName)
############################################
# 启动并发线程
############################################
def startThread(self):
timeStart = int(time.time() * 1000)
for i in range(0,self.threadCount):
threadName = "thread-" + str(i)
theThread = threading.Thread(target=self.sendMsg, args=("7e0002000001314620111800065b7e",threadName,)) # 数据写死,心跳
threadInfo = {}
threadInfo["name"] = threadName
threadInfo["status"] = 0
self.threadArr[threadName] = threadInfo
theThread.start()
timeEnd = int(time.time() * 1000)
timeExpend = timeEnd - timeStart
time.sleep(3)
print("耗时:" + str(timeExpend) + " 毫秒")
print("并发数据每秒发送:" + str(int(self.threadCount / (timeExpend / 1000))))
print("平均响应时间:" + str(self.totalTime / self.threadCount) + "毫秒")
print("发送总数:" + str(self.threadCount))
print("响应失败数:" + str(self.failThreadCount))
self.writeToFile("./threadDetails.json",self.threadArr)
############################################
# 启动持续并发线程
############################################
def startThreadContinuous(self):
timeStart = int(time.time() * 1000)
for i in range(0,self.threadCount):
threadName = "thread-" + str(i)
print(threadName)
carid = 201912000000 + i + self.setStartCarNumber
theThread = threading.Thread(target=self.sendMsgContinuous, args=(carid,threadName,)) # 数据写死,心跳
threadInfo = {}
threadInfo["name"] = threadName
threadInfo["status"] = 0
self.threadArr[threadName] = threadInfo
theThread.start()
timeEnd = int(time.time() * 1000)
timeExpend = timeEnd - timeStart
print("耗时:" + str(timeExpend) + " 毫秒产生了" + str(self.threadCount) + "线程")
time.sleep(0.5) #防止启动的时候溜掉某些启动比较慢的线程
tmp = 1
while len(self.durThreads) != 0:
print("剩余线程数_" + str(tmp) + ":" + str(len(self.durThreads)))
tmp = tmp + 1
timeArray = time.localtime(timeStart / 1000)
testStart = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
timeCur = int(time.time() * 1000)
timeArray = time.localtime(timeCur / 1000)
testCur = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
info_0 = "-------------------------- 统计信息(pre) --------------------------"
info_1 = "耗时:" + str(timeExpend) + " 毫秒产生了" + str(self.threadCount) + "线程"
info_2 = "开始测试时间:" + testStart
info_4 = "设置socket超时时间:" + str(self.timeOut)
info_5 = "设置线程持续时间:" + str(self.durTime)
info_6 = "剩余线程数:" + str(len(self.durThreads))
info_8 = "连接失败:" + str(self.connectTimeoutNum)
info_9 = "发送失败:" + str(self.sendTimeoutNum)
info_10 = "接收失败:" + str(self.reviceTimeoutNum)
info_11 = "当前写入时间:" + testCur
info_12 = "当前发送消息总数:" + str(self.messageCons)
result = info_0 + "\n" + info_1 + "\n" + info_2 + "\n" + info_4 + "\n" + info_5 + "\n" + info_6 + "\n"
result = result + info_8 + "\n" + info_9 + "\n" + info_10 + "\n" + info_11 + "\n" + info_12 + "\n"
self.writeToFile("./result_pre.txt",result)
time.sleep(5)
timeArray = time.localtime(timeStart / 1000)
testStart = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
timeEnd = int(time.time() * 1000)
timeArray = time.localtime(timeEnd / 1000)
testestEnd = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
time.sleep(2) #防止线程慢的时候,某些线程被漏统计的情况
info_0 = "-------------------------- 统计信息 --------------------------"
print(info_0)
totalMsg = 0
for i in self.messageCon:
totalMsg = totalMsg + i
info_1 = "耗时:" + str(timeExpend) + " 毫秒产生了" + str(self.threadCount) + "线程"
info_2 = "开始测试时间:" + testStart
info_3 = "结束测试时间:" + testestEnd
info_4 = "设置socket超时时间:" + str(self.timeOut)
info_5 = "设置线程持续时间:" + str(self.durTime)
info_6 = "成功线程数:" + str(self.sucessNum)
info_7 = "消息总数:" + str(totalMsg)
info_8 = "连接失败:" + str(self.connectTimeoutNum)
info_9 = "发送失败:" + str(self.sendTimeoutNum)
info_10 = "接收失败:" + str(self.reviceTimeoutNum)
print(info_1)
print(info_2)
print(info_3)
print(info_4)
print(info_5)
print(info_6)
print(info_7)
print(info_8)
print(info_9)
print(info_10)
self.writeToFile("./threadDetailsContinuous.json",json.dumps(self.threadArr))
info_11 = self.getInfoFromResult("./threadDetailsContinuous.json")
result = info_0 + "\n" + info_1 + "\n" + info_2 + "\n" + info_3 + "\n" + info_4 + "\n" + info_5 + "\n" + info_6 + "\n"
result = result + info_7 + "\n" + info_8 + "\n" + info_9 + "\n" + info_10 + "\n" + info_11
self.writeToFile("./result.txt", result)
def writeToFile(self,path,data):
with open(path, "w", encoding='utf-8') as fi:
#json.dump(data, fi)
fi.write(data)
#获取随机消息数据(M300车机)
def getRandomMsg_M300(self,carId):
# carId = 201912010002
wh = random.randint(0,2)
msg = ""
if wh == 0:
hearbeat_msg = "7e000400e14d" + str(carId) + "0000"
hearbeat_msg = hearbeat_msg + self.getCheckCode(hearbeat_msg[2:]) + "7e"
hearbeat_msg = "7e" + self.replace7e7d(hearbeat_msg[2:][:-2]) + "7e"
msg = hearbeat_msg
elif wh == 1:
GPS_msg = "7e002000fb4d" + str(carId) + "002414031003351501c3422106588b8c0703200c520000000000007403200100000000000000"
GPS_msg = GPS_msg + self.getCheckCode(GPS_msg[2:]) + "7e"
GPS_msg = "7e" + self.replace7e7d(GPS_msg[2:][:-2]) + "7e"
msg = GPS_msg
elif wh == 2:
OBD_msg = "7e000300fa4d" + str(carId) + "005d140310033513117f3f0effff3f30f9000001000000000000000000fffffe8000740200008e8800002ac2010003e8503232320000501403e80069006903840000000000640a03e802580000000000000384000000000000000000000000"
OBD_msg = OBD_msg + self.getCheckCode(OBD_msg[2:]) + "7e"
OBD_msg = "7e" + self.replace7e7d(OBD_msg[2:][:-2]) + "7e"
msg = OBD_msg
return msg
def getInfoFromResult(self,path):
with open(path, "r", encoding='utf-8') as fi:
d = fi.read()
d = json.loads(d)
msgC = 0
failT = 0
for i in d:
msgC = msgC + d[i]["msgCon"]
if d[i]["status"] == 1:
failT = failT +1
info_1 = "文件统计总消息数:" + str(msgC)
info_2 = "文件统计总失败数:" + str(failT)
print(info_1)
print(info_2)
return info_1 + "\n" +info_2 + "\n"
#######################################################
# 获取校验码(M300,新硬件)
#######################################################
def getCheckCode(self,data="aa"):
if len(data) % 2 == 1:
raise RuntimeError('数据段错误!')
start = data[0:2]
tmp = int(start,16)
for i in range(2,len(data),2):
tmp = tmp ^ int(data[i:i + 2],16)
dataHex = self.int2hexStringByBytes(tmp)
return dataHex
#######################################################
# 替换消息中的7e7d字符
#######################################################
def replace7e7d(self,data):
tmpR = data
tmp = tmpR[0:2]
tmpA = tmpR[0:2]
tmpR = tmpR[2:]
data = ""
while tmpA != "":
if tmp == "7d":
tmp = "7d01"
elif tmp == "7e":
tmp = "7d02"
data = data + tmp
tmp = tmpR[0:2]
tmpA = tmpR[0:2]
tmpR = tmpR[2:]
return data
#####################################################
# 数字转换为16进制字符串,通过传入字节数可自动补0
# 传入数据格式所占字节数
#####################################################
def int2hexStringByBytes(self, num,bytescount=1):
hexStr = hex(num)[2:]
while len(hexStr) < (bytescount * 2):
hexStr = "0" + hexStr
return hexStr
if __name__ == "__main__":
t = SendMultMsgThread_m300()
# t.setHost("10.100.12.32")
t.setHost("10.100.5.251")
# t.setPort(9008) #M500
t.setPort(9009) #M300
# t.startThread()
t.startThreadContinuous()
# SendMultMsgThread().getInfoFromResult("../../data/threadDetailsContinuous.json")
\ No newline at end of file
#coding:utf-8
import binascii
import json
import random
import socket
import threading
import time
from lib.pressure.ThreadBase import ThreadBase
class SendMultMsgThread_m500(ThreadBase):
def __init__(self,host="10.100.11.20",port=9001,msg=""):
self.host = host
self.port = port
self.msg = msg
self.timeOut = 30 #socket超时时间
self.BUF_SIZE = 1024 #接收消息缓存
self.threadCount = 1000 #并发线程数
self.totalTime = 0 #所有线程的运行总和
self.threadArr = {} #保存每个线程的信息
self.failThreadCount = 0 #失败线程数
self.durThreads = [] #持续发送线程数组,当数组为空,表示所有线程已经结束
dt = 1 * 10 * 60
self.durTime = dt #线程持续时间
self.connectTimeoutNum = 0 #连接超时线程数
self.sendTimeoutNum = 0 #发送超时线程数
self.reviceTimeoutNum = 0 #接收超时线程数
self.sucessNum = 0 #成功线程数
self.setStartCarNumber = 0 #开始车机号
self.messageCon = [] #用来统计每个线程所发的消息数
self.messageCons = 0 # 用来统计每个线程所发的消息数
pass
############################################
# 设置host
############################################
def setHost(self,host):
self.host = host
############################################
# 设置端口号
############################################
def setPort(self,port):
self.port = port
############################################
# 设置消息
############################################
def setMsg(self,msg):
self.msg = msg
############################################
# 设置并发线程数
############################################
def setThreadCount(self,threadCount):
self.threadCount = threadCount
############################################
# 设置持续时间
############################################
def setDurTime(self,durTime):
self.durTime = durTime
############################################
# 设置超时时间
############################################
def setTimeOut(self,timeOut):
self.timeOut = timeOut
############################################
# 设置开始车机号
############################################
def setSetStartCarNumber(self,setStartCarNumber):
self.setStartCarNumber = setStartCarNumber
############################################
# 发送一条消息
############################################
def sendMsg(self,msg,threadName):
msg = msg
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳
client.settimeout(self.timeOut)
startTime = int(time.time() * 1000)
try:
client.connect((self.host, self.port))
client.send(binascii.a2b_hex(msg))
except BaseException as e:
client.close()
self.threadArr[threadName]["status"] = 1
self.failThreadCount = self.failThreadCount + 1
print("连接超时,socket断开")
return
try:
data = client.recv(self.BUF_SIZE)
# print(data)
except BaseException as e:
# traceback.print_exc()
client.close()
# raise RuntimeError('socket 接收消息超时!')
self.threadArr[threadName]["status"] = 1
self.failThreadCount = self.failThreadCount + 1
print('socket 接收消息超时!')
return
endTime = int(time.time() * 1000)
timeExpend = endTime - startTime
self.threadArr[threadName]["timeExp"] = timeExpend
self.totalTime = self.totalTime + timeExpend
client.close()
############################################
# 持续发送消息
# dur: 持续发送时间,如果为0一直发送, 设置了事件为持续发送多少秒,默认为一分钟
############################################
def sendMsgContinuous(self,carId,threadName="thread0"):
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳
client.settimeout(self.timeOut)
startTime = int(time.time())
endTime = int(time.time())
msgCon = 0 #统计线程发的消息数量
self.durThreads.append(threadName)
try:
client.connect((self.host, self.port))
except BaseException as e:
client.close()
self.durThreads.remove(threadName)
self.threadArr[threadName]["status"] = 1
self.failThreadCount = self.failThreadCount + 1
endTime = int(time.time())
timeExpend = endTime - startTime
self.threadArr[threadName]["timeExp"] = timeExpend
self.connectTimeoutNum = self.connectTimeoutNum + 1
self.messageCon.append(msgCon)
self.threadArr[threadName]["msgCon"] = msgCon
print(threadName + ":" + "连接超时,socket断开")
return
while (endTime - startTime) < self.durTime:
msg = self.getRandomMsg_M500(carId)
try:
client.send(binascii.a2b_hex(msg))
msgCon = msgCon + 1
self.messageCons = self.messageCons + 1
except BaseException as e:
client.close()
self.durThreads.remove(threadName)
self.threadArr[threadName]["status"] = 1
self.failThreadCount = self.failThreadCount + 1
endTime = int(time.time())
timeExpend = endTime - startTime
self.threadArr[threadName]["timeExp"] = timeExpend
self.sendTimeoutNum = self.sendTimeoutNum + 1
self.messageCon.append(msgCon)
self.threadArr[threadName]["msgCon"] = msgCon
print(threadName + ":" + "发送超时,socket断开")
return
try:
data = client.recv(self.BUF_SIZE)
except BaseException as e:
client.close()
self.durThreads.remove(threadName)
self.threadArr[threadName]["status"] = 1
self.failThreadCount = self.failThreadCount + 1
self.reviceTimeoutNum = self.connectTimeoutNum + 1
self.messageCon.append(msgCon)
self.threadArr[threadName]["msgCon"] = msgCon
print(threadName + ":" + 'socket 接收消息超时!')
endTime = int(time.time())
timeExpend = endTime - startTime
self.threadArr[threadName]["timeExp"] = timeExpend
return
endTime = int(time.time())
sleepTime = random.randint(1, 5)
time.sleep(sleepTime)
endTime = int(time.time())
timeExpend = endTime - startTime
self.threadArr[threadName]["timeExp"] = timeExpend
self.messageCon.append(msgCon)
self.threadArr[threadName]["msgCon"] = msgCon
client.close()
self.sucessNum = self.sucessNum + 1
self.durThreads.remove(threadName)
############################################
# 启动并发线程
############################################
def startThread(self):
timeStart = int(time.time() * 1000)
for i in range(0,self.threadCount):
threadName = "thread-" + str(i)
theThread = threading.Thread(target=self.sendMsg, args=("7e0002000001314620111800065b7e",threadName,)) # 数据写死,心跳
threadInfo = {}
threadInfo["name"] = threadName
threadInfo["status"] = 0
self.threadArr[threadName] = threadInfo
theThread.start()
timeEnd = int(time.time() * 1000)
timeExpend = timeEnd - timeStart
time.sleep(3)
print("耗时:" + str(timeExpend) + " 毫秒")
print("并发数据每秒发送:" + str(int(self.threadCount / (timeExpend / 1000))))
print("平均响应时间:" + str(self.totalTime / self.threadCount) + "毫秒")
print("发送总数:" + str(self.threadCount))
print("响应失败数:" + str(self.failThreadCount))
self.writeToFile("./data/threadDetails.json",self.threadArr)
############################################
# 启动持续并发线程
############################################
def startThreadContinuous(self):
timeStart = int(time.time() * 1000)
for i in range(0,self.threadCount):
threadName = "thread-" + str(i)
print(threadName)
carid = 201912000000 + i + self.setStartCarNumber
theThread = threading.Thread(target=self.sendMsgContinuous, args=(carid,threadName,)) # 数据写死,心跳
threadInfo = {}
threadInfo["name"] = threadName
threadInfo["status"] = 0
self.threadArr[threadName] = threadInfo
theThread.start()
timeEnd = int(time.time() * 1000)
timeExpend = timeEnd - timeStart
print("耗时:" + str(timeExpend) + " 毫秒产生了" + str(self.threadCount) + "线程")
time.sleep(0.5) #防止启动的时候溜掉某些启动比较慢的线程
tmp = 1
while len(self.durThreads) != 0:
print("剩余线程数_" + str(tmp) + ":" + str(len(self.durThreads)))
tmp = tmp + 1
timeArray = time.localtime(timeStart / 1000)
testStart = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
timeCur = int(time.time() * 1000)
timeArray = time.localtime(timeCur / 1000)
testCur = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
info_0 = "-------------------------- 统计信息(pre) --------------------------"
info_1 = "耗时:" + str(timeExpend) + " 毫秒产生了" + str(self.threadCount) + "线程"
info_2 = "开始测试时间:" + testStart
info_4 = "设置socket超时时间:" + str(self.timeOut)
info_5 = "设置线程持续时间:" + str(self.durTime)
info_6 = "剩余线程数:" + str(len(self.durThreads))
info_8 = "连接失败:" + str(self.connectTimeoutNum)
info_9 = "发送失败:" + str(self.sendTimeoutNum)
info_10 = "接收失败:" + str(self.reviceTimeoutNum)
info_11 = "当前写入时间:" + testCur
info_12 = "当前发送消息总数:" + str(self.messageCons)
result = info_0 + "\n" + info_1 + "\n" + info_2 + "\n" + info_4 + "\n" + info_5 + "\n" + info_6 + "\n"
result = result + info_8 + "\n" + info_9 + "\n" + info_10 + "\n" + info_11 + "\n" + info_12 + "\n"
self.writeToFile("./result_pre.txt",result)
time.sleep(5)
timeArray = time.localtime(timeStart / 1000)
testStart = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
timeEnd = int(time.time() * 1000)
timeArray = time.localtime(timeEnd / 1000)
testestEnd = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
time.sleep(2) #防止线程慢的时候,某些线程被漏统计的情况
info_0 = "-------------------------- 统计信息 --------------------------"
print(info_0)
totalMsg = 0
for i in self.messageCon:
totalMsg = totalMsg + i
info_1 = "耗时:" + str(timeExpend) + " 毫秒产生了" + str(self.threadCount) + "线程"
info_2 = "开始测试时间:" + testStart
info_3 = "结束测试时间:" + testestEnd
info_4 = "设置socket超时时间:" + str(self.timeOut)
info_5 = "设置线程持续时间:" + str(self.durTime)
info_6 = "成功线程数:" + str(self.sucessNum)
info_7 = "消息总数:" + str(totalMsg)
info_8 = "连接失败:" + str(self.connectTimeoutNum)
info_9 = "发送失败:" + str(self.sendTimeoutNum)
info_10 = "接收失败:" + str(self.reviceTimeoutNum)
print(info_1)
print(info_2)
print(info_3)
print(info_4)
print(info_5)
print(info_6)
print(info_7)
print(info_8)
print(info_9)
print(info_10)
self.writeToFile("./threadDetailsContinuous.json",json.dumps(self.threadArr))
info_11 = self.getInfoFromResult("./threadDetailsContinuous.json")
result = info_0 + "\n" + info_1 + "\n" + info_2 + "\n" + info_3 + "\n" + info_4 + "\n" + info_5 + "\n" + info_6 + "\n"
result = result + info_7 + "\n" + info_8 + "\n" + info_9 + "\n" + info_10 + "\n" + info_11
self.writeToFile("./result.txt", result)
def writeToFile(self,path,data):
with open(path, "w", encoding='utf-8') as fi:
#json.dump(data, fi)
fi.write(data)
#获取随机消息数据(M500车机)
def getRandomMsg_M500(self,carId):
# carId = 201912010002
wh = random.randint(0,2)
msg = ""
if wh == 0:
hearbeat_msg = "4040000b00044d" + str(carId) + "0003ffd4"
hearbeat_msg = hearbeat_msg[:-4] + self.crc16(hearbeat_msg[:-4])
msg = hearbeat_msg
elif wh == 1:
GPS_msg = "4040003d00054d" + str(carId) + "001001140305031e0301c329ed0659dec501f402e8000000b4050a0b0c9305050258001400000fa0000000005e606f115e60723be44b"
GPS_msg = GPS_msg[:-4] + self.crc16(GPS_msg[:-4])
msg = GPS_msg
elif wh == 2:
OBD_msg = "4040007000064d" + str(carId) + "00120114030503202d26d7fffff0000000000505000000143c00000bb80100000fa00000000a0000000000005e60723b723b39331e100055320000001312001007d0001e0000000000000096000000280096ffff3e0001f40000003e00000000000000000000007213"
OBD_msg = OBD_msg[:-4] + self.crc16(OBD_msg[:-4])
msg = OBD_msg
return msg
# 获取随机消息数据(新硬件车机)
def getRandomMsg_new(self, carId):
# carId = 201912010002
wh = random.randint(0, 2)
msg = ""
wh = 0
if wh == 0:
hearbeat_msg = "4040000e00044d" + str(carId) + "8000000300cf91"
hearbeat_msg = hearbeat_msg + self.getCheckCode(hearbeat_msg[2:]) + "7e"
hearbeat_msg = self.replace7e7d(hearbeat_msg)
msg = hearbeat_msg
elif wh == 1:
GPS_msg = "4040003d00054d" + str(
carId) + "001001140305031e0301c329ed0659dec501f402e8000000b4050a0b0c9305050258001400000fa0000000005e606f115e60723be44b"
GPS_msg = GPS_msg[:-4] + self.crc16(GPS_msg[:-4])
msg = GPS_msg
elif wh == 2:
OBD_msg = "4040007000064d" + str(
carId) + "00120114030503202d26d7fffff0000000000505000000143c00000bb80100000fa00000000a0000000000005e60723b723b39331e100055320000001312001007d0001e0000000000000096000000280096ffff3e0001f40000003e00000000000000000000007213"
OBD_msg = OBD_msg[:-4] + self.crc16(OBD_msg[:-4])
msg = OBD_msg
return msg
####################################################
# 定义生成校验字段的函数(M500 校验方式)
# inputStr:需要传入一个已经转换为16进制的字符串
#####################################################
# add crc 16 check at the end of the string
def crc16(self,inputStr):
inputStrByte = bytes.fromhex(inputStr)
crc = 0xFFFF
for i in range(0, len(inputStrByte)):
for j in range(0, 8):
c15 = (crc >> 15) == 1
bit = ((inputStrByte[i] >> (7 - j)) & 1) == 1
crc <<= 1
crc &= 0xFFFF
if c15 ^ bit:
crc ^= 0x1021
crc = str(hex(crc))
crc = self.leftPad(crc[2:], 4)
# outputStr = inputStr + crc
outputStr = crc
return outputStr
# pad zero to the left of the string if not long enough
def leftPad(self,inputStr, strLen):
if (strLen > len(inputStr)):
outputStr = "0000000000000000000000000000000000000000" + inputStr
outputStr = outputStr[len(outputStr) - strLen:]
return outputStr
else:
return inputStr
# pad zero to the right of the string if not long enough
def rightPad(self,inputStr, strLen):
if (strLen > len(inputStr)):
outputStr = inputStr + "0000000000000000000000000000000000000000"
outputStr = outputStr[: strLen]
return outputStr
else:
return inputStr
def getInfoFromResult(self,path):
with open(path, "r", encoding='utf-8') as fi:
d = fi.read()
d = json.loads(d)
msgC = 0
failT = 0
for i in d:
msgC = msgC + d[i]["msgCon"]
if d[i]["status"] == 1:
failT = failT +1
info_1 = "文件统计总消息数:" + str(msgC)
info_2 = "文件统计总失败数:" + str(failT)
print(info_1)
print(info_2)
return info_1 + "\n" +info_2 + "\n"
if __name__ == "__main__":
t = SendMultMsgThread_m500()
# t.setHost("10.100.12.32")
t.setHost("10.100.5.251")
t.setPort(9008) #M500
# t.setPort(9009) # M300
# t.startThread()
t.startThreadContinuous()
# SendMultMsgThread().getInfoFromResult("../../data/threadDetailsContinuous.json")
\ No newline at end of file
#coding:utf-8
from lib.pressure.Base import Base
class ThreadBase(Base):
def __init__(self):
pass
\ No newline at end of file
-------------------------- 统计信息 --------------------------
耗时:2 毫秒产生了4线程
开始测试时间:2020-03-19 14:11:21
结束测试时间:2020-03-19 14:12:31
设置socket超时时间:30
设置线程持续时间:60
成功线程数:4
消息总数:79
连接失败:0
发送失败:0
接收失败:0
文件统计总消息数:79
文件统计总失败数:0
-------------------------- 统计信息(pre) --------------------------
耗时:2 毫秒产生了4线程
开始测试时间:2020-03-19 14:11:21
设置socket超时时间:30
设置线程持续时间:60
剩余线程数:2
连接失败:0
发送失败:0
接收失败:0
当前写入时间:2020-03-19 14:12:26
当前发送消息总数:79
#coding:utf-8
'''
启动压力测试脚本
'''
from lib.pressure.SendMultMsgThread_m300 import SendMultMsgThread_m300
from lib.pressure.SendMultMsgThread_m500 import SendMultMsgThread_m500
def startM300():
t = SendMultMsgThread_m300()
# t.setHost("10.100.12.32")
t.setHost("10.100.5.251")
t.setPort(9009) # M300
t.setThreadCount(3500)
t.setTimeOut(30)
t.setDurTime(1 * 20 * 60)
t.setSetStartCarNumber(0)
# t.startThread()
t.startThreadContinuous()
def startM500():
t = SendMultMsgThread_m500()
# t.setHost("10.100.12.32")
t.setHost("10.100.5.251")
t.setPort(9008) #M500
t.setThreadCount(4)
t.setTimeOut(30)
t.setDurTime(1 * 1 * 60)
t.setSetStartCarNumber(0)
# t.startThread()
t.startThreadContinuous()
if __name__ == "__main__":
startM500()
\ No newline at end of file
{"thread-0": {"name": "thread-0", "status": 0, "timeExp": 62, "msgCon": 18}, "thread-1": {"name": "thread-1", "status": 0, "timeExp": 66, "msgCon": 20}, "thread-2": {"name": "thread-2", "status": 0, "timeExp": 63, "msgCon": 20}, "thread-3": {"name": "thread-3", "status": 0, "timeExp": 66, "msgCon": 21}}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment