Commit 737ea123 authored by liyuanhong's avatar liyuanhong

首次提交

parents
Pipeline #169 failed with stages
.idea/*
venv/*
\ No newline at end of file
# coding: utf-8
# 用户名
USER_NAME = "13146201116"
# 密码
PASSWORD = "Lyh123456"
# 车机号
CAR_DIN = "M202003060518"
# 域名
HOST = "api-test.vandyo.com"
# 请求头公共参数
HEADER = {"Accept-Encoding": "gzip", "Content-Type": "application/json; charset=UTF-8"}
SID = ""
UID = ""
# 车机id (区别于硬件的车机号)
CAR_ID = ""
# http url前缀
HTTP_PREFIX = "http://"
# https url前缀
HTTPS_PREFIX = "https://"
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
#coding: utf-8
import time
from testCase.MainCase import MainCase
###########################################################
# 使用html 报告模板输出
###########################################################
def run():
caseObj = MainCase()
caseObj.startTest()
###########################################################
# 只在控制台输出测试结果
###########################################################
def txtRun():
caseObj = MainCase()
caseObj.startTxtTest()
###########################################################
# 通过服务方式启动拨测服务
###########################################################
def runService():
caseObj = MainCase()
ts = time.time()
timeArray = time.localtime(ts)
curTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
caseObj.set_title(curTime + " 拨测报告") # 设置邮件标题
caseObj.set_from_addr("optest@vandyo.com") # 设置发件箱
caseObj.set_password("123qweQWE!@#AaA") # 设置邮箱登录密码
caseObj.add_to_addr("liyuanhong@vandyo.com") # 添加收件箱
# caseObj.add_to_addr("yuzhanyong@vandyo.com") # 添加收件箱
# caseObj.add_to_addr("908963295@qq.com") # 添加收件箱
# caseObj.add_to_cc("jiaxiantao@vandyo.com") # 添加抄送
# caseObj.add_to_cc("langang@vandyo.com") # 添加抄送
caseObj.startTestService()
if __name__ == "__main__":
runService()
# run()
# txtRun()
\ No newline at end of file
#coding: utf-8
'''
汽车接口相关用例
'''
import json
import random
import time
import unittest
import requests
from config import config
from testCase.common import utils
class Car_interface_cases(unittest.TestCase):
##########################################################
# 登录,用户手机密码登录
# 单元测试的同时获取uid 和sid
# 必须是第一个执行的单元测试用例
##########################################################
def test_user_authorize_pwd_login(self):
url = config.HTTP_PREFIX + config.HOST + "/user/authorize/pwd/login"
ts = int(time.time())
headers = {"Accept-Encoding": "gzip", "Content-Type": "application/json; charset=UTF-8"}
params = {}
params["ts"] = ts
postData = {}
postData["mobile"] = config.USER_NAME
postData["password"] = utils.getMd5String(config.PASSWORD)
resObj = requests.post(url, headers=headers, params=params, data=json.dumps(postData))
result = json.loads(resObj.text)
print("\n请求地址:" + url)
print("-------------------------返回结果:-------------------------")
print(resObj.text)
config.SID = result["result"]["sid"]
config.UID = result["result"]["uid"]
self.baseAssert(resObj)
def to_unicode(string):
ret = ''
for v in string:
ret = ret + hex(ord(v)).upper().replace('0X', '\\u')
return ret
##########################################################
# 查询人车对话
##########################################################
def test_car_my_man_speak(self):
url = config.HTTP_PREFIX + config.HOST + "/car/man/speak"
ts = int(time.time())
headers = {"Accept-Encoding": "gzip", "Content-Type": "application/json; charset=UTF-8"}
params = {}
params["sid"] = config.SID
params["ts"] = ts
params["uid"] = config.UID
postData = {}
postData["cid"] = config.CAR_ID
postData["message"] = random.choice(['车辆位置', '即时速度', '发动机转速','剩余油量','电瓶电压'])
cs = utils.getSignature(params,json.dumps(postData))
params["cs"] = cs
params.pop("sid")
print(postData)
resObj = requests.post(url, headers=headers, params=params,data=json.dumps(postData))
print("\n请求地址:" + url)
print("-------------------------返回结果:-------------------------")
print(resObj.text)
self.baseAssert(resObj)
####################################### 车相关 #######################################
##########################################################
# 车品牌列表
##########################################################
def test_car_my_brand_list(self):
url = config.HTTP_PREFIX + config.HOST + "/car/brand/list"
ts = int(time.time())
headers = {"Accept-Encoding": "gzip", "Content-Type": "application/json; charset=UTF-8"}
params = {}
params["sid"] = config.SID
params["ts"] = ts
params["uid"] = config.UID
cs = utils.getSignature(params)
params["cs"] = cs
params.pop("sid")
resObj = requests.get(url, headers=headers, params=params)
result = json.loads(resObj.text)
print("\n请求地址:" + url)
print("-------------------------返回结果:-------------------------")
print(resObj.text)
self.baseAssert(resObj)
##########################################################
# 车款年款列表
##########################################################
def test_car_model_list_year(self):
# todo
url = config.HTTP_PREFIX + config.HOST + "/car/model/list/year"
ts = int(time.time())
headers = {"Accept-Encoding": "gzip", "Content-Type": "application/json; charset=UTF-8"}
params = {}
params["sid"] = config.SID
params["ts"] = ts
params["uid"] = config.UID
cs = utils.getSignature(params)
params["cs"] = cs
params.pop("sid")
resObj = requests.get(url, headers=headers, params=params)
result = json.loads(resObj.text)
print("\n请求地址:" + url)
print("-------------------------返回结果:-------------------------")
print(resObj.text)
self.baseAssert(resObj)
##########################################################
# 查询车列表
##########################################################
def test_car_my_car_list(self):
url = config.HTTP_PREFIX + config.HOST + "/car/my_car/list"
ts = int(time.time())
headers = {"Accept-Encoding": "gzip", "Content-Type": "application/json; charset=UTF-8"}
params = {}
params["sid"] = config.SID
params["ts"] = ts
params["uid"] = config.UID
cs = utils.getSignature(params)
params["cs"] = cs
params.pop("sid")
resObj = requests.get(url, headers=headers, params=params)
result = json.loads(resObj.text)
print("\n请求地址:" + url)
print("-------------------------返回结果:-------------------------")
print(resObj.text)
config.CAR_ID = result["result"][0]["cid"]
self.baseAssert(resObj)
##########################################################
# 查询车信息
##########################################################
def test_car_my_car_info(self):
url = config.HTTP_PREFIX + config.HOST + "/car/my_car/info"
ts = int(time.time())
headers = {"Accept-Encoding": "gzip", "Content-Type": "application/json; charset=UTF-8"}
params = {}
params["sid"] = config.SID
params["ts"] = ts
params["uid"] = config.UID
params["cid"] = config.CAR_ID
cs = utils.getSignature(params)
params["cs"] = cs
params.pop("sid")
resObj = requests.get(url, headers=headers, params=params)
print("\n请求地址:" + url)
print("-------------------------返回结果:-------------------------")
print(resObj.text)
self.baseAssert(resObj)
##########################################################
# 接口基本断言,需要传入一个返回结果对象
##########################################################
def baseAssert(self,resObj):
statusCode = int(resObj.status_code) # 响应状态码
resTime = int(resObj.elapsed.total_seconds() * 1000) # 获取接口相应时间(毫秒)
result = json.loads(resObj.text)
error = result["error"]
self.assertEqual(statusCode, 200, "响应状态码为:" + str(statusCode))
self.assertTrue(resTime < 4000, "响应时间大于4秒,响应时间为:" + str(resTime) + "毫秒")
self.assertEqual(error, 0, "接口返回错误:" + str(error) + " content: " + resObj.text)
print("接口响应时间:" + str(resTime) + "毫秒")
def startTest(self):
suite = unittest.TestSuite()
suite.addTest(Car_interface_cases("test_user_authorize_pwd_login"))
suite.addTest(Car_interface_cases("test_car_my_brand_list")) # 车品牌列表
suite.addTest(Car_interface_cases("test_car_my_car_list")) # 查询车列表
suite.addTest(Car_interface_cases("test_car_my_car_info")) # 查询车信息
suite.addTest(Car_interface_cases("test_car_my_man_speak")) # 查询人车对话
runner = unittest.TextTestRunner()
runner.run(suite)
if __name__ == "__main__":
Car_interface_cases().startTest()
\ No newline at end of file
#coding: utf-8
'''
测试套件类,用来组织所有的测试用例
'''
import os
import platform
import time
import unittest
from testCase.Car_interface_cases import Car_interface_cases
from testCase.User_interface_cases import User_interface_cases
from testCase.common.EmailTool import EmailTool
from testCase.common.HTMLTestRunner import HTMLTestRunner
class MainCase():
def __init__(self):
self.sucessCount = 0
self.failureCount = 0
self.errorCount = 0
self.testReport = "" #测试报告名
self.testDate = "" #测试日期
self.from_addr = 'optest@vandyo.com' # 发件箱
self.password = '123qweQWE!@#AaA' # 发件箱密码
# self.to_addr = "liyuanhong@vandyo.com" # 收件箱
self.to_addr = "" # 收件箱
self.to_cc = "" # 抄送
self.msg = "邮件内容" # 邮件正文
self.smtp_server = "smtp.exmail.qq.com" # 发信服务器
self.title = "车安优拨测报告" # 邮件标题
def getSucessCount(self):
return self.sucessCount
def getFailCount(self):
return self.failureCount
def getErrorCount(self):
return self.errorCount
def getTestReport(self):
return self.testReport
def getTestDate(self):
return self.testDate
def set_from_addr(self,data):
self.from_addr = data
def set_password(self,data):
self.password = data
def set_to_addr(self,data):
self.to_addr = data
def set_title(self,data):
self.title = data
def set_msg(self,data):
self.msg = data
#########################################################
# 添加收件人
#########################################################
def add_to_addr(self,data):
if self.to_addr == "":
self.to_addr = data
else:
self.to_addr = self.to_addr + "," + data
#########################################################
# 添加抄送
#########################################################
def add_to_cc(self,data):
if self.to_cc == "":
self.to_cc = data
else:
self.to_cc = self.to_cc + "," + data
###########################################################
# 使用html 报告模板输出
###########################################################
def startTest(self):
suite = unittest.TestSuite()
suite.addTest(User_interface_cases("test_user_authorize_pwd_login")) #用户手机密码登录
suite.addTest(User_interface_cases("test_user_mall_goods_list")) #商品列表
suite.addTest(User_interface_cases("test_user_third_party_app_list")) #第三方应用列表
suite.addTest(User_interface_cases("test_user_business_cooperation_get")) #查询商务合作信息
suite.addTest(User_interface_cases("test_user_info_get")) #获取用户基本信息
suite.addTest(User_interface_cases("test_user_pop_list")) #获取弹框广告
suite.addTest(User_interface_cases("test_user_banner_list")) # banner图片列表
suite.addTest(User_interface_cases("test_user_flash_list")) # 获取闪屏广告
suite.addTest(User_interface_cases("test_user_car_msg_config_get")) # 获取车消息开关
suite.addTest(User_interface_cases("test_user_user_msg_config_get")) # 获取用户推送消息开关
suite.addTest(User_interface_cases("test_user_config_version_update_get")) # 版本更新
suite.addTest(User_interface_cases("test_user_menu_list")) #用户菜单列表
suite.addTest(Car_interface_cases("test_car_my_car_list")) #查询车列表
suite.addTest(Car_interface_cases("test_car_my_car_info")) #查询车信息
suite.addTest(Car_interface_cases("test_car_my_man_speak")) #查询人车对话
ts = time.time()
timeArray = time.localtime(ts)
curTime = ""
curDate = ""
sys = platform.system()
if sys == "Windows":
curTime = time.strftime("%Y-%m-%d %H_%M_%S", timeArray)
curDate = time.strftime("%Y-%m-%d", timeArray)
else:
curTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
curDate = time.strftime("%Y-%m-%d", timeArray)
fdir = "result/" + curDate
if not os.path.exists(fdir):
os.mkdir(fdir)
filePath = fdir + "/" + curTime + ".html"
self.testReport = curTime + ".html"
self.testDate = curDate
fp = open(filePath, 'wb')
# 生成报告的Title,描述
runner = HTMLTestRunner(stream=fp, title='车安优App接口测试', description='车安优App拨测程序测试报告...')
result = runner.run(suite)
self.sucessCount = result.success_count
self.failureCount = result.failure_count
self.errorCount = result.error_count
self.msg = "【" + curTime + "】 测试简要结果如下:\n"
self.msg = self.msg + "\n 用例总数:" + str(self.sucessCount + self.failureCount + self.errorCount)
self.msg = self.msg + "\n 成功个数:" + str(self.sucessCount)
self.msg = self.msg + "\n 失败个数:" + str(self.failureCount)
self.msg = self.msg + "\n 错误个数:" + str(self.errorCount)
self.msg = self.msg + "\n\n测试详情,详见附件..."
fp.close()
###########################################################
# 使用html 报告模板输出
###########################################################
def startTxtTest(self):
suite = unittest.TestSuite()
suite.addTest(User_interface_cases("test_user_authorize_pwd_login")) #用户手机密码登录
suite.addTest(User_interface_cases("test_user_mall_goods_list")) #商品列表
suite.addTest(User_interface_cases("test_user_third_party_app_list")) #第三方应用列表
suite.addTest(User_interface_cases("test_user_business_cooperation_get")) #查询商务合作信息
suite.addTest(User_interface_cases("test_user_info_get")) #获取用户基本信息
suite.addTest(User_interface_cases("test_user_pop_list")) #获取弹框广告
suite.addTest(User_interface_cases("test_user_banner_list")) # banner图片列表
suite.addTest(User_interface_cases("test_user_flash_list")) # 获取闪屏广告
suite.addTest(User_interface_cases("test_user_car_msg_config_get")) # 获取车消息开关
suite.addTest(User_interface_cases("test_user_user_msg_config_get")) # 获取用户推送消息开关
suite.addTest(User_interface_cases("test_user_config_version_update_get")) # 版本更新
suite.addTest(User_interface_cases("test_user_menu_list")) #用户菜单列表
suite.addTest(Car_interface_cases("test_car_my_car_list")) # 查询车列表
runner = unittest.TextTestRunner()
runner.run(suite)
###########################################################
# 定时测试服务,如果有失败用例,会自动发送邮件通知
###########################################################
def startTestService(self):
while True:
startTime = int(time.time())
endTime = int(time.time())
self.startTest()
if self.failureCount > 0 or self.errorCount > 0:
self.sendEmail()
###########################################################
# 发送邮件
###########################################################
def sendEmail(self):
email_obj = EmailTool()
email_obj.addContent(self.msg) # 设置邮件内容
email_obj.set_title(self.title) # 设置邮件标题
ts = time.time()
timeArray = time.localtime(ts)
curDate = time.strftime("%Y-%m-%d", timeArray)
time.sleep(2)
email_obj.set_attachment("result/" + curDate + "/" + self.testReport,self.testReport) # 设置附件
email_obj.set_to_addr(self.to_addr) # 设置抄送
email_obj.set_to_cc(self.to_cc) # 设置收信人
email_obj.send()
This diff is collapsed.
#coding: utf-8
import smtplib
from email import encoders
from email.mime.application import MIMEApplication
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.header import Header
'''
发送邮件的类
'''
class EmailTool():
def __init__(self):
self.from_addr = 'optest@vandyo.com' # 发件箱
self.password = '123qweQWE!@#AaA' # 发件箱密码
self.to_addr = "liyuanhong@vandyo.com" # 收件箱
self.to_cc = "" # 抄送
self.smtp_server = "smtp.exmail.qq.com" # 发信服务器
self.title = "车安优拨测反馈" # 邮件标题
self.msg_obj = MIMEMultipart("mixed")
def set_from_addr(self,data):
self.from_addr = data
def set_password(self,data):
self.password = data
def set_to_addr(self,data):
self.to_addr = data
def set_to_cc(self,data):
self.to_cc = data
def set_smtp_server(self,data):
self.smtp_server = data
def set_title(self,data):
self.title = data
def set_email_type(self,data):
self.email_type = data
#########################################################
# 添加收件人
#########################################################
def add_to_addr(self,data):
if self.to_addr == "":
self.to_addr = data
else:
self.to_addr = self.to_addr + "," + data
#########################################################
# 添加抄送
#########################################################
def add_to_cc(self,data):
if self.to_cc == "":
self.to_cc = data
else:
self.to_cc = self.to_cc + "," + data
#########################################################
# 添加邮件内容: plain为纯文本,html:为html格式
#########################################################
def addContent(self,msg="",type="plain"):
mime = MIMEText(msg, type, 'utf-8')
self.msg_obj.attach(mime)
#########################################################
# 发送邮件
#########################################################
def send(self):
# 邮箱正文内容,第一个参数为内容,第二个参数为格式(plain 为纯文本),第三个参数为编码
self.msg_obj['From'] = Header(self.from_addr)
self.msg_obj['To'] = Header(self.to_addr) # 收信人
if self.to_cc != "":
self.msg_obj['Cc'] = Header(self.to_cc) # 添加抄送
self.msg_obj['Subject'] = Header(self.title)
server = smtplib.SMTP_SSL(host=self.smtp_server)
server.connect(self.smtp_server, 465)
server.login(self.from_addr, self.password)
if self.to_cc == "":
server.sendmail(self.from_addr, self.to_addr.split(',') ,self.msg_obj.as_string()) # 发送邮件
else:
server.sendmail(self.from_addr, self.to_addr.split(',') + self.to_cc.split(","), self.msg_obj.as_string()) # 发送邮件
server.quit() # 关闭服务器
#########################################################
# 设置附件内容
########################################################
def set_attachment(self,file_path,file_name):
txt = ""
with open(file_path, "r", encoding="utf-8") as fi:
content = fi.read()
txt = content
part = MIMEText(txt, "plain", 'utf-8')
part.add_header('Content-Disposition', 'attachment', filename=file_name)
self.msg_obj.attach(part)
#########################################################
# 读取并返回文件内容
#########################################################
def read_from_file(self,file_name):
txt = ""
with open(file_name, "r", encoding="utf-8") as fi:
content = fi.read()
txt = content
return txt
if __name__ == "__main__":
obj = EmailTool()
obj.set_to_addr("liyuanhong@vandyo.com")
obj.add_to_addr("908963295@qq.com")
obj.add_to_cc("langang@vandyo.com")
obj.addContent("这是一个测试邮件,请忽略...")
obj.set_attachment("../../result/2020-05-22/2020-05-22 13_50_21.html", "2020-05-22 13_50_21.html")
obj.send()
This diff is collapsed.
# coding:utf-8
'''
在此定义辅助函数
'''
import hashlib
#################################################
# 获取一个Md5的字符串
#################################################
def getMd5String(data):
m = hashlib.md5()
b = data.encode(encoding='utf-8')
m.update(b)
str_md5 = m.hexdigest()
return str_md5
#################################################
# 获取签名,传入一个map,
# (需要对传入的参数,按照key的升序排列)
#################################################
def getSignature(data,postData={}):
sMd5 = ""
if len(postData) == 0:
key = sorted(data.keys())
tem = {}
for item in key:
tem[item] = data[item]
data = tem
s = ""
for key in data.keys():
s = s + key + "[" + str(data[key]) + "];"
sMd5 = getMd5String(s)
else:
data["Data"] = postData
key = sorted(data.keys())
tem = {}
for item in key:
tem[item] = data[item]
data = tem
s = ""
for key in data.keys():
s = s + key + "[" + str(data[key]) + "];"
sMd5 = getMd5String(s)
return sMd5
if __name__ == "__main__":
print(getSignature({"a":123,"b":"abc"}))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment