测试平台训练营 1-设计与实现

霍格沃兹测试开发

ceshiren.com

训练营目标

  • 测试平台的价值体系
  • 测试平台的学习路线
  • 测试平台实战练习

问题

  1. 为什么要做测试平台?为什么我们要学习平台开发的技术?怎么样去做测试平台才能达到价值最大化(个人、公司)
  2. 平台开发技术要学到什么程度?一定要非常深入吗?
  3. 平台开发需要掌握什么技术栈,如何学习可以提升投入产出比?

测试平台-功能设计

功能点 描述 案例
项目管理 对于项目的分类,通常以业务线为维度 商城、飞书、企业微信
测试类型 手工测试、自动化测试 pytest、JUnit、HttpRunner、Postman
测试用例 测试用例的信息 标题、步骤、预期结果、实际结果
测试套件 用例的集合 通常是用例从不同维度组合,冒烟测试用例集,某业务线用例集
测试装置 用例执行前后的准备工作,处理工作 用例依赖环境的安装,比如 python requirements
测试 Runner 测试用例执行器调度器 执行环境的管理:性能测试环境、兼容性环境
测试报告 测试结果的体现 allure 报告、手工报告

测试平台-学院平台案例

  • 项目管模块
    • 项目搜索
    • 项目新建
    • 项目编辑
    • 项目配置
    • 团队管理
    • 角色管理

测试平台-学院平台案例

  • 支持自动化测试框架管理
    • Web 自动化测试
    • App 自动化测试
    • 接口自动化测试
    • 兼容性测试
  • 支持多样化测试能力管理
    • 性能测试
    • 安全测试
    • 大数据测试

测试平台-学院平台案例

  • 测试报告管理
    • 支持测试结果管理
    • 支持多种测试能力的自定义报告管理
    • 支持数据分析
    • 支持用例趋势图、Bug 趋势图等图表管理

其他案例

测试平台的个人价值

  • 个人价值:
    • 测试方案能力。(重要)
    • 测试工具产品设计能力。(重要)
    • 前后端的技术能力。

测试平台的产品价值

  • 公司收益(提效):

    1. 提效
    2. 提升质量
    3. 降低成本
  • 市场收益:

    1. 知名度与影响力
    2. 成熟的商业模式,也有巨大的市场空间。带来直接的创收。
    3. 代表产品: wetest、禅道、process on、figma

测试平台的“Timing”

  1. 需求: 公司需要测试能力服务化、平台化,比如整合内部的多套平台。
  2. 体系: 公司有成熟的测试体系:自动化测试、持续集成
  3. 人力: 有成熟的技术团队,包括前端、后端、测试开发三个角色

测试平台的学习路线

技术-前端

  • 界面展示
  • 良好的用户体验
  • 数据展示
  • 相关技术: VUE、React 等

技术-后端

  • 把控业务逻辑
  • 数据库交互
  • 相关技术:Flask、Django、SpringBoot

数据库

  • 数据的持久化存储
  • 数据操作
  • 相关技术: MySQL、Oracle

测试用例管理

  • 测试标题
  • 测试套件
  • 测试步骤
  • 测试装置
  • 测试断言
  • 测试结果
  • 测试执行器

参考知识 xUnit:https://en.wikipedia.org/wiki/XUnit

测试用例调度

  • 定时任务
  • 分布式
  • 触发机制
  • 拓展性配置
  • 相关技术: Jenkins

实战前提

前后端的录播全部看完,并且完成前后端的实战练习

https://course.hogwarts.ceshiren.com/courses/进阶班/课程大纲Python/#-python

实战路线

现有后端代码的问题

如果一家饭店,从服务员->厨师->收银都是一个人,会成什么样子?

  • 耦合性强
  • 可维护性差
  • 可测性差

代码分层的目的

各司其职(高内聚),轻松协作(低耦合),就是分层思想的目标。

  • 方便后续代码进行维护扩展。
  • 分层的效果需要让整个团队都接受
  • 各个层职责边界清晰

代码分层架构

  • 路由层(Controller)
  • 服务层(Service)
  • 数据层(Dao)
  • DO 层(实体类)

实战1:架构优化

  • 按照标准的设计,完成目录结构优化

实体类

  • DO实体类 的作用一般是和数据表做映射
  • 此对象与数据库表结构一一对应
# 创建用例表
class TestcaseDo(db.Model):
    # 表名
    __tablename__ = "testcase"
    # 用例ID 用例的唯 一标识
    id = db.Column(Integer, primary_key=True)
    # 用例的标题, 限定 80个字符 ,不为空,并且唯一
    name = db.Column(String(80), nullable=False, unique=True)
    # 用例的步骤
    step = db.Column(String(120))
    # 用例的自动化方法
    method = db.Column(String(120))
    # 备注
    remark = db.Column(String(120))

Do层创建-测试用例表

# 创建用例表
class TestcaseDo(db.Model):
    # 表名
    __tablename__ = "testcase"
    # 用例ID 用例的唯 一标识
    id = db.Column(Integer, primary_key=True)
    # 用例的标题, 限定 80个字符 ,不为空,并且唯一
    name = db.Column(String(80), nullable=False, unique=True)
    # 用例的步骤
    step = db.Column(String(120))
    # 用例的自动化方法
    method = db.Column(String(120))
    # 备注
    remark = db.Column(String(120))

Do层创建-测试计划表

# 创建测试计划表
class PlanDo(db.Model):
    # 表名
    __tablename__ = "plan"
    # 用例ID 用例的唯 一标识
    id = db.Column(Integer, primary_key=True)
    # 计划名称,限定 80个字符 ,不为空,并且唯一
    name = db.Column(String(80), nullable=False, unique=True)
    # 绑定的测试用例
    # 参数1: 关联的另外一个业务表类名, 参数2: 中间表 , 参数3: 反射别名
    testcases: List[TestcaseDo] = relationship("TestcaseDo", secondary=testcase_plan_rel, backref="plans")

Do层创建-构建记录表

# 创建构建记录表
class BuildDo(db.Model):
    # 表名
    __tablename__ = "build"
    # 用例ID 用例的唯 一标识
    id = db.Column(Integer, primary_key=True)
    # 计划id,不为空,并且唯一,外键写在多的一方,比如 plan-> build 是一对多,那么外键就在build这方。
    plan_id = db.Column(Integer, ForeignKey('plan.id', ondelete='CASCADE'))
    # 报告地址,限定 80个字符 ,不为空,并且唯一
    report = db.Column(String(80), nullable=False)
    # 生成时间格式, 创建时间,通常不需要手动传入,在写入记录的时候自动生成
    create_time = Column(DateTime, nullable=True, default=datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
    # 参数1: 关联的另外一个业务表类名, 参数2: 反射别名
    plan: PlanDo = relationship("PlanDo", backref="build")

Do层创建-多对多(测试用例与测试计划)

# 中间表
# 中间表表名
# 测试用例的外键
# 计划的外键
testcase_plan_rel = db.Table(
    'testcase_plan_rel',
    # 参数一: 表名_id, 参数二:整型,参数3: 外键字符串('表名.id', 参数4: 是否为主键)
    Column('testcase_id', Integer, ForeignKey('testcase.id', ondelete='CASCADE'), primary_key=True),
    Column('plan_id', Integer, ForeignKey('plan.id', ondelete='CASCADE'), primary_key=True))

Do层创建-用户表

class UserDo(db.Model):
    # 表名
    __tablename__ = "user"
    # 用例ID 用户的唯 一标识
    id = db.Column(Integer, primary_key=True)
    # 用户名, 限定 80个字符 ,不为空,并且唯一
    username = db.Column(String(80), nullable=False, unique=True)
    # 密码
    password = db.Column(String(500), nullable=False)
    # 生成时间格式, 创建时间,通常不需要手动传入,在写入记录的时候自动生成
    create_time = Column(DateTime, nullable=True, default=datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

    def __init__(self, *args, **kwargs):
        self.username = kwargs.get('username')
        self.password = generate_password_hash(kwargs.get('password'))

    # 在登陆时候的验证操作
    # 这里的参数是hash过的参数以及原始传入hash
    def check_hash_password(self, raw_password):
        password = check_password_hash(self.password, raw_password)
        return password

    def as_dict(self):
        return {
            "id": self.id,
            "username": self.username,
            "password": self.password,
            "create_time": str(self.create_time)
        }

Do层创建-初始化数据库

from do.build_do import BuildDo
from do.plan_do import PlanDo
from do.testcase_do import TestcaseDo
from do.testcase_plan_rel import testcase_plan_rel
from do.user_do import UserDo
from server import db

if __name__ == '__main__':
    db.create_all()

实战2:创建表结构

完成测试计划、测试用例、构建记录的表创建

Dao层实现

  • 数据库增删查改动作封装

Dao层创建-测试用例

# Dao 负责和数据库的交互
class TestcaseDao:
    """
    TestcaseDao
    """
    def get(self, testcase_id) -> TestcaseDo:
        """
        根据id返回数据
        :return:
        """
        # TestcaseDo.query.filter()
        return db_session.query(TestcaseDo).filter_by(id=testcase_id).first()

    def list(self) -> List[TestcaseDo]:
        """
        返回所有
        :return:
        """
        return db_session.query(TestcaseDo).all()

    # 如果涉及到数据的改动,一定要添加commit
    def save(self, testcase_do: TestcaseDo):
        """
        保存数据
        :return:
        """
        # testcase = TestCase(**case_data)
        db_session.add(testcase_do)
        db_session.commit()
        return testcase_do.id

    def update(self, testcase_do: TestcaseDo):
        """
        修改数据
        :return:
        """
        # add 底层支持 保存 或者更新 _save_or_update_state
        db_session.query(TestcaseDo).filter_by(id=testcase_do.id).update(testcase_do.as_dict())
        db_session.commit()
        return testcase_do.id

    def delete(self, testcase_id):
        """
        删除数据
        """
        db_session.query(TestcaseDo).filter_by(id=testcase_id).delete()
        db_session.commit()

    def get_by_name(self, name) -> TestcaseDo:
        """
        通过用例name查询数据
        :return:
        """
        return db_session.query(TestcaseDo).filter_by(name=name).first()

Dao层创建-测试计划

# Dao 负责和数据库的交互
class PlanDao:
    """
    PlanDao
    """
    def get(self, plan_id) -> PlanDo:
        """
        根据id返回数据
        :return:
        """
        return db_session.query(PlanDo).filter_by(id=plan_id).first()

    def list(self) -> List[PlanDo]:
        """
        返回所有
        :return:
        """
        return db_session.query(PlanDo).all()

    def save(self, plan_do: PlanDo):
        """
        保存数据
        :return:
        """
        db_session.add(plan_do)
        db_session.commit()
        return plan_do.id

    def delete(self, plan_id):
        """
        删除数据
        """
        db_session.query(PlanDo).filter_by(id=plan_id).delete()
        db_session.commit()

    def get_by_name(self, name) -> PlanDo:
        """
        通过name查询数据
        :return:
        """
        return db_session.query(PlanDo).filter_by(name=name).first()

Dao层创建-构建记录

# Dao 负责和数据库的交互
class BuildDao:
    """
    BuildDao
    """
    def get(self, build_id) -> BuildDo:
        """
        根据id返回数据
        :return:
        """
        return db_session.query(BuildDo).filter_by(id=build_id).first()

    def list(self) -> List[BuildDo]:
        """
        返回所有
        :return:
        """
        return db_session.query(BuildDo).all()

    def save(self, build_do):
        """
        保存数据
        :return:
        """
        db_session.add(build_do)
        db_session.commit()
        return build_do.id

    def delete(self, build_id):
        """
        删除数据
        """
        db_session.query(BuildDo).filter_by(id=build_id).delete()
        db_session.commit()

Dao层创建-用户

class UserDao:

    def get(self, user_id) -> UserDo:
        # 根据id返回数据
        return db_session.query(UserDo).filter_by(id=user_id).first()

    def list(self) -> List[UserDo]:
        # 返回所有
        return db_session.query(UserDo).all()

    def save(self, user_do: UserDo):
        # 新增数据
        db_session.add(user_do)
        db_session.commit()
        return user_do.id

    def update(self, user_do: UserDo):
        # 修改数据
        db_session.query(UserDo).filter_by(id=user_do.id).update(user_do.as_dict())
        db_session.commit()
        return user_do.id

    def delete(self, user_id):
        # 删除数据
        db_session.query(UserDo).filter_by(id=user_id).delete()
        db_session.commit()
        return user_id

    def get_by_name(self, username) -> UserDo:
        # 通过用例名称查询数据
        return db_session.query(UserDo).filter_by(username=username).first()

实战3:完成Dao层逻辑

完成测试计划、测试用例、构建记录的Dao层代码

Server层实现

  • 处理核心的业务逻辑

Server层创建-测试用例

class TestcaseService:
    def get(self, testcase_id):
        """
        通过用例id查询数据
        :return:
        """
        return testcase_dao.get(testcase_id)

    def list(self):
        """
        返回所有
        :return:
        """
        return testcase_dao.list()

    def save(self, testcase_do: TestcaseDo):
        """
        1. 查询数据是否存在
        2. 如果存在返回失败
        3. 如果不存在,新增
        :param testcase_do:
        :return:
        """
        testcase = self.get_by_name(testcase_do.name)
        if not testcase:
            return testcase_dao.save(testcase_do)
        else:
            return False

    def update(self, testcase_do: TestcaseDo):
        """
        如果测试用例存在才更新,反之返回错误
        :param testcase_do:
        :return:
        """
        testcase = self.get(testcase_do.id)
        if not testcase:
            return False
        else:
            # 传入被更新的测试用例的对象
            return testcase_dao.update(testcase_do)

    def delete(self, testcase_id):
        """
        如果测试用例存在才删除,反之返回错误
        :param testcase_id:  测试用例删除
        :return:
        """
        testcase = self.get(testcase_id)
        if not testcase:
            return False
        else:
            testcase_dao.delete(testcase_id)
            return testcase_id

    def get_by_name(self, name):
        """
        通过用例name查询数据
        :return:
        """
        return testcase_dao.get_by_name(name)

Server层创建-测试计划

class PlanService:
    def list(self):
        """
        查询所有的测试任务
        :return:
        """
        return plan_dao.list()

    def get(self, plan_id):
        """
        查询单个的测试任务
        :param plan_id:
        :return:
        """
        return plan_dao.get(plan_id)

    def save(self, testcase_id_list, plan_do: PlanDo):
        """
        :return: 生成的测试任务的id
        """
        testcase = self.get_by_name(plan_do.name)
        if not testcase:
            # testcase_id_list = [1,2,3] 转换为 [TestcaseDo1, TestcaseDo2]
            # plan_do.testcases 要求 的输入格式为 [TestcaseDo1, TestcaseDo2]
            testcase_obj_list = [testcase_service.get(testcase_id) for testcase_id in testcase_id_list]
            # 多对多情况下建立关联关系,必须使用 实例.关联表属性 = [关联表对象]
            plan_do.testcases = testcase_obj_list
            return plan_dao.save(plan_do)
        else:
            return False

    def delete(self, plan_id):
        """
        存在才删除,反之返回错误
        :param plan_id:  计划id
        :return:
        """
        plan = self.get(plan_id)
        if not plan:
            return False
        else:
            plan_dao.delete(plan_id)
            return plan_id

    def get_by_name(self, name):
        """
        通过name查询数据
        :return:
        """
        return plan_dao.get_by_name(name)

测试用例调度

class JenkinsUtils:
    # Jenkins 服务
    BASE_URL = "http://42.192.73.147:7080/"
    # Jenkins 服务对应的用户名
    USERNAME = "admin"
    # Jenkins 服务对应的token
    PASSWORD = "1141cbca1d1b9ac4781d0c255a5a478fce"
    JOB = "tpf"

    @classmethod
    def invoke(cls, invoke_params):
        """
        执行构建任务
        :return:
        """
        jenkins_hogwarts = Jenkins(cls.BASE_URL, cls.USERNAME, cls.PASSWORD)
        # 获取Jenkins 的job 对象
        job = jenkins_hogwarts.get_job(cls.JOB)
        # 构建hogwarts job, 传入的值必须是字典, key 对应 jenkins 设置的参数名
        # job.invoke(build_params={"task": "CalculatorProject/test/cases/test_div.py"})
        job.invoke(build_params={"methods": invoke_params})
        # 获取job 最后一次完成构建的编号
        # http://47.92.149.0:8080/job/ck24/20/allure/
        last_build_number = job.get_last_buildnumber()+1
        # pytest 用例名称 指定报告生成地址
        # pytest $task --alluredir=path
        report = f"{cls.BASE_URL}job/{cls.JOB}/{last_build_number}/allure/"
        return report

Server层创建-构建记录

class BuildService:
    def list(self):
        """
        查询所有的测试任务
        :return:
        """
        return build_dao.list()

    def get(self, build_id):
        """
        根据id查询构建
        :return:
        """
        return build_dao.get(build_id)

    def save(self, plan_id):
        """
        保存构建任务
        :return:
        """
        # 根据测试计划id获取自动化方法名
        plan_detail = plan_service.get(plan_id)
        method_list = [testcase.method for testcase in plan_detail.testcases]
        methods = " ".join(method_list)
        # 执行测试用例
        report = JenkinsUtils.invoke(methods)
        # 构建记录添加report信息
        logger.info(f"report<====== {report}")
        build_do = BuildDo(plan_id=plan_id, report=report)
        # 创建构建记录
        return build_dao.save(build_do)

    def delete(self, build_id):
        """
        存在才删除,反之返回错误
        :param build_id:  build_id
        :return:
        """
        plan = self.get(build_id)
        if not plan:
            return False
        else:
            build_dao.delete(build_id)
            return build_id

Server层创建-用户

class UserService:

    def get(self, user_id):
        # 通过id查询数据
        return user_dao.get(user_id)

    def list(self):
        # 返回所有
        return user_dao.list()

    def save(self, user_do: UserDo):
        # 保存之前先查询数据是否存在
        # 存在返回false,不存在则进行新增
        user = self.get_by_name(user_do.username)
        if not user:
            return user_dao.save(user_do)
        else:
            return False

    def update(self, user_do: UserDo):
        # 用例存在则进行更新修改,否则返回错误
        user = self.get(user_do.id)
        if not user:
            return False
        else:
            return user_dao.update(user_do)

    def delete(self, user_id):
        # 用户存在则进行删除,否则返回错误
        user = self.get(user_id)
        if not user:
            return False
        else:
            return user_dao.delete(user_id)

    def get_by_name(self, username):
        return user_dao.get_by_name(username)

    def create_access_token(self, user_do):
        return create_access_token(identity=user_do)

    @jwt.user_lookup_loader
    def user_lookup_callback(self, _jwt_header, jwt_data):
        # 获取 username
        username = jwt_data["sub"]["username"]
        # 返回通过 username 查询用户的结果
        return self.get_by_name(username)

实战4:测试用例与服务层的实现

完成测试计划、测试用例、构建记录的Service层代码

Controller层实现

  • 处理接口数据、接口文档

Controller层创建-测试用例

case_ns = Namespace("case", description="用例管理")

testcase_service = TestcaseService()


@case_ns.route("")
class TestcaseController(Resource):
    testcase_get_parser = api.parser()
    testcase_get_parser.add_argument("id", type=int, location="args")

    @case_ns.expect(testcase_get_parser)
    def get(self):
        """
        测试用例的查找
        :return:
        """
        id = request.args.get("id")
        if id:
            data = testcase_service.get(id)
            if data:
                datas = [testcase_service.get(id).as_dict()]
                return {"code": 0, "msg": "data success get", "data": datas}
            else:
                return {"code": 40004, "msg": "data is not exists"}
        else:
            datas = [testcase.as_dict() for testcase in testcase_service.list()]
            return {"code": 0, "msg": "data success get", "data": datas}

    testcase_post_model = case_ns.model("testcase_post_model", {
        "name": fields.String,
        "step": fields.String,
        "method": fields.String,
        "remark": fields.String
    })

    @case_ns.expect(testcase_post_model)
    def post(self):
        """
        测试用例的新增
        :return:
        """
        data = request.json
        # {a=1,b=2}   a=1,b=2
        testcase = TestcaseDo(**data)
        testcase_id = testcase_service.save(testcase)
        if testcase_id:
            return {"code": 0, "msg": f"{testcase_id} success add"}
        else:
            return {"code": 40001, "msg": "case is exists"}

    testcase_put_model = case_ns.model("testcase_put_model", {
        "id": fields.Integer,
        "name": fields.String,
        "step": fields.String,
        "method": fields.String,
        "remark": fields.String
    })

    @case_ns.expect(testcase_put_model)
    def put(self):
        """
        测试用例的修改
        :return:
        """
        data = request.json
        # {a=1,b=2}   a=1,b=2
        testcase = TestcaseDo(**data)
        testcase_id = testcase_service.update(testcase)
        if testcase_id:
            return {"code": 0, "msg": f"{testcase_id} success update"}
        else:
            return {"code": 40001, "msg": "case is not exists"}

    testcase_delete_parser = api.parser()
    testcase_delete_parser.add_argument("id", type=int, location="json", required=True)

    @case_ns.expect(testcase_delete_parser)
    def delete(self):
        """
        测试用例的删除
        :return:
        """
        data = request.json
        case_id = data.get("id")
        testcase_id = testcase_service.delete(case_id)
        if testcase_id:
            return {"code": 0, "msg": f"{testcase_id} success delete"}
        else:
            return {"code": 40002, "msg": "case is not exists"}

启动模块server.py

from flask import Flask
from flask_cors import CORS
from flask_jwt_extended import JWTManager
from flask_restx import Api
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.orm import Session

# 问题: 跨域 No 'Access-Control-Allow-Origin' header is present on the requested resource.
# 解决:
app = Flask(__name__)
api = Api(app)
CORS(app, supports_credentials=True)

# 用例的命名空间
username = "root"
password = 123456
server = "42.192.73.147:3307"
database = "tpf"
app.config['SQLALCHEMY_DATABASE_URI'] = f"mysql+pymysql://{username}:{password}@{server}/{database}?charset=utf8mb4"
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config["JWT_SECRET_KEY"] = "super-secret"
# JWTManager 绑定 app
jwt = JWTManager(app)
# SQLAlchemy 绑定 app
db = SQLAlchemy(app)
# 为了有类型提示所做的声明
db_session: Session = db.session


def register_router():
    # 如果出现循环导入,把导包语句放在方法内执行。并且调用此函数
    from controller.testcase_controller import testcase_ns
    from controller.plan_controller import plan_ns
    from controller.build_controller import build_ns
    from controller.user_controller import user_ns
    api.add_namespace(testcase_ns, "/testcase")
    api.add_namespace(plan_ns, "/plan")
    api.add_namespace(build_ns, "/build")
    api.add_namespace(user_ns, "/user")


if __name__ == '__main__':
    register_router()
    app.run(debug=True, port=5001)

Controller层创建-测试计划

plan_ns = Namespace("plan", description="测试计划管理")

plan_service = PlanService()


@plan_ns.route("")
class PlanController(Resource):
    plan_get_parser = api.parser()
    plan_get_parser.add_argument("id", type=int, location="args")

    @plan_ns.expect(plan_get_parser)
    def get(self):
        """
        测试计划的查找
        :return:
        """
        id = request.args.get("id")
        print(id)
        if id:
            data = plan_service.get(id)
            if data:
                datas = [plan_service.get(id).as_dict()]
                return {"code": 0, "msg": "data success get", "data": datas}
            else:
                return {"code": 40004, "msg": "data is not exists"}

        else:
            datas = [plan.as_dict() for plan in plan_service.list()]
            return {"code": 0, "msg": "data success get", "data": datas}

    plan_post_model = plan_ns.model("plan_post_model", {
        "name": fields.String,
        "testcase_ids": fields.List(fields.Integer)
    })

    @plan_ns.expect(plan_post_model)
    def post(self):
        """
        测试计划的新增
        :return:
        """
        data = request.json
        # {a=1,b=2}   a=1,b=2
        testcase_id_list = data.pop("testcase_ids")
        plan = PlanDo(**data)
        plan_id = plan_service.save(testcase_id_list, plan)
        if plan_id:
            return {"code": 0, "msg": f"{plan_id} success add"}
        else:
            return {"code": 40001, "msg": "case is exists"}

    plan_delete_parser = api.parser()
    plan_delete_parser.add_argument("id", type=int, location="json", required=True)

    @plan_ns.expect(plan_delete_parser)
    def delete(self):
        """
        测试计划的删除
        :return:
        """
        data = request.json
        plan_id = data.get("id")
        id = plan_service.delete(plan_id)
        if id:
            return {"code": 0, "msg": f"{id} success delete"}
        else:
            return {"code": 40002, "msg": "case is not exists"}

Controller层创建-构建记录

build_ns = Namespace("build", description="构建记录管理")

build_service = BuildService()


@build_ns.route("")
class BuildController(Resource):
    build_get_parser = api.parser()
    build_get_parser.add_argument("id", type=int, location="args")

    @build_ns.expect(build_get_parser)
    def get(self):
        """
        构建记录的查找
        :return:
        """
        id = request.args.get("id")
        if id:
            data = build_service.get(id)
            if data:
                datas = [build_service.get(id).as_dict()]
                return {"code": 0, "msg": "data success get", "data": datas}
            else:
                return {"code": 40004, "msg": "data is not exists"}
        else:
            datas = [plan.as_dict() for plan in build_service.list()]
            return {"code": 0, "msg": "data success get", "data": datas}

    build_post_model = build_ns.model("build_post_model", {
        "plan_id": fields.Integer
    })

    @build_ns.expect(build_post_model)
    def post(self):
        """
        构建记录的新增
        :return:
        """
        plan_id = request.json.get("plan_id")
        build_id = build_service.save(plan_id)
        if build_id:
            return {"code": 0, "msg": f"{build_id} success add"}
        else:
            return {"code": 40001, "msg": "case is exists"}

    build_delete_parser = api.parser()
    build_delete_parser.add_argument("id", type=int, location="json", required=True)

    @build_ns.expect(build_delete_parser)
    def delete(self):
        """
        构建记录的删除
        :return:
        """
        plan_id = request.json.get("id")
        id = build_service.delete(plan_id)
        if id:
            return {"code": 0, "msg": f"{id} success delete"}
        else:
            return {"code": 40002, "msg": "case is not exists"}

Controller层创建-用户

user_ns = Namespace("user", description="用户管理")

user_service = UserService()


@user_ns.route("/login")
class LoginController(Resource):
    login_post_model = user_ns.model("login_post_model", {
        "username": fields.String,
        "password": fields.String
    })

    @user_ns.expect(login_post_model)
    def post(self):
        """
        登录功能
        :return:
        """
        user_info = request.json
        # 通过用户名和密码生成 user 对象
        user = UserDo(**user_info)
        # 通过用户名查询用户是否存在
        user_result = user_service.get_by_name(user.username)
        # 如果用户不存在,说明用户名未注册
        if not user_result:
            return {"code": 40013, "msg": "用户未注册"}
        # 如果密码不匹配
        if not user_result.check_hash_password(user_info.get("password")):
            # 返回提示信息与未授权的响应状态码
            return {"code": 40014, "msg": "密码错误"}
        # 用户存在,生成 token
        access_token = user_service.create_access_token(user_result.as_dict())
        if access_token:
            return {"code": 0, "msg": "登录成功", "data": {"token": access_token}}
        else:
            return {"code": 40021, "msg": "登录失败"}


@user_ns.route("/register")
class RegisterController(Resource):
    register_post_model = user_ns.model("register_post_model", {
        "username": fields.String,
        "password": fields.String,
    })

    @user_ns.expect(register_post_model)
    def post(self):
        """
        注册功能
        :return:
        """
        data = request.json
        user = UserDo(**data)
        user_id = user_service.save(user)
        if user_id:
            return {"code": 0, "msg": f"{user_id} success add"}
        else:
            return {"code": 40001, "msg": "user is exists"}

实战5: 实现接口层逻辑

完成测试计划、测试用例、构建记录的Controller层代码

最终实现效果