Commit ca5e9955 authored by 李远洪's avatar 李远洪

首次提交

parents
Pipeline #66 failed with stages
.idea/*
venv/*
\ No newline at end of file
[socket]
host = 10.100.12.32
port = 9008
This diff is collapsed.
这是一个测试数据文件
\ No newline at end of file
#coding:utf-8
'''
定义基类,供所有协议类继承
'''
class Base():
def __init__(self):
pass
#####################################################
# 生成协议头
#####################################################
def getProtocalHeader(self):
# print("生成协议头方法")
pass
#encoding:utf-8
from lib.protocol.Base import Base
'''
定义协议类的基类
'''
class ProtocolBase(Base):
def __init__(self):
pass
#####################################################
# 数字转换为16进制字符串
#####################################################
def int2hexString(self, num):
hexStr = hex(num)[2:]
if (len(hexStr) % 2) == 1:
hexStr = "0" + hexStr
return hexStr
#####################################################
# 数字转换为16进制字符串,通过传入字节数可自动补0
# 传入数据格式所占字节数
#####################################################
def int2hexStringByBytes(self, num,bytescount=1):
hexStr = hex(num)[2:]
while len(hexStr) < (bytescount * 2):
hexStr = "0" + hexStr
return hexStr
#####################################################
# 设备id转换为16进制字符串
#####################################################
def devid2hexString(self, id):
# 获取第一个字符的ASCII值
ascii = ord(id[0:1])
# 将10进制的ASCII值转换为16进制
ascii = self.int2hexString(int(ascii))
devid = str(ascii) + id[1:]
return devid
####################################################
# 获取消息体长度
#####################################################
def getMsgLength(self,num):
hexData = self.int2hexString(num)
while len(hexData) < 4:
hexData = "0" + hexData
return hexData
#####################################################
# 获取流水号
#####################################################
def getWaterCode(self,num):
hexData = self.int2hexString(num)
while len(hexData) < 4:
hexData = "0" + hexData
return hexData
#####################################################
# 获取校验码
#####################################################
def getCheckCode(self,data):
return self.crc16(data)
#####################################################
# 定义生成校验字段的函数(自己翻译的函数,简化了很多步骤)
# 通过我实现的方式
#####################################################
def myCrc16(self,msg):
msg = self.str2Ascsii(msg)
crc = 0xFFFF
for i in range(0, len(msg)):
for j in range(0, 8):
cl5 = ((crc >> 15 & 1) == 1)
bit = ((msg[i] >> (7 - j) & 1) == 1)
crc <<= 1
# 通过与0xFFFF(即二进制:1111111111111111)做了一个或运算,将其转换为一个有符号的数
crc &= 0xFFFF
if (cl5 ^ bit):
crc ^= 0x1021;
crc = hex(crc) # 将10进制的crc转换为16进制
crc = str(crc)[2:] # 将16进制转换为字符串,并去掉前面的0x
return crc
#####################################################
# 将字符串转换为对应的ascii值数组
#####################################################
def str2Ascsii(s):
asciiArr = []
for i in range(0, len(s)):
asciiValue = ord(s[i])
asciiArr.append(asciiValue)
return asciiArr
#####################################################
# 将字符串转换为对应的ascii字母对应的16进制字符串
#####################################################
def str2Hex(self,s):
sHex = ""
tem = ""
for i in s:
tem = ord(i)
sHex += hex(tem)[2:]
return sHex
####################################################
# 定义生成校验字段的函数
# inputStr:需要传入一个已经转换为16进制的字符串
#####################################################
# add crc 16 check at the end of the string
def crc16(self,inputStr):
inputStrByte = bytes.fromhex(inputStr)
crc = 0xFFFF
for i in range(0, len(inputStrByte)):
for j in range(0, 8):
c15 = (crc >> 15) == 1
bit = ((inputStrByte[i] >> (7 - j)) & 1) == 1
crc <<= 1
crc &= 0xFFFF
if c15 ^ bit:
crc ^= 0x1021
crc = str(hex(crc))
crc = self.leftPad(crc[2:], 4)
# outputStr = inputStr + crc
outputStr = crc
return outputStr
# pad zero to the left of the string if not long enough
def leftPad(self,inputStr, strLen):
if (strLen > len(inputStr)):
outputStr = "0000000000000000000000000000000000000000" + inputStr
outputStr = outputStr[len(outputStr) - strLen:]
return outputStr
else:
return inputStr
# pad zero to the right of the string if not long enough
def rightPad(self,inputStr, strLen):
if (strLen > len(inputStr)):
outputStr = inputStr + "0000000000000000000000000000000000000000"
outputStr = outputStr[: strLen]
return outputStr
else:
return inputStr
#####################################################
# 将UTC时间转换为16进制,
# 例如:2020-01-02 20:30:00 (年取后面2字节)则将20,01,02,20,30,00 转换为对应的6个字节
# theTime:传入一个类似:2020-01-03 13:05:13的一个字符串
#####################################################
def getUTCTimeHex(self, theTime):
# 获取当前时间,时间格式为:2020-01-03 13:05:13
# now_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 将2020-01-03 13:05:13时间格式转换为一个数组
# timeStr = "2020-01-03 13:05:13"
timeStr = theTime
timeArr = []
timeArr.append(timeStr[2:4])
timeArr.append(timeStr[5:7])
timeArr.append(timeStr[8:11])
timeArr.append(timeStr[11:13])
timeArr.append(timeStr[14:16])
timeArr.append(timeStr[17:19])
UTCTime = ""
for i in range(0, len(timeArr)):
UTCTime += self.int2hexString(int(timeArr[i]))
return UTCTime
####################################################
# 将整数转换为有符号的整数
#####################################################
def num2signedNum(self,num):
return num & 0xff
if __name__ == "__main__":
# print(ProtocolBase().str2Hex("a"))
# print(ProtocolBase().int2hexStringByBytes(1,6))
# print(ProtocolBase().num2signedNum(-5))
print(ProtocolBase().getUTCTimeHex("2020-01-03 13:05:13"))
#encoding:utf-8
from lib.protocol.Base import Base
'''
定义消息协议类的基类
'''
class MessageBase(Base):
def __init__(self):
self.IDENTIFY = "7e" #标识位
pass
#######################################################
# 获取消息头
#######################################################
def getMsgHeader(self):
msgID = self.int2hexStringByBytes(102,2) #消息id
msgBodyProperty = self.getMsgBodyProperty() #消息体属性
phoneNum = self.int2BCD(13146201118) #终端手机号
msgWaterCode = self.int2BCD(1,2) #消息流水号
subPkgContent = "" #消息包封装项
data = msgID + msgBodyProperty + phoneNum + msgWaterCode + subPkgContent
return data
#获取消息体属性
def getMsgBodyProperty(self,msgBodyLen=128,encryptionType=1024,subPkg=8192):
if msgBodyLen >= 512:
raise RuntimeError('消息体长度超长!')
msgBodyLen = msgBodyLen #消息体长度
encryptionType = encryptionType #加密方式
subPkg = subPkg #分包
retain = 0 #保留位
data = msgBodyLen + encryptionType + subPkg + retain
dataHex = self.int2hexStringByBytes(data,2)
return dataHex
#######################################################
# 获取消息体
#######################################################
def getMsgBody(self):
pass
#######################################################
# 获取校验码
#######################################################
def getCheckCode(self,data):
if len(data) % 2 == 1:
raise RuntimeError('数据段错误!')
start = data[0:2]
tmp = int(start,16)
for i in range(2,len(data),2):
tmp = tmp ^ int(data[i:i + 2],16)
dataHex = self.int2hexStringByBytes(tmp)
return dataHex
#######################################################
# 字符串转16进制
#######################################################
def str2Hex(self,data):
dataHex = ""
tem = ""
for i in data:
tem = ord(i)
dataHex += hex(tem)[2:]
return dataHex
#####################################################
# 将字符串转换为对应的ascii值数组
#####################################################
def str2Ascsii(self,data):
asciiArr = []
for i in range(0, len(data)):
asciiValue = ord(data[i])
asciiArr.append(asciiValue)
return asciiArr
####################################################
# 将整数转换为有符号的整数
#####################################################
def num2signedNum(self,num):
return num & 0xff
#####################################################
# 数字转换为16进制字符串,通过传入字节数可自动补0
# 传入数据格式所占字节数
#####################################################
def int2hexStringByBytes(self, num,bytescount=1):
hexStr = hex(num)[2:]
while len(hexStr) < (bytescount * 2):
hexStr = "0" + hexStr
return hexStr
####################################################
# 将10进制转换位8421码
#####################################################
def int2BCD(self,data):
data = str(data)
while len(data) < 12:
data = "0" + data
return data
if __name__ == "__main__":
# print(MessageBase().str2Hex("uvwxyz"))
# print(MessageBase().str2Ascsii("uvwxyz"))
# print(MessageBase().int2hexStringByBytes(22,2))
# print(MessageBase().getMsgBodyProperty())
# print(MessageBase().int2BCD(13146201117))
print(MessageBase().getCheckCode("abcde0"))
\ No newline at end of file
#encoding:utf-8
from lib.protocol.message import MessageBase
'''
定义终端通用应答
'''
class TerminalCommonMsgRes(MessageBase):
def __init__(self):
pass
\ No newline at end of file
This diff is collapsed.
#coding:utf-8
import datetime
from lib.protocol.ProtocolBase import ProtocolBase
from lib.protocol.appendix.EventClass import EventClass
from lib.protocol.report.SecurityStatusReport_protocol import SecurityStatusReport_protocol
'''
终端上事件数据包
'''
class EventReport_protocol(ProtocolBase):
def __init__(self,msgCount = 1,WATER_CODE = 26,DEV_ID = "M121501010001",locationType=1,eventType="000F"):
super().__init__()
self.msgCount = msgCount #数据包个数
self.WATER_CODE = int(WATER_CODE); #消息流水号
self.DEV_ID = DEV_ID #设备id
self.locationType = int(locationType) # 定位类型
#self.GPSPkg = GPSpkg & BaseStationPkg # GPS包或者基站包
self.GPSPkg = "1401091213260265b86206ed8c70026103280000752f03030405af017102610bb800003200000186a0001ed2a25e16fe3a"
self.BaseStationPkg = "1401140a0c050207e407e607e807ea07ec4eea4eec4eee4ef04efc4efe4f004f024f040024025e07d00007a125000927c60000ea610100"
self.eventType = eventType #事件类别
#####################################################
# 生成事件信息消息
#####################################################
def generateEventMsg(self):
self.getProtocalHeader()
info = ""
#消息头
HEADER = "4040"
#消息流水号
WATER_CODE = self.getWaterCode(self.WATER_CODE)
#设备id
DEV_ID = self.devid2hexString(self.DEV_ID)
# 功能id(GPS功能id)
FUN_ID = "0021"
#数据段
data = ""
for i in range(0,self.msgCount):
data += self.generateEventPkg(self.msgCount,self.generateEventData())
# 消息长度
LENGTH = self.getMsgLength(int(len(WATER_CODE + DEV_ID + FUN_ID + data)/2))
info += HEADER
info += LENGTH
info += WATER_CODE
info += DEV_ID
info += FUN_ID
info += data
# 校验字段
CHECK_CODE = self.getCheckCode(info)
info += CHECK_CODE
return info
#####################################################
# 创建GPS数据包,包含包个数
# data:传入GPS数据包的多条数据段
#####################################################
def generateEventPkg(self,pkgNum,data):
pkgNumHex = self.int2hexString(int(pkgNum))
pkg = pkgNumHex + data
return pkg
#####################################################
# 创建事件信息数据段
#####################################################
def generateEventData(self):
data = ""
locationType = self.int2hexString(self.locationType) #定位类型
locationData = "" #GPS包或基站包
if self.locationType == 1:
locationData = self.GPSPkg
elif self.locationType == 2:
locationData = self.BaseStationPkg
eventType = self.eventType
extraInfo = self.getExtraData(eventType) #附带信息
extraInfoLen = self.int2hexStringByBytes(int((len(extraInfo) / 2)),2) #附带信息长度
data = locationType + locationData + eventType + extraInfoLen + extraInfo
return data
#####################################################
# 获取附带信息
#####################################################
def getExtraData(self,eventType):
if eventType == "0001": #终端插入报警
return EventClass().terminalInsertionAlarmExtraInfo()
elif eventType == "0002": #终端拔出报警
return EventClass().terminalPulloutAlarmExtraInfo()
elif eventType == "0003": #汽车电瓶低电压报警
return ""
elif eventType == "0004": #终端主电断电报警
return ""
elif eventType == "0005": #GPS模块故障报
return ""
elif eventType == "0006": #GPS天线开路报警
return ""
elif eventType == "0007": #GPS天线短路报警
return ""
elif eventType == "0008": #GPS定位时间过长报警
return ""
elif eventType == "0009": #FLASH故障报警
return ""
elif eventType == "000A": #CAN模块故障报警
return ""
elif eventType == "000B": #3D传感器故障报警
return ""
elif eventType == "000C": #RTC模块故障报警
return ""
elif eventType == "000D": #TF卡故障报警
return ""
elif eventType == "000E": #刷卡器故障报警
return ""
elif eventType == "000F": #汽车预点火上报
return EventClass().preFiringExtraInfo()
elif eventType == "0010": #汽车点火上报
return EventClass().fireExtraInfo()
elif eventType == "0011": #汽车熄火上报
return EventClass().misFireExtraInfo()
elif eventType == "0012": #汽车设防上报
return EventClass().setUpDefencesExtraInfo()
elif eventType == "0013": #汽车撤防上报
return EventClass().setDownDefencesExtraInfo()
elif eventType == "0014": #锁车未成功提醒
return EventClass().lockCarFaillExtraInfo()
elif eventType == "0015": #超时未设防提醒
return EventClass().noDefencesWithTimeoutExtraInfo()
elif eventType == "0016": #设防玻璃未关提醒
return EventClass().defencesGlassNoCloseExtraInfo()
elif eventType == "0017": #设防非法开门报警
return EventClass().defencesIllegalCloseDoorExtraInfo()
This diff is collapsed.
#coding:utf-8
'''
定义一个心跳协议的类
'''
import datetime
import time
from lib.protocol.ProtocolBase import ProtocolBase
'''
终端心跳协议数据包
'''
class HeartBeatReport_protocol(ProtocolBase):
def __init__(self,WATER_CODE = "0003",DEV_ID = "M121501010001"):
super().__init__()
self.WATER_CODE = int(WATER_CODE); # 设置默认消息流水号
self.DEV_ID = DEV_ID # 设置默认设备id
#####################################################
# 生成心跳消息
#####################################################
def generateHeartBeatMsg(self):
self.getProtocalHeader()
info = ""
HEADER = "4040" # 消息头
WATER_CODE = self.getWaterCode(self.WATER_CODE) # 消息流水号
DEV_ID = self.devid2hexString(self.DEV_ID) # 设备id
FUN_ID = "0003" # 功能id(心跳功能id)
data = "" # 数据段
LENGTH = self.getMsgLength(int(len(WATER_CODE + DEV_ID + FUN_ID + data) / 2)) # 消息长度
info += HEADER
info += LENGTH
info += WATER_CODE
info += DEV_ID
info += FUN_ID
info += data
CHECK_CODE = self.getCheckCode(info) # 校验字段
info += CHECK_CODE
return info
if __name__ == "__main__":
pass
\ No newline at end of file
#coding:utf-8
'''
定义一个终端登录协议的类
'''
import datetime
import time
from lib.protocol.ProtocolBase import ProtocolBase
class LoginReport_protocol(ProtocolBase):
def __init__(self,WATER_CODE = "0002",DEV_ID = "M121501010001",cpuId="CPU-ID001122334455667788",imsi="IMSI13145678902",ccid="CCID1122334455667788",imei="IMEI12233445566"):
super().__init__()
self.WATER_CODE = int(WATER_CODE); # 设置默认消息流水号
self.DEV_ID = DEV_ID # 设置默认设备id
self.cpuId = cpuId #设置默认cupId值
self.imsi = imsi #设置默认imsi值
self.ccid = ccid #设置默认ccid值
self.imei = imei #设置默认imei值
#####################################################
# 生成终端登录消息
#####################################################
def generateLoginMsg(self):
self.getProtocalHeader()
info = ""
HEADER = "4040" # 消息头
WATER_CODE = self.getWaterCode(self.WATER_CODE) # 消息流水号
DEV_ID = self.devid2hexString(self.DEV_ID) # 设备id
FUN_ID = "0003" # 功能id(终端登录功能id)
data = self.generateLoginData() # 数据段
LENGTH = self.getMsgLength(int(len(WATER_CODE + DEV_ID + FUN_ID + data) / 2)) # 消息长度
info += HEADER
info += LENGTH
info += WATER_CODE
info += DEV_ID
info += FUN_ID
info += data
CHECK_CODE = self.getCheckCode(info) # 校验字段
info += CHECK_CODE
return info
#####################################################
# 创建终端登录数据段
#####################################################
def generateLoginData(self):
data = ""
CPU_ID = self.getCPU_IDHex(self.cpuId) #CPU-ID
IMSI = self.getIMSIHex(self.imsi) #IMSI
CCID = self.getCCIDHex(self.ccid) #CCID
IMEI = self.getIMEIHex(self.imei) #IMEI
data = data + CPU_ID + IMSI +CCID +IMEI
return data
#####################################################
# 获取CPU_ID对应的16进制数据
#####################################################
def getCPU_IDHex(self,data):
return self.str2Hex(data)
#####################################################
# 获取IMSI对应的16进制数据
#####################################################
def getIMSIHex(self,data):
return self.str2Hex(data)
#####################################################
# 获取CCID对应的16进制数据
#####################################################
def getCCIDHex(self,data):
return self.str2Hex(data)
#####################################################
# 获取IMDE对应的16进制数据
#####################################################
def getIMEIHex(self,data):
return self.str2Hex(data)
if __name__ == "__main__":
print(LoginReport_protocol().generateLoginData())
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
#coding:utf-8
from lib.protocol.ProtocolBase import ProtocolBase
'''
终端上报故障码数据包
'''
class TroubleReport_protocol(ProtocolBase):
def __init__(self,WATER_CODE = 26,DEV_ID = "M121501010001",infoTime="2020-01-20 11:55:58",troubleCount=1,byte0="02",byte1=2,byte2=3,byte3=1,MILStatus=1):
super().__init__()
self.WATER_CODE = int(WATER_CODE); # 设置默认消息流水号
self.DEV_ID = DEV_ID # 设置默认设备id
self.infoTime = infoTime #时间信息
self.troubleCount = int(troubleCount) #故障码个数
self.byte0 = byte0 #系统id
self.byte1 = int(byte1) #故障码内容
self.byte2 = int(byte2) #故障码内容
self.byte3 = int(byte3) #故障码状态
self.MILStatus = int(MILStatus) #MIL状态
#####################################################
# 生成故障码消息
#####################################################
def generateTroubleMsg(self):
self.getProtocalHeader()
info = ""
HEADER = "4040" # 消息头
WATER_CODE = self.getWaterCode(self.WATER_CODE) # 消息流水号
DEV_ID = self.devid2hexString(self.DEV_ID) # 设备id
FUN_ID = "001A" # 功能id(GPS功能id)
data = self.generateTroubleData(self.troubleCount) # 数据段
# 消息长度
LENGTH = self.getMsgLength(int(len(WATER_CODE + DEV_ID + FUN_ID + data) / 2))
info += HEADER
info += LENGTH
info += WATER_CODE
info += DEV_ID
info += FUN_ID
info += data
CHECK_CODE = self.getCheckCode(info)
info += CHECK_CODE # 校验字段
return info
#####################################################
# 创建故障码数据段
#####################################################
def generateTroubleData(self,troubleCount):
data = ""
infoTime = self.getUTCTimeHex(self.infoTime) #时间信息
data += infoTime
data += self.int2hexString(troubleCount)
for i in range(0,troubleCount):
data = data + self.generateTroubleCode()
data = data + self.int2hexString(self.MILStatus)
return data
#####################################################
# 生成一个故障码数据
#####################################################
def generateTroubleCode(self):
byte0 = self.byte0 #系统ID
byte1 = self.getByte1Hex(self.byte1) #故障码内容
byte2 = self.getByte2Hex(self.byte2) #故障码内容
byte3 = self.getByte3Hex(self.byte3) #故障码状态
data = byte0 + byte1 +byte2 + byte3
return data
#####################################################
# 获取系统ID的16进制数据
# 0x00: 发动机故障码
# 0x01: 变速箱故障码
# 0x02: ABS故障码
# 0x03: 安全气囊故障码
# 其它预留
#####################################################
def getByte0Hex(self, data):
return "02"
#####################################################
# 获取故障码内容的16进制数据
#####################################################
def getByte1Hex(self,data):
hexData = self.int2hexString(data)
return hexData
#####################################################
# 获取故障码内容的16进制数据
#####################################################
def getByte2Hex(self, data):
hexData = self.int2hexString(data)
return hexData
#####################################################
# 获取故障码状态的16进制数据
#####################################################
def getByte3Hex(self, data):
hexData = self.int2hexString(data)
return hexData
#coding:utf-8
#########################################################
#
# 定义基类,供所有类继承
#
#########################################################
class Base():
def __init__(self):
pass
#coding:utf-8
import socket
from time import sleep
from lib.socket.SocketBase import SocketBase
import binascii
import random
import string
import traceback
'''
定义客户端socket类
'''
class ClientSocket(SocketBase):
def __init__(self, host="127.0.0.1", port=5000):
#socke连接的标识,生成一个长度为8的随机字符串
self.socketId = ''.join(random.sample(string.ascii_letters + string.digits, 8))
self.port = port
self.host = host
# 0代表未连接 , 1代表socket处于连接状态
self.status = 0
self.BUF_SIZE = 1024
#####################################################
# 连接服务器
#####################################################
def connect(self):
self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳
self.client.connect((self.host, self.port))
self.status = 1
#####################################################
# 发送消息
#####################################################
def send(self, msg):
# self.client.send(msg.encode())
# client.send(bytes.fromhex(msg))
self.client.send(binascii.a2b_hex(msg))
def receive(self):
data = ""
try:
# 设置接收消息的超时时间
self.client.settimeout(1)
data = self.client.recv(self.BUF_SIZE)
except BaseException as e:
# traceback.print_exc()
print("连接超时,socket断开")
self.client.close()
self.status = 0
raise RuntimeError('socket 接收消息超时!')
return data
#####################################################
# 断开socket
#####################################################
def close(self):
# self.client.send("_end".encode()) //发送一个socket断开的命令
self.client.close()
self.status = 0
#设置服务器域名
def setHost(self,host):
self.host = host
#设置要连接的服务器端口
def setPort(self,port):
self.port = port
#获取该连接的socket ID号
def getSocketId(self):
return self.socketId
def getSocketStatus(self):
return self.status
if __name__ == "__main__":
client = ClientSocket()
client.connect()
for i in range(0,10):
client.send("world" + str(i))
sleep(1)
print(i)
client.close()
\ No newline at end of file
#coding: utf-8
import logging
import threading
from time import sleep
from lib.socket.SocketBase import SocketBase
from lib.socket.websocket_server import WebsocketServer
class MyWebsocket_server(SocketBase):
def __init__(self,host="127.0.0.1",port=5005):
self.connectTimeout = 5 #设置socket服务启动之后的连接超时时间
self.connectTimeoutstatus = 0 #设置socket服务启动之后是否有连接,0为为收到连接,1:为已经收到了连接
self.msgTimeout = 12 #设置12秒没有收到任何消息之后,就关闭连接
self.host = host
self.port = port
def startWebsocketServer(self):
server = WebsocketServer(self.port, host=self.host, loglevel=logging.INFO)
server.set_fn_new_client(self.new_client)
server.set_fn_message_received(self.mysend)
self.connectTimeoutThreading(server) #启动超时连接线程
self.msgTimeoutThreading(server) #启动接收消息超时连接线程
server.run_forever()
server.server_close()
#有客户端连接成功之后,回复一条消息
def new_client(self,client, server):
server.send_message_to_all("连接成功......")
self.connectTimeout = 1
#收到消息之后的恢复消息方法
def mysend(self,client,server,msg):
print(msg)
self.msgTimeout = 12
server.send_message(client,"哈哈哈哈哈")
if(msg == "_end"): #收到_end 控制码的时候,断开连接
server.server_close()
print("连接断开...")
###################################################
# 超过时间没有客户端连接,自动断开连接线程
###################################################
def connectTimeoutThreading(self,server):
t1 = threading.Thread(target=self.doConnectTomeout, args=(server,))
t1.start()
###################################################
# 超过时间没有客户端连接,自动断开连接方法
###################################################
def doConnectTomeout(self,server):
while self.connectTimeout > 0:
self.connectTimeout -= 1
sleep(1)
if self.connectTimeoutstatus == 0:
server.server_close()
print("连接断开...")
else:
pass
###################################################
# 超过时间没有收到任何消息,自动断开连接线程
###################################################
def msgTimeoutThreading(self, server):
t1 = threading.Thread(target=self.doMsgTimeout, args=(server,))
t1.start()
###################################################
# 超过时间没有收到任何消息,自动断开连接方法
###################################################
def doMsgTimeout(self, server):
while self.msgTimeout > 0:
self.msgTimeout -= 1
sleep(1)
if self.connectTimeout == 1:
server.server_close()
print("连接断开...")
#coding:utf-8
#########################################################
#
# 定义服务端socket类
#
#########################################################
import socket
from time import sleep
from lib.socket.SocketBase import SocketBase
class ServerSocket(SocketBase):
def __init__(self,host="127.0.0.1",port=5000):
self.host = host
self.port = port
self.status = 0 #0代表socket停止服务,1代表socket服务运行
self.BUF_SIZE = 1024
#####################################################
# 创建一个服务端套接字
#####################################################
def createServerSocket(self):
serverSocket = socket.socket() #默认为ipv4,tcp连接
#下面创建socketyu8上面创建的没有区别(AF_INET:ipv4,SOCK_STREAM代表使用TCP连接)
# serverSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serverSocket.bind((self.host,self.port))
return serverSocket
#####################################################
# 启动socket服务
#####################################################
def startServer(self):
self.status = 1
server = self.createServerSocket()
# 设置接收的连接数为1
server.listen(1)
server.settimeout(10)
client, address = server.accept()
while self.status == 1: # 循环收发数据包,长连接
data = client.recv(self.BUF_SIZE)
text = data.decode()
sleep(1)
if text != "":
print(text) # python3 要使用decode
client.send("world".encode())
# client.close() #连接不断开,长连接
#接收到断开连接的信息后,关闭socket;应为python的socket没有提供获取服务端自身socket的状态或者客户端是否断开了连接
if text == "_end":
self.close()
def close(self):
self.status = 0
def getHost(self):
return self.host
def getPort(self):
return self.port
def setHost(self,host):
self.host = host
def setPort(self,port):
self.port = port
if __name__ == "__main__":
server = ServerSocket("localhost", 5000)
server.createServerSocket()
server.startServer()
\ No newline at end of file
#encoding:utf-8
#########################################################
#
# 定义socket的基类
#
#########################################################
from lib.socket.Base import Base
class SocketBase(Base):
def __init__(self):
pass
#coding:utf-8
import binascii
import socket
from lib.protocol.report.GPSReport_protocol import GPSReport_protocol
from lib.protocol.report.OBDReport_CAN_protocol import OBDReport_CAN_protocol
from lib.protocol.report.OBDReport_CAN_protocol import OBDReport_CAN_protocol
from lib.protocol.report.HeartBeatReport_protocol import HeartBeatReport_protocol
from lib.protocol.report.LoginReport_protocol import LoginReport_protocol
from lib.protocol.report.SecurityStatusReport_protocol import SecurityStatusReport_protocol
from lib.protocol.report.BaseStationReport_protocol import BaseStationReport_protocol
from lib.protocol.report.TroubleReport_protocol import TroubleReport_protocol
from lib.protocol.report.EventReport_protocol import EventReport_protocol
# host = "10.100.12.34"
# port = 8712
host = "10.100.12.32"
port = 9008
# msg = GPSReport_protocol().generateGpsMsg() #GPS消息数据
# msg = OBDReport_protocol().generateOBDReportMsg() #OBD终端上报数据
# msg = OBDReport_CAN_protocol().generateOBDReportCANMsg() #OBD终端上报CAN数据
# msg = HeartBeatReport_protocol().generateHeartBeatMsg() #终端上报心跳协议
# msg = LoginReport_protocol().generateLoginMsg() #终端上报登录协议
# msg = SecurityStatusReport_protocol().generateSecurityStatusMsg() #终端上报安防状态协议
# msg = BaseStationReport_protocol().generateBaseStationMsg() #终端上报基站定位协议
# msg = TroubleReport_protocol().generateTroubleMsg() #终端上报故障码数据包
msg = EventReport_protocol().generateEventMsg()
print(msg)
BUF_SIZE = 1024
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳
client.connect((host, port))
client.send(binascii.a2b_hex(msg))
# client.send(bytes.fromhex(msg))
data = client.recv(BUF_SIZE)
print(data)
client.close()
This diff is collapsed.
#coding:utf-8
#########################################################
#
# 定义图形界面基类,供所有类继承
#
#########################################################
class Base():
def __init__(self):
pass
\ No newline at end of file
#encoding:utf-8
#########################################################
#
# 定义图形界面的基类
#
#########################################################
from lib.ui.Base import Base
class InterfaceBase(Base):
def __init__(self):
pass
\ No newline at end of file
#coding:utf-8
import wx
from lib.ui.InterfaceBase import InterfaceBase
from lib.ui.MyMenubar import MyMenubar
from lib.ui.MyToolsbar import MyToolsbar
'''
定义主窗体
'''
class MainWindow(InterfaceBase):
def __init__(self):
self.app = None
self.frame = None
self.main()
def main(self):
self.app = wx.App()
self.frame = wx.Frame(None,-1, title='socketTools',size = wx.Size(800,700))
#添加设置菜单栏
self.frame.SetMenuBar(MyMenubar(self.frame).createMenuBar())
#添加设置工具栏
self.toolsBar = MyToolsbar(self.frame).createToolsBar()
#创建主窗体布局面板,将主窗体分为几大块
self.createPanel()
#####################################################
# 显示窗体
#####################################################
def show(self):
self.frame.Show()
self.app.MainLoop()
#####################################################
# 创建主窗体的布局面板
#####################################################
def createPanel(self):
# 创建面板
splitter = wx.SplitterWindow(self.frame, wx.ID_ANY)
panel1 = wx.Panel(splitter, wx.ID_ANY)
treeList = wx.TreeCtrl(panel1, wx.ID_ANY, size=(wx.EXPAND, wx.EXPAND))
item1 = treeList.AddRoot("工具汇总")
item11 = treeList.AppendItem(item1, "终端上报报文")
treeList.AppendItem(item1, "远程查询报文")
treeList.AppendItem(item1, "车辆行为模拟")
treeList.AppendItem(item1, "系统设置")
treeList.AppendItem(item11, "GPS上报报文")
treeList.AppendItem(item11, "OBD_CAN上报报文")
treeList.AppendItem(item11, "其他报文")
panel2 = wx.Panel(splitter, wx.ID_ANY)
panel2.SetBackgroundColour("pink")
splitter.SplitVertically(panel1, panel2)
splitter.SetSashPosition(200) #设置水平分割的位置
if __name__ == "__main__":
MainWindow().show()
\ No newline at end of file
#coding:utf-8
import wx
from lib.ui.InterfaceBase import InterfaceBase
'''
定义菜单栏
'''
class MyMenubar(InterfaceBase):
def __init__(self,frame):
self.frame = frame
#####################################################
# wxpython定义菜单栏
#####################################################
def createMenuBar(self):
menuBar = wx.MenuBar()
#《------------------ 开始菜单编码start ---------------------》
#first_ 代表一级菜单,即menubar上显示的菜单
first_menuStart = wx.Menu()
menuItemOpen = first_menuStart.Append(wx.ID_ANY, u"打开 ")
menuItemExit = first_menuStart.Append(wx.ID_ANY, u"退出 ")
self.frame.Bind(wx.EVT_MENU, self.myExit, menuItemExit)
# 《------------------ 开始菜单编码end ---------------------》
menuBar.Append(first_menuStart, u"文件")
return menuBar
def myExit(self,evt):
print(evt)
wx.Exit()
\ No newline at end of file
#coding:utf-8
import wx
from lib.ui.InterfaceBase import InterfaceBase
'''
定义工具条
'''
class MyToolsbar(InterfaceBase):
def __init__(self,frame):
self.frame = frame
#####################################################
# 创建一个工具栏,并返回该对象
#####################################################
# @property
def createToolsBar(self):
toolBar = self.frame.CreateToolBar()
openTool = toolBar.AddTool(wx.ID_ANY, "open",
wx.Bitmap(wx.Image("../../img/folder.png", "image/png").Scale(25, 25),
wx.BITMAP_SCREEN_DEPTH))
startTool = toolBar.AddTool(wx.ID_ANY, "start",
wx.Bitmap(wx.Image("../../img/start.png", "image/png").Scale(25, 25),
wx.BITMAP_SCREEN_DEPTH))
stopTool = toolBar.AddTool(wx.ID_ANY, "stop",
wx.Bitmap(wx.Image("../../img/stop.png", "image/png").Scale(25, 25),
wx.BITMAP_SCREEN_DEPTH))
toolBar.AddSeparator()
infoTool = toolBar.AddTool(wx.ID_ANY, "info",
wx.Bitmap(wx.Image("../../img/info.png", "image/png").Scale(25, 25),
wx.BITMAP_SCREEN_DEPTH))
helpTool = toolBar.AddTool(wx.ID_ANY, "help",
wx.Bitmap(wx.Image("../../img/help.png", "image/png").Scale(25, 25),
wx.BITMAP_SCREEN_DEPTH))
toolBar.Realize()
return toolBar
\ No newline at end of file
#coding:utf-8
'''
定义一个处理数据包的辅助方法集合
'''
from binascii import *
from crcmod import *
import struct
#####################################################
# 定义生成校验字段的函数(自己翻译的函数,简化了很多步骤)
#####################################################
def myCrc16(msg):
msg = str2Ascsii(msg)
crc = 0xFFFF
for i in range(0, len(msg)):
for j in range(0, 8):
cl5 = ((crc >> 15 & 1) == 1)
bit = ((msg[i] >> (7 - j) & 1) == 1)
crc <<= 1
# 通过与0xFFFF(即二进制:1111111111111111)做了一个或运算,将其转换为一个有符号的数
crc &= 0xFFFF
if (cl5 ^ bit):
crc ^= 0x1021;
crc = hex(crc) # 将10进制的crc转换为16进制
crc = str(crc)[2:] # 将16进制转换为字符串,并去掉前面的0x
return crc
#####################################################
# 定义生成校验字段的函数
# inputStr:需要传入一个已经转换为16进制的字符串
#####################################################
# add crc 16 check at the end of the string
def crc16(inputStr):
inputStrByte = bytes.fromhex(inputStr)
crc = 0xFFFF
for i in range(0, len(inputStrByte)):
for j in range(0, 8):
c15 = (crc >> 15) == 1
bit = ((inputStrByte[i] >> (7 - j)) & 1) == 1
crc <<= 1
crc &= 0xFFFF
if c15 ^ bit:
crc ^= 0x1021
crc = str(hex(crc))
crc = leftPad(crc[2:], 4)
# outputStr = inputStr + crc
outputStr = crc
return outputStr
# pad zero to the left of the string if not long enough
def leftPad(inputStr, strLen):
if (strLen > len(inputStr)):
outputStr = "0000000000000000000000000000000000000000" + inputStr
outputStr = outputStr[len(outputStr) - strLen:]
return outputStr
else:
return inputStr
# pad zero to the right of the string if not long enough
def rightPad(inputStr, strLen):
if (strLen > len(inputStr)):
outputStr = inputStr + "0000000000000000000000000000000000000000"
outputStr = outputStr[: strLen]
return outputStr
else:
return inputStr
#####################################################
# 将字符串转换为16进制的数字字符串
#####################################################
def str2Hex(s):
s_hex=""
for i in range(len(s)):
s_hex=s_hex+hex(ord(s[i]))[2:]+" "
return s_hex
#####################################################
# 将字符串转换为16进制的数字字符串,并去掉空格
#####################################################
def str2HexStrip(s):
s = str2Hex(s)
s2 = s.replace(" ","")
return s2
#####################################################
# 将字符串转换为对应的ascii值数组
#####################################################
def str2Ascsii(s):
asciiArr = []
for i in range(0,len(s)):
asciiValue = ord(s[i])
asciiArr.append(asciiValue)
return asciiArr
if __name__ == "__main__":
print(crc16(str2Hex("aa")))
print(str2Hex("aa"))
print(str2HexStrip("aa"))
print(str2Ascsii("aa"))
print(myCrc16("aa"))
print(crc16(str2HexStrip("aa")))
#coding:utf-8
'''
定义一个获取经纬度的辅助方法集合
'''
import json
#####################################################
# 获取文件的金纬度数据,返回一个经纬度数组
# path:传入文件路径
#####################################################
def getLocationInfo(path):
with open(path,"r",encoding='utf-8') as fi:
loc_data = fi.readlines()
strData = ""
for lineD in loc_data:
strData += lineD
json_data = json.loads(strData)
return json_data["locationInfo"]
if __name__ == "__main__":
print(getLocationInfo("../../data/protocolTools/GPSLine_1.json"))
\ No newline at end of file
这是一个测试日志文件
\ No newline at end of file
wxPython==4.0.7.post2
Flask==1.0.2
crcmod==1.7
from flask import Flask, redirect
from views.protocolTools.setting_view import setting_view
from views.protocolTools.setting_process import setting_process
from views.protocolTools.protocolReport_view import protocolReport_view
from views.protocolTools.protocolReport_process import protocolReport_process
from views.protocolTools.test_view import test_view
from views.tab1.style import style
from views.tab1.icon import icon
from views.tab2.login import login
from views.tab2.example import example
from views.other import other
app = Flask(__name__)
app.register_blueprint(setting_view,url_prefix = "/protocolTools/setting_view")
app.register_blueprint(setting_process,url_prefix = "/protocolTools/setting_process")
app.register_blueprint(protocolReport_view,url_prefix = "/protocolTools/protocolReport_view")
app.register_blueprint(protocolReport_process,url_prefix = "/protocolTools/protocolReport_process")
app.register_blueprint(test_view,url_prefix = "/protocolTools/test_view")
app.register_blueprint(style,url_prefix = "/tab1/style")
app.register_blueprint(icon,url_prefix = "/tab1/icon")
app.register_blueprint(login,url_prefix = "/tab2/login")
app.register_blueprint(example,url_prefix = "/tab2/example")
app.register_blueprint(other,url_prefix = "/other")
@app.route('/')
def hello():
return redirect('/tab1/style')
if __name__ == '__main__':
app.run(debug=True,host='0.0.0.0')
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This source diff could not be displayed because it is too large. You can view the blob instead.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
// This file is autogenerated via the `commonjs` Grunt task. You can require() this file in a CommonJS environment.
require('../../js/transition.js')
require('../../js/alert.js')
require('../../js/button.js')
require('../../js/carousel.js')
require('../../js/collapse.js')
require('../../js/dropdown.js')
require('../../js/modal.js')
require('../../js/tooltip.js')
require('../../js/popover.js')
require('../../js/scrollspy.js')
require('../../js/tab.js')
require('../../js/affix.js')
\ No newline at end of file
This diff is collapsed.
/**
*页面顶部的tab切换
*/
function swichTap(e){
var id = $(e).attr("id")
if(id == "protocolTools"){
$(location).attr('href', "http://" + window.location.host + "/protocolTools/protocolReport_view/GPS_protocol_page");
}else if(id == "tab1"){
$(location).attr('href', "http://" + window.location.host + "/tab1/style");
}else if(id == "tab2"){
$(location).attr('href', "http://" + window.location.host + "/tab2/login");
}else if(id == "otherTab"){
$(location).attr('href', "http://" + window.location.host + "/other");
}
}
/**
*protocols 页面顶部的协议切换
*/
function protocolManTab(e){
var url = window.location.href;
var id = $(e).attr("id");
if(id == "heartBeat_protocol"){
$(location).attr('href', "http://" + window.location.host + "/protocolTools/protocolReport_view/heartBeat_protocol_page");
}else if(id == "login_protocol"){
$(location).attr('href', "http://" + window.location.host + "/protocolTools/protocolReport_view/login_protocol_page");
}else if(id == "GPS_protocol"){
$(location).attr('href', "http://" + window.location.host + "/protocolTools/protocolReport_view/GPS_protocol_page");
}else if(id == "OBD_CAN_protocol"){
$(location).attr('href', "http://" + window.location.host + "/protocolTools/protocolReport_view/OBD_CAN_protocol_page");
}else if(id == "securityStatus_protocol"){
$(location).attr('href', "http://" + window.location.host + "/protocolTools/protocolReport_view/securityStatus_protocol_page");
}else{
alert(id)
}
}
\ No newline at end of file
/**
*setting 页面顶部的协议切换
*/
function settingManTab(e){
var url = window.location.href;
var id = $(e).attr("id");
if(id == "socketSetting"){
$(location).attr('href', "http://" + window.location.host + "/protocolTools/setting_view/socketSetting_page");
}else if(id == "style_labels1"){
$(location).attr('href', "http://" + window.location.host + "/tab1/style/labels");
}else{
alert(id)
}
}
\ No newline at end of file
/**
*test 页面顶部的Tab且换
*/
function testManTab(e){
var url = window.location.href;
var id = $(e).attr("id");
if(id == "test"){
$(location).attr('href', "http://" + window.location.host + "/protocolTools/test_view/test_page");
}else{
alert(id)
}
}
\ No newline at end of file
/**
*tab1基本控件顶部tab且换
*/
function iconManTab(e){
var url = window.location.href;
var id = $(e).attr("id");
if(id == "style_index"){
$(location).attr('href', "http://" + window.location.host + "/tab1/icon");
}else{
alert(id)
}
}
/**
*tab1基本控件顶部tab且换
*/
function styleManTab(e){
var url = window.location.href;
var id = $(e).attr("id");
if(id == "style_buttons"){
$(location).attr('href', "http://" + window.location.host + "/tab1/style");
}else if(id == "style_labels"){
$(location).attr('href', "http://" + window.location.host + "/tab1/style/labels");
}else if(id == "style_tables"){
$(location).attr('href', "http://" + window.location.host + "/tab1/style/tables");
}else{
alert(id)
}
}
/*
辅助函数js文件
*/
//定义一个获取当前时间的函数
function getCurTime(){
var myDate = new Date;
var year = myDate.getFullYear(); //获取当前年
var mon = myDate.getMonth() + 1; //获取当前月
if(String(mon).length < 2) mon = "0" + String(mon)
var date = myDate.getDate(); //获取当前日
if(String(date).length < 2) date = "0" + String(date)
var h = myDate.getHours();//获取当前小时数(0-23)
if(String(h).length < 2) h = "0" + String(h)
var m = myDate.getMinutes();//获取当前分钟数(0-59)
if(String(m).length < 2) m = "0" + String(m)
var s = myDate.getSeconds();//获取当前秒
if(String(s).length < 2) s = "0" + String(s)
var week = myDate.getDay();
var now = year + "-" + mon + "-" + date + " " + h + ":" + m + ":" + s;
return now;
}
//定义一个获取当前时间戳的函数
function getCutTimestamp(){
var timestamp = Date.parse(new Date());
// var timestamp = (new Date()).valueOf()
// var timestamp=new Date().getTime()
timestamp = String(timestamp);
timestamp = timestamp.slice(0,10);
return timestamp;
}
\ No newline at end of file
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<link rel="shortcut icon" href="../../static/img/victory.ico" type="image/x-icon"/>
<title>{% block title %}{% endblock %}</title>
<link href="../../static/bootstrap-3.3.7-dist/css/bootstrap.min.css" rel="stylesheet">
<link href="../../static/bootstrap-3.3.7-dist/css/bootstrap-theme.min.css" rel="stylesheet">
<link href="../../static/bootstrap-3.3.7-dist/css/other/docs.css" rel="stylesheet">
<script src="../../static/jquery/jquery-3.4.1.js"></script>
<script src="../../static/bootstrap-3.3.7-dist/js/bootstrap.min.js"></script>
<script src="../../static/js/util.js"></script>
<script src="../../static/js/index.js"></script>
</head>
<body>
{% block top %}
<div class="top_nav">
<div style="background:#222222;height:50px;"></div>
<div style="width: 70%;margin: 0 auto">
<ul class="nav nav-tabs" style="font-size:18px;">
<li role="presentation" {% if arg.path[0]=="protocolTools" %} class="active" {% endif %}><a onclick="swichTap(this)" id="protocolTools">协议工具</a></li>
<li role="presentation" {% if arg.path[0]=="tab1" %} class="active" {% endif %}><a onclick="swichTap(this)" id="tab1">案例</a></li>
<li role="presentation" {% if arg.path[0]=="tab2" %} class="active" {% endif %}><a onclick="swichTap(this)" id="tab2">演示页面</a></li>
<li role="presentation" {% if arg.path[0]=="other" %} class="active" {% endif %}><a onclick="swichTap(this)" id="otherTab">其他</a></li>
</ul>
</div>
</div>
{% endblock %}
{% block content %}
<div style="width:100%;min-height:750px;background-color:gray;">
</div>
{% endblock %}
{% block bottom %}
<div style="width:100%;height:80px;background-color:grey;float:left;margin-top:10px;">
{% endblock %}
</div>
</div>
</body>
</html>
\ No newline at end of file
{% extends "base.html" %}
{% block title %}other{% endblock %}
{% block content %}
<div style="width:70%;height:700px;margin:auto;">
<h1 style="margin-top:100px;">什么也没有哦~</h1>
</div>
{% endblock %}
{% block bottom %}
<div style="width:100%;height:80px;background-color:grey;float:left;">
{% endblock %}
</div>
</body>
</html>
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment