Commit 452f5bbf authored by liyuanhong's avatar liyuanhong

优化了界面

parents
Pipeline #343 failed with stages
File added
File added
#! /bin/bash
git fetch --all
git reset --hard origin/master
git pull
\ No newline at end of file
File added
#coding:utf-8
class Protocal_1078:
def __init__(self):
self.frameHeader1078 = "30316364" # JT/T 1078 头 固定为0x30 0x31 0x63 0x64,ASCII码为01cd
self.V = 2 << 6 # V 固定为2
self.P = 0 << 5 # P 固定为0
self.X = 0 << 4 # X RTP头是否需要扩展位,固定为0
self.CC = 1 # CC 固定为1
self.M = 1 << 7 # M 标志位,确定是否完整数据帧的边界,因为数据体的最大长度是950字节,而一个视频I帧通道要远远超过950字节,所以视频的一个帧通常会分包
self.PT = 98 # PT 98:表示H264 99:表示H265
self.sn = 0 # 包序号 初始为0,每发送一个RTP数据包,序列号加1
self.sim = "013146201117" # 终端设备SIM卡号
self.logicC = 1 # 逻辑通道
self.dataType = 0 << 4 # 数据类型 , 0000:数据I祯,0001:视频P帧,0010:视频B帧,0011:音频帧,0100:透传数据
self.pkgTag = 1 # 分包处理 ,0000:原子包,不可拆分等,0001:分包处理时的第一个包,0010:分包处理时的最后一个包,0011:分包处理时的中间包
self.time = 33 # 时间戳
self.lastKeyTime = 33 # Last I Frame Interval 该祯与上一个关键祯之间的时间间隔,单位毫秒(ms),当数据类型为非视频祯时,则没有该字段
self.lastTime = 33 # Last Frame Interval 该祯与上一个祯之间的时间时间,单位毫秒(ms),当数据类型为非视频祯时,则没有该字段
self.lenS = 950 # 数据长度
self.dataBody = "" # 数据体,要发送的帧数据
# self.host = "10.100.11.125"
self.host = "10.16.15.85"
self.port = 1078
self.BUF_SIZE = 2048
def setDataBody(self,data):
self.dataBody = data
def setM(self,data):
self.M = data << 7
def setSim(self,data):
while len(data) < 12:
data = "0" + data
self.sim = data
def setLogcC(self,data):
self.logicC = data
def setSn(self,data):
self.sn = data
def setTime(self,data):
self.time = data
def setLastKeyTime(self,data):
self.lastKeyTime = data
def setLastTime(self,data):
self.lastTime = data
#####################################################
# 从帧数据中获取帧类型
#####################################################
def getFrameType(self):
# 0x67 (0 11 00111) SPS 非常重要 type = 7
# 0x68 (0 11 01000) PPS 非常重要 type = 8
# 0x65 (0 11 00101) IDR帧 关键帧 非常重要 type = 5
# 0x61 (0 11 00001) I帧 重要 type=1 非IDR的I帧 不大常见
# 0x41 (0 10 00001) P帧 重要 type = 1
# 0x01 (0 00 00001) B帧 不重要 type = 1
# 0x06 (0 00 00110) SEI 不重要 type = 6
frameTypeHex = self.dataBody[8:10]
return frameTypeHex
#####################################################
# 数字转换为16进制字符串,通过传入字节数可自动补0
# 传入数据格式所占字节数
#####################################################
def int2hexStringByBytes(self, num,bytescount=1):
hexStr = hex(num)[2:]
while len(hexStr) < (bytescount * 2):
hexStr = "0" + hexStr
return hexStr
#####################################################
# 根据长度分割字符串
#####################################################
def splitStrByLen(self,strs,num):
result = []
if num > len(strs):
result.append(strs)
else:
result.append(strs[:num])
strs = strs[num:]
while len(strs) > num:
result.append(strs[:num])
strs = strs[num:]
if len(strs) > 0:
result.append(strs)
return result
#####################################################
# 生成单个数据包
#####################################################
def genPkg(self,data):
msg = ""
msg = self.frameHeader1078
msg = msg + self.int2hexStringByBytes(self.V + self.P + self.X + self.CC)
msg = msg + self.int2hexStringByBytes(self.M + self.PT)
msg = msg + self.int2hexStringByBytes(self.sn,2)
msg = msg + self.sim
msg = msg + self.int2hexStringByBytes(self.logicC)
msg = msg + self.int2hexStringByBytes(self.dataType + self.pkgTag)
msg = msg + self.int2hexStringByBytes(self.time,8)
msg = msg + self.int2hexStringByBytes(self.lastKeyTime,2)
msg = msg + self.int2hexStringByBytes(self.lastTime,2)
msg = msg + self.int2hexStringByBytes(int(len(data) / 2),2)
msg = msg + data
return msg
#####################################################
# 生成单个数据包
#####################################################
def genAudioPkg(self,data):
msg = ""
msg = self.frameHeader1078
msg = msg + self.int2hexStringByBytes(self.V + self.P + self.X + self.CC)
self.PT = 19
msg = msg + self.int2hexStringByBytes(self.M + self.PT)
msg = msg + self.int2hexStringByBytes(self.sn,2)
msg = msg + self.sim
msg = msg + self.int2hexStringByBytes(self.logicC)
msg = msg + self.int2hexStringByBytes(self.dataType + self.pkgTag)
msg = msg + self.int2hexStringByBytes(self.time,8)
msg = msg + self.int2hexStringByBytes(int(len(data) / 2),2)
msg = msg + data
return msg
#####################################################
# 根据帧数据,生成多个数据包(视频)
#####################################################
def genPkgsByFrame(self):
f_type = self.getFrameType()
if f_type == "65" or f_type == "61":
self.dataType = 0 << 4 # 0000
elif f_type == "41":
self.dataType = 1 << 4 # 0001
elif f_type == "01":
self.dataType = 2 << 4 # 0010
else:
self.dataType = 2 << 4 # 0010
datas = self.splitStrByLen(self.dataBody,1900)
pkgs = []
for i in range(0,len(datas)):
if len(datas) == 1:
self.pkgTag = 0 # 原子包 0000
else:
if i == 0:
self.pkgTag = 1 # 第一个包 0001
elif i < (len(datas) - 1):
self.pkgTag = 3 # 中间包 0011
else:
self.pkgTag = 2 # 最后一个包 0010
pkg = self.genPkg(datas[i])
pkgs.append(pkg)
if self.sn >= 65535:
self.sn = 0
self.sn = self.sn + 1
return pkgs
#####################################################
# 根据帧数据,生成多个数据包(音频频)
#####################################################
def genAudioPkgsByFrame(self):
self.dataType = 3 << 4
datas = self.splitStrByLen(self.dataBody,1900)
pkgs = []
for i in range(0,len(datas)):
if len(datas) == 1:
self.pkgTag = 0 # 原子包 0000
else:
if i == 0:
self.pkgTag = 1 # 第一个包 0001
elif i < (len(datas) - 1):
self.pkgTag = 3 # 中间包 0011
else:
self.pkgTag = 2 # 最后一个包 0010
pkg = self.genAudioPkg(datas[i])
pkgs.append(pkg)
if self.sn == 65535:
self.sn = 0
self.sn = self.sn + 1
return pkgs
if __name__ == "__main__":
pro = Protocal_1078()
# print(Protocal_1078().genPkg())
# Protocal_1078().readvideoFile()
# pro.getFrameType()
#coding: utf-8
import binascii
import socket
import time
import logging
from lib.socket.ClientSocket import ClientSocket
logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
from lib.protocal.Protocal_1078 import Protocal_1078
class StreamH264():
def __init__(self):
self.readSize = 10240 # 每次读取文件的字节大小 (视频)
self.readSizeAudio = 1024 # 每次读取文件的字节大小 (音频频)
self.filePos = 0.001 # 文件的读取位置
self.mobile = "013146201117" # 手机号
self.channel = 1 # 频道号
# self.host = "10.100.11.125" # 开发环境
self.host = "10.100.12.3" # 测试环境
self.port = 1078
self.client = None
self.BUF_SIZE = 2048
self.sendDur = 0 # 消息发送间隔
self.AVChangeBySecond = 2 # 每秒音视频切换数
self.pos = 1 # 当前的帧数(视频)
self.posAudio = 1 # 当前的帧数(音频)
self.frameCount = 1 # 对发送的帧计数(视频),如果发送的帧刚搞到达帧率发送的帧,则需要加上补齐1秒的值 (例如1秒发送30帧,则发送了30帧后需要补齐 1毫秒,使时间刚好到1秒),对应repairTime
self.frameCountAudio = 1 # 对发送的帧计数(视频),用来补齐1秒的时间(同上)
self.videoCount = 0 # 用来使音视频同步的变量
self.audioCount = 0 # 用来使音视频同步的变量
self.repairTime = 1 # 需要补齐到1秒的时间,对应frameCount
self.time = 33 # 时间戳,视频 (帧的时间,每发送一帧自动增加)
self.timeAudio = 21 # 时间戳,音频 (帧的时间,每发送一帧自动增加)
self.singleFrameTime = 33 # 视频单帧的时间(帧率为30 fps,单帧的时间为33 毫秒)
self.singleFrameTimeAudio = 21 # 音频的单帧时间 (毫秒)
self.lastKeyTime = 33 # Last I Frame Interval 该祯与上一个关键祯之间的时间间隔,单位毫秒(ms),当数据类型为非视频祯时,则没有该字段
self.lastTime = 33 # Last Frame Interval 该祯与上一个祯之间的时间时间,单位毫秒(ms),当数据类型为非视频祯时,则没有该字段
self.videoPath = "../h264/bbb3.h264" # 要发送的视频地址
self.audioPath = "../aac/bbb3.aac" # 要发送的音频地址
self.sendFrameType = 0 # 要发送的帧类型, 0:视频帧 1:音频帧 (针对视频和音频他同时发送的方法)
self.aacFront_28 = "" # aac 音频前28个固定bit 的值
self.aacIsFront_28 = 0 # aac 是否已经取出了前28个字节
self.frameNum = 0 # 发送的包数量,用于测试
def setSendDur(self,data):
self.sendDur = data
def setAVChangeBySecond(self,data):
self.AVChangeBySecond = data
def setVideoPath(self,data):
self.videoPath = data
def setAudioPath(self,data):
self.audioPath = data
def setMobile(self,data):
self.mobile = data
def setChannel(self,data):
self.channel = data
def setHost(self,data):
self.host = data
def setPort(self,data):
self.port = data
def setIsLoop(self,data):
self.isLoop = data
####################################################
# 设置视频帧率
####################################################
def setFPSVideo(self,data = 0):
if(data == 0):
pass
else:
self.time = int(1000 / data)
self.singleFrameTime = int(1000 / data)
self.lastTime = self.singleFrameTime
####################################################
# 设置音频帧率
####################################################
def setFPSAudio(self,data = 0):
if (data == 0):
pass
else:
self.timeAudio = int(1000 / data)
self.singleFrameTimeAudio = int(1000 / data)
####################################################
# socket 连接
####################################################
def connectServer(self):
socketObj = ClientSocket()
socketObj.setHost(self.host)
socketObj.setPort(self.port)
self.client = socketObj.connect()
# self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# self.client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳
# self.client.connect((self.host, self.port))
####################################################
# 同时发送视频和音频
####################################################
def sendAVStream(self):
protocal_1078 = Protocal_1078()
protocal_1078.setSim(self.mobile)
protocal_1078.setLogcC(self.channel)
# 打开视频文件
fv2 = open(self.videoPath, 'rb')
con = fv2.read(self.readSize)
data = con.hex() # 16进制字节码转字符串
frames, lastData = self.getFramesFromStr_2(data)
for fra in frames:
self.frameNum = self.frameNum + 1
protocal_1078.setDataBody("00000001" + fra)
frameType = protocal_1078.getFrameType()
self.doFrameCount(frameType, protocal_1078)
pkgs = protocal_1078.genPkgsByFrame()
for msg in pkgs:
# print("发送视频消息:" + msg)
self.client.send(binascii.a2b_hex(msg))
time.sleep(self.sendDur)
frames = []
# 打开音频文件
fa2 = open(self.audioPath, 'rb')
conAudio = fa2.read(self.readSizeAudio)
dataAudio = conAudio.hex() # 16进制字节码转字符串
framesAudio, lastDataAudio = self.getAudioFramesFromStr_2(dataAudio)
for fraAudio in framesAudio:
self.frameNum = self.frameNum + 1
protocal_1078.setDataBody(self.aacFront_28 + fraAudio)
self.doAudioFrameCount(protocal_1078)
pkgsAudio = protocal_1078.genAudioPkgsByFrame()
for msgAudio in pkgsAudio:
# print("发送音频消息:" + msgAudio)
self.client.send(binascii.a2b_hex(msgAudio))
time.sleep(self.sendDur)
framesAudio = []
while con or conAudio:
if self.sendFrameType == 0:
videoTempFrame = []
isVloop = 0
for fra in frames:
self.frameNum = self.frameNum + 1
if self.videoCount >= int(self.singleFrameTime / self.AVChangeBySecond):
self.sendFrameType = 1
self.videoCount = 0
if self.videoCount == 0:
videoTempFrame.append(fra)
isVloop = 1
continue
else:
if isVloop == 0:
if len(fra) % 2 != 0: # 处理有问题的帧数据
print("出现了视频问题帧,并自动修复...")
fra = fra[:len(fra) - 1]
protocal_1078.setDataBody("00000001" + fra)
frameType = protocal_1078.getFrameType()
self.doFrameCount(frameType, protocal_1078)
pkgs = protocal_1078.genPkgsByFrame()
for msg in pkgs:
# print("发送视频消息:" + msg)
self.client.send(binascii.a2b_hex(msg))
time.sleep(self.sendDur)
else:
videoTempFrame.append(fra)
frames = videoTempFrame
if len(frames) != 0:
continue
con = fv2.read(self.readSize)
data = con.hex()
if con:
frames, lastData = self.getFramesFromStr_2(data, lastData)
for fra in frames:
self.frameNum = self.frameNum + 1
if self.videoCount >= int(self.singleFrameTime / self.AVChangeBySecond):
self.sendFrameType = 1
self.videoCount = 0
if self.videoCount == 0:
videoTempFrame.append(fra)
else:
if len(fra) % 2 != 0: # 处理有问题的帧数据
print("出现了视频问题帧,并自动修复...")
print(fra)
fra = fra[:len(fra) - 1]
protocal_1078.setDataBody("00000001" + fra)
frameType = protocal_1078.getFrameType()
self.doFrameCount(frameType, protocal_1078)
pkgs = protocal_1078.genPkgsByFrame()
for msg in pkgs:
# print("发送视频消息:" + msg)
self.client.send(binascii.a2b_hex(msg))
time.sleep(self.sendDur)
frames = videoTempFrame
else:
self.sendFrameType = 1
elif self.sendFrameType == 1:
audioTempFrame = []
isALoop = 0
for fraAudio in framesAudio:
self.frameNum = self.frameNum + 1
if self.audioCount >= int(1000 / (self.singleFrameTimeAudio * 2)):
self.sendFrameType = 0
self.audioCount = 0
if self.audioCount == 0:
audioTempFrame.append(fraAudio)
isALoop = 1
continue
else:
if isALoop == 0:
if len(self.aacFront_28 + fraAudio) % 2 != 0:
print("出现了问题帧,并自动修复...")
print(fraAudio)
fraAudio = fraAudio[:len(fraAudio) - 1]
protocal_1078.setDataBody(self.aacFront_28 + fraAudio)
self.doAudioFrameCount(protocal_1078)
pkgsAudio = protocal_1078.genAudioPkgsByFrame()
for msgAudio in pkgsAudio:
# print("发送音频消息:" + msgAudio)
self.client.send(binascii.a2b_hex(msgAudio))
time.sleep(self.sendDur)
else:
audioTempFrame.append(fraAudio)
framesAudio = audioTempFrame
if len(framesAudio) != 0:
continue
conAudio = fa2.read(self.readSizeAudio)
dataAudio = conAudio.hex()
if conAudio:
framesAudio, lastDataAudio = self.getAudioFramesFromStr_2(dataAudio, lastDataAudio)
for fraAudio in framesAudio:
self.frameNum = self.frameNum + 1
if self.audioCount >= int(1000 / (self.singleFrameTimeAudio * 2)):
self.sendFrameType = 0
self.audioCount = 0
if self.audioCount == 0:
audioTempFrame.append(fraAudio)
else:
if len(self.aacFront_28 + fraAudio) % 2 != 0:
print("出现了问题帧,并自动修复...")
print(fraAudio)
fraAudio = fraAudio[:len(fraAudio) - 1]
protocal_1078.setDataBody(self.aacFront_28 + fraAudio)
self.doAudioFrameCount(protocal_1078)
pkgsAudio = protocal_1078.genAudioPkgsByFrame()
for msgAudio in pkgsAudio:
# print("发送音频消息:" + msgAudio)
self.client.send(binascii.a2b_hex(msgAudio))
time.sleep(self.sendDur)
framesAudio = audioTempFrame
else:
self.sendFrameType = 0
fv2.close()
fa2.close()
self.client.close()
# print("----------视频" + str(self.pos))
# print("----------音频" + str(self.posAudio))
# print("---------------------------------")
####################################################
# 读取h264文件并发送
####################################################
def readStreamFromFileAndSend(self,fileName):
protocal_1078 = Protocal_1078()
protocal_1078.setSim(self.mobile)
protocal_1078.setLogcC(self.channel)
with open(fileName, 'rb') as f2:
con = f2.read(self.readSize)
data = con.hex() # 16进制字节码转字符串
frames, lastData = self.getFramesFromStr_2(data)
for fra in frames:
protocal_1078.setDataBody("00000001" + fra)
frameType = protocal_1078.getFrameType()
self.doFrameCount(frameType, protocal_1078)
pkgs = protocal_1078.genPkgsByFrame()
for msg in pkgs:
print("发送消息:" + msg)
self.client.send(binascii.a2b_hex(msg))
time.sleep(self.sendDur)
while con:
con = f2.read(self.readSize)
data = con.hex() # 16进制字节码转字符串
frames, lastData = self.getFramesFromStr_2(data, lastData)
for fra in frames:
if len(fra) % 2 != 0: # 处理有问题的帧数据
print("出现了问题帧,并自动修复...")
fra = fra[:len(fra) - 1]
protocal_1078.setDataBody("00000001" + fra)
frameType = protocal_1078.getFrameType()
self.doFrameCount(frameType, protocal_1078)
pkgs = protocal_1078.genPkgsByFrame()
for msg in pkgs:
print("发送消息:" + msg)
self.client.send(binascii.a2b_hex(msg))
time.sleep(self.sendDur)
# print(self.pos)
####################################################
# 读取aac文件并发送
####################################################
def readAacStreamFromFileAndSend(self,fileName):
protocal_1078 = Protocal_1078()
protocal_1078.setSim(self.mobile)
protocal_1078.setLogcC(self.channel)
with open(fileName, 'rb') as f2:
con = f2.read(self.readSizeAudio)
data = con.hex() # 16进制字节码转字符串
frames,lastData = self.getAudioFramesFromStr_2(data)
for fra in frames:
protocal_1078.setDataBody(self.aacFront_28 + fra)
self.doAudioFrameCount(protocal_1078)
pkgs = protocal_1078.genAudioPkgsByFrame()
for msg in pkgs:
print("发送消息:" + msg)
self.client.send(binascii.a2b_hex(msg))
time.sleep(self.sendDur)
while con:
con = f2.read(self.readSize)
data = con.hex() # 16进制字节码转字符串
frames, lastData = self.getAudioFramesFromStr_2(data, lastData)
for fra in frames:
if len(self.aacFront_28 + fra) % 2 != 0:
print("出现了问题帧,并自动修复...")
print(fra)
fra = fra[:len(fra) - 1]
protocal_1078.setDataBody(self.aacFront_28 + fra)
self.doAudioFrameCount(protocal_1078)
pkgs = protocal_1078.genAudioPkgsByFrame()
for msg in pkgs:
print("发送消息:" + msg)
self.client.send(binascii.a2b_hex(msg))
time.sleep(self.sendDur)
print(self.posAudio)
####################################################
# 读取h264文件(用来测试)
####################################################
def readStreamFromFile(self,fileName):
protocal_1078 = Protocal_1078()
with open(fileName, 'rb') as f2:
con = f2.read(self.readSize)
data = con.hex() # 16进制字节码转字符串
frames,lastData = self.getFramesFromStr(data)
for fra in frames:
protocal_1078.setDataBody(fra)
frameType = protocal_1078.getFrameType()
if frameType == "67":
print(frameType)
self.pos = self.pos + 1
while con:
con = f2.read(self.readSize)
data = con.hex() # 16进制字节码转字符串
frames, lastData = self.getFramesFromStr(data, lastData)
for fra in frames:
protocal_1078.setDataBody(fra)
frameType = protocal_1078.getFrameType()
if frameType == "67":
print(frameType)
self.pos = self.pos + 1
# print(self.pos)
print(self.pos)
####################################################
# 对帧时间进行计数(视频帧)
###################################################
def doFrameCount(self,frameType,obj_1078):
if frameType == "65" or frameType == "61": # 68 之后接00000001 便是关键帧
self.time = self.time + self.singleFrameTime
self.lastKeyTime = 0
self.frameCount = self.frameCount + 1
self.pos = self.pos + 1
# timeStamp = time.time()
# print(timeStamp)
# timeArray = time.localtime(int(timeStamp - 8 * 3600))
# dateTimeM = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
# print(dateTimeM)
elif frameType == "41": # P帧
self.time = self.time + self.singleFrameTime
self.lastKeyTime = self.lastKeyTime + self.singleFrameTime
self.frameCount = self.frameCount + 1
self.pos = self.pos + 1
elif frameType == "01": # B帧
self.time = self.time + self.singleFrameTime
self.lastKeyTime = self.lastKeyTime + self.singleFrameTime
self.frameCount = self.frameCount + 1
self.pos = self.pos + 1
if self.frameCount >= 30:
self.time = self.time + 1
self.frameCount = 1
self.videoCount = self.videoCount + 1
obj_1078.setTime(self.time)
obj_1078.setLastKeyTime(self.lastKeyTime)
obj_1078.setLastTime(self.lastTime)
####################################################
# 对帧时间进行计数(音频帧)
###################################################
def doAudioFrameCount(self,obj_1078):
self.timeAudio = self.timeAudio + self.singleFrameTimeAudio
if self.frameCountAudio >= int(1000 / self.singleFrameTimeAudio):
self.frameCountAudio = 0
self.timeAudio = self.timeAudio + 1000 % self.singleFrameTimeAudio
self.frameCountAudio = self.frameCountAudio + 1
self.audioCount = self.audioCount + 1
self.posAudio = self.posAudio + 1
obj_1078.setTime(self.timeAudio)
####################################################
# 读取h264文件(测试程序)
####################################################
def readTest(self,fileName):
with open(fileName, 'rb') as f2:
con = f2.read(204800)
data = con.hex() # 16进制字节码转字符串
data_H = data.split("00000001")
for i in range(1,len(data_H)):
print("00000001" + data_H[i])
####################################################
# 读取h264文件(测试程序)
####################################################
def readTest2(self,fileName):
with open(fileName, 'rb') as f2:
con = f2.read(200)
data = con.hex()
print(data)
con = f2.read(207)
data = con.hex()
print(data)
con = f2.read(212)
data = con.hex()
print(data)
####################################################
# 读取h264文件(测试程序),从指定位置开始读取
####################################################
def readTest3(self,fileName):
with open(fileName, 'rb') as f2:
f2.seek(4,0)
con = f2.read(1)
data = con.hex()
print(data)
#####################################################
# 从帧数据中获取帧类型
#####################################################
def getFrameType(self,frame):
# 0x67 (0 11 00111) SPS 非常重要 type = 7
# 0x68 (0 11 01000) PPS 非常重要 type = 8
# 0x65 (0 11 00101) IDR帧 关键帧 非常重要 type = 5
# 0x61 (0 11 00001) I帧 重要 type=1 非IDR的I帧 不大常见
# 0x41 (0 10 00001) P帧 重要 type = 1
# 0x01 (0 00 00001) B帧 不重要 type = 1
# 0x06 (0 00 00110) SEI 不重要 type = 6
frameTypeHex = frame[8:10]
return frameTypeHex
####################################################
# 16进制字符串分割为个多个帧(通过split方式分割)
####################################################
def getFramesFromStr(self,data,lastDataParam = ""):
data_H = data.split("00000001")
frames = []
if(len(data_H) == 1): #如果字符串中不包含 000001(说明内容都属于一帧的中间位置)
lastData = lastDataParam + data
return frames,lastData
else:
for i in range(0, len(data_H)):
if i != (len(data_H) - 1):
if data_H[i] != "":
if lastDataParam == "": #文件的开始
frame = "" + data_H[i]
frames.append(frame)
else:
if i == 0:
frame = "" + lastDataParam + data_H[i]
frames.append(frame)
else:
frame = "" + data_H[i]
frames.append(frame)
else:
if lastDataParam == "": #文件的开始
pass
else:
frame = "" + data_H[i] + lastDataParam
frames.append(frame)
else:
lastData = data_H[i]
tempFrames = []
for tmp in frames:
if self.getFrameType("00000001" + tmp) == "68" or self.getFrameType("00000001" + tmp) == "09":
fra = tmp.split("000001")
for fra2 in fra:
tempFrames.append(fra2)
else:
tempFrames.append(tmp)
frames = tempFrames
return frames,lastData
####################################################
# 16进制字符串分割为个多个帧(一个字节一个字节的读取)
####################################################
def getFramesFromStr_2(self,data,lastDataParam = ""):
dataStr = data
frame = ""
frames = []
frameSlice = dataStr[:2]
frame = lastDataParam + frame + frameSlice
dataStr = dataStr[2:]
while len(dataStr) > 0:
frameSlice = dataStr[:2]
dataStr = dataStr[2:]
if "00" == frameSlice:
frameTmp = frameSlice
frameTmp2 = dataStr[:2]
dataStr = dataStr[2:]
while frameTmp2 == "00":
frameTmp = frameTmp + frameTmp2
frameTmp2 = dataStr[:2]
dataStr = dataStr[2:]
frameTmp = frameTmp + frameTmp2
if frameTmp == "00000001":
frames.append(frame)
frame = ""
else:
frame = frame + frameTmp
else:
frame = frame + frameSlice
lastData = frame
tempFrames = []
for tmp in frames:
if self.getFrameType("00000001" + tmp) == "68" or self.getFrameType("00000001" + tmp) == "09":
fra = tmp.split("000001")
for fra2 in fra:
tempFrames.append(fra2)
else:
tempFrames.append(tmp)
frames = tempFrames
return frames,lastData
####################################################
# 获取音频帧(通过前面28位来截取)
####################################################
def getAudioFramesFromStr_2(self,data,lastDataParam = ""):
if self.aacIsFront_28 == 0:
self. aacFront_28 = data[:7]
self.aacIsFront_28 = 1
data_H = data.split(self.aacFront_28)
frames = []
if(len(data_H) == 1): #如果字符串中不包含 000001(说明内容都属于一帧的中间位置)
lastData = lastDataParam + data
return frames,lastData
else:
for i in range(0, len(data_H)):
if i != (len(data_H) - 1):
if data_H[i] != "":
if lastDataParam == "": #文件的开始
frame = "" + data_H[i]
frames.append(frame)
else:
if i == 0:
frame = "" + lastDataParam + data_H[i]
frames.append(frame)
else:
frame = "" + data_H[i]
frames.append(frame)
else:
if lastDataParam == "": #文件的开始
pass
else:
frame = "" + data_H[i] + lastDataParam
frames.append(frame)
else:
lastData = data_H[i]
# for i in frames:
# print(self.aacFront_28 + frame)
return frames,lastData
if __name__ == "__main__":
obj = StreamH264()
obj.connectServer()
obj.setFPSVideo(30)
obj.setFPSAudio(47)
obj.setSendDur(0.005)
obj.setAVChangeBySecond(2)
obj.setVideoPath("../../h264/aaa3.h264")
obj.setAudioPath("../../aac/aaa3.aac")
# obj.readStreamFromFileAndSend("../../h264/aaa3.h264") # 发送视频
# obj.readAacStreamFromFileAndSend("../../aac/aaa3.aac") # 发送音频
obj.sendAVStream() # 同时发送音视频
# StreamH264().readTest("../../h264/aaa.h264")
# StreamH264().readTest2("../../h264/bbb3.h264")
# StreamH264().readTest2("../../aac/aaa.aac")
# StreamH264().readStreamFromFile("../h264/aaa2.264")
#coding: utf-8
import binascii
import json
import re
import socket
import time
import requests
from lib.protocal.Protocal_1078 import Protocal_1078
from lib.socket.ClientSocket import ClientSocket
class StreamH264Flv():
def __init__(self):
self.isFileHead = 0 # 读取文件的时候,是否是文件的开头 0:不是 1:是
self.readSize = 1024 # 读取文件的大小
self.videoLastKeyTime = 0 # Last I Frame Interval 该祯与上一个关键祯之间的时间间隔,单位毫秒(ms),当数据类型为非视频祯时,则没有该字段
self.videoLastTime = 0 # Last Frame Interval 该祯与上一个祯之间的时间时间,单位毫秒(ms),当数据类型为非视频祯时,则没有该字段
self.sendDur = 0.003 # 每发送一帧的间隔时间
self.client = None
self.mobile = "013146201117" # 手机号
self.channel = 1 # 频道号
self.isSendAudio = 1 # 是否发送音频0:不发送 1:发送
# self.host = "10.100.11.125" # 开发环境
self.host = "10.100.12.3" # 测试环境
self.port = 1078
self.videoPath = "../../flv/bbb3.flv" # 要推流的视频路劲
self.getGetPlayUrl = "http://10.100.11.110:9999/video/streamInfo" # 获取推流播放地址的url
# self.getGetPlayUrl = "http://fb-test.vandyo.com/video/streamInfo" # 获取推流播放地址的url
self.isGetPlayUrl = 0 # 是否已经获取了playUrl 0:没有 1:获取了
self.playUrl = "" # 视频播放地址
def setMobile(self,data):
self.mobile = data
def setChannel(self,data):
self.channel = data
def setReadSize(self,data):
self.readSize = data
def setIsSendAudio(self,data):
self.isSendAudio = data
def setHost(self,data):
self.host = data
def setPort(self,data):
self.port = data
def setSendDur(self,data):
self.sendDur = data
def setVideoPath(self,data):
self.videoPath = data
####################################################
# socket 连接
####################################################
def connectServer(self):
socketObj = ClientSocket()
socketObj.setHost(self.host)
socketObj.setPort(self.port)
self.client = socketObj.connect()
# self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# self.client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳
# self.client.connect((self.host, self.port))
####################################################
# 读取flv文件,并发送 (h264编码,并且音频为aac编码格式)
####################################################
def readFlvAndSend(self):
with open(self.videoPath, 'rb') as fi:
con = fi.read(self.readSize)
data = con.hex()
if self.isFileHead == 0:
data = data[18:]
self.isFileHead = 1
else:
data = data
while con:
if len(data) < 30:
con = fi.read(self.readSize)
data = data + con.hex()
else:
tagSize = self.getAVTagSize(data)
if len(data) < (tagSize * 2 + 30):
con = fi.read(self.readSize)
data = data + con.hex()
else:
tag = data[:tagSize * 2 + 30] # flv tag
tagType = self.getTagType(tag) # flv tag 类型,是音频 还是视频
AVtag = tag[30:] # 音视频 tag
avTimeStamp = self.getAVTimeStamp(tag) #获取时间戳
if avTimeStamp > 100:
if self.isGetPlayUrl == 0:
self.getPlayUrl(self.mobile,self.channel)
self.isGetPlayUrl = 1
if tagType == "08": # 音频
AVdata = AVtag[4:] # 音频数据
frameInfo = self.getAudioFrame(AVtag)
for frame in frameInfo["frames"]:
self.sendAudioFrame(frame,avTimeStamp)
elif tagType == "09": # 视频
AVdata = AVtag[10:] # 视频数据
frameInfo = self.getVideoFrame(AVtag)
for frame in frameInfo["frames"]:
self.sendVideoFrame(frame,avTimeStamp)
elif tagType == "12": # 脚本
AVdata = AVtag # 脚本数据
else:
AVdata = AVtag
data = data[(tagSize * 2 + 30):]
self.client.close()
####################################################
# 获取视频播放地址
####################################################
def getPlayUrl(self,mobile,channel):
res = requests.post(self.getGetPlayUrl, data=json.dumps({'devId': mobile,'chan': str(channel)})).text
res = json.loads(res)
url = res["data"]["url"]
# url = self.replaceHost(res["data"]["url"],"video-test.vandyo.com:")
print("播放地址:" + url)
####################################################
# 替换url的host
####################################################
def replaceHost(self,url,replaceWord):
data = url
strinfo = re.compile('(\d+.\d+.\d+.\d+.:)')
result = strinfo.sub(replaceWord, data)
return result
####################################################
# 发送视频帧
####################################################
def sendVideoFrame(self,fra,timeStamp):
protocal_1078 = Protocal_1078()
protocal_1078.setSim(self.mobile)
protocal_1078.setLogcC(self.channel)
protocal_1078.setTime(timeStamp)
frameType = fra[8:10]
if frameType == "65":
self.videoLastKeyTime = timeStamp
protocal_1078.setLastKeyTime(timeStamp - self.videoLastKeyTime)
protocal_1078.setLastTime(timeStamp - self.videoLastTime)
self.videoLastTime = timeStamp
else:
protocal_1078.setLastKeyTime(timeStamp - self.videoLastKeyTime)
protocal_1078.setLastTime(timeStamp - self.videoLastTime)
self.videoLastTime = timeStamp
if len(fra) % 2 != 0: # 处理有问题的帧数据
print("出现了视频问题帧,并自动修复...")
fra = fra[:len(fra) - 1]
protocal_1078.setDataBody(fra)
pkgs = protocal_1078.genPkgsByFrame()
for msg in pkgs:
# print("发送视频消息:" + msg)
self.client.send(binascii.a2b_hex(msg))
time.sleep(self.sendDur)
####################################################
# 发送音频帧
####################################################
def sendAudioFrame(self,fra,timeStamp):
protocal_1078 = Protocal_1078()
protocal_1078.setSim(self.mobile)
protocal_1078.setLogcC(self.channel)
protocal_1078.setTime(timeStamp)
if len(fra) % 2 != 0: # 处理有问题的帧数据
print("出现了视频问题帧,并自动修复...")
fra = fra[:len(fra) - 1]
protocal_1078.setDataBody(fra)
pkgs = protocal_1078.genAudioPkgsByFrame()
for msg in pkgs:
# print("发送音频消息:" + msg)
self.client.send(binascii.a2b_hex(msg))
time.sleep(self.sendDur)
####################################################
# 获取TagType
# 0x08 音频
# 0x09 视频
# 0x12 脚本
####################################################
def getTagType(self,tag):
tagType = tag[8:10]
return tagType
####################################################
# 获取 flv tag 的数据长度,其实如图里 audio tag 头及其数据长度
####################################################
def getAVTagSize(self,tag):
tagSize = tag[10:16]
tagSize = int(tagSize,16)
return tagSize
####################################################
# 获取相对时间戳 (音视频相对时间)
####################################################
def getAVTimeStamp(self,tag):
timeStamp = tag[16:22]
timeStamp = int(timeStamp,16)
return timeStamp
####################################################
# 获取音视频数据 (音视频头 + 数据部分)
####################################################
def getAVData(self,tag,tagSize):
flvData = tag[34:(tagSize * 2)]
lastData = tag[(tagSize * 2):]
return flvData,lastData
####################################################
# 根据AVtag,获取视频帧数据
####################################################
def getVideoFrame(self,AVTag):
frameData = {}
frames = []
# frameData["frameType"] = int(AVTag[:2],16) >> 4 # 视频帧类型
frameData["codeId"] = int(AVTag[:2],16) & 7 # 编码id ,7 为h264编码 (程序里默认都是h264)
frameData["AVCPkgType"] = int(AVTag[2:4],16)
if frameData["AVCPkgType"] == 0:
spsLen = int(AVTag[22:26], 16)
sps = "00000001" + AVTag[26:26 + spsLen * 2]
ppsLen = int(AVTag[26 + spsLen * 2 + 2:26 + spsLen * 2 + 6],16)
pps = "00000001" + AVTag[26 + spsLen * 2 + 6:26 + spsLen *2 + 6 + ppsLen * 2]
frames.append(sps)
frames.append(pps)
frameData["frames"] = frames
elif frameData["AVCPkgType"] == 1:
dataLen = int(AVTag[10:18], 16)
frame = "00000001" + AVTag[18:18 + dataLen * 2]
frames.append(frame)
lastData = AVTag[18 + dataLen * 2:]
if len(lastData) != 0:
dataLen_65 = int(AVTag[18 + dataLen * 2:18 + dataLen * 2 + 8],16)
frame_65 = "00000001" + AVTag[18 + dataLen * 2 + 8:]
frames.append(frame_65)
else:
pass
frameData["frames"] = frames
elif frameData["AVCPkgType"] == 2:
frameData["frames"] = []
# print("2222222----------" + AVTag)
else:
frameData["frames"] = []
return frameData
####################################################
# 根据AVtag,获取音频帧数据
####################################################
def getAudioFrame(self,AVTag):
frameData = {}
frames = []
AVdata = AVTag[4:]
cp_id_bit = 0 << 27
cp_id_start = 0 << 26
aac_frame_length = int(len(AVdata) / 2 + 7) << 13
adts_buffer_funess = 2047 << 2
raw_data_frame = 0
infoAac = cp_id_bit + cp_id_start + aac_frame_length + adts_buffer_funess + raw_data_frame
infoAac = hex(infoAac)[2:]
while len(infoAac) < 7:
infoAac = "0" + infoAac
frame = "fff14c8" + infoAac + AVdata
frames.append(frame)
frameData["frames"] = frames
return frameData
if __name__ == "__main__":
obj = StreamH264Flv()
obj.setSendDur(0.001)
obj.setIsSendAudio(1)
# obj.setMobile("013146201117")
obj.setMobile("142043390091")
obj.setChannel(1)
obj.setHost("10.100.11.125")
# obj.setHost("video-test.vandyo.com")
obj.setPort(1078)
obj.setVideoPath("../../flv/yyy.flv")
obj.connectServer()
obj.readFlvAndSend()
#coding: utf-8
import binascii
import socket
import time
# http://10.100.11.125:8085/live?port=1985&app=vandyo&stream=013146201117-1
# http://10.16.15.85:8085/live?port=1985&app=vandyo&stream=013146201117-3
# http://localhost:3333/video/013146201117-3
from lib.protocal.Protocal_1078 import Protocal_1078
class StreamH264_bat():
def __init__(self):
self.readSize = 10240 # 每次读取文件的字节大小 (视频)
self.readSizeAudio = 1024 # 每次读取文件的字节大小 (音频频)
self.filePos = 0 # 文件的读取位置
# self.host = "10.100.11.125"
# self.host = "10.16.12.249"
self.host = "10.100.12.3"
# self.host = "localhost"
self.port = 1078
self.client = None
self.BUF_SIZE = 2048
self.sendDur = 0.001 # 消息发送间隔
self.pos = 1 # 当前的帧数(视频)
self.posAudio = 1 # 当前的帧数(音频)
self.frameCount = 1 # 对发送的帧计数(视频),如果发送的帧刚搞到达帧率发送的帧,则需要加上补齐1秒的值 (例如1秒发送30帧,则发送了30帧后需要补齐 1毫秒,使时间刚好到1秒),对应repairTime
self.frameCountAudio = 1 # 对发送的帧计数(视频),用来补齐1秒的时间(同上)
self.videoCount = 0 # 用来使音视频同步的变量
self.audioCount = 0 # 用来使音视频同步的变量
self.repairTime = 1 # 需要补齐到1秒的时间,对应frameCount
self.time = 33 # 时间戳,视频 (帧的时间,每发送一帧自动增加)
self.timeAudio = 21 # 时间戳,音频 (帧的时间,每发送一帧自动增加)
self.singleFrameTime = 33 # 视频单帧的时间(帧率为30 fps,单帧的时间为33 毫秒)
self.singleFrameTimeAudio = 21 # 音频的单帧时间 (毫秒)
self.lastKeyTime = 33 # Last I Frame Interval 该祯与上一个关键祯之间的时间间隔,单位毫秒(ms),当数据类型为非视频祯时,则没有该字段
self.lastTime = 33 # Last Frame Interval 该祯与上一个祯之间的时间时间,单位毫秒(ms),当数据类型为非视频祯时,则没有该字段
self.videoPath = "../h264/bbb3.h264" # 要发送的视频地址
self.audioPath = "../aac/bbb3.aac" # 要发送的音频地址
# self.videoPath = "../h264/aaa3.h264" # 要发送的视频地址
# self.audioPath = "../aac/aaa3.aac" # 要发送的音频地址
self.sendFrameType = 0 # 要发送的帧类型, 0:视频帧 1:音频帧 (针对视频和音频他同时发送的方法)
self.aacFront_28 = "" # aac 音频前28个固定bit 的值
self.aacIsFront_28 = 0 # aac 是否已经取出了前28个字节
####################################################
# 设置视频帧率
####################################################
def setFPSVideo(self,data = 0):
if(data == 0):
pass
else:
self.time = int(1000 / data)
self.singleFrameTime = int(1000 / data)
####################################################
# 设置音频帧率
####################################################
def setFPSAudio(self,data = 0):
if (data == 0):
pass
else:
self.timeAudio = int(1000 / data)
self.singleFrameTimeAudio = int(1000 / data)
####################################################
# socket 连接
####################################################
def connectServer(self):
self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳
self.client.connect((self.host, self.port))
####################################################
# 同时发送视频和音频
####################################################
def sendAVStream(self):
protocal_1078 = Protocal_1078()
# 打开视频文件
fv2 = open(self.videoPath, 'rb')
con = fv2.read(self.readSize)
data = con.hex() # 16进制字节码转字符串
frames, lastData = self.getFramesFromStr_2(data)
for fra in frames:
protocal_1078.setDataBody("00000001" + fra)
frameType = protocal_1078.getFrameType()
self.doFrameCount(frameType, protocal_1078)
pkgs = protocal_1078.genPkgsByFrame()
for msg in pkgs:
print("发送视频消息:" + msg)
self.client.send(binascii.a2b_hex(msg))
time.sleep(self.sendDur)
frames = []
# 打开音频文件
fa2 = open(self.audioPath, 'rb')
conAudio = fa2.read(self.readSizeAudio)
dataAudio = conAudio.hex() # 16进制字节码转字符串
framesAudio, lastDataAudio = self.getAudioFramesFromStr_2(dataAudio)
for fraAudio in framesAudio:
protocal_1078.setDataBody(self.aacFront_28 + fraAudio)
self.doAudioFrameCount(protocal_1078)
pkgsAudio = protocal_1078.genAudioPkgsByFrame()
for msgAudio in pkgsAudio:
print("发送音频消息:" + msgAudio)
self.client.send(binascii.a2b_hex(msgAudio))
time.sleep(self.sendDur)
framesAudio = []
#####################################################################
# --------------------------------------------------------
while con or conAudio:
# print("----------视频" + str(self.pos))
# print("----------音频" + str(self.posAudio))
# print("---------------------------------")
if self.sendFrameType == 0:
con = fv2.read(self.readSize)
data = con.hex()
# print("视频" + str(self.videoCount))
if con:
if self.videoCount >= int(self.singleFrameTime / 2):
self.sendFrameType = 1
self.videoCount = 0
frames, lastData = self.getFramesFromStr_2(data, lastData)
for fra in frames:
print("视频" + str(self.videoCount))
if len(fra) % 2 != 0: # 处理有问题的帧数据
print("出现了视频问题帧,并自动修复...")
fra = fra[:len(fra) - 1]
protocal_1078.setDataBody("00000001" + fra)
frameType = protocal_1078.getFrameType()
self.doFrameCount(frameType, protocal_1078)
pkgs = protocal_1078.genPkgsByFrame()
for msg in pkgs:
# print("发送视频消息:" + msg)
self.client.send(binascii.a2b_hex(msg))
time.sleep(self.sendDur)
else:
self.sendFrameType = 1
elif self.sendFrameType == 1:
# print("音频" + str(self.audioCount))
conAudio = fa2.read(self.readSizeAudio)
dataAudio = conAudio.hex()
if conAudio:
if self.audioCount >= int(1000 / (self.singleFrameTimeAudio * 2)):
self.sendFrameType = 0
self.audioCount = 0
framesAudio, lastDataAudio = self.getAudioFramesFromStr_2(dataAudio, lastDataAudio)
for fraAudio in framesAudio:
print("音频" + str(self.audioCount))
if len(self.aacFront_28 + fraAudio) % 2 != 0:
print("出现了问题帧,并自动修复...")
print(fraAudio)
fraAudio = fraAudio[:len(fraAudio) - 1]
protocal_1078.setDataBody(self.aacFront_28 + fraAudio)
self.doAudioFrameCount(protocal_1078)
pkgsAudio = protocal_1078.genAudioPkgsByFrame()
for msgAudio in pkgsAudio:
# print("发送音频消息:" + msgAudio)
self.client.send(binascii.a2b_hex(msgAudio))
time.sleep(self.sendDur)
else:
self.sendFrameType = 0
self.client.close()
# print("视频" + str(self.pos))
# print("音频" + str(self.posAudio))
#######################################################################################
# # --------------------------------------------------------
# while con or conAudio:
# # print("----------视频" + str(self.pos))
# # print("----------音频" + str(self.posAudio))
# # print("---------------------------------")
#
# if self.sendFrameType == 0:
# videoTempFrame = []
# for fra in frames:
# print("视频--" + str(self.videoCount))
# if len(fra) % 2 != 0: # 处理有问题的帧数据
# print("出现了视频问题帧,并自动修复...")
# fra = fra[:len(fra) - 1]
# protocal_1078.setDataBody("00000001" + fra)
# frameType = protocal_1078.getFrameType()
# self.doFrameCount(frameType, protocal_1078)
# pkgs = protocal_1078.genPkgsByFrame()
# for msg in pkgs:
# # print("发送视频消息:" + msg)
# self.client.send(binascii.a2b_hex(msg))
# time.sleep(self.sendDur)
# frames = []
#
# con = fv2.read(self.readSize)
# data = con.hex()
#
# if con:
# frames, lastData = self.getFramesFromStr_2(data, lastData)
# for fra in frames:
# if self.videoCount >= int(self.singleFrameTime / 2):
# self.sendFrameType = 1
# self.videoCount = 0
# if self.videoCount == 0:
# videoTempFrame.append(fra)
# else:
# print("视频" + str(self.videoCount))
# if len(fra) % 2 != 0: # 处理有问题的帧数据
# print("出现了视频问题帧,并自动修复...")
# fra = fra[:len(fra) - 1]
# protocal_1078.setDataBody("00000001" + fra)
# frameType = protocal_1078.getFrameType()
# self.doFrameCount(frameType, protocal_1078)
# pkgs = protocal_1078.genPkgsByFrame()
# for msg in pkgs:
# # print("发送视频消息:" + msg)
# self.client.send(binascii.a2b_hex(msg))
# time.sleep(self.sendDur)
# frames = videoTempFrame
# else:
# self.sendFrameType = 1
# elif self.sendFrameType == 1:
# audioTempFrame = []
# for fraAudio in framesAudio:
# print("音频--" + str(self.audioCount))
# if len(self.aacFront_28 + fraAudio) % 2 != 0:
# print("出现了问题帧,并自动修复...")
# print(fraAudio)
# fraAudio = fraAudio[:len(fraAudio) - 1]
# protocal_1078.setDataBody(self.aacFront_28 + fraAudio)
# self.doAudioFrameCount(protocal_1078)
# pkgsAudio = protocal_1078.genAudioPkgsByFrame()
# for msgAudio in pkgsAudio:
# # print("发送音频消息:" + msgAudio)
# self.client.send(binascii.a2b_hex(msgAudio))
# time.sleep(self.sendDur)
# framesAudio = []
#
# conAudio = fa2.read(self.readSizeAudio)
# dataAudio = conAudio.hex()
# if conAudio:
# framesAudio, lastDataAudio = self.getAudioFramesFromStr_2(dataAudio, lastDataAudio)
# for fraAudio in framesAudio:
# if self.audioCount >= int(1000 / (self.singleFrameTimeAudio * 2)):
# self.sendFrameType = 0
# self.audioCount = 0
# if self.audioCount == 0:
# audioTempFrame.append(fraAudio)
# else:
# print("音频" + str(self.audioCount))
# if len(self.aacFront_28 + fraAudio) % 2 != 0:
# print("出现了问题帧,并自动修复...")
# print(fraAudio)
# fraAudio = fraAudio[:len(fraAudio) - 1]
# protocal_1078.setDataBody(self.aacFront_28 + fraAudio)
# self.doAudioFrameCount(protocal_1078)
# pkgsAudio = protocal_1078.genAudioPkgsByFrame()
# for msgAudio in pkgsAudio:
# # print("发送音频消息:" + msgAudio)
# self.client.send(binascii.a2b_hex(msgAudio))
# time.sleep(self.sendDur)
# framesAudio = audioTempFrame
# else:
# self.sendFrameType = 0
# self.client.close()
#
# # print("视频" + str(self.pos))
# # print("音频" + str(self.posAudio))
############################################################################################
# # --------------------------------------------------------
# while con or conAudio:
# # print("----------视频" + str(self.pos))
# # print("----------音频" + str(self.posAudio))
# # print("---------------------------------")
#
# if self.sendFrameType == 0:
# videoTempFrame = []
# for fra in frames:
# if self.videoCount >= int(self.singleFrameTime / 2):
# self.sendFrameType = 1
# self.videoCount = 0
# if self.videoCount == 0:
# videoTempFrame.append(fra)
# continue
# else:
# # print("视频--" + str(self.videoCount))
# if len(fra) % 2 != 0: # 处理有问题的帧数据
# print("出现了视频问题帧,并自动修复...")
# fra = fra[:len(fra) - 1]
# protocal_1078.setDataBody("00000001" + fra)
# frameType = protocal_1078.getFrameType()
# self.doFrameCount(frameType, protocal_1078)
# pkgs = protocal_1078.genPkgsByFrame()
# for msg in pkgs:
# print("发送视频消息:" + msg)
# self.client.send(binascii.a2b_hex(msg))
# time.sleep(self.sendDur)
# frames = videoTempFrame
# if len(frames) != 0:
# continue
#
# con = fv2.read(self.readSize)
# data = con.hex()
#
# if con:
# frames, lastData = self.getFramesFromStr_2(data, lastData)
# for fra in frames:
# if self.videoCount >= int(self.singleFrameTime / 2):
# self.sendFrameType = 1
# self.videoCount = 0
# if self.videoCount == 0:
# videoTempFrame.append(fra)
# else:
# # print("视频" + str(self.videoCount))
# if len(fra) % 2 != 0: # 处理有问题的帧数据
# print("出现了视频问题帧,并自动修复...")
# fra = fra[:len(fra) - 1]
# protocal_1078.setDataBody("00000001" + fra)
# frameType = protocal_1078.getFrameType()
# self.doFrameCount(frameType, protocal_1078)
# pkgs = protocal_1078.genPkgsByFrame()
# for msg in pkgs:
# print("发送视频消息:" + msg)
# self.client.send(binascii.a2b_hex(msg))
# time.sleep(self.sendDur)
# frames = videoTempFrame
# else:
# self.sendFrameType = 1
# elif self.sendFrameType == 1:
# audioTempFrame = []
# for fraAudio in framesAudio:
# if self.audioCount >= int(1000 / (self.singleFrameTimeAudio * 2)):
# self.sendFrameType = 0
# self.audioCount = 0
# if self.audioCount == 0:
# audioTempFrame.append(fraAudio)
# continue
# else:
# # print("音频--" + str(self.audioCount))
# if len(self.aacFront_28 + fraAudio) % 2 != 0:
# print("出现了问题帧,并自动修复...")
# print(fraAudio)
# fraAudio = fraAudio[:len(fraAudio) - 1]
# protocal_1078.setDataBody(self.aacFront_28 + fraAudio)
# self.doAudioFrameCount(protocal_1078)
# pkgsAudio = protocal_1078.genAudioPkgsByFrame()
# for msgAudio in pkgsAudio:
# # print("发送音频消息:" + msgAudio)
# self.client.send(binascii.a2b_hex(msgAudio))
# time.sleep(self.sendDur)
# framesAudio = audioTempFrame
# if len(framesAudio) != 0:
# continue
#
# conAudio = fa2.read(self.readSizeAudio)
# dataAudio = conAudio.hex()
# if conAudio:
# framesAudio, lastDataAudio = self.getAudioFramesFromStr_2(dataAudio, lastDataAudio)
# for fraAudio in framesAudio:
# if self.audioCount >= int(1000 / (self.singleFrameTimeAudio * 2)):
# self.sendFrameType = 0
# self.audioCount = 0
# if self.audioCount == 0:
# audioTempFrame.append(fraAudio)
# else:
# # print("音频" + str(self.audioCount))
# if len(self.aacFront_28 + fraAudio) % 2 != 0:
# print("出现了问题帧,并自动修复...")
# print(fraAudio)
# fraAudio = fraAudio[:len(fraAudio) - 1]
# protocal_1078.setDataBody(self.aacFront_28 + fraAudio)
# self.doAudioFrameCount(protocal_1078)
# pkgsAudio = protocal_1078.genAudioPkgsByFrame()
# for msgAudio in pkgsAudio:
# # print("发送音频消息:" + msgAudio)
# self.client.send(binascii.a2b_hex(msgAudio))
# time.sleep(self.sendDur)
# framesAudio = audioTempFrame
# else:
# self.sendFrameType = 0
# fv2.close()
# fa2.close()
# self.client.close()
#
# # print("视频" + str(self.pos))
# # print("音频" + str(self.posAudio))
####################################################
# 读取h264文件并发送
####################################################
def readStreamFromFileAndSend(self,fileName):
protocal_1078 = Protocal_1078()
with open(fileName, 'rb') as f2:
con = f2.read(self.readSize)
data = con.hex() # 16进制字节码转字符串
# frames,lastData = self.getFramesFromStr(data)
frames, lastData = self.getFramesFromStr_2(data)
for fra in frames:
protocal_1078.setDataBody("00000001" + fra)
frameType = protocal_1078.getFrameType()
self.doFrameCount(frameType, protocal_1078)
pkgs = protocal_1078.genPkgsByFrame()
for msg in pkgs:
print("发送消息:" + msg)
self.client.send(binascii.a2b_hex(msg))
time.sleep(self.sendDur)
# self.pos = self.pos + 1
while con:
con = f2.read(self.readSize)
data = con.hex() # 16进制字节码转字符串
# frames, lastData = self.getFramesFromStr(data, lastData)
frames, lastData = self.getFramesFromStr_2(data, lastData)
for fra in frames:
if len(fra) % 2 != 0: # 处理有问题的帧数据
print("出现了问题帧,并自动修复...")
fra = fra[:len(fra) - 1]
protocal_1078.setDataBody("00000001" + fra)
frameType = protocal_1078.getFrameType()
self.doFrameCount(frameType, protocal_1078)
pkgs = protocal_1078.genPkgsByFrame()
for msg in pkgs:
print("发送消息:" + msg)
self.client.send(binascii.a2b_hex(msg))
time.sleep(self.sendDur)
# self.pos = self.pos + 1
# print(self.pos)
####################################################
# 读取aac文件并发送
####################################################
def readAacStreamFromFileAndSend(self,fileName):
protocal_1078 = Protocal_1078()
with open(fileName, 'rb') as f2:
con = f2.read(self.readSizeAudio)
data = con.hex() # 16进制字节码转字符串
# frames, lastData = self.getAudioFramesFromStr(data)
frames,lastData = self.getAudioFramesFromStr_2(data)
for fra in frames:
# protocal_1078.setDataBody("fff" + fra)
protocal_1078.setDataBody(self.aacFront_28 + fra)
self.doAudioFrameCount(protocal_1078)
pkgs = protocal_1078.genAudioPkgsByFrame()
for msg in pkgs:
print("发送消息:" + msg)
self.client.send(binascii.a2b_hex(msg))
time.sleep(self.sendDur)
self.posAudio = self.posAudio + 1
while con:
con = f2.read(self.readSize)
data = con.hex() # 16进制字节码转字符串
# frames, lastData = self.getAudioFramesFromStr(data, lastData)
frames, lastData = self.getAudioFramesFromStr_2(data, lastData)
for fra in frames:
if len(self.aacFront_28 + fra) % 2 != 0:
print("出现了问题帧,并自动修复...")
print(fra)
fra = fra[:len(fra) - 1]
# protocal_1078.setDataBody("fff" + fra)
protocal_1078.setDataBody(self.aacFront_28 + fra)
self.doAudioFrameCount(protocal_1078)
pkgs = protocal_1078.genAudioPkgsByFrame()
for msg in pkgs:
print("发送消息:" + msg)
self.client.send(binascii.a2b_hex(msg))
time.sleep(self.sendDur)
# self.posAudio = self.posAudio + 1
print(self.posAudio)
####################################################
# 读取h264文件(用来测试)
####################################################
def readStreamFromFile(self,fileName):
protocal_1078 = Protocal_1078()
with open(fileName, 'rb') as f2:
con = f2.read(self.readSize)
data = con.hex() # 16进制字节码转字符串
frames,lastData = self.getFramesFromStr(data)
for fra in frames:
protocal_1078.setDataBody(fra)
frameType = protocal_1078.getFrameType()
if frameType == "67":
print(frameType)
self.pos = self.pos + 1
while con:
con = f2.read(self.readSize)
data = con.hex() # 16进制字节码转字符串
frames, lastData = self.getFramesFromStr(data, lastData)
for fra in frames:
protocal_1078.setDataBody(fra)
frameType = protocal_1078.getFrameType()
if frameType == "67":
print(frameType)
self.pos = self.pos + 1
# print(self.pos)
print(self.pos)
####################################################
# 对帧时间进行计数(视频帧)
###################################################
def doFrameCount(self,frameType,obj_1078):
if frameType == "65" or frameType == "61": # 68 之后接00000001 便是关键帧
self.time = self.time + self.singleFrameTime
self.lastKeyTime = 0
self.frameCount = self.frameCount + 1
self.videoCount = self.videoCount + 1
self.pos = self.pos + 1
elif frameType == "41": # P帧
self.time = self.time + self.singleFrameTime
self.lastKeyTime = self.lastKeyTime + self.singleFrameTime
self.frameCount = self.frameCount + 1
self.videoCount = self.videoCount + 1
self.pos = self.pos + 1
elif frameType == "01": # B帧
self.time = self.time + self.singleFrameTime
self.lastKeyTime = self.lastKeyTime + self.singleFrameTime
self.frameCount = self.frameCount + 1
self.videoCount = self.videoCount + 1
self.pos = self.pos + 1
else:
self.videoCount = self.videoCount + 1
if self.frameCount >= 30:
self.time = self.time + 1
self.frameCount = 1
obj_1078.setTime(self.time)
obj_1078.setLastKeyTime(self.lastKeyTime)
obj_1078.setLastTime(self.lastTime)
####################################################
# 对帧时间进行计数(音频帧)
###################################################
def doAudioFrameCount(self,obj_1078):
self.timeAudio = self.timeAudio + self.singleFrameTimeAudio
if self.frameCountAudio >= int(1000 / self.singleFrameTimeAudio):
self.frameCountAudio = 0
self.timeAudio = self.timeAudio + 1000 % self.singleFrameTimeAudio
self.frameCountAudio = self.frameCountAudio + 1
self.audioCount = self.audioCount + 1
self.posAudio = self.posAudio + 1
obj_1078.setTime(self.timeAudio)
####################################################
# 读取h264文件(测试程序)
####################################################
def readTest(self,fileName):
with open(fileName, 'rb') as f2:
con = f2.read(204800)
data = con.hex() # 16进制字节码转字符串
data_H = data.split("00000001")
for i in range(1,len(data_H)):
print("00000001" + data_H[i])
####################################################
# 读取h264文件(测试程序)
####################################################
def readTest2(self,fileName):
with open(fileName, 'rb') as f2:
con = f2.read(200)
data = con.hex()
print(data)
con = f2.read(207)
data = con.hex()
print(data)
con = f2.read(212)
data = con.hex()
print(data)
####################################################
# 读取h264文件(测试程序),从指定位置开始读取
####################################################
def readTest3(self,fileName):
with open(fileName, 'rb') as f2:
f2.seek(4,0)
con = f2.read(1)
data = con.hex()
print(data)
#####################################################
# 从帧数据中获取帧类型
#####################################################
def getFrameType(self,frame):
# 0x67 (0 11 00111) SPS 非常重要 type = 7
# 0x68 (0 11 01000) PPS 非常重要 type = 8
# 0x65 (0 11 00101) IDR帧 关键帧 非常重要 type = 5
# 0x61 (0 11 00001) I帧 重要 type=1 非IDR的I帧 不大常见
# 0x41 (0 10 00001) P帧 重要 type = 1
# 0x01 (0 00 00001) B帧 不重要 type = 1
# 0x06 (0 00 00110) SEI 不重要 type = 6
frameTypeHex = frame[8:10]
return frameTypeHex
####################################################
# 16进制字符串分割为个多个帧(通过split方式分割)
####################################################
def getFramesFromStr(self,data,lastDataParam = ""):
data_H = data.split("00000001")
frames = []
if(len(data_H) == 1): #如果字符串中不包含 000001(说明内容都属于一帧的中间位置)
lastData = lastDataParam + data
return frames,lastData
else:
for i in range(0, len(data_H)):
if i != (len(data_H) - 1):
if data_H[i] != "":
if lastDataParam == "": #文件的开始
frame = "" + data_H[i]
frames.append(frame)
else:
if i == 0:
frame = "" + lastDataParam + data_H[i]
frames.append(frame)
else:
frame = "" + data_H[i]
frames.append(frame)
else:
if lastDataParam == "": #文件的开始
pass
else:
frame = "" + data_H[i] + lastDataParam
frames.append(frame)
else:
lastData = data_H[i]
tempFrames = []
for tmp in frames:
if self.getFrameType("00000001" + tmp) == "68" or self.getFrameType("00000001" + tmp) == "09":
fra = tmp.split("000001")
for fra2 in fra:
tempFrames.append(fra2)
else:
tempFrames.append(tmp)
frames = tempFrames
return frames,lastData
####################################################
# 16进制字符串分割为个多个帧(一个字节一个字节的读取)
####################################################
def getFramesFromStr_2(self,data,lastDataParam = ""):
dataStr = data
frame = ""
frames = []
frameSlice = dataStr[:2]
frame = lastDataParam + frame + frameSlice
dataStr = dataStr[2:]
while len(dataStr) > 0:
frameSlice = dataStr[:2]
dataStr = dataStr[2:]
if "00" == frameSlice:
frameTmp = frameSlice
frameTmp2 = dataStr[:2]
dataStr = dataStr[2:]
while frameTmp2 == "00":
frameTmp = frameTmp + frameTmp2
frameTmp2 = dataStr[:2]
dataStr = dataStr[2:]
frameTmp = frameTmp + frameTmp2
if frameTmp == "00000001":
frames.append(frame)
frame = ""
else:
frame = frame + frameTmp
else:
frame = frame + frameSlice
lastData = frame
tempFrames = []
for tmp in frames:
if self.getFrameType("00000001" + tmp) == "68" or self.getFrameType("00000001" + tmp) == "09":
fra = tmp.split("000001")
for fra2 in fra:
tempFrames.append(fra2)
else:
tempFrames.append(tmp)
frames = tempFrames
return frames,lastData
####################################################
# 获取音频帧
####################################################
# def getAudioFramesFromStr_2(self,data,lastDataParam = ""):
# dataStr = data
# frames = []
# frameSlice = dataStr[:2]
# frame = lastDataParam + frameSlice
# dataStr = dataStr[2:]
# while len(dataStr) > 0:
# frameSlice = dataStr[:2]
# dataStr = dataStr[2:]
# if "ff" == frameSlice:
# frameTmp = frameSlice
# frameTmp2 = dataStr[:2]
# dataStr = dataStr[2:]
# if frameTmp2[:1] == "f":
# if len(frame) > 14:
# frames.append(frame[3:])
# frame = ""
# if len(dataStr) < 10:
# frame = frame + frameTmp + frameTmp2 + dataStr
# dataStr = ""
# else:
# frameSlice = dataStr[:10]
# dataStr = dataStr[10:]
# frame = frame + frameTmp + frameTmp2 + frameSlice
# else:
# frame = frame + frameTmp + frameTmp2
# else:
# frame = frame + frameTmp + frameTmp2
# else:
# frame = frame + frameSlice
# lastData = frame
# return frames, lastData
####################################################
# 获取音频帧(通过前面28个字节来截取)
####################################################
def getAudioFramesFromStr_2(self,data,lastDataParam = ""):
if self.aacIsFront_28 == 0:
self. aacFront_28 = data[:7]
self.aacIsFront_28 = 1
data_H = data.split(self.aacFront_28)
frames = []
if(len(data_H) == 1): #如果字符串中不包含 000001(说明内容都属于一帧的中间位置)
lastData = lastDataParam + data
return frames,lastData
else:
for i in range(0, len(data_H)):
if i != (len(data_H) - 1):
if data_H[i] != "":
if lastDataParam == "": #文件的开始
frame = "" + data_H[i]
frames.append(frame)
else:
if i == 0:
frame = "" + lastDataParam + data_H[i]
frames.append(frame)
else:
frame = "" + data_H[i]
frames.append(frame)
else:
if lastDataParam == "": #文件的开始
pass
else:
frame = "" + data_H[i] + lastDataParam
frames.append(frame)
else:
lastData = data_H[i]
return frames,lastData
if __name__ == "__main__":
obj = StreamH264()
obj.connectServer()
obj.setFPSVideo(30)
obj.setFPSAudio(47)
# obj.readStreamFromFileAndSend("../h264/bbb3.h264") # 发送视频
# obj.readAacStreamFromFileAndSend("../aac/bbb3.aac") # 发送音频
obj.sendAVStream()
# StreamH264().readTest("../h264/aaa.h264")
# StreamH264().readTest2("../h264/aaa2.264")
# StreamH264().readTest2("../aac/aaa.aac")
# StreamH264().readStreamFromFile("../h264/aaa2.264")
pass
# coding:utf-8
import socket
class ClientSocket():
def __init__(self):
self.host = "10.100.11.125"
self.port = 1078
self.timeOut = 1
self.client = None # 客户端socket对象
def setHost(self,data):
self.host = data
def setPort(self,data):
self.port = data
def setTimeOut(self,data):
self.timeOut = data
def connect(self):
self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳
self.client.connect((self.host, self.port))
return self.client
def send(self,data):
self.client.send(data)
def close(self):
self.client.shutdown(socket.SHUT_RDWR)
self.client.close()
\ No newline at end of file
#coding: utf-8
import json
import re
import threading
import time
import traceback
import cv2
import requests
from lib.service.StreamH264Flv import StreamH264Flv
class FlvPressureTest():
def __init__(self):
self.mobileStart = 100000000000 # 开始手机号
self.channel = 1 # 频道号
self.terNum = 2 # 要压力测试的线程数
self.flvPath = "../../flv/aaa3.flv" # 要推流的视频路劲
self.host = "10.100.12.3"
self.port = 1078
self.sendDur = 0.005 # socket消息发送间隔
self.isShowFrame = 0 # 拉流是否展示画面 0:不展示 1:展示
self.getGetPlayUrl = "http://10.100.11.110:9999/video/streamInfo" # 获取推流播放地址的url
# self.getGetPlayUrl = "http://fb-test.vandyo.com/video/streamInfo" # 获取推流播放地址的url
self.threadInfo = {} # 存放线程集
self.threadInfo["threadObj"] = {}
self.threadInfo["sucessT"] = {}
self.threadInfo["failT"] = {}
self.threadInfoPull = {} # 存放线程集(拉流线程)
self.threadInfoPull["threadObj"] = {}
self.threadInfoPull["sucessT"] = {}
self.threadInfoPull["failT"] = {}
self.isOpenPullStream = 1 # 是否开启视频拉流 0:不 1:是
def setTerNum(self, data):
self.terNum = data
def setMobileStart(self, data="10000000000"):
self.mobileStart = data
def setChannel(self, data):
self.channel = data
def setFlvPath(self, data):
self.flvPath = data
def setHost(self, data):
self.host = data
def setPort(self, data):
self.port = data
def setIsOpenPullStream(self, data):
self.isOpenPullStream = data
def setSendDur(self,data):
self.sendDur = data
def setIsShowFrame(self,data):
self.isShowFrame = data
def getThreadInfo(self):
return self.threadInfo
def getThreadInfoPull(self):
return self.threadInfoPull
def testServicePush(self, cloudMirror,threadName):
try:
cloudMirror.readFlvAndSend()
except:
traceback.print_exc()
self.threadInfo["threadObj"].pop(threadName)
return
self.threadInfo["threadObj"].pop(threadName)
def testServicePull(self, mobile,threadName):
pullUrl = self.getPlayUrl(mobile,self.channel)
# pullUrl = self.replaceHost(pullUrl, "video-test.vandyo.com:")
cap = cv2.VideoCapture(pullUrl)
try:
while (cap.isOpened()):
ret, frame = cap.read()
if ret == False:
break
if self.isShowFrame == 1:
cv2.imshow("video", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
print("拉流结束")
cap.release()
cv2.destroyAllWindows()
except:
traceback.print_exc()
self.threadInfoPull["threadObj"].pop(threadName)
return
self.threadInfoPull["threadObj"].pop(threadName)
################################################
# 启动推流线程
################################################
def runPush(self, mobile, threadId):
cloudMirror = StreamH264Flv()
cloudMirror.setHost(self.host)
cloudMirror.setPort(self.port)
cloudMirror.connectServer()
cloudMirror.setVideoPath(self.flvPath)
cloudMirror.setMobile(mobile)
cloudMirror.setChannel(self.channel)
cloudMirror.setSendDur(self.sendDur)
conThread = threading.Thread(target=self.testServicePush, args=(cloudMirror,threadId))
self.threadInfo["threadObj"][threadId] = conThread
conThread.start()
print(threadId + "启动了:" + mobile + "-" + str(self.channel))
################################################
# 启动拉流线程
################################################
def runPull(self, mobile, threadId):
time.sleep(1)
pullThread = threading.Thread(target=self.testServicePull, args=(mobile,threadId))
self.threadInfoPull["threadObj"][threadId] = pullThread
pullThread.start()
print(threadId + "启动了:" + mobile + "-" + str(self.channel))
################################################
# 启动压力测试服务
################################################
def start(self):
for i in range(0, self.terNum):
mobile = self.mobileStart + i
mobile = "0" + str(mobile)
threadId = "threadId__" + str(i)
self.runPush(mobile, threadId)
time.sleep(0.01)
if self.isOpenPullStream == 1:
for j in range(0, self.terNum):
mobile = self.mobileStart + j
mobile = "0" + str(mobile)
threadPullId = "threadPullId__" + str(j)
self.runPull(mobile, threadPullId)
time.sleep(0.01)
####################################################
# 获取视频播放地址
####################################################
def getPlayUrl(self,mobile,channel):
res = requests.post(self.getGetPlayUrl, data=json.dumps({'devId': mobile,'chan': str(channel)})).text
res = json.loads(res)
return res["data"]["url"]
####################################################
# 替换url的host
####################################################
def replaceHost(self,url,replaceWord):
data = url
strinfo = re.compile('(\d+.\d+.\d+.\d+.:)')
result = strinfo.sub(replaceWord, data)
return result
if __name__ == "__main__":
test = FlvPressureTest()
test.setHost("10.100.11.125")
# test.setHost("video-test.vandyo.com")
test.setPort(1078)
test.setMobileStart(10000000000)
test.setChannel(1)
test.setTerNum(1)
test.setIsOpenPullStream(1) # 设置是否开启拉流
test.setIsShowFrame(0) # 拉流是否显示预览画面
test.setSendDur(0.007)
test.setFlvPath("../../flv/aaa3.flv")
test.start()
\ No newline at end of file
# coding:utf-8
import threading
import time
import cv2
from lib.service.StreamH264 import StreamH264
'''
使用H264视频文件 和aac音频文件作为推流的压力测试
'''
class H264PresureTest():
def __init__(self):
self.mobileStart = 100000000000 # 开始手机号
self.channel = 1 # 频道号
self.terNum = 2 # 要压力测试的线程数
self.videoPath = "../../h264/bbb3.h264" # 要推流的视频路劲
self.audioPath = "../../h264/bbb3.aac" # 要推流的音频路劲
self.host = "10.100.12.3"
self.port = 1078
self.sendDur = 0.001
self.isShowFrame = 0 # 拉流是否展示画面 0:不展示 1:展示
self.threadInfo = {} # 存放线程集
self.threadInfo["threadObj"] = {}
self.threadInfo["sucessT"] = {}
self.threadInfo["failT"] = {}
self.threadInfoPull = {} # 存放线程集(拉流线程)
self.threadInfoPull["threadObj"] = {}
self.threadInfoPull["sucessT"] = {}
self.threadInfoPull["failT"] = {}
self.isOpenPullStream = 1 # 是否开启视频拉流 0:不 1:是
self.pullUrl = "http://10.100.11.125:8085/live?port=1985&app=vandyo&stream="
def setTerNum(self,data):
self.terNum = data
def setMobileStart(self,data="10000000000"):
self.mobileStart = data
def setChannel(self,data):
self.channel = data
def setVideoPath(self,data):
self.videoPath = data
def setAudioPath(self,data):
self.audioPath = data
def setHost(self,data):
self.host = data
def setPort(self,data):
self.port = data
def setIsOpenPullStream(self,data):
self.isOpenPullStream = data
def setSendDur(self,data):
self.sendDur = data
def setIsShowFrame(self,data):
self.isShowFrame = data
def testServicePush(self,cloudMirror):
cloudMirror.sendAVStream()
def testServicePull(self,mobile):
pullUrl = self.pullUrl + mobile +"-" + str(self.channel)
cap = cv2.VideoCapture(pullUrl)
while (cap.isOpened()):
ret, frame = cap.read()
if ret == False:
break
if self.isShowFrame == 1:
cv2.imshow("video", frame)
if cv2.waitKey(1)&0xFF==ord('q'):
break
print("拉流结束")
cap.release()
cv2.destroyAllWindows()
def runPush(self,mobile,threadId):
cloudMirror = StreamH264()
cloudMirror.setHost(self.host)
cloudMirror.setPort(self.port)
cloudMirror.setSendDur(self.sendDur)
cloudMirror.connectServer()
cloudMirror.setFPSVideo(30)
cloudMirror.setFPSAudio(47)
cloudMirror.setVideoPath(self.videoPath)
cloudMirror.setAudioPath(self.audioPath)
cloudMirror.setMobile(mobile)
cloudMirror.setChannel(self.channel)
conThread = threading.Thread(target=self.testServicePush,args=(cloudMirror,))
self.threadInfo["threadObj"][threadId] = conThread
conThread.start()
print(threadId + "启动了:" + mobile + "-" + str(self.channel))
def runPull(self,mobile,threadId):
pullThread = threading.Thread(target=self.testServicePull, args=(mobile,))
self.threadInfoPull["threadObj"][threadId] = pullThread
pullThread.start()
print(threadId + "启动了:" + mobile + "-" + str(self.channel))
def start(self):
for i in range(0,self.terNum):
mobile = self.mobileStart + i
mobile = "0" + str(mobile)
threadId = "threadId__" + str(i)
self.runPush(mobile,threadId)
time.sleep(0.01)
if self.isOpenPullStream == 1:
for j in range(0,self.terNum):
mobile = self.mobileStart + j
mobile = "0" + str(mobile)
threadPullId = "threadPullId__" + str(j)
self.runPull(mobile,threadPullId)
time.sleep(0.01)
if __name__ == "__main__":
test = H264PresureTest()
test.setHost("10.100.12.3")
test.setPort(1078)
test.setMobileStart(10000000000)
test.setChannel(1)
test.setTerNum(10)
test.setSendDur(0.001)
test.setIsOpenPullStream(0) # 设置是否开启拉流
test.setIsShowFrame(0)
test.setVideoPath("../../h264/bbb3.h264")
test.setAudioPath("../../aac/bbb3.aac")
test.start()
\ No newline at end of file
# coding: utf-8
import subprocess
import threading
import time
import os
VIDEO_PATH = "video" #自动推流视频目录
# PUSH_URL = "rtmp://192.168.0.108:1935/live/room"
PUSH_URL = "rtmp://10.100.12.50:1935/live/room" #推流服务地址
pushThread = {}
isLoop = 1 #是否循环推流:0:不循环 1:循环
#############################################
# 读取当前目录文件
#############################################
def readVideoFile(path=VIDEO_PATH):
videoFiles = os.listdir(VIDEO_PATH)
return videoFiles
#############################################
# 推流方法,被线程调用
#############################################
def pushStream(fi,url):
print(fi + "----推流开始 ")
m_command = ["ffmpeg.exe",
"-re",
"-i", fi,
"-vcodec", "copy",
"-f",
"flv", url]
print(m_command)
subprocess.run(m_command)
print(fi + "----推流结束 ")
if isLoop == 1:
time.sleep(0.5)
pushStream(fi,url)
else:
pushThread.pop(fi)
#############################################
# 开始推流启动多个线程去推流(使用VIDEO_PATH目录下的有所文件)
#############################################
def startPush():
v_files = readVideoFile()
i = 1
for fi in v_files:
pushFile = VIDEO_PATH + "/" + fi
pushUrl = PUSH_URL + str(i)
td = threading.Thread(target=pushStream,args=[pushFile,pushUrl])
td.start()
i = i + 1
pushThread[pushFile] = pushUrl
time.sleep(1)
while len(pushThread) > 0:
for key in pushThread:
print(key + " 的播放地址为:" + pushThread[key])
time.sleep(5)
print("\n")
print("所有推流全部结束")
#############################################
# 开始推流(使用写死的文件地址)
#############################################
def pushStreamFixedFile():
m_command = ["ffmpeg.exe",
"-re",
"-i","E:\视频\舞蹈\【かや】Doja Cat - Say So _ KAYA Ver.【踊ってみた】.mp4",
# "-i", "【かや】thank u, next _ Ariana Grande【踊ってみた _ KAYA Ver.】.mp4",
"-vcodec", "copy",
"-f",
"flv", "rtmp://10.100.12.50:1935/live/room1"]
subprocess.run(m_command)
if __name__ == "__main__":
# startPush()
pushStreamFixedFile()
#coding:utf-8
import binascii
import time
import socket
from xml.dom.minidom import parse
#http://10.100.11.125:8085/live?port=1985&app=vandyo&stream=013146201116-4
# host = "10.100.11.125"
# host = "10.16.15.85"
host = "localhost"
port = 1078
BUF_SIZE = 2048
isLoop = 0 #是否循环发送视频流)0:不 1:是
SIM = "013146201117"
channel = "03"
def readFile_M(fileName):
with open(fileName, 'r') as f1:
f_content = f1.readlines()
for con in f_content:
# print(con)
with open("temp/res.txt", 'a') as f2:
f2.write(con.replace("\n",""))
#####################################################
# 从帧数据中获取帧类型
#####################################################
def getFrameType(data):
# 0x67 (0 11 00111) SPS 非常重要 type = 7
# 0x68 (0 11 01000) PPS 非常重要 type = 8
# 0x65 (0 11 00101) IDR帧 关键帧 非常重要 type = 5
# 0x61 (0 11 00001) I帧 重要 type=1 非IDR的I帧 不大常见
# 0x41 (0 10 00001) P帧 重要 type = 1
# 0x01 (0 00 00001) B帧 不重要 type = 1
# 0x06 (0 00 00110) SEI 不重要 type = 6
frameTypeHex = data[68:70]
return frameTypeHex
def readFile_M2(fileName):
with open(fileName, 'r') as f1:
f_content = f1.read()
f_content = f_content.split("30316364")
# print(len(f_content))
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳
client.connect((host, port))
for i in range(2,len(f_content)):
msg = "30316364" + f_content[i]
msg = msg[:16] + SIM + channel + msg[30:]
if getFrameType(msg) == "61":
print("关键帧")
client.send(binascii.a2b_hex(msg))
time.sleep(0.01)
print(msg)
print("\n\n\n\n 第一波推流完成 --------------------------------")
client.close()
time.sleep(5)
if isLoop == 1:
readFile_M2("temp/res.txt")
def paseXml():
domTree = parse("xml/frame.xml")
rootNode = domTree.documentElement
frameSizes = []
# 所有顾客
customers = rootNode.getElementsByTagName("frame")
for customer in customers:
frameSizes.append(int(customer.getAttribute("pkt_size")))
with open("aac/aaa.aac", 'rb') as f1:
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # 在客户端开启心跳
client.connect((host, port))
for i in frameSizes:
msg = f1.read(i)
msg = msg.hex()
print(msg)
client.send(binascii.a2b_hex(msg))
time.sleep(0.001)
client.close()
if __name__ == "__main__":
# readFile_M("temp/tmp1.txt")
# readFile_M2("temp/res.txt")
# readFile_M("temp/video.txt")
# readFile_M2("temp/res.txt")
paseXml()
\ No newline at end of file
#coding: utf-8
def readH():
with open("h264/aaa2.264", 'rb') as f2:
con = f2.read(2048)
data = con.hex()
data2 = data.split("00000001")
for i in data2:
print("00000001" + i)
print(data2)
with open("h264/tmp.bin", 'wb') as f3:
f3.write(con)
if __name__ == "__main__":
readH()
\ No newline at end of file
#coding: utf-8
import time
from lib.testCase.FlvPressureTest import FlvPressureTest
import logging
logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
###########################################
# 推流h264 和 aac 文件压力测试
###########################################
from lib.testCase.H264PresureTest import H264PresureTest
def h264FilePushTest():
test = H264PresureTest()
test.setHost("10.100.12.3")
test.setPort(1078)
test.setMobileStart(10000000000)
test.setChannel(1)
test.setTerNum(10)
test.setSendDur(0.001)
test.setIsOpenPullStream(0) # 设置是否开启拉流
test.setIsShowFrame(0)
test.setVideoPath("h264/bbb3.h264")
test.setAudioPath("aac/bbb3.aac")
test.start()
def getPressTestObj():
test = FlvPressureTest()
test.setHost("10.100.11.125")
test.setPort(1078)
test.setMobileStart(10000000000) # 开始的设备号 (累加)
test.setChannel(1) # 设置频道号
test.setTerNum(10) # 要启动的推拉流线程
test.setIsOpenPullStream(1) # 设置是否开启拉流
test.setIsShowFrame(0) # 拉流是否显示预览画面 (压力测试都应该填:0)
test.setSendDur(0.007) # 设置socket 发送数据间隔
test.setFlvPath("flv/aaa3.flv") # 设置视频路劲
test.start()
return test
###########################################
# 推流flv文件压力测试
###########################################
def flvFilePushTest():
isLoop = 1 # 是否循环压力测试 0: 不循环 1:循环
loopTime = 1 * 60 * 60 # 循环压力测试的时长
test = getPressTestObj()
logger.info("-----------------------------启动新一轮压力测试-----------------------------")
threadInfo = test.getThreadInfo()
threadInfoPull = test.getThreadInfoPull()
logger.info("剩余推流线程:" + str(len(threadInfo["threadObj"])))
logger.info("剩余拉流线程:" + str(len(threadInfoPull["threadObj"])))
if isLoop == 0:
while len(threadInfo["threadObj"]) != 0 and len(threadInfoPull["threadObj"]) != 0:
time.sleep(5)
logger.info("剩余推流线程:" + str(len(threadInfo["threadObj"])))
logger.info("剩余拉流线程:" + str(len(threadInfoPull["threadObj"])))
threadInfo = test.getThreadInfo()
threadInfoPull = test.getThreadInfoPull()
else:
if len(threadInfo["threadObj"]) == 0 and len(threadInfoPull["threadObj"]) == 0:
test = getPressTestObj()
threadInfo = test.getThreadInfo()
threadInfoPull = test.getThreadInfoPull()
while len(threadInfo["threadObj"]) != 0 and len(threadInfoPull["threadObj"]) != 0:
time.sleep(5)
logger.info("剩余推流线程:" + str(len(threadInfo["threadObj"])))
logger.info("剩余拉流线程:" + str(len(threadInfoPull["threadObj"])))
threadInfo = test.getThreadInfo()
threadInfoPull = test.getThreadInfoPull()
time.sleep(30)
flvFilePushTest()
logger.info("压力测试结束!!!!!!")
if __name__ == "__main__":
flvFilePushTest()
# h264FilePushTest()
#coding: utf-8
import cv2
cap=cv2.VideoCapture("http://10.100.11.125:8085/live?port=1985&app=vandyo&stream=013146201117-1")
while (cap.isOpened()):
ret, frame = cap.read()
if ret == False:
break
cv2.imshow("video", frame)
if cv2.waitKey(1)&0xFF==ord('q'):
break
print("拉流结束")
cap.release()
cv2.destroyAllWindows()
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
#coding: utf-8
import os
import wx
class CameraArea():
def __init__(self,frame):
self.frame = frame
self.mainPanel = None
self.devId = "010000000000"
self.channel = 1
def setDevId(self,data):
self.devId = data
def setChannel(self,data):
self.channel = data
#################################################
# 创建一个pannel
#################################################
def create(self):
self.mainPanel = wx.Panel(self.frame)
boxSizer = wx.BoxSizer(wx.VERTICAL)
topPanel = wx.Panel(self.mainPanel)
bottomPanel = wx.Panel(self.mainPanel)
bottomPanel.SetBackgroundColour(wx.YELLOW)
boxSizer.Add(topPanel,1,flag=wx.EXPAND | wx.ALL)
boxSizer.Add(bottomPanel,2, flag=wx.EXPAND | wx.ALL)
self.mainPanel.SetSizer(boxSizer)
preView = wx.Panel(topPanel)
boxSizer_preView = wx.BoxSizer(wx.VERTICAL)
preView_staticText = wx.StaticText(preView, label='预览区域(未实现):', pos=(10, 10))
preView_content = wx.Panel(topPanel, style=wx.BORDER_SIMPLE)
boxSizer_preView.Add(preView_staticText,1,flag=wx.EXPAND | wx.ALL)
boxSizer_preView.Add(preView_content,11, flag=wx.EXPAND | wx.ALL)
preView.SetSizer(boxSizer_preView)
paramView = wx.Panel(topPanel)
wx.StaticText(paramView, label='本地视频:', pos=(10, 10))
videoPathText = wx.TextCtrl(paramView,pos=(70,5),size=wx.Size(300,-1)) # 要推流的视频地址
selectPathButton = wx.Button(paramView,label="选择视频文件",pos=(375,5))
self.frame.Bind(wx.EVT_BUTTON, lambda evt,textCtr=videoPathText : self.selectVideoFile(evt,textCtr), selectPathButton)
wx.StaticText(paramView, label='推流地址:', pos=(10, 40))
pushHostText = wx.TextCtrl(paramView, pos=(70, 35), size=wx.Size(150, -1), value="10.100.11.125") # 推流地址
wx.StaticText(paramView, label='推流端口:', pos=(235, 40))
pushPortText = wx.TextCtrl(paramView, pos=(300, 35), size=wx.Size(80, -1), value="1078") # 推流端口
wx.StaticText(paramView, label='设 备号:', pos=(10, 70))
pushHostText = wx.TextCtrl(paramView, pos=(70, 65), size=wx.Size(150, -1), value=self.devId) # 设备号
wx.StaticText(paramView, label='频 道号:', pos=(235, 70))
pushPortText = wx.TextCtrl(paramView, pos=(300, 65), size=wx.Size(80, -1), value=str(self.channel)) # 频道号
wx.StaticText(paramView, label='当前状态:', pos=(10, 100))
pushStatusText = wx.TextCtrl(paramView, pos=(70, 95), size=wx.Size(60, -1),value="未推流",style=wx.TE_READONLY) # 推流状态显示
pushStatusText.SetForegroundColour(wx.RED)
wx.StaticText(paramView, label='消息发送间隔(毫秒):', pos=(10, 130))
msgSendDurText = wx.TextCtrl(paramView, pos=(150, 125), size=wx.Size(40, -1), value="7") # 消息发送间隔
wx.StaticText(paramView, label='播放地址:', pos=(10, 190))
msgSendDurText = wx.TextCtrl(paramView, pos=(70, 185), size=wx.Size(400, -1)) # 视频播放地址
ctrView = wx.Panel(topPanel)
startPushButton = wx.Button(ctrView, label="开始推流", pos=(5, 5))
stopPushButton = wx.Button(ctrView, label="停止推流", pos=(90, 5))
ctrView.SetBackgroundColour(wx.RED)
boxSizer_1 = wx.BoxSizer(wx.HORIZONTAL)
boxSizer_1.Add(preView,1,flag=wx.EXPAND | wx.ALL)
boxSizer_1.Add(paramView, 2, flag=wx.EXPAND | wx.ALL)
boxSizer_1.Add(ctrView, 1, flag=wx.EXPAND | wx.ALL)
topPanel.SetSizer(boxSizer_1)
return self.mainPanel
def selectVideoFile(self,evt,textCtr):
wildcard = "Video file (*.flv)|*.flv"
dlg = wx.FileDialog(
self.frame, message="Choose a file",
defaultDir=os.getcwd(),
defaultFile="",
wildcard=wildcard,
style=wx.FD_OPEN |
wx.FD_CHANGE_DIR | wx.FD_FILE_MUST_EXIST |
wx.FD_PREVIEW
)
if dlg.ShowModal() == wx.ID_OK:
path = dlg.GetPath()
textCtr.SetValue(path)
dlg.Destroy()
\ No newline at end of file
#coding:utf-8
import wx
from ui.CameraArea import CameraArea
'''
定义主窗体
'''
class MainWindow():
def __init__(self):
self.app = None
self.frame = None
self.main()
def main(self):
self.app = wx.App()
self.frame = wx.Frame(None,-1, title='云镜模拟器',size = wx.Size(1100,700))
self.createMainPanel()
#####################################################
# 显示窗体
#####################################################
def show(self):
self.frame.Show()
# import wx.lib.inspection
# wx.lib.inspection.InspectionTool().Show()
self.app.MainLoop()
#####################################################
# 创建主窗体的布局面板
#####################################################
def createMainPanel(self):
mainPanel = wx.Panel(self.frame)
boxSizer = wx.BoxSizer(wx.VERTICAL)
nodeBook = wx.Notebook(mainPanel)
cameraArea1 = CameraArea(nodeBook)
page1 = cameraArea1.create()
nodeBook.AddPage(page1,"摄像头1")
cameraArea2 = CameraArea(nodeBook)
cameraArea2.setChannel(2)
page2 = cameraArea2.create()
nodeBook.AddPage(page2, "摄像头2")
cameraArea3 = CameraArea(nodeBook)
cameraArea3.setChannel(3)
page3 = cameraArea3.create()
nodeBook.AddPage(page3, "摄像头3")
page4 = wx.Panel(nodeBook)
nodeBook.AddPage(page4, "转码工具")
page5 = wx.Panel(nodeBook)
nodeBook.AddPage(page5, "告警与其他功能")
boxSizer.Add(nodeBook, 1, flag=wx.EXPAND | wx.ALL)
mainPanel.SetSizer(boxSizer)
if __name__ == "__main__":
MainWindow().show()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment