霍格沃兹测试开发学社
├── apis
│ ├── __init__.py
│ ├── base_api.py
│ ├── department.py
│ └── wework.py
├── config
│ ├── __init__.py
│ ├── config.yaml
│ └── secrets.yaml
├── tests
│ ├── __init__.py
│ ├── schema.json
│ ├── test_departments.py
│ └── test_department_flow.py
└── utils
├── __init__.py
├── log_utils.py
└── utils.py
形式 | 章节 |
---|---|
知识点 | 接口测试框架搭建 |
知识点 | 整体结构响应断言 |
知识点 | 数据库操作与断言 |
请求方式:GET/POST(HTTPS)
请求地址:https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=ID&corpsecret=SECRET
请求包体:
...
参数说明:
...
权限说明:
...
返回结果:
...
参数说明:
...
class TestToken:
def test_get_token(self):
'''
获取 access_token
:return:
'''
# 定义凭证
corpid = "xxx"
corpsecret = "xxx"
url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpid}&corpsecret={corpsecret}"
# 发get请求
r = requests.get(url=url)
# 打印响应
print(r.json())
class TestToken:
def test_get_token2(self):
'''
获取token的第二种方法
:return:
'''
# 定义凭证
corpid = "xxx"
corpsecret = "xxx"
url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
# 定义param
params = {
"corpid": corpid,
"corpsecret": corpsecret
}
# 发get请求
r = requests.request(method="GET", url=url, params=params)
# 打印响应
print(r.json())
class TestWework:
def setup_class(self):
# 定义凭证
corpid = "xxx"
corpsecret = "xxx"
url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
# 定义param
params = {
"corpid": corpid,
"corpsecret": corpsecret
}
# 发get请求
r = requests.request(method="GET", url=url, params=params)
self.token = r.json()["access_token"]
self.department_id = 3
def test_create_department(self):
url = "https://qyapi.weixin.qq.com/cgi-bin/department/create"
data = {
"name": "广州研发中心2",
"name_en": "RDGZ2",
"parentid": 1,
"order": 1,
"id": self.department_id
}
params = {
"access_token": self.token
}
r = requests.request(method="POST", url=url, params = params,json=data)
assert r.json()["errcode"] == 0
# 通过得到查询部门列表接口的返回值, 实现查看部门是否新建成功
list = self.test_get_departments()
assert list["department"][1]["name"] == "广州研发中心2"
class TestDepartments:
"""
单接口验证
"""
def setup_class(self):
# 定义凭证
corpid = "xxx"
corpsecret = "xxx"
url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
# 定义param
params = {
"corpid": corpid,
"corpsecret": corpsecret
}
# 发get请求
r = requests.request(method="GET", url=url, params=params)
self.token = r.json()["access_token"]
@pytest.mark.parametrize(
"name, name_en, parentid, order, depart_id, expect",
[
("技术部", "JISHU1", 1, 1, 2, 0),
("技术部1t6yujk9osjhynnj890lkmbg54321", "JISHU2", 1, 2, 3, 0),
("技术部23", "JISHU23", 1, 23, 1, 60123)
]
)
def test_create_department(self, name, name_en, parentid, order, depart_id, expect):
url = "https://qyapi.weixin.qq.com/cgi-bin/department/create"
data = {
"name": name,
"name_en": name_en,
"parentid": parentid,
"order": order,
"id": depart_id
}
params = {
"access_token": self.token
}
r = requests.request(method="POST", url=url, params=params, json=data)
print(r.json())
assert r.json()["errcode"] == expect
# 创建一个日志模块: log_util.py
import logging
import os
from logging.handlers import RotatingFileHandler
# 绑定绑定句柄到logger对象
logger = logging.getLogger(__name__)
# 获取当前工具文件所在的路径
root_path = os.path.dirname(os.path.abspath(__file__))
# 拼接当前要输出日志的路径
log_dir_path = os.sep.join([root_path, f'/logs'])
if not os.path.isdir(log_dir_path):
os.mkdir(log_dir_path)
# 创建日志记录器,指明日志保存路径,每个日志的大小,保存日志的上限
file_log_handler = RotatingFileHandler(os.sep.join([log_dir_path, 'log.log']), maxBytes=1024 * 1024, backupCount=10 , encoding="utf-8")
# 设置日志的格式
date_string = '%Y-%m-%d %H:%M:%S'
formatter = logging.Formatter(
'[%(asctime)s] [%(levelname)s] [%(filename)s]/[line: %(lineno)d]/[%(funcName)s] %(message)s ', date_string)
# 日志输出到控制台的句柄
stream_handler = logging.StreamHandler()
# 将日志记录器指定日志的格式
file_log_handler.setFormatter(formatter)
stream_handler.setFormatter(formatter)
# 为全局的日志工具对象添加日志记录器
# 绑定绑定句柄到logger对象
logger.addHandler(stream_handler)
logger.addHandler(file_log_handler)
# 设置日志输出级别
logger.setLevel(level=logging.INFO)
# 数据库
def query_db(cls, sql, database_info):
# 连接数据库
conn = pymysql.Connect(**database_info)
# 创建游标
cursor = conn.cursor()
# 执行 SQL 语句
cursor.execute(sql)
# 获取查询结果
datas = cursor.fetchall()
print("查询到的数据为:", datas) # 获取多条数据
# 关闭连接
cursor.close()
conn.close()
return datas
# 整体结构响应断言
class SchemaUtils:
@classmethod
def schema_validate_by_file(cls, obj,file_path):
schema_data = json.load(open(file_path))
return cls.schema_validate(obj, schema_data)
@classmethod
def schema_validate(cls, obj, schema):
'''
对比 python 对象与生成的 json schame 的结构是否一致
'''
try:
validate(instance=obj, schema=schema)
return True
except Exception as e:
log.error(f"schema 校验异常 =======> {e}")
return False
@allure.feature("部门管理")
class TestDepartmentFlow:
def setup_class(self):
# 实例化部门类
self.department = Department("hogwarts", "contacts")
# 准备测试数据
self.department_id = 210
self.create_data = {
"name": "广州研发中心",
"name_en": "RDGZ",
"parentid": 1,
"order": 1,
"id": self.department_id
}
self.update_data = {
"id": self.department_id,
"name": "广州研发中心-update"
}
# 清除部门数据
self.department.clear()
@allure.story("部门操作场景用例")
def test_department_flow(self):
'''
部门增删改查场景测试
'''
with allure.step("创建部门"):
# 创建部门
r = self.department.create(self.create_data)
assert r.status_code == 200
assert r.json().get("errcode") == 0
with allure.step("查询部门创建结果"):
# 查询是否创建成功
r = self.department.get()
# depart_ids = [depart.get("id") for depart in r.json().get("department_id")]
depart_ids = jsonpath(r.json(), "$..id")
assert self.department_id in depart_ids
with allure.step("更新部门信息"):
# 更新部门信息
r = self.department.update(self.update_data)
assert r.status_code == 200
assert r.json().get("errcode") == 0
# 执行测试,搜集测试结果
pytest --alluredir=./results --clean-alluredir
# 生成在线的测试报告
allure serve ./results