霍格沃兹测试开发
功能点 | 描述 | 案例 |
---|---|---|
项目管理 | 对于项目的分类,通常以业务线为维度 | 商城、飞书、企业微信 |
测试类型 | 手工测试、自动化测试 | pytest、JUnit、HttpRunner、Postman |
测试用例 | 测试用例的信息 | 标题、步骤、预期结果、实际结果 |
测试套件 | 用例的集合 | 通常是用例从不同维度组合,冒烟测试用例集,某业务线用例集 |
测试装置 | 用例执行前后的准备工作,处理工作 | 用例依赖环境的安装,比如 python requirements |
测试 Runner | 测试用例执行器调度器 | 执行环境的管理:性能测试环境、兼容性环境 |
测试报告 | 测试结果的体现 | allure 报告、手工报告 |
公司收益(提效):
市场收益:
参考知识 xUnit:https://en.wikipedia.org/wiki/XUnit
前后端的录播全部看完,并且完成前后端的实战练习
https://course.hogwarts.ceshiren.com/courses/进阶班/课程大纲Python/#-python
如果一家饭店,从服务员->厨师->收银都是一个人,会成什么样子?
各司其职(高内聚),轻松协作(低耦合),就是分层思想的目标。
# 创建用例表
class TestcaseDo(db.Model):
# 表名
__tablename__ = "testcase"
# 用例ID 用例的唯 一标识
id = db.Column(Integer, primary_key=True)
# 用例的标题, 限定 80个字符 ,不为空,并且唯一
name = db.Column(String(80), nullable=False, unique=True)
# 用例的步骤
step = db.Column(String(120))
# 用例的自动化方法
method = db.Column(String(120))
# 备注
remark = db.Column(String(120))
# 创建用例表
class TestcaseDo(db.Model):
# 表名
__tablename__ = "testcase"
# 用例ID 用例的唯 一标识
id = db.Column(Integer, primary_key=True)
# 用例的标题, 限定 80个字符 ,不为空,并且唯一
name = db.Column(String(80), nullable=False, unique=True)
# 用例的步骤
step = db.Column(String(120))
# 用例的自动化方法
method = db.Column(String(120))
# 备注
remark = db.Column(String(120))
# 创建测试计划表
class PlanDo(db.Model):
# 表名
__tablename__ = "plan"
# 用例ID 用例的唯 一标识
id = db.Column(Integer, primary_key=True)
# 计划名称,限定 80个字符 ,不为空,并且唯一
name = db.Column(String(80), nullable=False, unique=True)
# 绑定的测试用例
# 参数1: 关联的另外一个业务表类名, 参数2: 中间表 , 参数3: 反射别名
testcases: List[TestcaseDo] = relationship("TestcaseDo", secondary=testcase_plan_rel, backref="plans")
# 创建构建记录表
class BuildDo(db.Model):
# 表名
__tablename__ = "build"
# 用例ID 用例的唯 一标识
id = db.Column(Integer, primary_key=True)
# 计划id,不为空,并且唯一,外键写在多的一方,比如 plan-> build 是一对多,那么外键就在build这方。
plan_id = db.Column(Integer, ForeignKey('plan.id', ondelete='CASCADE'))
# 报告地址,限定 80个字符 ,不为空,并且唯一
report = db.Column(String(80), nullable=False)
# 生成时间格式, 创建时间,通常不需要手动传入,在写入记录的时候自动生成
create_time = Column(DateTime, nullable=True, default=datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
# 参数1: 关联的另外一个业务表类名, 参数2: 反射别名
plan: PlanDo = relationship("PlanDo", backref="build")
# 中间表
# 中间表表名
# 测试用例的外键
# 计划的外键
testcase_plan_rel = db.Table(
'testcase_plan_rel',
# 参数一: 表名_id, 参数二:整型,参数3: 外键字符串('表名.id', 参数4: 是否为主键)
Column('testcase_id', Integer, ForeignKey('testcase.id', ondelete='CASCADE'), primary_key=True),
Column('plan_id', Integer, ForeignKey('plan.id', ondelete='CASCADE'), primary_key=True))
class UserDo(db.Model):
# 表名
__tablename__ = "user"
# 用例ID 用户的唯 一标识
id = db.Column(Integer, primary_key=True)
# 用户名, 限定 80个字符 ,不为空,并且唯一
username = db.Column(String(80), nullable=False, unique=True)
# 密码
password = db.Column(String(500), nullable=False)
# 生成时间格式, 创建时间,通常不需要手动传入,在写入记录的时候自动生成
create_time = Column(DateTime, nullable=True, default=datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
def __init__(self, *args, **kwargs):
self.username = kwargs.get('username')
self.password = generate_password_hash(kwargs.get('password'))
# 在登陆时候的验证操作
# 这里的参数是hash过的参数以及原始传入hash
def check_hash_password(self, raw_password):
password = check_password_hash(self.password, raw_password)
return password
def as_dict(self):
return {
"id": self.id,
"username": self.username,
"password": self.password,
"create_time": str(self.create_time)
}
from do.build_do import BuildDo
from do.plan_do import PlanDo
from do.testcase_do import TestcaseDo
from do.testcase_plan_rel import testcase_plan_rel
from do.user_do import UserDo
from server import db
if __name__ == '__main__':
db.create_all()
完成测试计划、测试用例、构建记录的表创建
# Dao 负责和数据库的交互
class TestcaseDao:
"""
TestcaseDao
"""
def get(self, testcase_id) -> TestcaseDo:
"""
根据id返回数据
:return:
"""
# TestcaseDo.query.filter()
return db_session.query(TestcaseDo).filter_by(id=testcase_id).first()
def list(self) -> List[TestcaseDo]:
"""
返回所有
:return:
"""
return db_session.query(TestcaseDo).all()
# 如果涉及到数据的改动,一定要添加commit
def save(self, testcase_do: TestcaseDo):
"""
保存数据
:return:
"""
# testcase = TestCase(**case_data)
db_session.add(testcase_do)
db_session.commit()
return testcase_do.id
def update(self, testcase_do: TestcaseDo):
"""
修改数据
:return:
"""
# add 底层支持 保存 或者更新 _save_or_update_state
db_session.query(TestcaseDo).filter_by(id=testcase_do.id).update(testcase_do.as_dict())
db_session.commit()
return testcase_do.id
def delete(self, testcase_id):
"""
删除数据
"""
db_session.query(TestcaseDo).filter_by(id=testcase_id).delete()
db_session.commit()
def get_by_name(self, name) -> TestcaseDo:
"""
通过用例name查询数据
:return:
"""
return db_session.query(TestcaseDo).filter_by(name=name).first()
# Dao 负责和数据库的交互
class PlanDao:
"""
PlanDao
"""
def get(self, plan_id) -> PlanDo:
"""
根据id返回数据
:return:
"""
return db_session.query(PlanDo).filter_by(id=plan_id).first()
def list(self) -> List[PlanDo]:
"""
返回所有
:return:
"""
return db_session.query(PlanDo).all()
def save(self, plan_do: PlanDo):
"""
保存数据
:return:
"""
db_session.add(plan_do)
db_session.commit()
return plan_do.id
def delete(self, plan_id):
"""
删除数据
"""
db_session.query(PlanDo).filter_by(id=plan_id).delete()
db_session.commit()
def get_by_name(self, name) -> PlanDo:
"""
通过name查询数据
:return:
"""
return db_session.query(PlanDo).filter_by(name=name).first()
# Dao 负责和数据库的交互
class BuildDao:
"""
BuildDao
"""
def get(self, build_id) -> BuildDo:
"""
根据id返回数据
:return:
"""
return db_session.query(BuildDo).filter_by(id=build_id).first()
def list(self) -> List[BuildDo]:
"""
返回所有
:return:
"""
return db_session.query(BuildDo).all()
def save(self, build_do):
"""
保存数据
:return:
"""
db_session.add(build_do)
db_session.commit()
return build_do.id
def delete(self, build_id):
"""
删除数据
"""
db_session.query(BuildDo).filter_by(id=build_id).delete()
db_session.commit()
class UserDao:
def get(self, user_id) -> UserDo:
# 根据id返回数据
return db_session.query(UserDo).filter_by(id=user_id).first()
def list(self) -> List[UserDo]:
# 返回所有
return db_session.query(UserDo).all()
def save(self, user_do: UserDo):
# 新增数据
db_session.add(user_do)
db_session.commit()
return user_do.id
def update(self, user_do: UserDo):
# 修改数据
db_session.query(UserDo).filter_by(id=user_do.id).update(user_do.as_dict())
db_session.commit()
return user_do.id
def delete(self, user_id):
# 删除数据
db_session.query(UserDo).filter_by(id=user_id).delete()
db_session.commit()
return user_id
def get_by_name(self, username) -> UserDo:
# 通过用例名称查询数据
return db_session.query(UserDo).filter_by(username=username).first()
完成测试计划、测试用例、构建记录的Dao层代码
class TestcaseService:
def get(self, testcase_id):
"""
通过用例id查询数据
:return:
"""
return testcase_dao.get(testcase_id)
def list(self):
"""
返回所有
:return:
"""
return testcase_dao.list()
def save(self, testcase_do: TestcaseDo):
"""
1. 查询数据是否存在
2. 如果存在返回失败
3. 如果不存在,新增
:param testcase_do:
:return:
"""
testcase = self.get_by_name(testcase_do.name)
if not testcase:
return testcase_dao.save(testcase_do)
else:
return False
def update(self, testcase_do: TestcaseDo):
"""
如果测试用例存在才更新,反之返回错误
:param testcase_do:
:return:
"""
testcase = self.get(testcase_do.id)
if not testcase:
return False
else:
# 传入被更新的测试用例的对象
return testcase_dao.update(testcase_do)
def delete(self, testcase_id):
"""
如果测试用例存在才删除,反之返回错误
:param testcase_id: 测试用例删除
:return:
"""
testcase = self.get(testcase_id)
if not testcase:
return False
else:
testcase_dao.delete(testcase_id)
return testcase_id
def get_by_name(self, name):
"""
通过用例name查询数据
:return:
"""
return testcase_dao.get_by_name(name)
class PlanService:
def list(self):
"""
查询所有的测试任务
:return:
"""
return plan_dao.list()
def get(self, plan_id):
"""
查询单个的测试任务
:param plan_id:
:return:
"""
return plan_dao.get(plan_id)
def save(self, testcase_id_list, plan_do: PlanDo):
"""
:return: 生成的测试任务的id
"""
testcase = self.get_by_name(plan_do.name)
if not testcase:
# testcase_id_list = [1,2,3] 转换为 [TestcaseDo1, TestcaseDo2]
# plan_do.testcases 要求 的输入格式为 [TestcaseDo1, TestcaseDo2]
testcase_obj_list = [testcase_service.get(testcase_id) for testcase_id in testcase_id_list]
# 多对多情况下建立关联关系,必须使用 实例.关联表属性 = [关联表对象]
plan_do.testcases = testcase_obj_list
return plan_dao.save(plan_do)
else:
return False
def delete(self, plan_id):
"""
存在才删除,反之返回错误
:param plan_id: 计划id
:return:
"""
plan = self.get(plan_id)
if not plan:
return False
else:
plan_dao.delete(plan_id)
return plan_id
def get_by_name(self, name):
"""
通过name查询数据
:return:
"""
return plan_dao.get_by_name(name)
class JenkinsUtils:
# Jenkins 服务
BASE_URL = "http://42.192.73.147:7080/"
# Jenkins 服务对应的用户名
USERNAME = "admin"
# Jenkins 服务对应的token
PASSWORD = "1141cbca1d1b9ac4781d0c255a5a478fce"
JOB = "tpf"
@classmethod
def invoke(cls, invoke_params):
"""
执行构建任务
:return:
"""
jenkins_hogwarts = Jenkins(cls.BASE_URL, cls.USERNAME, cls.PASSWORD)
# 获取Jenkins 的job 对象
job = jenkins_hogwarts.get_job(cls.JOB)
# 构建hogwarts job, 传入的值必须是字典, key 对应 jenkins 设置的参数名
# job.invoke(build_params={"task": "CalculatorProject/test/cases/test_div.py"})
job.invoke(build_params={"methods": invoke_params})
# 获取job 最后一次完成构建的编号
# http://47.92.149.0:8080/job/ck24/20/allure/
last_build_number = job.get_last_buildnumber()+1
# pytest 用例名称 指定报告生成地址
# pytest $task --alluredir=path
report = f"{cls.BASE_URL}job/{cls.JOB}/{last_build_number}/allure/"
return report
class BuildService:
def list(self):
"""
查询所有的测试任务
:return:
"""
return build_dao.list()
def get(self, build_id):
"""
根据id查询构建
:return:
"""
return build_dao.get(build_id)
def save(self, plan_id):
"""
保存构建任务
:return:
"""
# 根据测试计划id获取自动化方法名
plan_detail = plan_service.get(plan_id)
method_list = [testcase.method for testcase in plan_detail.testcases]
methods = " ".join(method_list)
# 执行测试用例
report = JenkinsUtils.invoke(methods)
# 构建记录添加report信息
logger.info(f"report<====== {report}")
build_do = BuildDo(plan_id=plan_id, report=report)
# 创建构建记录
return build_dao.save(build_do)
def delete(self, build_id):
"""
存在才删除,反之返回错误
:param build_id: build_id
:return:
"""
plan = self.get(build_id)
if not plan:
return False
else:
build_dao.delete(build_id)
return build_id
class UserService:
def get(self, user_id):
# 通过id查询数据
return user_dao.get(user_id)
def list(self):
# 返回所有
return user_dao.list()
def save(self, user_do: UserDo):
# 保存之前先查询数据是否存在
# 存在返回false,不存在则进行新增
user = self.get_by_name(user_do.username)
if not user:
return user_dao.save(user_do)
else:
return False
def update(self, user_do: UserDo):
# 用例存在则进行更新修改,否则返回错误
user = self.get(user_do.id)
if not user:
return False
else:
return user_dao.update(user_do)
def delete(self, user_id):
# 用户存在则进行删除,否则返回错误
user = self.get(user_id)
if not user:
return False
else:
return user_dao.delete(user_id)
def get_by_name(self, username):
return user_dao.get_by_name(username)
def create_access_token(self, user_do):
return create_access_token(identity=user_do)
@jwt.user_lookup_loader
def user_lookup_callback(self, _jwt_header, jwt_data):
# 获取 username
username = jwt_data["sub"]["username"]
# 返回通过 username 查询用户的结果
return self.get_by_name(username)
完成测试计划、测试用例、构建记录的Service层代码
case_ns = Namespace("case", description="用例管理")
testcase_service = TestcaseService()
@case_ns.route("")
class TestcaseController(Resource):
testcase_get_parser = api.parser()
testcase_get_parser.add_argument("id", type=int, location="args")
@case_ns.expect(testcase_get_parser)
def get(self):
"""
测试用例的查找
:return:
"""
id = request.args.get("id")
if id:
data = testcase_service.get(id)
if data:
datas = [testcase_service.get(id).as_dict()]
return {"code": 0, "msg": "data success get", "data": datas}
else:
return {"code": 40004, "msg": "data is not exists"}
else:
datas = [testcase.as_dict() for testcase in testcase_service.list()]
return {"code": 0, "msg": "data success get", "data": datas}
testcase_post_model = case_ns.model("testcase_post_model", {
"name": fields.String,
"step": fields.String,
"method": fields.String,
"remark": fields.String
})
@case_ns.expect(testcase_post_model)
def post(self):
"""
测试用例的新增
:return:
"""
data = request.json
# {a=1,b=2} a=1,b=2
testcase = TestcaseDo(**data)
testcase_id = testcase_service.save(testcase)
if testcase_id:
return {"code": 0, "msg": f"{testcase_id} success add"}
else:
return {"code": 40001, "msg": "case is exists"}
testcase_put_model = case_ns.model("testcase_put_model", {
"id": fields.Integer,
"name": fields.String,
"step": fields.String,
"method": fields.String,
"remark": fields.String
})
@case_ns.expect(testcase_put_model)
def put(self):
"""
测试用例的修改
:return:
"""
data = request.json
# {a=1,b=2} a=1,b=2
testcase = TestcaseDo(**data)
testcase_id = testcase_service.update(testcase)
if testcase_id:
return {"code": 0, "msg": f"{testcase_id} success update"}
else:
return {"code": 40001, "msg": "case is not exists"}
testcase_delete_parser = api.parser()
testcase_delete_parser.add_argument("id", type=int, location="json", required=True)
@case_ns.expect(testcase_delete_parser)
def delete(self):
"""
测试用例的删除
:return:
"""
data = request.json
case_id = data.get("id")
testcase_id = testcase_service.delete(case_id)
if testcase_id:
return {"code": 0, "msg": f"{testcase_id} success delete"}
else:
return {"code": 40002, "msg": "case is not exists"}
from flask import Flask
from flask_cors import CORS
from flask_jwt_extended import JWTManager
from flask_restx import Api
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.orm import Session
# 问题: 跨域 No 'Access-Control-Allow-Origin' header is present on the requested resource.
# 解决:
app = Flask(__name__)
api = Api(app)
CORS(app, supports_credentials=True)
# 用例的命名空间
username = "root"
password = 123456
server = "42.192.73.147:3307"
database = "tpf"
app.config['SQLALCHEMY_DATABASE_URI'] = f"mysql+pymysql://{username}:{password}@{server}/{database}?charset=utf8mb4"
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config["JWT_SECRET_KEY"] = "super-secret"
# JWTManager 绑定 app
jwt = JWTManager(app)
# SQLAlchemy 绑定 app
db = SQLAlchemy(app)
# 为了有类型提示所做的声明
db_session: Session = db.session
def register_router():
# 如果出现循环导入,把导包语句放在方法内执行。并且调用此函数
from controller.testcase_controller import testcase_ns
from controller.plan_controller import plan_ns
from controller.build_controller import build_ns
from controller.user_controller import user_ns
api.add_namespace(testcase_ns, "/testcase")
api.add_namespace(plan_ns, "/plan")
api.add_namespace(build_ns, "/build")
api.add_namespace(user_ns, "/user")
if __name__ == '__main__':
register_router()
app.run(debug=True, port=5001)
plan_ns = Namespace("plan", description="测试计划管理")
plan_service = PlanService()
@plan_ns.route("")
class PlanController(Resource):
plan_get_parser = api.parser()
plan_get_parser.add_argument("id", type=int, location="args")
@plan_ns.expect(plan_get_parser)
def get(self):
"""
测试计划的查找
:return:
"""
id = request.args.get("id")
print(id)
if id:
data = plan_service.get(id)
if data:
datas = [plan_service.get(id).as_dict()]
return {"code": 0, "msg": "data success get", "data": datas}
else:
return {"code": 40004, "msg": "data is not exists"}
else:
datas = [plan.as_dict() for plan in plan_service.list()]
return {"code": 0, "msg": "data success get", "data": datas}
plan_post_model = plan_ns.model("plan_post_model", {
"name": fields.String,
"testcase_ids": fields.List(fields.Integer)
})
@plan_ns.expect(plan_post_model)
def post(self):
"""
测试计划的新增
:return:
"""
data = request.json
# {a=1,b=2} a=1,b=2
testcase_id_list = data.pop("testcase_ids")
plan = PlanDo(**data)
plan_id = plan_service.save(testcase_id_list, plan)
if plan_id:
return {"code": 0, "msg": f"{plan_id} success add"}
else:
return {"code": 40001, "msg": "case is exists"}
plan_delete_parser = api.parser()
plan_delete_parser.add_argument("id", type=int, location="json", required=True)
@plan_ns.expect(plan_delete_parser)
def delete(self):
"""
测试计划的删除
:return:
"""
data = request.json
plan_id = data.get("id")
id = plan_service.delete(plan_id)
if id:
return {"code": 0, "msg": f"{id} success delete"}
else:
return {"code": 40002, "msg": "case is not exists"}
build_ns = Namespace("build", description="构建记录管理")
build_service = BuildService()
@build_ns.route("")
class BuildController(Resource):
build_get_parser = api.parser()
build_get_parser.add_argument("id", type=int, location="args")
@build_ns.expect(build_get_parser)
def get(self):
"""
构建记录的查找
:return:
"""
id = request.args.get("id")
if id:
data = build_service.get(id)
if data:
datas = [build_service.get(id).as_dict()]
return {"code": 0, "msg": "data success get", "data": datas}
else:
return {"code": 40004, "msg": "data is not exists"}
else:
datas = [plan.as_dict() for plan in build_service.list()]
return {"code": 0, "msg": "data success get", "data": datas}
build_post_model = build_ns.model("build_post_model", {
"plan_id": fields.Integer
})
@build_ns.expect(build_post_model)
def post(self):
"""
构建记录的新增
:return:
"""
plan_id = request.json.get("plan_id")
build_id = build_service.save(plan_id)
if build_id:
return {"code": 0, "msg": f"{build_id} success add"}
else:
return {"code": 40001, "msg": "case is exists"}
build_delete_parser = api.parser()
build_delete_parser.add_argument("id", type=int, location="json", required=True)
@build_ns.expect(build_delete_parser)
def delete(self):
"""
构建记录的删除
:return:
"""
plan_id = request.json.get("id")
id = build_service.delete(plan_id)
if id:
return {"code": 0, "msg": f"{id} success delete"}
else:
return {"code": 40002, "msg": "case is not exists"}
user_ns = Namespace("user", description="用户管理")
user_service = UserService()
@user_ns.route("/login")
class LoginController(Resource):
login_post_model = user_ns.model("login_post_model", {
"username": fields.String,
"password": fields.String
})
@user_ns.expect(login_post_model)
def post(self):
"""
登录功能
:return:
"""
user_info = request.json
# 通过用户名和密码生成 user 对象
user = UserDo(**user_info)
# 通过用户名查询用户是否存在
user_result = user_service.get_by_name(user.username)
# 如果用户不存在,说明用户名未注册
if not user_result:
return {"code": 40013, "msg": "用户未注册"}
# 如果密码不匹配
if not user_result.check_hash_password(user_info.get("password")):
# 返回提示信息与未授权的响应状态码
return {"code": 40014, "msg": "密码错误"}
# 用户存在,生成 token
access_token = user_service.create_access_token(user_result.as_dict())
if access_token:
return {"code": 0, "msg": "登录成功", "data": {"token": access_token}}
else:
return {"code": 40021, "msg": "登录失败"}
@user_ns.route("/register")
class RegisterController(Resource):
register_post_model = user_ns.model("register_post_model", {
"username": fields.String,
"password": fields.String,
})
@user_ns.expect(register_post_model)
def post(self):
"""
注册功能
:return:
"""
data = request.json
user = UserDo(**data)
user_id = user_service.save(user)
if user_id:
return {"code": 0, "msg": f"{user_id} success add"}
else:
return {"code": 40001, "msg": "user is exists"}
完成测试计划、测试用例、构建记录的Controller层代码