Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
N
new-socketemulator
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
李远洪
new-socketemulator
Commits
7dbe43dc
Commit
7dbe43dc
authored
Mar 19, 2020
by
liyuanhong
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
剥离了压力测试脚本为一个新项目
parent
7b6a9de9
Changes
6
Show whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
0 additions
and
910 deletions
+0
-910
lib/multiThread/Base.py
lib/multiThread/Base.py
+0
-12
lib/multiThread/SendMultMsgThread.py
lib/multiThread/SendMultMsgThread.py
+0
-143
lib/multiThread/SendMultMsgThread_m300.py
lib/multiThread/SendMultMsgThread_m300.py
+0
-366
lib/multiThread/SendMultMsgThread_m500.py
lib/multiThread/SendMultMsgThread_m500.py
+0
-382
lib/multiThread/ThreadBase.py
lib/multiThread/ThreadBase.py
+0
-7
lib/multiThread/__init__.py
lib/multiThread/__init__.py
+0
-0
No files found.
lib/multiThread/Base.py
deleted
100644 → 0
View file @
7b6a9de9
#coding:utf-8
#########################################################
#
# 定义基类,供所有类继承
#
#########################################################
import
threading
class
Base
(
threading
.
Thread
):
def
__init__
(
self
):
pass
lib/multiThread/SendMultMsgThread.py
deleted
100644 → 0
View file @
7b6a9de9
#coding:utf-8
import
binascii
import
json
import
socket
import
threading
import
time
from
lib.multiThread.ThreadBase
import
ThreadBase
from
lib.protocol.message.Location_msg
import
Location_msg
from
lib.protocol.message.TerminalRegister_msg
import
TerminalRegister_msg
class
SendMultMsgThread
():
def
__init__
(
self
,
host
=
"10.100.11.20"
,
port
=
9001
,
msg
=
""
):
self
.
host
=
host
self
.
port
=
port
self
.
msg
=
msg
self
.
timeOut
=
1
#socket超时时间
self
.
BUF_SIZE
=
1024
#接收消息缓存
self
.
threadCount
=
10000
#并发线程数
self
.
totalTime
=
0
#所有线程的运行总和
self
.
threadArr
=
{}
#保存每个线程的信息
self
.
failThreadCount
=
0
#失败线程数
pass
############################################
# 设置host
############################################
def
setHost
(
self
,
host
):
self
.
host
=
host
############################################
# 设置端口号
############################################
def
setPort
(
self
,
port
):
self
.
port
=
port
############################################
# 设置消息
############################################
def
setMsg
(
self
,
msg
):
self
.
msg
=
msg
############################################
# 设置并发线程数
############################################
def
setThreadCount
(
self
,
threadCount
):
self
.
threadCount
=
threadCount
############################################
# 发送一条消息
############################################
def
sendMsg
(
self
,
msg
,
threadName
):
msg
=
msg
client
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
client
.
setsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_KEEPALIVE
,
1
)
# 在客户端开启心跳
client
.
settimeout
(
self
.
timeOut
)
startTime
=
int
(
time
.
time
()
*
1000
)
try
:
client
.
connect
((
self
.
host
,
self
.
port
))
client
.
send
(
binascii
.
a2b_hex
(
msg
))
except
BaseException
as
e
:
client
.
close
()
self
.
threadArr
[
threadName
][
"status"
]
=
1
self
.
failThreadCount
=
self
.
failThreadCount
+
1
print
(
"连接超时,socket断开"
)
return
try
:
data
=
client
.
recv
(
self
.
BUF_SIZE
)
# print(data)
except
BaseException
as
e
:
# traceback.print_exc()
client
.
close
()
# raise RuntimeError('socket 接收消息超时!')
self
.
threadArr
[
threadName
][
"status"
]
=
1
self
.
failThreadCount
=
self
.
failThreadCount
+
1
print
(
'socket 接收消息超时!'
)
return
endTime
=
int
(
time
.
time
()
*
1000
)
timeExpend
=
endTime
-
startTime
self
.
threadArr
[
threadName
][
"timeExp"
]
=
timeExpend
self
.
totalTime
=
self
.
totalTime
+
timeExpend
client
.
close
()
############################################
# 启动并发线程
############################################
def
startThread
(
self
):
timeStart
=
int
(
time
.
time
()
*
1000
)
for
i
in
range
(
0
,
self
.
threadCount
):
threadName
=
"thread"
+
str
(
i
)
# theThread = threading.Thread(target=self.sendMsg, args=("7e0002000001314620111800065b7e",threadName,)) # 数据写死,心跳
theThread
=
threading
.
Thread
(
target
=
self
.
sendMsg
,
args
=
(
"4040007000094d20191201000200120114030409123426d7fffff0000000000505000000143c00000bb80100000fa00000000a0000000000005e5f68e768e739331e100055320000001312001007d0001e0000000000000096000000280096ffff3e0001f40000003e00000000000000000000000f9a"
,
threadName
,))
# 数据写死
# theThread = threading.Thread(target=self.sendMsg, \
# args=(TerminalRegister_msg().generateMsg_random(), threadName,)) #终端注册
# theThread = threading.Thread(target=self.sendMsg, \
# args=(Location_msg().generateMsg_random(), threadName,)) #地理位置
threadInfo
=
{}
threadInfo
[
"name"
]
=
threadName
threadInfo
[
"status"
]
=
0
self
.
threadArr
[
threadName
]
=
threadInfo
theThread
.
start
()
timeEnd
=
int
(
time
.
time
()
*
1000
)
timeExpend
=
timeEnd
-
timeStart
time
.
sleep
(
6
)
print
(
"耗时:"
+
str
(
timeExpend
)
+
" 毫秒"
)
print
(
"并发数据每秒发送:"
+
str
(
int
(
self
.
threadCount
/
(
timeExpend
/
1000
))))
print
(
"平均响应时间:"
+
str
(
self
.
totalTime
/
self
.
threadCount
)
+
"毫秒"
)
print
(
"发送总数:"
+
str
(
self
.
threadCount
))
print
(
"响应失败数:"
+
str
(
self
.
failThreadCount
))
self
.
writeToFile
(
"../../data/threadDetails.json"
,
self
.
threadArr
)
# print(json.dumps( self.threadArr))
def
writeToFile
(
self
,
path
,
data
):
with
open
(
path
,
"w"
,
encoding
=
'utf-8'
)
as
fi
:
json
.
dump
(
data
,
fi
)
# fi.write(data)
if
__name__
==
"__main__"
:
t
=
SendMultMsgThread
()
t
.
setHost
(
"10.100.12.32"
)
t
.
setPort
(
9008
)
# t.setMsg("7e0002000001314620111800065b7e")
# t.setMsg("7e020001020131462011190001fffc7fff001c010401c0a6380659ad7a02090042003b200204185704310102EA6600010400000000000204001e7c1f0003050A0001f400000405020001d4c000050400057d0240000604000119400007040007530000100c0004006403f203f203f203f2001114ffffffffffffffffffff00200000000000000000001202002400130106001D0101EB7960C0020bb860D0013c62f00203216050014c60F0015860B001146330011c646001416490012060A00201146014010160100102610002022661100201f561F0020e746210040000119c6040012c60700200e660E00203206701010067020100670301016704024e20670502000067060200416707040000017d02097e")
t
.
startThread
()
lib/multiThread/SendMultMsgThread_m300.py
deleted
100644 → 0
View file @
7b6a9de9
#coding:utf-8
import
binascii
import
json
import
random
import
socket
import
threading
import
time
from
lib.multiThread.ThreadBase
import
ThreadBase
from
lib.protocol.message.Location_msg
import
Location_msg
from
lib.protocol.message.TerminalRegister_msg
import
TerminalRegister_msg
class
SendMultMsgThread
():
def
__init__
(
self
,
host
=
"10.100.11.20"
,
port
=
9001
,
msg
=
""
):
self
.
host
=
host
self
.
port
=
port
self
.
msg
=
msg
self
.
timeOut
=
30
#socket超时时间
self
.
BUF_SIZE
=
1024
#接收消息缓存
self
.
threadCount
=
10000
#并发线程数
self
.
totalTime
=
0
#所有线程的运行总和
self
.
threadArr
=
{}
#保存每个线程的信息
self
.
failThreadCount
=
0
#失败线程数
self
.
durThreads
=
[]
#持续发送线程数组,当数组为空,表示所有线程已经结束
dt
=
1
*
20
*
60
self
.
durTime
=
dt
#线程持续时间
self
.
connectTimeoutNum
=
0
#连接超时线程数
self
.
sendTimeoutNum
=
0
#发送超时线程数
self
.
reviceTimeoutNum
=
0
#接收超时线程数
self
.
sucessNum
=
0
#成功线程数
self
.
failThreadCount
=
0
#失败线程数
self
.
messageCon
=
[]
#用来统计每个线程所发的消息数
self
.
messageCons
=
0
# 用来统计每个线程所发的消息数
pass
############################################
# 设置host
############################################
def
setHost
(
self
,
host
):
self
.
host
=
host
############################################
# 设置端口号
############################################
def
setPort
(
self
,
port
):
self
.
port
=
port
############################################
# 设置消息
############################################
def
setMsg
(
self
,
msg
):
self
.
msg
=
msg
############################################
# 设置并发线程数
############################################
def
setThreadCount
(
self
,
threadCount
):
self
.
threadCount
=
threadCount
############################################
# 发送一条消息
############################################
def
sendMsg
(
self
,
msg
,
threadName
):
msg
=
msg
client
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
client
.
setsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_KEEPALIVE
,
1
)
# 在客户端开启心跳
client
.
settimeout
(
self
.
timeOut
)
startTime
=
int
(
time
.
time
()
*
1000
)
try
:
client
.
connect
((
self
.
host
,
self
.
port
))
client
.
send
(
binascii
.
a2b_hex
(
msg
))
except
BaseException
as
e
:
client
.
close
()
self
.
threadArr
[
threadName
][
"status"
]
=
1
self
.
failThreadCount
=
self
.
failThreadCount
+
1
print
(
"连接超时,socket断开"
)
return
try
:
data
=
client
.
recv
(
self
.
BUF_SIZE
)
# print(data)
except
BaseException
as
e
:
# traceback.print_exc()
client
.
close
()
# raise RuntimeError('socket 接收消息超时!')
self
.
threadArr
[
threadName
][
"status"
]
=
1
self
.
failThreadCount
=
self
.
failThreadCount
+
1
print
(
'socket 接收消息超时!'
)
return
endTime
=
int
(
time
.
time
()
*
1000
)
timeExpend
=
endTime
-
startTime
self
.
threadArr
[
threadName
][
"timeExp"
]
=
timeExpend
self
.
totalTime
=
self
.
totalTime
+
timeExpend
client
.
close
()
############################################
# 持续发送消息
# dur: 持续发送时间,如果为0一直发送, 设置了事件为持续发送多少秒,默认为一分钟
############################################
def
sendMsgContinuous
(
self
,
carId
,
threadName
=
"thread0"
):
client
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
client
.
setsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_KEEPALIVE
,
1
)
# 在客户端开启心跳
client
.
settimeout
(
self
.
timeOut
)
startTime
=
int
(
time
.
time
())
endTime
=
int
(
time
.
time
())
msgCon
=
0
#统计线程发的消息数量
self
.
durThreads
.
append
(
threadName
)
try
:
client
.
connect
((
self
.
host
,
self
.
port
))
except
BaseException
as
e
:
client
.
close
()
self
.
durThreads
.
remove
(
threadName
)
self
.
threadArr
[
threadName
][
"status"
]
=
1
self
.
failThreadCount
=
self
.
failThreadCount
+
1
endTime
=
int
(
time
.
time
())
timeExpend
=
endTime
-
startTime
self
.
threadArr
[
threadName
][
"timeExp"
]
=
timeExpend
self
.
connectTimeoutNum
=
self
.
connectTimeoutNum
+
1
self
.
messageCon
.
append
(
msgCon
)
self
.
threadArr
[
threadName
][
"msgCon"
]
=
msgCon
print
(
threadName
+
":"
+
"连接超时,socket断开"
)
return
while
(
endTime
-
startTime
)
<
self
.
durTime
:
msg
=
self
.
getRandomMsg_M300
(
carId
)
try
:
client
.
send
(
binascii
.
a2b_hex
(
msg
))
msgCon
=
msgCon
+
1
self
.
messageCons
=
self
.
messageCons
+
1
except
BaseException
as
e
:
client
.
close
()
self
.
durThreads
.
remove
(
threadName
)
self
.
threadArr
[
threadName
][
"status"
]
=
1
self
.
failThreadCount
=
self
.
failThreadCount
+
1
self
.
sendTimeoutNum
=
self
.
sendTimeoutNum
+
1
endTime
=
int
(
time
.
time
())
timeExpend
=
endTime
-
startTime
self
.
threadArr
[
threadName
][
"timeExp"
]
=
timeExpend
self
.
sendTimeoutNum
=
self
.
sendTimeoutNum
+
1
self
.
messageCon
.
append
(
msgCon
)
self
.
threadArr
[
threadName
][
"msgCon"
]
=
msgCon
print
(
threadName
+
":"
+
"发送超时,socket断开"
)
return
try
:
data
=
client
.
recv
(
self
.
BUF_SIZE
)
except
BaseException
as
e
:
client
.
close
()
self
.
durThreads
.
remove
(
threadName
)
self
.
threadArr
[
threadName
][
"status"
]
=
1
self
.
failThreadCount
=
self
.
failThreadCount
+
1
self
.
reviceTimeoutNum
=
self
.
reviceTimeoutNum
+
1
self
.
messageCon
.
append
(
msgCon
)
self
.
threadArr
[
threadName
][
"msgCon"
]
=
msgCon
print
(
threadName
+
":"
+
'socket 接收消息超时!'
)
endTime
=
int
(
time
.
time
())
timeExpend
=
endTime
-
startTime
self
.
threadArr
[
threadName
][
"timeExp"
]
=
timeExpend
return
endTime
=
int
(
time
.
time
())
sleepTime
=
random
.
randint
(
1
,
5
)
time
.
sleep
(
sleepTime
)
endTime
=
int
(
time
.
time
())
timeExpend
=
endTime
-
startTime
self
.
threadArr
[
threadName
][
"timeExp"
]
=
timeExpend
self
.
messageCon
.
append
(
msgCon
)
self
.
threadArr
[
threadName
][
"msgCon"
]
=
msgCon
client
.
close
()
self
.
sucessNum
=
self
.
sucessNum
+
1
self
.
durThreads
.
remove
(
threadName
)
############################################
# 启动并发线程
############################################
def
startThread
(
self
):
timeStart
=
int
(
time
.
time
()
*
1000
)
for
i
in
range
(
0
,
self
.
threadCount
):
threadName
=
"thread-"
+
str
(
i
)
theThread
=
threading
.
Thread
(
target
=
self
.
sendMsg
,
args
=
(
"7e0002000001314620111800065b7e"
,
threadName
,))
# 数据写死,心跳
threadInfo
=
{}
threadInfo
[
"name"
]
=
threadName
threadInfo
[
"status"
]
=
0
self
.
threadArr
[
threadName
]
=
threadInfo
theThread
.
start
()
timeEnd
=
int
(
time
.
time
()
*
1000
)
timeExpend
=
timeEnd
-
timeStart
time
.
sleep
(
3
)
print
(
"耗时:"
+
str
(
timeExpend
)
+
" 毫秒"
)
print
(
"并发数据每秒发送:"
+
str
(
int
(
self
.
threadCount
/
(
timeExpend
/
1000
))))
print
(
"平均响应时间:"
+
str
(
self
.
totalTime
/
self
.
threadCount
)
+
"毫秒"
)
print
(
"发送总数:"
+
str
(
self
.
threadCount
))
print
(
"响应失败数:"
+
str
(
self
.
failThreadCount
))
self
.
writeToFile
(
"../../data/threadDetails.json"
,
self
.
threadArr
)
############################################
# 启动持续并发线程
############################################
def
startThreadContinuous
(
self
):
timeStart
=
int
(
time
.
time
()
*
1000
)
for
i
in
range
(
0
,
self
.
threadCount
):
threadName
=
"thread-"
+
str
(
i
+
42000
)
print
(
threadName
)
carid
=
201912000000
+
i
+
42000
theThread
=
threading
.
Thread
(
target
=
self
.
sendMsgContinuous
,
args
=
(
carid
,
threadName
,))
# 数据写死,心跳
threadInfo
=
{}
threadInfo
[
"name"
]
=
threadName
threadInfo
[
"status"
]
=
0
self
.
threadArr
[
threadName
]
=
threadInfo
theThread
.
start
()
timeEnd
=
int
(
time
.
time
()
*
1000
)
timeExpend
=
timeEnd
-
timeStart
print
(
"耗时:"
+
str
(
timeExpend
)
+
" 毫秒产生了"
+
str
(
self
.
threadCount
)
+
"线程"
)
time
.
sleep
(
0.5
)
#防止启动的时候溜掉某些启动比较慢的线程
tmp
=
1
while
len
(
self
.
durThreads
)
!=
0
:
print
(
"剩余线程数_"
+
str
(
tmp
)
+
":"
+
str
(
len
(
self
.
durThreads
)))
tmp
=
tmp
+
1
timeArray
=
time
.
localtime
(
timeStart
/
1000
)
testStart
=
time
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
,
timeArray
)
timeCur
=
int
(
time
.
time
()
*
1000
)
timeArray
=
time
.
localtime
(
timeCur
/
1000
)
testCur
=
time
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
,
timeArray
)
info_0
=
"-------------------------- 统计信息(pre) --------------------------"
info_1
=
"耗时:"
+
str
(
timeExpend
)
+
" 毫秒产生了"
+
str
(
self
.
threadCount
)
+
"线程"
info_2
=
"开始测试时间:"
+
testStart
info_4
=
"设置socket超时时间:"
+
str
(
self
.
timeOut
)
info_5
=
"设置线程持续时间:"
+
str
(
self
.
durTime
)
info_6
=
"剩余线程数:"
+
str
(
len
(
self
.
durThreads
))
info_8
=
"连接失败:"
+
str
(
self
.
connectTimeoutNum
)
info_9
=
"发送失败:"
+
str
(
self
.
sendTimeoutNum
)
info_10
=
"接收失败:"
+
str
(
self
.
reviceTimeoutNum
)
info_11
=
"当前写入时间:"
+
testCur
info_12
=
"当前发送消息总数:"
+
str
(
self
.
messageCons
)
result
=
info_0
+
"
\n
"
+
info_1
+
"
\n
"
+
info_2
+
"
\n
"
+
info_4
+
"
\n
"
+
info_5
+
"
\n
"
+
info_6
+
"
\n
"
result
=
result
+
info_8
+
"
\n
"
+
info_9
+
"
\n
"
+
info_10
+
"
\n
"
+
info_11
+
"
\n
"
+
info_12
+
"
\n
"
self
.
writeToFile
(
"./result_pre.txt"
,
result
)
time
.
sleep
(
5
)
timeArray
=
time
.
localtime
(
timeStart
/
1000
)
testStart
=
time
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
,
timeArray
)
timeEnd
=
int
(
time
.
time
()
*
1000
)
timeArray
=
time
.
localtime
(
timeEnd
/
1000
)
testestEnd
=
time
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
,
timeArray
)
time
.
sleep
(
2
)
#防止线程慢的时候,某些线程被漏统计的情况
info_0
=
"-------------------------- 统计信息 --------------------------"
print
(
info_0
)
totalMsg
=
0
for
i
in
self
.
messageCon
:
totalMsg
=
totalMsg
+
i
info_1
=
"耗时:"
+
str
(
timeExpend
)
+
" 毫秒产生了"
+
str
(
self
.
threadCount
)
+
"线程"
info_2
=
"开始测试时间:"
+
testStart
info_3
=
"结束测试时间:"
+
testestEnd
info_4
=
"设置socket超时时间:"
+
str
(
self
.
timeOut
)
info_5
=
"设置线程持续时间:"
+
str
(
self
.
durTime
)
info_6
=
"成功线程数:"
+
str
(
self
.
sucessNum
)
info_7
=
"消息总数:"
+
str
(
totalMsg
)
info_8
=
"连接失败:"
+
str
(
self
.
connectTimeoutNum
)
info_9
=
"发送失败:"
+
str
(
self
.
sendTimeoutNum
)
info_10
=
"接收失败:"
+
str
(
self
.
reviceTimeoutNum
)
print
(
info_1
)
print
(
info_2
)
print
(
info_3
)
print
(
info_4
)
print
(
info_5
)
print
(
info_6
)
print
(
info_7
)
print
(
info_8
)
print
(
info_9
)
print
(
info_10
)
self
.
writeToFile
(
"../../data/threadDetailsContinuous.json"
,
json
.
dumps
(
self
.
threadArr
))
info_11
=
self
.
getInfoFromResult
(
"../../data/threadDetailsContinuous.json"
)
result
=
info_0
+
"
\n
"
+
info_1
+
"
\n
"
+
info_2
+
"
\n
"
+
info_3
+
"
\n
"
+
info_4
+
"
\n
"
+
info_5
+
"
\n
"
+
info_6
+
"
\n
"
result
=
result
+
info_7
+
"
\n
"
+
info_8
+
"
\n
"
+
info_9
+
"
\n
"
+
info_10
+
"
\n
"
+
info_11
self
.
writeToFile
(
"./result.txt"
,
result
)
def
writeToFile
(
self
,
path
,
data
):
with
open
(
path
,
"w"
,
encoding
=
'utf-8'
)
as
fi
:
#json.dump(data, fi)
fi
.
write
(
data
)
#获取随机消息数据(M300车机)
def
getRandomMsg_M300
(
self
,
carId
):
# carId = 201912010002
wh
=
random
.
randint
(
0
,
2
)
msg
=
""
if
wh
==
0
:
hearbeat_msg
=
"7e000400e14d"
+
str
(
carId
)
+
"0000"
hearbeat_msg
=
hearbeat_msg
+
self
.
getCheckCode
(
hearbeat_msg
[
2
:])
+
"7e"
hearbeat_msg
=
"7e"
+
self
.
replace7e7d
(
hearbeat_msg
[
2
:][:
-
2
])
+
"7e"
msg
=
hearbeat_msg
elif
wh
==
1
:
GPS_msg
=
"7e002000fb4d"
+
str
(
carId
)
+
"002414031003351501c3422106588b8c0703200c520000000000007403200100000000000000"
GPS_msg
=
GPS_msg
+
self
.
getCheckCode
(
GPS_msg
[
2
:])
+
"7e"
GPS_msg
=
"7e"
+
self
.
replace7e7d
(
GPS_msg
[
2
:][:
-
2
])
+
"7e"
msg
=
GPS_msg
elif
wh
==
2
:
OBD_msg
=
"7e000300fa4d"
+
str
(
carId
)
+
"005d140310033513117f3f0effff3f30f9000001000000000000000000fffffe8000740200008e8800002ac2010003e8503232320000501403e80069006903840000000000640a03e802580000000000000384000000000000000000000000"
OBD_msg
=
OBD_msg
+
self
.
getCheckCode
(
OBD_msg
[
2
:])
+
"7e"
OBD_msg
=
"7e"
+
self
.
replace7e7d
(
OBD_msg
[
2
:][:
-
2
])
+
"7e"
msg
=
OBD_msg
return
msg
def
getInfoFromResult
(
self
,
path
):
with
open
(
path
,
"r"
,
encoding
=
'utf-8'
)
as
fi
:
d
=
fi
.
read
()
d
=
json
.
loads
(
d
)
msgC
=
0
failT
=
0
for
i
in
d
:
msgC
=
msgC
+
d
[
i
][
"msgCon"
]
if
d
[
i
][
"status"
]
==
1
:
failT
=
failT
+
1
info_1
=
"文件统计总消息数:"
+
str
(
msgC
)
info_2
=
"文件统计总失败数:"
+
str
(
failT
)
print
(
info_1
)
print
(
info_2
)
return
info_1
+
"
\n
"
+
info_2
+
"
\n
"
#######################################################
# 获取校验码(M300,新硬件)
#######################################################
def
getCheckCode
(
self
,
data
=
"aa"
):
if
len
(
data
)
%
2
==
1
:
raise
RuntimeError
(
'数据段错误!'
)
start
=
data
[
0
:
2
]
tmp
=
int
(
start
,
16
)
for
i
in
range
(
2
,
len
(
data
),
2
):
tmp
=
tmp
^
int
(
data
[
i
:
i
+
2
],
16
)
dataHex
=
self
.
int2hexStringByBytes
(
tmp
)
return
dataHex
#######################################################
# 替换消息中的7e7d字符
#######################################################
def
replace7e7d
(
self
,
data
):
tmpR
=
data
tmp
=
tmpR
[
0
:
2
]
tmpA
=
tmpR
[
0
:
2
]
tmpR
=
tmpR
[
2
:]
data
=
""
while
tmpA
!=
""
:
if
tmp
==
"7d"
:
tmp
=
"7d01"
elif
tmp
==
"7e"
:
tmp
=
"7d02"
data
=
data
+
tmp
tmp
=
tmpR
[
0
:
2
]
tmpA
=
tmpR
[
0
:
2
]
tmpR
=
tmpR
[
2
:]
return
data
#####################################################
# 数字转换为16进制字符串,通过传入字节数可自动补0
# 传入数据格式所占字节数
#####################################################
def
int2hexStringByBytes
(
self
,
num
,
bytescount
=
1
):
hexStr
=
hex
(
num
)[
2
:]
while
len
(
hexStr
)
<
(
bytescount
*
2
):
hexStr
=
"0"
+
hexStr
return
hexStr
if
__name__
==
"__main__"
:
t
=
SendMultMsgThread
()
# t.setHost("10.100.12.32")
t
.
setHost
(
"10.100.5.251"
)
# t.setPort(9008) #M500
t
.
setPort
(
9009
)
#M300
# t.startThread()
t
.
startThreadContinuous
()
# SendMultMsgThread().getInfoFromResult("../../data/threadDetailsContinuous.json")
\ No newline at end of file
lib/multiThread/SendMultMsgThread_m500.py
deleted
100644 → 0
View file @
7b6a9de9
#coding:utf-8
import
binascii
import
json
import
random
import
socket
import
threading
import
time
from
lib.multiThread.ThreadBase
import
ThreadBase
from
lib.protocol.message.Location_msg
import
Location_msg
from
lib.protocol.message.TerminalRegister_msg
import
TerminalRegister_msg
class
SendMultMsgThread
():
def
__init__
(
self
,
host
=
"10.100.11.20"
,
port
=
9001
,
msg
=
""
):
self
.
host
=
host
self
.
port
=
port
self
.
msg
=
msg
self
.
timeOut
=
30
#socket超时时间
self
.
BUF_SIZE
=
1024
#接收消息缓存
self
.
threadCount
=
1000
#并发线程数
self
.
totalTime
=
0
#所有线程的运行总和
self
.
threadArr
=
{}
#保存每个线程的信息
self
.
failThreadCount
=
0
#失败线程数
self
.
durThreads
=
[]
#持续发送线程数组,当数组为空,表示所有线程已经结束
dt
=
1
*
10
*
60
self
.
durTime
=
dt
#线程持续时间
self
.
connectTimeoutNum
=
0
#连接超时线程数
self
.
sendTimeoutNum
=
0
#发送超时线程数
self
.
reviceTimeoutNum
=
0
#接收超时线程数
self
.
sucessNum
=
0
#成功线程数
self
.
messageCon
=
[]
#用来统计每个线程所发的消息数
self
.
messageCons
=
0
# 用来统计每个线程所发的消息数
pass
############################################
# 设置host
############################################
def
setHost
(
self
,
host
):
self
.
host
=
host
############################################
# 设置端口号
############################################
def
setPort
(
self
,
port
):
self
.
port
=
port
############################################
# 设置消息
############################################
def
setMsg
(
self
,
msg
):
self
.
msg
=
msg
############################################
# 设置并发线程数
############################################
def
setThreadCount
(
self
,
threadCount
):
self
.
threadCount
=
threadCount
############################################
# 发送一条消息
############################################
def
sendMsg
(
self
,
msg
,
threadName
):
msg
=
msg
client
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
client
.
setsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_KEEPALIVE
,
1
)
# 在客户端开启心跳
client
.
settimeout
(
self
.
timeOut
)
startTime
=
int
(
time
.
time
()
*
1000
)
try
:
client
.
connect
((
self
.
host
,
self
.
port
))
client
.
send
(
binascii
.
a2b_hex
(
msg
))
except
BaseException
as
e
:
client
.
close
()
self
.
threadArr
[
threadName
][
"status"
]
=
1
self
.
failThreadCount
=
self
.
failThreadCount
+
1
print
(
"连接超时,socket断开"
)
return
try
:
data
=
client
.
recv
(
self
.
BUF_SIZE
)
# print(data)
except
BaseException
as
e
:
# traceback.print_exc()
client
.
close
()
# raise RuntimeError('socket 接收消息超时!')
self
.
threadArr
[
threadName
][
"status"
]
=
1
self
.
failThreadCount
=
self
.
failThreadCount
+
1
print
(
'socket 接收消息超时!'
)
return
endTime
=
int
(
time
.
time
()
*
1000
)
timeExpend
=
endTime
-
startTime
self
.
threadArr
[
threadName
][
"timeExp"
]
=
timeExpend
self
.
totalTime
=
self
.
totalTime
+
timeExpend
client
.
close
()
############################################
# 持续发送消息
# dur: 持续发送时间,如果为0一直发送, 设置了事件为持续发送多少秒,默认为一分钟
############################################
def
sendMsgContinuous
(
self
,
carId
,
threadName
=
"thread0"
):
client
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
client
.
setsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_KEEPALIVE
,
1
)
# 在客户端开启心跳
client
.
settimeout
(
self
.
timeOut
)
startTime
=
int
(
time
.
time
())
endTime
=
int
(
time
.
time
())
msgCon
=
0
#统计线程发的消息数量
self
.
durThreads
.
append
(
threadName
)
try
:
client
.
connect
((
self
.
host
,
self
.
port
))
except
BaseException
as
e
:
client
.
close
()
self
.
durThreads
.
remove
(
threadName
)
self
.
threadArr
[
threadName
][
"status"
]
=
1
self
.
failThreadCount
=
self
.
failThreadCount
+
1
endTime
=
int
(
time
.
time
())
timeExpend
=
endTime
-
startTime
self
.
threadArr
[
threadName
][
"timeExp"
]
=
timeExpend
self
.
connectTimeoutNum
=
self
.
connectTimeoutNum
+
1
self
.
messageCon
.
append
(
msgCon
)
self
.
threadArr
[
threadName
][
"msgCon"
]
=
msgCon
print
(
threadName
+
":"
+
"连接超时,socket断开"
)
return
while
(
endTime
-
startTime
)
<
self
.
durTime
:
msg
=
self
.
getRandomMsg_M500
(
carId
)
try
:
client
.
send
(
binascii
.
a2b_hex
(
msg
))
msgCon
=
msgCon
+
1
self
.
messageCons
=
self
.
messageCons
+
1
except
BaseException
as
e
:
client
.
close
()
self
.
durThreads
.
remove
(
threadName
)
self
.
threadArr
[
threadName
][
"status"
]
=
1
self
.
failThreadCount
=
self
.
failThreadCount
+
1
endTime
=
int
(
time
.
time
())
timeExpend
=
endTime
-
startTime
self
.
threadArr
[
threadName
][
"timeExp"
]
=
timeExpend
self
.
sendTimeoutNum
=
self
.
sendTimeoutNum
+
1
self
.
messageCon
.
append
(
msgCon
)
self
.
threadArr
[
threadName
][
"msgCon"
]
=
msgCon
print
(
threadName
+
":"
+
"发送超时,socket断开"
)
return
try
:
data
=
client
.
recv
(
self
.
BUF_SIZE
)
except
BaseException
as
e
:
client
.
close
()
self
.
durThreads
.
remove
(
threadName
)
self
.
threadArr
[
threadName
][
"status"
]
=
1
self
.
failThreadCount
=
self
.
failThreadCount
+
1
self
.
reviceTimeoutNum
=
self
.
connectTimeoutNum
+
1
self
.
messageCon
.
append
(
msgCon
)
self
.
threadArr
[
threadName
][
"msgCon"
]
=
msgCon
print
(
threadName
+
":"
+
'socket 接收消息超时!'
)
endTime
=
int
(
time
.
time
())
timeExpend
=
endTime
-
startTime
self
.
threadArr
[
threadName
][
"timeExp"
]
=
timeExpend
return
endTime
=
int
(
time
.
time
())
sleepTime
=
random
.
randint
(
1
,
5
)
time
.
sleep
(
sleepTime
)
endTime
=
int
(
time
.
time
())
timeExpend
=
endTime
-
startTime
self
.
threadArr
[
threadName
][
"timeExp"
]
=
timeExpend
self
.
messageCon
.
append
(
msgCon
)
self
.
threadArr
[
threadName
][
"msgCon"
]
=
msgCon
client
.
close
()
self
.
sucessNum
=
self
.
sucessNum
+
1
self
.
durThreads
.
remove
(
threadName
)
############################################
# 启动并发线程
############################################
def
startThread
(
self
):
timeStart
=
int
(
time
.
time
()
*
1000
)
for
i
in
range
(
0
,
self
.
threadCount
):
threadName
=
"thread-"
+
str
(
i
)
theThread
=
threading
.
Thread
(
target
=
self
.
sendMsg
,
args
=
(
"7e0002000001314620111800065b7e"
,
threadName
,))
# 数据写死,心跳
threadInfo
=
{}
threadInfo
[
"name"
]
=
threadName
threadInfo
[
"status"
]
=
0
self
.
threadArr
[
threadName
]
=
threadInfo
theThread
.
start
()
timeEnd
=
int
(
time
.
time
()
*
1000
)
timeExpend
=
timeEnd
-
timeStart
time
.
sleep
(
3
)
print
(
"耗时:"
+
str
(
timeExpend
)
+
" 毫秒"
)
print
(
"并发数据每秒发送:"
+
str
(
int
(
self
.
threadCount
/
(
timeExpend
/
1000
))))
print
(
"平均响应时间:"
+
str
(
self
.
totalTime
/
self
.
threadCount
)
+
"毫秒"
)
print
(
"发送总数:"
+
str
(
self
.
threadCount
))
print
(
"响应失败数:"
+
str
(
self
.
failThreadCount
))
self
.
writeToFile
(
"../../data/threadDetails.json"
,
self
.
threadArr
)
############################################
# 启动持续并发线程
############################################
def
startThreadContinuous
(
self
):
timeStart
=
int
(
time
.
time
()
*
1000
)
for
i
in
range
(
0
,
self
.
threadCount
):
threadName
=
"thread-"
+
str
(
31500
+
i
)
print
(
threadName
)
carid
=
201912000000
+
i
+
31500
theThread
=
threading
.
Thread
(
target
=
self
.
sendMsgContinuous
,
args
=
(
carid
,
threadName
,))
# 数据写死,心跳
threadInfo
=
{}
threadInfo
[
"name"
]
=
threadName
threadInfo
[
"status"
]
=
0
self
.
threadArr
[
threadName
]
=
threadInfo
theThread
.
start
()
timeEnd
=
int
(
time
.
time
()
*
1000
)
timeExpend
=
timeEnd
-
timeStart
print
(
"耗时:"
+
str
(
timeExpend
)
+
" 毫秒产生了"
+
str
(
self
.
threadCount
)
+
"线程"
)
time
.
sleep
(
0.5
)
#防止启动的时候溜掉某些启动比较慢的线程
tmp
=
1
while
len
(
self
.
durThreads
)
!=
0
:
print
(
"剩余线程数_"
+
str
(
tmp
)
+
":"
+
str
(
len
(
self
.
durThreads
)))
tmp
=
tmp
+
1
timeArray
=
time
.
localtime
(
timeStart
/
1000
)
testStart
=
time
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
,
timeArray
)
timeCur
=
int
(
time
.
time
()
*
1000
)
timeArray
=
time
.
localtime
(
timeCur
/
1000
)
testCur
=
time
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
,
timeArray
)
info_0
=
"-------------------------- 统计信息(pre) --------------------------"
info_1
=
"耗时:"
+
str
(
timeExpend
)
+
" 毫秒产生了"
+
str
(
self
.
threadCount
)
+
"线程"
info_2
=
"开始测试时间:"
+
testStart
info_4
=
"设置socket超时时间:"
+
str
(
self
.
timeOut
)
info_5
=
"设置线程持续时间:"
+
str
(
self
.
durTime
)
info_6
=
"剩余线程数:"
+
str
(
len
(
self
.
durThreads
))
info_8
=
"连接失败:"
+
str
(
self
.
connectTimeoutNum
)
info_9
=
"发送失败:"
+
str
(
self
.
sendTimeoutNum
)
info_10
=
"接收失败:"
+
str
(
self
.
reviceTimeoutNum
)
info_11
=
"当前写入时间:"
+
testCur
info_12
=
"当前发送消息总数:"
+
str
(
self
.
messageCons
)
result
=
info_0
+
"
\n
"
+
info_1
+
"
\n
"
+
info_2
+
"
\n
"
+
info_4
+
"
\n
"
+
info_5
+
"
\n
"
+
info_6
+
"
\n
"
result
=
result
+
info_8
+
"
\n
"
+
info_9
+
"
\n
"
+
info_10
+
"
\n
"
+
info_11
+
"
\n
"
+
info_12
+
"
\n
"
self
.
writeToFile
(
"./result_pre.txt"
,
result
)
time
.
sleep
(
5
)
timeArray
=
time
.
localtime
(
timeStart
/
1000
)
testStart
=
time
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
,
timeArray
)
timeEnd
=
int
(
time
.
time
()
*
1000
)
timeArray
=
time
.
localtime
(
timeEnd
/
1000
)
testestEnd
=
time
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
,
timeArray
)
time
.
sleep
(
2
)
#防止线程慢的时候,某些线程被漏统计的情况
info_0
=
"-------------------------- 统计信息 --------------------------"
print
(
info_0
)
totalMsg
=
0
for
i
in
self
.
messageCon
:
totalMsg
=
totalMsg
+
i
info_1
=
"耗时:"
+
str
(
timeExpend
)
+
" 毫秒产生了"
+
str
(
self
.
threadCount
)
+
"线程"
info_2
=
"开始测试时间:"
+
testStart
info_3
=
"结束测试时间:"
+
testestEnd
info_4
=
"设置socket超时时间:"
+
str
(
self
.
timeOut
)
info_5
=
"设置线程持续时间:"
+
str
(
self
.
durTime
)
info_6
=
"成功线程数:"
+
str
(
self
.
sucessNum
)
info_7
=
"消息总数:"
+
str
(
totalMsg
)
info_8
=
"连接失败:"
+
str
(
self
.
connectTimeoutNum
)
info_9
=
"发送失败:"
+
str
(
self
.
sendTimeoutNum
)
info_10
=
"接收失败:"
+
str
(
self
.
reviceTimeoutNum
)
print
(
info_1
)
print
(
info_2
)
print
(
info_3
)
print
(
info_4
)
print
(
info_5
)
print
(
info_6
)
print
(
info_7
)
print
(
info_8
)
print
(
info_9
)
print
(
info_10
)
self
.
writeToFile
(
"../../data/threadDetailsContinuous.json"
,
json
.
dumps
(
self
.
threadArr
))
info_11
=
self
.
getInfoFromResult
(
"../../data/threadDetailsContinuous.json"
)
result
=
info_0
+
"
\n
"
+
info_1
+
"
\n
"
+
info_2
+
"
\n
"
+
info_3
+
"
\n
"
+
info_4
+
"
\n
"
+
info_5
+
"
\n
"
+
info_6
+
"
\n
"
result
=
result
+
info_7
+
"
\n
"
+
info_8
+
"
\n
"
+
info_9
+
"
\n
"
+
info_10
+
"
\n
"
+
info_11
self
.
writeToFile
(
"./result.txt"
,
result
)
def
writeToFile
(
self
,
path
,
data
):
with
open
(
path
,
"w"
,
encoding
=
'utf-8'
)
as
fi
:
#json.dump(data, fi)
fi
.
write
(
data
)
#获取随机消息数据(M500车机)
def
getRandomMsg_M500
(
self
,
carId
):
# carId = 201912010002
wh
=
random
.
randint
(
0
,
2
)
msg
=
""
if
wh
==
0
:
hearbeat_msg
=
"4040000b00044d"
+
str
(
carId
)
+
"0003ffd4"
hearbeat_msg
=
hearbeat_msg
[:
-
4
]
+
self
.
crc16
(
hearbeat_msg
[:
-
4
])
msg
=
hearbeat_msg
elif
wh
==
1
:
GPS_msg
=
"4040003d00054d"
+
str
(
carId
)
+
"001001140305031e0301c329ed0659dec501f402e8000000b4050a0b0c9305050258001400000fa0000000005e606f115e60723be44b"
GPS_msg
=
GPS_msg
[:
-
4
]
+
self
.
crc16
(
GPS_msg
[:
-
4
])
msg
=
GPS_msg
elif
wh
==
2
:
OBD_msg
=
"4040007000064d"
+
str
(
carId
)
+
"00120114030503202d26d7fffff0000000000505000000143c00000bb80100000fa00000000a0000000000005e60723b723b39331e100055320000001312001007d0001e0000000000000096000000280096ffff3e0001f40000003e00000000000000000000007213"
OBD_msg
=
OBD_msg
[:
-
4
]
+
self
.
crc16
(
OBD_msg
[:
-
4
])
msg
=
OBD_msg
return
msg
# 获取随机消息数据(新硬件车机)
def
getRandomMsg_new
(
self
,
carId
):
# carId = 201912010002
wh
=
random
.
randint
(
0
,
2
)
msg
=
""
wh
=
0
if
wh
==
0
:
hearbeat_msg
=
"4040000e00044d"
+
str
(
carId
)
+
"8000000300cf91"
hearbeat_msg
=
hearbeat_msg
+
self
.
getCheckCode
(
hearbeat_msg
[
2
:])
+
"7e"
hearbeat_msg
=
self
.
replace7e7d
(
hearbeat_msg
)
msg
=
hearbeat_msg
elif
wh
==
1
:
GPS_msg
=
"4040003d00054d"
+
str
(
carId
)
+
"001001140305031e0301c329ed0659dec501f402e8000000b4050a0b0c9305050258001400000fa0000000005e606f115e60723be44b"
GPS_msg
=
GPS_msg
[:
-
4
]
+
self
.
crc16
(
GPS_msg
[:
-
4
])
msg
=
GPS_msg
elif
wh
==
2
:
OBD_msg
=
"4040007000064d"
+
str
(
carId
)
+
"00120114030503202d26d7fffff0000000000505000000143c00000bb80100000fa00000000a0000000000005e60723b723b39331e100055320000001312001007d0001e0000000000000096000000280096ffff3e0001f40000003e00000000000000000000007213"
OBD_msg
=
OBD_msg
[:
-
4
]
+
self
.
crc16
(
OBD_msg
[:
-
4
])
msg
=
OBD_msg
return
msg
####################################################
# 定义生成校验字段的函数(M500 校验方式)
# inputStr:需要传入一个已经转换为16进制的字符串
#####################################################
# add crc 16 check at the end of the string
def
crc16
(
self
,
inputStr
):
inputStrByte
=
bytes
.
fromhex
(
inputStr
)
crc
=
0xFFFF
for
i
in
range
(
0
,
len
(
inputStrByte
)):
for
j
in
range
(
0
,
8
):
c15
=
(
crc
>>
15
)
==
1
bit
=
((
inputStrByte
[
i
]
>>
(
7
-
j
))
&
1
)
==
1
crc
<<=
1
crc
&=
0xFFFF
if
c15
^
bit
:
crc
^=
0x1021
crc
=
str
(
hex
(
crc
))
crc
=
self
.
leftPad
(
crc
[
2
:],
4
)
# outputStr = inputStr + crc
outputStr
=
crc
return
outputStr
# pad zero to the left of the string if not long enough
def
leftPad
(
self
,
inputStr
,
strLen
):
if
(
strLen
>
len
(
inputStr
)):
outputStr
=
"0000000000000000000000000000000000000000"
+
inputStr
outputStr
=
outputStr
[
len
(
outputStr
)
-
strLen
:]
return
outputStr
else
:
return
inputStr
# pad zero to the right of the string if not long enough
def
rightPad
(
self
,
inputStr
,
strLen
):
if
(
strLen
>
len
(
inputStr
)):
outputStr
=
inputStr
+
"0000000000000000000000000000000000000000"
outputStr
=
outputStr
[:
strLen
]
return
outputStr
else
:
return
inputStr
def
getInfoFromResult
(
self
,
path
):
with
open
(
path
,
"r"
,
encoding
=
'utf-8'
)
as
fi
:
d
=
fi
.
read
()
d
=
json
.
loads
(
d
)
msgC
=
0
failT
=
0
for
i
in
d
:
msgC
=
msgC
+
d
[
i
][
"msgCon"
]
if
d
[
i
][
"status"
]
==
1
:
failT
=
failT
+
1
info_1
=
"文件统计总消息数:"
+
str
(
msgC
)
info_2
=
"文件统计总失败数:"
+
str
(
failT
)
print
(
info_1
)
print
(
info_2
)
return
info_1
+
"
\n
"
+
info_2
+
"
\n
"
if
__name__
==
"__main__"
:
t
=
SendMultMsgThread
()
# t.setHost("10.100.12.32")
t
.
setHost
(
"10.100.5.251"
)
t
.
setPort
(
9008
)
#M500
# t.setPort(9009) # M300
# t.startThread()
t
.
startThreadContinuous
()
# SendMultMsgThread().getInfoFromResult("../../data/threadDetailsContinuous.json")
\ No newline at end of file
lib/multiThread/ThreadBase.py
deleted
100644 → 0
View file @
7b6a9de9
#coding:utf-8
from
lib.multiThread.Base
import
Base
class
ThreadBase
(
Base
):
def
__init__
(
self
):
pass
\ No newline at end of file
lib/multiThread/__init__.py
deleted
100644 → 0
View file @
7b6a9de9
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment