Commit 87b2af09 authored by liyuanhong's avatar liyuanhong

开发了一些用例,拨测程序整体功能基本完成

parent 737ea123
## 车安优app拨测程序
#### 一、使用说明
1、安装了python3 和 pip 包管理工具
2、使用 :pip install -r requirements.txt 安装依赖库
3、编辑config/config.py 配置好用户名和密码以及车机号
4、编辑run.py,编辑runService方法里面的启动参数
5、python3 run.py即可启动模拟服务(window下双击即可启动)
\ No newline at end of file
# coding: utf-8
# 用户名
USER_NAME = "13146201116"
# 密码
PASSWORD = "Lyh123456"
# 车机号
CAR_DIN = "M202003060518"
# 域名
HOST = "api-test.vandyo.com"
#################### 全局参数设置 ##################
SSL_VERIFY = False # 是否开启SSL验证,默认设置为True;需要抓包调试的时候,改为False
#################### 测试用户参数设置 ##################
USER_NAME = "13146201116" # 用户名
PASSWORD = "Lyh123456" # 密码
CAR_DIN = "M202003060518" # 车机号
HOST = "api-test.vandyo.com" # 域名
# 请求头公共参数
HEADER = {"Accept-Encoding": "gzip", "Content-Type": "application/json; charset=UTF-8"}
SID = ""
UID = ""
# 车机id (区别于硬件的车机号)
CAR_ID = ""
# http url前缀
HTTP_PREFIX = "http://"
# https url前缀
HTTPS_PREFIX = "https://"
CAR_ID = "" # 车机id (区别于硬件的车机号)
HTTP_PREFIX = "http://" # http url前缀
#################### 体验模式参数设置 ##################
EX_HOST = "api.vandyo.com" # 体验模式域名
# 请求头公共参数
EX_HEADER = {"Accept-Encoding": "gzip", "Content-Type": "application/json; charset=UTF-8"}
EX_USER_NAME = "guest" # 用户名
EX_PASSWORD = "fcf41657f02f88137a1bcf068a32c0a3" # 密码
EX_SID = ""
EX_UID = ""
EX_HTTP_PREFIX = "https://" # http url前缀
EX_CAR_ID = "" # 车机id (区别于硬件的车机号)
......@@ -34,6 +34,7 @@ def runService():
# caseObj.add_to_addr("908963295@qq.com") # 添加收件箱
# caseObj.add_to_cc("jiaxiantao@vandyo.com") # 添加抄送
# caseObj.add_to_cc("langang@vandyo.com") # 添加抄送
caseObj.setDurTime(1 * 1 * 60) # 设置多久执行一次拨测程序
caseObj.startTestService()
......
#coding: utf-8
'''
车动态接口相关用例
'''
import json
import time
import unittest
import requests
from config import config
from testCase.common import utils
class CarDynamic_interface_cases(unittest.TestCase):
##########################################################
# 登录,用户手机密码登录
# 单元测试的同时获取uid 和sid
# 必须是第一个执行的单元测试用例
##########################################################
def test_user_authorize_pwd_login(self):
url = config.HTTP_PREFIX + config.HOST + "/user/authorize/pwd/login"
ts = int(time.time())
headers = {"Accept-Encoding": "gzip", "Content-Type": "application/json; charset=UTF-8"}
params = {}
params["ts"] = ts
postData = {}
postData["mobile"] = config.USER_NAME
postData["password"] = utils.getMd5String(config.PASSWORD)
resObj = requests.post(url, headers=headers, params=params, data=json.dumps(postData),verify=config.SSL_VERIFY)
result = json.loads(resObj.text)
print("\n请求地址:" + url)
print("-------------------------返回结果:-------------------------")
print(resObj.text)
config.SID = result["result"]["sid"]
config.UID = result["result"]["uid"]
self.baseAssert(resObj)
#获取car_id
url = config.HTTP_PREFIX + config.HOST + "/car/my_car/list"
ts = int(time.time())
headers = {"Accept-Encoding": "gzip", "Content-Type": "application/json; charset=UTF-8"}
params = {}
params["sid"] = config.SID
params["ts"] = ts
params["uid"] = config.UID
cs = utils.getSignature(params)
params["cs"] = cs
params.pop("sid")
resObj = requests.get(url, headers=headers, params=params,verify=config.SSL_VERIFY)
result = json.loads(resObj.text)
print("\n请求地址:" + url)
print("-------------------------返回结果:-------------------------")
print(resObj.text)
config.CAR_ID = result["result"][0]["cid"]
##########################################################
# 车辆动态信息
##########################################################
def test_carDynamic_api_status_car_state_get(self):
url = config.HTTP_PREFIX + config.HOST + "/carDynamic/api/status/car/state/get"
ts = int(time.time())
headers = {"Accept-Encoding": "gzip", "Content-Type": "application/json; charset=UTF-8"}
params = {}
params["sid"] = config.SID
params["ts"] = ts
params["start"] = ts
params["uid"] = config.UID
params["cid"] = config.CAR_ID
cs = utils.getSignature(params)
params["cs"] = cs
params.pop("sid")
resObj = requests.get(url, headers=headers, params=params,verify=config.SSL_VERIFY)
print("\n请求地址:" + url)
print("-------------------------返回结果:-------------------------")
print(resObj.text)
self.baseAssert(resObj)
##########################################################
# 接口基本断言,需要传入一个返回结果对象
##########################################################
def baseAssert(self,resObj):
statusCode = int(resObj.status_code) # 响应状态码
resTime = int(resObj.elapsed.total_seconds() * 1000) # 获取接口相应时间(毫秒)
result = json.loads(resObj.text)
error = result["error"]
self.assertEqual(statusCode, 200, "响应状态码为:" + str(statusCode))
self.assertTrue(resTime < 4000, "响应时间大于4秒,响应时间为:" + str(resTime) + "毫秒")
# self.assertEqual(error, 0, "接口返回错误:" + str(error) + " content: " + resObj.text)
self.assertEqual(error, 0, "接口返回错误")
print("接口响应时间:" + str(resTime) + "毫秒")
def startTest(self):
suite = unittest.TestSuite()
suite.addTest(CarDynamic_interface_cases("test_user_authorize_pwd_login"))
suite.addTest(CarDynamic_interface_cases("test_carDynamic_api_status_car_state_get")) # 车辆动态信息
runner = unittest.TextTestRunner()
runner.run(suite)
if __name__ == "__main__":
CarDynamic_interface_cases().startTest()
\ No newline at end of file
This diff is collapsed.
#coding: utf-8
'''
车模拟程序测试用例
'''
import json
import random
import time
import unittest
import requests
from config import config
from testCase.common import utils
class DemoCar_case(unittest.TestCase):
##########################################################
# demo车登录接口
##########################################################
def test_user_authorize_pwd_login(self):
url = config.EX_HTTP_PREFIX + config.EX_HOST + "/user/authorize/pwd/login"
ts = int(time.time())
headers = config.EX_HEADER
params = {}
params["ts"] = ts
postData = {}
postData["mobile"] = config.EX_USER_NAME
postData["password"] = config.EX_PASSWORD
resObj = requests.post(url, headers=headers, params=params, data=json.dumps(postData),verify=config.SSL_VERIFY)
result = json.loads(resObj.text)
print("\n请求地址:" + url)
print("-------------------------返回结果:-------------------------")
print(resObj.text)
config.EX_SID = result["result"]["sid"]
config.EX_UID = result["result"]["uid"]
self.baseAssert(resObj)
##########################################################
# demo车,查询车列表
##########################################################
def test_car_my_car_list(self):
url = config.EX_HTTP_PREFIX + config.EX_HOST + "/car/my_car/list"
ts = int(time.time())
headers = config.EX_HEADER
params = {}
params["sid"] = config.EX_SID
params["ts"] = ts
params["uid"] = config.EX_UID
cs = utils.getSignature(params)
params["cs"] = cs
params.pop("sid")
resObj = requests.get(url, headers=headers, params=params,verify=config.SSL_VERIFY)
result = json.loads(resObj.text)
print(result)
print("\n请求地址:" + url)
print("-------------------------返回结果:-------------------------")
print(resObj.text)
config.EX_CAR_ID = result["result"][0]["cid"]
self.baseAssert(resObj)
##########################################################
# demo车,辆动态信息
##########################################################
def test_carDynamic_api_status_car_state_get(self):
url = config.EX_HTTP_PREFIX + config.EX_HOST + "/carDynamic/api/status/car/state/get"
ts = int(time.time())
headers = config.EX_HEADER
params = {}
params["sid"] = config.EX_SID
params["ts"] = ts
params["start"] = ts
params["uid"] = config.EX_UID
params["cid"] = config.EX_CAR_ID
cs = utils.getSignature(params)
params["cs"] = cs
params.pop("sid")
resObj = requests.get(url, headers=headers, params=params,verify=config.SSL_VERIFY)
result = json.loads(resObj.text)
print(type(result))
print(result)
print("\n请求地址:" + url)
print("-------------------------返回结果:-------------------------")
print(resObj.text)
# self.assertTrue(durTime < 11000, "demo车机已经有超过3小时未启动,请检查是否出现问题")
# self.baseAssert(resObj)
##########################################################
# 判断 demo车是否正常行驶
##########################################################
def test_car_api_locus_list_page(self):
url = config.EX_HTTP_PREFIX + config.EX_HOST + "/car/api/locus/list/page"
ts = int(time.time())
headers = config.EX_HEADER
params = {}
params["sid"] = config.EX_SID
params["ts"] = ts
params["start"] = ts
params["uid"] = config.EX_UID
params["cid"] = config.EX_CAR_ID
params["size"] = 10
params["sort"] = 1
cs = utils.getSignature(params)
params["cs"] = cs
params.pop("sid")
resObj = requests.get(url, headers=headers, params=params,verify=config.SSL_VERIFY)
result = json.loads(resObj.text)
print(type(result))
print(result)
print("\n请求地址:" + url)
print("-------------------------返回结果:-------------------------")
print(resObj.text)
lastTime = int(result["result"][0]["sti"]) #车机最近一次轨迹的开始时间
durTime = ts - lastTime
self.assertTrue(durTime < 11000, "demo车机已经有超过3小时未启动,请检查是否出现问题")
# self.assertTrue(durTime < 3000, "demo车机已经有超过3小时未启动,请检查是否出现问题")
self.baseAssert(resObj)
##########################################################
# 接口基本断言,需要传入一个返回结果对象
##########################################################
def baseAssert(self,resObj):
statusCode = int(resObj.status_code) # 响应状态码
resTime = int(resObj.elapsed.total_seconds() * 1000) # 获取接口相应时间(毫秒)
result = json.loads(resObj.text)
error = result["error"]
self.assertEqual(statusCode, 200, "响应状态码为:" + str(statusCode))
self.assertTrue(resTime < 4000, "响应时间大于4秒,响应时间为:" + str(resTime) + "毫秒")
# self.assertEqual(error, 0, "接口返回错误:" + str(error) + " content: " + resObj.text)
self.assertEqual(error, 0, "接口返回错误")
print("接口响应时间:" + str(resTime) + "毫秒")
def startTest(self):
suite = unittest.TestSuite()
suite.addTest(DemoCar_case("test_user_authorize_pwd_login")) # demo车登录接口
suite.addTest(DemoCar_case("test_car_my_car_list")) # demo车,查询车列表
suite.addTest(DemoCar_case("test_carDynamic_api_status_car_state_get")) # demo车,辆动态信息
suite.addTest(DemoCar_case("test_car_api_locus_list_page")) # 判断 demo车是否正常行驶
runner = unittest.TextTestRunner()
runner.run(suite)
if __name__ == "__main__":
DemoCar_case().startTest()
\ No newline at end of file
......@@ -8,7 +8,9 @@ import platform
import time
import unittest
from testCase.CarDynamic_interface_cases import CarDynamic_interface_cases
from testCase.Car_interface_cases import Car_interface_cases
from testCase.DemoCar_case import DemoCar_case
from testCase.User_interface_cases import User_interface_cases
from testCase.common.EmailTool import EmailTool
from testCase.common.HTMLTestRunner import HTMLTestRunner
......@@ -29,6 +31,7 @@ class MainCase():
self.msg = "邮件内容" # 邮件正文
self.smtp_server = "smtp.exmail.qq.com" # 发信服务器
self.title = "车安优拨测报告" # 邮件标题
self.durTime = 1 * 5 * 60 # 多久执行一次拨测程序(秒)
def getSucessCount(self):
return self.sucessCount
......@@ -52,6 +55,8 @@ class MainCase():
self.title = data
def set_msg(self,data):
self.msg = data
def setDurTime(self,data):
self.durTime = data
#########################################################
# 添加收件人
......@@ -75,22 +80,40 @@ class MainCase():
###########################################################
def startTest(self):
suite = unittest.TestSuite()
suite.addTest(User_interface_cases("test_user_authorize_pwd_login")) #用户手机密码登录
suite.addTest(User_interface_cases("test_user_mall_goods_list")) #商品列表
suite.addTest(User_interface_cases("test_user_third_party_app_list")) #第三方应用列表
suite.addTest(User_interface_cases("test_user_business_cooperation_get")) #查询商务合作信息
suite.addTest(User_interface_cases("test_user_info_get")) #获取用户基本信息
suite.addTest(User_interface_cases("test_user_pop_list")) #获取弹框广告
suite.addTest(User_interface_cases("test_user_authorize_pwd_login")) # 用户手机密码登录
suite.addTest(User_interface_cases("test_user_mall_goods_list")) # 商品列表
suite.addTest(User_interface_cases("test_user_third_party_app_list")) # 第三方应用列表
suite.addTest(User_interface_cases("test_user_business_cooperation_get")) # 查询商务合作信息
suite.addTest(User_interface_cases("test_user_info_get")) # 获取用户基本信息
suite.addTest(User_interface_cases("test_user_pop_list")) # 获取弹框广告
suite.addTest(User_interface_cases("test_user_banner_list")) # banner图片列表
suite.addTest(User_interface_cases("test_user_flash_list")) # 获取闪屏广告
suite.addTest(User_interface_cases("test_user_car_msg_config_get")) # 获取车消息开关
suite.addTest(User_interface_cases("test_user_user_msg_config_get")) # 获取用户推送消息开关
suite.addTest(User_interface_cases("test_user_config_version_update_get")) # 版本更新
suite.addTest(User_interface_cases("test_user_menu_list")) #用户菜单列表
suite.addTest(Car_interface_cases("test_car_my_car_list")) #查询车列表
suite.addTest(Car_interface_cases("test_car_my_car_info")) #查询车信息
suite.addTest(Car_interface_cases("test_car_my_man_speak")) #查询人车对话
suite.addTest(User_interface_cases("test_user_menu_list")) # 用户菜单列表
suite.addTest(Car_interface_cases("test_car_brand_list")) # 车品牌列表
suite.addTest(Car_interface_cases("test_car_my_car_list")) # 查询车列表
suite.addTest(Car_interface_cases("test_car_my_car_info")) # 查询车信息
suite.addTest(Car_interface_cases("test_car_man_speak")) # 查询人车对话
suite.addTest(Car_interface_cases("test_car_api_reprt_reportDay_get")) # 查询日报告
suite.addTest(Car_interface_cases("test_car_api_locus_list_page")) # 查询历史轨迹列表
suite.addTest(Car_interface_cases("test_car_api_locus_points_list")) # 查询轨迹点
suite.addTest(Car_interface_cases("test_car_car_health")) # 车辆健康检测
suite.addTest(Car_interface_cases("test_car_car_health_battery")) # 车辆健康检测-电瓶检测结果
suite.addTest(Car_interface_cases("test_car_car_health_brake")) # 车辆健康检测-刹车系统检测结果
suite.addTest(Car_interface_cases("test_car_car_health_engine")) # 车辆健康检测-发动机检测结果
suite.addTest(Car_interface_cases("test_car_car_health_gearbox")) # 车辆健康检测-变速箱检测结果
suite.addTest(Car_interface_cases("test_car_car_health_oilbox")) # 车辆健康检测-油箱检测结果
suite.addTest(Car_interface_cases("test_car_car_health_total_user")) # 车辆健康检测总用户数
suite.addTest(DemoCar_case("test_user_authorize_pwd_login")) # demo车登录接口
suite.addTest(DemoCar_case("test_car_my_car_list")) # demo车,查询车列表
suite.addTest(DemoCar_case("test_carDynamic_api_status_car_state_get")) # demo车,辆动态信息
suite.addTest(DemoCar_case("test_car_api_locus_list_page")) # 判断 demo车是否正常行驶
suite.addTest(CarDynamic_interface_cases("test_carDynamic_api_status_car_state_get")) # 车辆动态信息
ts = time.time()
timeArray = time.localtime(ts)
......@@ -122,7 +145,7 @@ class MainCase():
self.msg = self.msg + "\n 成功个数:" + str(self.sucessCount)
self.msg = self.msg + "\n 失败个数:" + str(self.failureCount)
self.msg = self.msg + "\n 错误个数:" + str(self.errorCount)
self.msg = self.msg + "\n\n测试详情,详见附件..."
self.msg = self.msg + "\n\n测试报告详情,详见附件..."
fp.close()
......@@ -152,12 +175,19 @@ class MainCase():
# 定时测试服务,如果有失败用例,会自动发送邮件通知
###########################################################
def startTestService(self):
self.startTest()
if self.failureCount > 0 or self.errorCount > 0:
self.sendEmail()
startTime = int(time.time())
while True:
startTime = int(time.time())
endTime = int(time.time())
self.startTest()
if self.failureCount > 0 or self.errorCount > 0:
self.sendEmail()
if endTime - startTime >= self.durTime:
self.startTest()
if self.failureCount > 0 or self.errorCount > 0:
self.sendEmail()
startTime = int(time.time())
else:
time.sleep(1)
###########################################################
# 发送邮件
......
......@@ -28,7 +28,7 @@ class User_interface_cases(unittest.TestCase):
postData = {}
postData["mobile"] = config.USER_NAME
postData["password"] = utils.getMd5String(config.PASSWORD)
resObj = requests.post(url, headers=headers, params=params, data=json.dumps(postData))
resObj = requests.post(url, headers=headers, params=params, data=json.dumps(postData),verify=config.SSL_VERIFY)
result = json.loads(resObj.text)
print("\n请求地址:" + url)
print("-------------------------返回结果:-------------------------")
......@@ -36,6 +36,23 @@ class User_interface_cases(unittest.TestCase):
config.SID = result["result"]["sid"]
config.UID = result["result"]["uid"]
self.baseAssert(resObj)
#获取car_id
url = config.HTTP_PREFIX + config.HOST + "/car/my_car/list"
ts = int(time.time())
headers = {"Accept-Encoding": "gzip", "Content-Type": "application/json; charset=UTF-8"}
params = {}
params["sid"] = config.SID
params["ts"] = ts
params["uid"] = config.UID
cs = utils.getSignature(params)
params["cs"] = cs
params.pop("sid")
resObj = requests.get(url, headers=headers, params=params,verify=config.SSL_VERIFY)
result = json.loads(resObj.text)
print("\n请求地址:" + url)
print("-------------------------返回结果:-------------------------")
print(resObj.text)
config.CAR_ID = result["result"][0]["cid"]
##########################################################
# 商品列表
......@@ -52,7 +69,7 @@ class User_interface_cases(unittest.TestCase):
params["cs"] = cs
params.pop("sid")
postData = {}
resObj = requests.post(url, headers=headers, params=params,data=json.dumps(postData))
resObj = requests.post(url, headers=headers, params=params,data=json.dumps(postData),verify=config.SSL_VERIFY)
print("\n请求地址:" + url)
print("-------------------------返回结果:-------------------------")
print(resObj.text)
......@@ -72,7 +89,7 @@ class User_interface_cases(unittest.TestCase):
cs = utils.getSignature(params)
params["cs"] = cs
params.pop("sid")
resObj = requests.get(url, headers=headers, params=params)
resObj = requests.get(url, headers=headers, params=params,verify=config.SSL_VERIFY)
print("\n请求地址:" + url)
print("-------------------------返回结果:-------------------------")
print(resObj.text)
......@@ -92,7 +109,7 @@ class User_interface_cases(unittest.TestCase):
cs = utils.getSignature(params)
params["cs"] = cs
params.pop("sid")
resObj = requests.get(url, headers=headers, params=params)
resObj = requests.get(url, headers=headers, params=params,verify=config.SSL_VERIFY)
print("\n请求地址:" + url)
print("-------------------------返回结果:-------------------------")
print(resObj.text)
......@@ -112,7 +129,7 @@ class User_interface_cases(unittest.TestCase):
cs = utils.getSignature(params)
params["cs"] = cs
params.pop("sid")
resObj = requests.get(url, headers=headers, params=params)
resObj = requests.get(url, headers=headers, params=params,verify=config.SSL_VERIFY)
print("\n请求地址:" + url)
print("-------------------------返回结果:-------------------------")
print(resObj.text)
......@@ -133,7 +150,7 @@ class User_interface_cases(unittest.TestCase):
cs = utils.getSignature(params)
params["cs"] = cs
params.pop("sid")
resObj = requests.get(url, headers=headers, params=params)
resObj = requests.get(url, headers=headers, params=params,verify=config.SSL_VERIFY)
print("\n请求地址:" + url)
print("-------------------------返回结果:-------------------------")
print(resObj.text)
......@@ -156,7 +173,7 @@ class User_interface_cases(unittest.TestCase):
cs = utils.getSignature(temp)
params["cs"] = cs
params.pop("sid")
resObj = requests.post(url, headers=headers, params=params,data=json.dumps(postData))
resObj = requests.post(url, headers=headers, params=params,data=json.dumps(postData),verify=config.SSL_VERIFY)
print("\n请求地址:" + url)
print("-------------------------返回结果:-------------------------")
print(resObj.text)
......@@ -176,7 +193,7 @@ class User_interface_cases(unittest.TestCase):
cs = utils.getSignature(params)
params["cs"] = cs
params.pop("sid")
resObj = requests.get(url, headers=headers, params=params)
resObj = requests.get(url, headers=headers, params=params,verify=config.SSL_VERIFY)
print("\n请求地址:" + url)
print("-------------------------返回结果:-------------------------")
print(resObj.text)
......@@ -197,7 +214,7 @@ class User_interface_cases(unittest.TestCase):
cs = utils.getSignature(params)
params["cs"] = cs
params.pop("sid")
resObj = requests.get(url, headers=headers, params=params)
resObj = requests.get(url, headers=headers, params=params,verify=config.SSL_VERIFY)
print("\n请求地址:" + url)
print("-------------------------返回结果:-------------------------")
print(resObj.text)
......@@ -217,7 +234,7 @@ class User_interface_cases(unittest.TestCase):
cs = utils.getSignature(params)
params["cs"] = cs
params.pop("sid")
resObj = requests.get(url, headers=headers, params=params)
resObj = requests.get(url, headers=headers, params=params,verify=config.SSL_VERIFY)
print("\n请求地址:" + url)
print("-------------------------返回结果:-------------------------")
print(resObj.text)
......@@ -239,7 +256,7 @@ class User_interface_cases(unittest.TestCase):
params["os"] = 2
cs = utils.getSignature(params)
params["cs"] = cs
resObj = requests.get(url, headers=headers, params=params)
resObj = requests.get(url, headers=headers, params=params,verify=config.SSL_VERIFY)
print("\n请求地址:" + url)
print("-------------------------返回结果:-------------------------")
print(resObj.text)
......@@ -259,7 +276,7 @@ class User_interface_cases(unittest.TestCase):
cs = utils.getSignature(params)
params["cs"] = cs
params.pop("sid")
resObj = requests.get(url, headers=headers, params=params)
resObj = requests.get(url, headers=headers, params=params,verify=config.SSL_VERIFY)
print("\n请求地址:" + url)
print("-------------------------返回结果:-------------------------")
print(resObj.text)
......@@ -275,7 +292,8 @@ class User_interface_cases(unittest.TestCase):
error = result["error"]
self.assertEqual(statusCode, 200, "响应状态码为:" + str(statusCode))
self.assertTrue(resTime < 4000, "响应时间大于4秒,响应时间为:" + str(resTime) + "毫秒")
self.assertEqual(error, 0, "接口返回错误:" + str(error) + " content: " + resObj.text)
# self.assertEqual(error, 0, "接口返回错误:" + str(error) + " content: " + resObj.text)
self.assertEqual(error, 0, "接口返回错误")
print("接口响应时间:" + str(resTime) + "毫秒")
def startTest(self):
......
#coding: utf-8
import smtplib
from email import encoders
from email.mime.application import MIMEApplication
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.header import Header
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment