测试平台开发

霍格沃兹测试开发学社

ceshiren.com

直播前准备

专题课 形式 章节
测试平台 知识点 测试平台架构设计
测试平台 知识点 平台登录功能实战
测试平台 知识点 平台注册功能实战
测试平台 知识点 跨平台api对接
测试平台 知识点 测试用例自动录入

问题

  1. 为什么要做测试平台?为什么我们要学习平台开发的技术?怎么样去做测试平台才能达到价值最大化(个人、公司)
  2. 平台开发技术要学到什么程度?一定要非常深入吗?
  3. 平台开发需要掌握什么技术栈,如何学习可以提升投入产出比?

测试平台的价值

  • 个人价值:
    • 测试方案能力。(重要)
    • 测试工具产品设计能力。(重要)
    • 前后端的技术能力。
  • 公司收益(提效):
    1. 提效
    2. 提升质量
    3. 降低成本
  • 市场收益:
    1. 知名度与影响力。
    2. 成熟的商业模式,也有巨大的市场空间。带来直接的创收。
    3. 代表产品: Wetest、禅道等等。

测试平台的 “Timing”

  1. 需求: 公司需要测试能力服务化、平台化,比如整合内部的多套平台。
  2. 体系: 公司有成熟的测试体系:自动化测试、持续集成
  3. 人力: 有成熟的技术团队,包括前端、后端、测试开发三个角色

需求说明

功能点 描述 案例
测试用例 测试用例管理 测试用例的增删改查
测试计划 用例的集合 通常是用例从不同维度组合,冒烟测试用例集,某业务线用例集
测试记录 测试用例执行 通过调度平台完成测试用例执行
测试报告 测试结果的体现 allure 报告
用户管理 登录注册 用户密码加密,登录 token 生成和鉴权管理

实战思路

平台分层架构

  • 路由层(Controller)
  • 服务层(Service)
  • 数据层(Dao)
  • 实体类层(Model)

测试平台架构

模型设计

数据库表结构设计

  • 设置 SQLAlchemy
# server.py

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session, declarative_base


# SQLAlchemy 设置
Base = declarative_base()
# 定义数据库
db_host = "127.0.0.1"  # MySQL主机名
db_port = "3306"  # MySQL端口号,默认3306
db_name = "testplatform"  # 数据库名称
db_user = "root"  # 数据库用户名
db_pass = "hogwarts"  # 数据库密码
# 数据库类型+数据库引擎( pip install pymysql)
db_url = f"mysql+pymysql://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}"
# 创建引擎,连接到数据库
# engine = create_engine('sqlite:///data.db', echo=True)
engine = create_engine(db_url, echo=True)
# 创建session对象
DBSession = sessionmaker(bind=engine)
db_session: Session = DBSession()

定义表结构 - 测试用例

# 测试用例模型
# model/testcase_model.py

from sqlalchemy import Column, Integer, String
from server import Base

class TestcaseModel(Base):

    __tablename__ = 'testcase'

    # 用例id,主键,唯一
    id = Column(Integer, primary_key=True)
    # 用例标题,不为空,并且唯一
    name = Column(String(80), nullable=False, unique=True)
    # 用例步骤
    step = Column(String(120))
    # 用例的自动化方法
    method = Column(String(120))
    # 备注
    remark = Column(String(120))


    def __repr__(self):
        # 数据库的 魔法方法 直观展示数据
        '''[<User "xxxx">,<User "yyyy">]'''
        return '<Testcase %r>' % self.name

    def as_dict(self):
        return {
            "id": self.id,
            "name": self.name,
            "step": self.step,
            "method": self.method,
            "remark": self.remark,
        }

定义表结构 - 测试计划与测试记录

# 测试计划模型
# model/plan_model.py

from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import relationship
from model.testcase_plan_rel import testcase_plan_rel
from server import Base


class PlanModel(Base):

    __tablename__ = "plan"

    # 测试计划 ID
    id = Column(Integer, primary_key=True)
    # 测试计划名称
    name = Column(String(80), nullable=False, unique=True)
    # 测试用例列表
    testcases = relationship("TestcaseModel", secondary=testcase_plan_rel, backref='plan')

    def __repr__(self):
        # 数据库的 魔法方法 直观展示数据
        '''[<User "xxxx">,<User "yyyy">]'''
        return '<Plan %r>' % self.name

    def as_dict(self):
        return {
            "id": self.id,
            "name": self.name,
            "testcases": [testcase.as_dict() for testcase in self.testcases],
        }
# 测试用例与测试计划中间表
# testcase_plan_rel.py

from sqlalchemy import Table, Column, Integer, ForeignKey
from server import Base

# 中间表
# 中间表表名
# 测试用例的外键
# 计划的外键

testcase_plan_rel = Table(
    "testcase_plan_rel", # 表名
    Base.metadata,   # 表继承的类
    # 参数一: 表名_id, 参数二:整型,参数3: 外键字符串('表名.id', 参数4: 是否为主键)
    Column('testcase_id', Integer, ForeignKey('testcase.id', ondelete='CASCADE'), primary_key=True),
    Column('plan_id', Integer, ForeignKey('plan.id', ondelete='CASCADE'), primary_key=True)
)
# 测试记录模型
# model/record_model.py

from datetime import datetime
from sqlalchemy import Column, Integer, ForeignKey, String, DateTime
from sqlalchemy.orm import relationship

from model.plan_model import PlanModel
from server import Base


class RecordModel(Base):
    __tablename__ = "record"

    # 在新系统下,使用带有显式注释的Mypy插件而在注释中不使用Mapped的SQLAlchemy应用程序会出现错误,
    # 因为在使用relationship()等结构时,这些注释会被标记为错误。
    # 将__allow_unmapped__添加到显式类型的ORM模型一节说明了如何暂时禁止为使用显式注释的遗留ORM模型引发这些错误。
    __allow_unmapped__ = True

    # 执行记录 ID
    id = Column(Integer, primary_key=True)
    # 测试计划 ID
    plan_id = Column(Integer, ForeignKey('plan.id', ondelete='CASCADE'))
    # 测试报告
    report = Column(String(80), nullable=False, unique=True)
    # 执行时间
    create_time = Column(DateTime, nullable=True, default=datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

    # 参数1: 关联的另外一个业务表类名, 参数2: 反射别名
    plan: PlanModel = relationship("PlanModel", backref="record")

    def __repr__(self):
        # 数据库的 魔法方法 直观展示数据
        '''[<User "xxxx">,<User "yyyy">]'''
        return '<Record %r>' % self.id

    def as_dict(self):
        return {
            "id": self.id,
            "plan_id": self.plan_id,
            "report": self.report,
            "create_time": str(self.create_time)
        }

定义表结构 - 用户

# model/user_model.py

from datetime import datetime
from passlib.handlers.sha2_crypt import sha256_crypt
from sqlalchemy import Column, Integer, String, DateTime
from server import Base


class UserModel(Base):
    __tablename__ = "user"

    # 用户 ID, 用户的唯 一标识
    id = Column(Integer, primary_key=True)
    # 用户名, 限定 80个字符 ,不为空,并且唯一
    username = Column(String(80), nullable=False, unique=True)
    # 密码
    password = Column(String(500), nullable=False)
    # 创建时间,不需要手动传入,在写入记录的时候自动生成
    create_time = Column(DateTime, nullable=True, default=datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

    def __init__(self, *args, **kwargs):
        # 密码进行自动加密
        self.username = kwargs.get('username')
        self.password = sha256_crypt.hash(kwargs.get('password'))

    def check_hash_password(self, raw_password):
        '''
        校验密码
        :param raw_password: 传入的密码
        :return: 校验结果 True or False
        '''
        return sha256_crypt.verify(raw_password, self.password)

    def as_dict(self):
        return {
            "id": self.id,
            "username": self.username,
            "password": self.password,
            "create_time": str(self.create_time)
        }

创建表

# model/init_db.py

"""
初始化数据库表文件
"""
from server import Base, engine
from model.test_model import TestcaseModel
from model.plan_model import PlanModel
from model.record_model import RecordModel
from model.user_model import UserModel



if __name__ == '__main__':
    # 删除所有数据
    # Base.metadata.drop_all(bind=engine)
    # 创建表,需要传入创建连接的对象
    Base.metadata.create_all(bind=engine)

接口设计 - 测试用例管理

接口设计 - 测试计划管理

接口设计 - 测试记录管理

接口设计 - 用户管理

Dao 层开发 - 测试用例

# 测试用例 Dao
# dao/testcase_dao.py

"""
和数据库交互
"""
from model.testcase_model import TestcaseModel
from server import db_session


class TestcaseDao:

    def get(self, testcase_id: int) -> TestcaseModel:
        """
        添加用例
        :param testcase_id: 用例id
        :return: TestcaseModel
        """
        return db_session.query(TestcaseModel).filter_by(id=testcase_id).first()

    def get_by_name(self, testcase_name: str) -> TestcaseModel:
        """
        根据测试用例名称查询
        """
        return db_session.query(TestcaseModel).filter_by(name=testcase_name).first()

    def list(self):
        """
        获取用例列表
        :return:
        """
        return db_session.query(TestcaseModel).all()

    def create(self, testcase_model: TestcaseModel) -> int:
        """
        创建用例
        :param testcase_model: testcase对象
        :return:
        """
        db_session.add(testcase_model)
        db_session.commit()
        return testcase_model.id

    def delete(self, testcase_id: int) -> int:
        """
        删除用例
        :param testcase_id: 用例id
        :return:
        """
        db_session.query(TestcaseModel).filter_by(id=testcase_id).delete()
        db_session.commit()
        return testcase_id

    def update(self, testcase_model: TestcaseModel) -> int:
        """
        更新用例
        :param testcase_model: testcase对象
        :param testcase_id: 用例id
        :return:
        """
        db_session.query(TestcaseModel).filter_by(id=testcase_model.id).update(testcase_model.as_dict())
        db_session.commit()
        return testcase_model.id

Dao 层开发 - 测试计划

# 测试计划 Dao
# dao/plan_dao.py

from model.plan_model import PlanModel
from server import db_session


class PlanDao:

    def get(self, plan_id) -> PlanModel:
        # 根据id返回数据
        return db_session.query(PlanModel).filter_by(id=plan_id).first()

    def get_by_name(self, name) -> PlanModel:
        # 根据name返回数据
        return db_session.query(PlanModel).filter_by(name=name).first()

    def list(self):
        # 返回所有数据
        return db_session.query(PlanModel).all()

    def create(self, plan_do: PlanModel):
        # 新增数据
        db_session.add(plan_do)
        db_session.commit()
        return plan_do.id

    def delete(self, plan_id):
        # 删除操作
        db_session.query(PlanModel).filter_by(id=plan_id).delete()
        db_session.commit()
        return plan_id

Dao 层开发 - 测试记录

# 测试记录 Dao
# dao/record_dao.py

from model.record_model import RecordModel
from server import db_session


class RecordDao:

    def list_by_plan_id(self, plan_id):
        # 根据id返回数据
        return db_session.query(RecordModel).filter_by(plan_id=plan_id).all()

    def list(self):
        # 返回所有数据
        return db_session.query(RecordModel).all()

    def create(self, build_do: RecordModel):
        # 新增数据
        db_session.add(build_do)
        db_session.commit()
        return build_do.id

Dao 层开发 - 用户

# 用户 Dao
# dao/user_dao.py

from model.user_model import UserModel
from server import db_session


class UserDao:

    def get(self, user_id) -> UserModel:
        '''
        根据 ID 查询用户
        '''
        return db_session.query(UserModel).filter_by(id=user_id).first()

    def get_by_name(self, user_name) -> UserModel:
        '''
        根据姓名查询用户
        '''
        return db_session.query(UserModel).filter_by(username=user_name).first()

    def create(self, user_model: UserModel):
        '''
        创建用户
        '''
        db_session.add(user_model)
        db_session.commit()
        return user_model.id

Service 开发 - 测试用例

# 测试用例 Service
# service/testcase_service.py

from typing import List
from dao.testcase_dao import TestcaseDao
from model.testcase_model import TestcaseModel

# 实例化测试用例实体类
testcase_dao = TestcaseDao()


class TestcaseService:
    def create(self, testcase_model: TestcaseModel) -> int:
        """
        创建用例
        """
        result = testcase_dao.get_by_name(testcase_model.name)
        if not result:
            return testcase_dao.create(testcase_model)

    def update(self, testcase_model: TestcaseModel) -> int:
        """
        更新用例
        """
        if testcase_dao.get_by_name(testcase_model.name):
            testcase_dao.update(testcase_model)
        return testcase_model.id

    def delete(self, testcase_id: int) -> int:
        """
        删除用例
        """
        if self.get(testcase_id):
            return testcase_dao.delete(testcase_id)

    def list(self) -> List[TestcaseModel]:
        """
        获取全部用例
        """
        return testcase_dao.list()

    def get(self, testcase_id: int) -> TestcaseModel:
        """
        获取某个测试用例
        """
        return testcase_dao.get(testcase_id)

Service 开发 - 测试计划

# 测试计划 Service
# service/plan_service.py

from dao.plan_dao import PlanDao
from model.plan_model import PlanModel
from service.testcase_service import TestcaseService

plan_dao = PlanDao()
testcase_service = TestcaseService()


class PlanService:

    def get(self, plan_id):
        '''
        通过 ID 查询测试计划
        '''
        return plan_dao.get(plan_id)

    def get_by_name(self, plan_name):
        '''
        通过名称查询测试计划
        '''
        return plan_dao.get_by_name(plan_name)

    def list(self):
        '''
        返回所有测试计划
        '''
        return plan_dao.list()

    def create(self, plan_model: PlanModel, testcase_id_list):
        '''
        创建测试计划
        '''
        # 创建之前先通过名称查询计划是否已经存在
        plan = self.get_by_name(plan_model.name)
        print(f"名称 {plan_model.name} 的查询结果为 {plan}")
        # 不存在则新增
        if not plan:
            # 根据测试用例 ID 查询获取测试用例对象列表
            testcase_list = [testcase_service.get(testcase_id) for testcase_id in testcase_id_list]
            # 构建测试计划对象
            plan_model.testcases = testcase_list
            # 创建测试计划
            return plan_dao.create(plan_model)
        # 存在则返回 False
        return False

    def delete(self, plan_id):
        # 删除操作
        # 删除之前先查询数据是否存在,存在则进行删除,不存在则返回false
        plan = self.get(plan_id)
        if not plan:
            return False
        else:
            return plan_dao.delete(plan_id)

Service 开发 - 测试记录

# 测试记录 Service
# service/record_service.py

from dao.record_dao import RecordDao
from model.record_model import RecordModel
from service.plan_service import PlanService
from utils.jenkins_util import JenkinsUtils

record_dao = RecordDao()
plan_service = PlanService()


class RecordService:

    def list_by_plan(self, plan_id):
        '''
        根据测试计划 ID 获取对应的测试记录
        '''
        return record_dao.list_by_plan_id(plan_id)

    def list(self):
        '''
        获取所有执行记录
        '''
        return record_dao.list()

    def create(self, plan_id):
        '''
        新增测试记录
        '''
        # 新增之前先查询要执行的测试计划是否存在
        plan = plan_service.get(plan_id)
        # 不存在则返回 False
        if not plan:
            return False
        # 存在则创建测试记录
        else:
            # 执行命令格式
            # pytest test_demo1 test_demo2 test_demo3
            # 获取测试记录中包含的测试用例的执行方式
            method_list = [testcase.method for testcase in plan.testcases]
            # 为测试执行方式列表去重
            methods = set(method_list)
            print(f"去重后的 method 列表为 {methods}")
            # 获取测试记录中包含的测试用例 step 中包含的 nodeid
            # test_demo1 ==> test_add_params.py
            nodeid_list = [testcase.step for testcase in plan.testcases]
            nodeid = " ".join(nodeid_list)
            print(f"获取到的用例 id 列表为 {nodeid}")
            # 执行测试用例
            invoke_params = {
                "methods": methods,
                "steps": nodeid
            }
            report = JenkinsUtils.invoke(invoke_params)
            # 构造测试记录对象
            record_model = RecordModel(plan_id=plan_id, report=report)
            # 新增测试记录
            return record_dao.create(record_model)
# 调用 jenkins
# utils/jenkins_util.py

from jenkinsapi.jenkins import Jenkins


class JenkinsUtils:
    # Jenkins 服务
    BASE_URL = "http://127.0.0.1:5003/"
    # Jenkins 服务对应的用户名
    USERNAME = "admin"
    # Jenkins 服务对应的token
    PASSWORD = "11cd6c401859ccdbb946b0ddb46c3bb76a"
    # Jenkins 要操作的 Job 名称
    JOB = "test_platform_job"

    @classmethod
    def invoke(cls, invoke_params):
        """
        执行构建任务
        :return:
        """
        # 获取 Jenkins 对象
        jenkins_hogwarts = Jenkins(cls.BASE_URL, cls.USERNAME, cls.PASSWORD)
        # 获取 Jenkins 的 job 对象
        job = jenkins_hogwarts.get_job(cls.JOB)
        # 构建 hogwarts job,传入的值必须是字典,key 对应 jenkins 设置的参数名
        # job.invoke(build_params={
        #     "step": "xx.py::TestXX::test_xx", 
        #     "methods": "pytest"
        # })
        job.invoke(build_params=invoke_params)
        # 获取job 最后一次完成构建的编号
        last_build_number = job.get_last_buildnumber() + 1
        # 执行方式为:pytest 用例名称 指定报告生成地址
        # 生成报告格式为
        # http://127.0.0.1:5003/job/test_platform_job/22/allure/
        # 获取本次构建的报告地址
        report = f"{cls.BASE_URL}job/{cls.JOB}/{last_build_number}/allure/"
        return report

Service 开发 - 用户

# 用户 Service
# service/user_service.py

from dao.user_dao import UserDao
from model.user_model import UserModel

user_dao = UserDao()


class UserService:

    def get(self, user_id) -> UserModel:
        '''
        通过 ID 查询用户
        '''
        return user_dao.get(user_id)

    def get_by_name(self, user_name):
        '''
        通过姓名查询用户
        '''
        return user_dao.get_by_name(user_name)

    def create(self, user_model: UserModel) -> int:
        '''
        创建用户
        '''
        # 新增前先查询用户是否存在
        user = user_dao.get_by_name(user_model.username)
        if not user:
            # 没有重名,创建用户
            user_dao.create(user_model)
            return user_model.id

    def create_access_token(self, user_model):
        '''
        根据用户信息生成 token
        '''
        # 使用 jwt 生成 token
        with app.app_context():
            # token = create_access_token(identity=user_model.username, expires_delta=timedelta(days=1))
            # identity 是生成 token 的依据,需要 json 格式的可序列化数据
            # expires_delta 可以配置 token 的过期时间
            token = create_access_token(
                identity=user_model.as_dict(),
                expires_delta=timedelta(days=1)
            )
            return token

    # 配置回调函数中验证数据条件
    @jwt.user_lookup_loader
    def user_lookup_callback(self, _jwt_header, jwt_data):
        # 因为 sub 字段下是请求 body 的内容,所以从 sub 中获取用户名用来查库
        # 获取 username
        username = jwt_data["sub"]["username"]
        # 返回通过 username 查询用户的结果
        return self.get_by_name(username)
# 初始化 JWT
# server.py

import argparse
from flask import Flask
from flask_cors import CORS
from flask_restx import Api
from flask_jwt_extended import JWTManager
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session, declarative_base

# 定义 app
app = Flask(__name__)
# 注册 restx,路由管理
api = Api(app)
# 解决跨域
CORS(app, supports_credentials=True)

# 注册 jwt
jwt = JWTManager(app)
# 配置服务端密钥
app.config["JWT_SECRET_KEY"] = "hogwarts_user_AHKFJJD5"
# 开启数据库跟踪模式
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True

# SQLAlchemy 设置
Base = declarative_base()
# MySQL 信息配置
db_host = "127.0.0.1"  # MySQL主机名
db_port = "3306"  # MySQL端口号,默认3306
db_name = "testplatform"  # 数据库名称
db_user = "root"  # 数据库用户名
db_pass = "hogwarts"  # 数据库密码
# 数据库类型+数据库引擎( pip install pymysql)
db_url = f"mysql+pymysql://{db_user}:{db_pass}@{db_host}:{db_port}/{db_name}"
# 创建引擎,连接到数据库
# engine = create_engine('sqlite:///data.db', echo=True)
engine = create_engine(db_url, echo=True)
# 创建session对象
DBSession = sessionmaker(bind=engine)
db_session: Session = DBSession()


if __name__ == '__main__':
    # ArgumentParser() 解析命令行参数并生成帮助文档
    parser = argparse.ArgumentParser()
    # add_argument() 添加具体的命令行参数和对应的帮助信息
    parser.add_argument("--port", type=int, default=5055, help="服务启动端口")
    # 解析命令行参数并返回一个 Namespace 对象,该对象包含了所有解析出来的参数
    args = parser.parse_args()
    # 启动服务
    app.run(host="0.0.0.0", debug=True, port=args.port)

Service 层单元测试

  • 快捷键生成单元测试用
    • Windows 系统快捷键为 Ctrl + n
    • MacOs 系统快捷键为 Command + n

Controller 层开发 - 用户

# 用户管理接口
# controller/user_controller.py

from flask import request
from flask_restx import Namespace, Resource, fields
from model.user_model import UserModel
from service.user_service import UserService

# 定义用户管理的命名空间
# 可以在 Swagger 页面中为把用户管理相关接口放到一个组中管理
user_ns = Namespace("user", description="用户管理")

user_service = UserService()


# 定义路由
@user_ns.route("/login")
class LoginController(Resource):

    # post 接口请求体注解,会展示在 Swagger 页面中
    login_post_model = user_ns.model("login_post_model", {
        "username": fields.String,
        "password": fields.String
    })

    # 为接口添加设置好的注解
    @user_ns.expect(login_post_model)
    def post(self):
        '''
        登录功能
        '''
        # 获取请求体
        data = request.json
        # 构建用户对象
        user = UserModel(**data)
        # 通过用户名查找用户是否存在
        user_result = user_service.get_by_name(user.username)
        print(user_result)
        print(data.get("password"))
        # 如果用户不存在,说明用户还未注册
        if not user_result:
            return {"code": 40013, "msg": "user is not register"}
        # 如果密码不匹配,说明密码错误
        if not user_result.check_hash_password(data.get("password")):
            return {"code": 40014, "msg": "password is wrong"}
        # 用户存在,且密码匹配,则生成 token
        access_token = user_service.create_access_token(user_result)
        print(access_token)
        if access_token:
            # 存在access_token,则证明登录成功了
            return {"code": 0, "msg": "login success", "data": {"token": access_token}}
        else:
            return {"code": 40021, "msg": "login fail"}


@user_ns.route("/register")
class RegisterController(Resource):
    register_post_model = user_ns.model("register_post_model", {
        "username": fields.String,
        "password": fields.String
    })

    @user_ns.expect(register_post_model)
    def post(self):
        '''
        注册功能
        '''
        # 获取请求体
        data = request.json
        # 构建用户对象
        user = UserModel(**data)
        print(user.password)
        if user:
            user_id = user_service.create(user)
            if user_id:
                # 存在id,则证明新增成功了
                return {"code": 0, "msg": f"register success"}
            else:
                return {"code": 40001, "msg": "register fail"}
        return {"code": 40001, "msg": "register fail"}

Controller 层开发 - 测试用例

# 测试用例管理接口
# controller/testcase_controller.py

from flask import request
from flask_jwt_extended import jwt_required
from flask_restx import Namespace, Resource, fields
from model.testcase_model import TestcaseModel
from server import api
from service.testcase_service import TestcaseService

# 定义测试用例的命名空间
# 可以在 Swagger 页面中为把测试用例管理相关接口放到一个组中管理
testcase_ns = Namespace("testcase", description="测试用例管理")

testcase_service = TestcaseService()


# 定义路由
@testcase_ns.route("")
class TestcaseController(Resource):
    # 鉴权操作
    decorators = [jwt_required()]

    # 测试用例管理 get 接口请求参数注解
    testcase_get_parser = api.parser()
    testcase_get_parser.add_argument("id", type=int, location="args")
    testcase_get_parser.add_argument('Authorization', type=str, location="headers")

    # 为 get 接口填写设置好的注解
    @testcase_ns.expect(testcase_get_parser)
    def get(self):
        '''
        测试用例查找
        '''
        # 获取请求参数
        data = request.args
        case_id = data.get("id")
        # 如果有id则进行数据查找
        if case_id:
            testcase = testcase_service.get(int(case_id))
            # 如果查询到结果
            if testcase:
                datas = [testcase.as_dict()]
                return {"code": 0, "msg": "get data success", "data": datas}
            else:
                # 如果没有数据,则返回数据已存在
                return {"code": 40004, "msg": "data is not exists"}
        else:
            # 如果没有id,则返回全部数据
            datas = [testcase.as_dict() for testcase in testcase_service.list()]
            return {"code": 0, "msg": "get data success", "data": datas}

    # 测试用例管理 post 接口请求体注解
    testcase_post_model = testcase_ns.model("testcase_post_model", {
        "name": fields.String,
        "step": fields.String,
        "method": fields.String,
        "remark": fields.String
    })
    # 测试用例管理 post 接口请求参数注解
    testcase_post_parser = api.parser()
    testcase_post_parser.add_argument('Authorization', type=str, location="headers")

    @testcase_ns.expect(testcase_post_model, testcase_post_parser)
    def post(self):
        '''
        新增测试用例
        '''
        # 获取请求体
        data = request.json
        # 构造测试用例对象
        testcase = TestcaseModel(**data)
        # 新增用例
        case_id = testcase_service.create(testcase)
        if case_id:
            # 存在测试用例id, 则证明用例新增成功了
            return {"code": 0, "msg": "add testcase success", "data": {"testcase_id": case_id}}
        else:
            return {"code": 40001, "msg": "testcase is exists"}

    # 测试用例管理 put 接口请求体注解
    testcase_put_model = testcase_ns.model("testcase_put_model", {
        "id": fields.Integer,
        "name": fields.String,
        "step": fields.String,
        "method": fields.String,
        "remark": fields.String
    })
    # 测试用例管理 put 接口请求参数注解
    testcase_put_parser = api.parser()
    testcase_put_parser.add_argument('Authorization', type=str, location="headers")

    @testcase_ns.expect(testcase_put_model, testcase_put_parser)
    def put(self):
        '''
        更新测试用例
        '''
        # 获取请求体
        data = request.json
        # 构造测试用例对象
        testcase = TestcaseModel(**data)
        # 修改测试用例
        case_id = testcase_service.update(testcase)
        if case_id:
            # 存在测试用例id, 则证明用例新增成功了
            return {"code": 0, "msg": "update testcase success", "data": {"testcase_id": case_id}}
        else:
            return {"code": 40001, "msg": "update testcas fail"}

    # 测试用例管理 delete 接口请求参数注解
    testcase_delete_parser = api.parser()
    testcase_delete_parser.add_argument("id", type=int, location="json", required=True)
    testcase_delete_parser.add_argument('Authorization', type=str, location="headers")

    @testcase_ns.expect(testcase_delete_parser)
    def delete(self):
        '''
        删除测试用例
        '''
        # 获取请求体
        data = request.json
        delete_case_id = data.get("id")
        if delete_case_id:
            case_id = testcase_service.delete(delete_case_id)
            if case_id:
                # 存在测试用例id,则证明用例修改成功了
                return {"code": 0, "msg": f"delete testcase success", "data": {"testcase_id": case_id}}
            else:
                return {"code": 40001, "msg": "delete case fail"}

Controller 层开发 - 测试计划

# 测试计划管理接口
# controller/plan_controller.py

from flask import request
from flask_jwt_extended import jwt_required
from flask_restx import Namespace, Resource, fields
from model.plan_model import PlanModel
from server import api
from service.plan_service import PlanService

plan_ns = Namespace("plan", description="测试计划管理")

plan_service = PlanService()


@plan_ns.route("")
class PlanController(Resource):
    # 鉴权操作
    decorators = [jwt_required()]

    plan_get_parser = api.parser()
    plan_get_parser.add_argument('id', type=int, location="args")
    plan_get_parser.add_argument('Authorization', type=str, location="headers")

    @plan_ns.expect(plan_get_parser)
    def get(self):
        '''
        查询测试计划
        '''
        # 获取请求参数
        id = request.args.get("id")
        # 如果请求参数中存在 id
        if id:
            # 根据 id 查询测试计划
            data = plan_service.get(id)
            # 如果测试计划存在
            if data:
                # 将查询结果返回
                datas = [data.as_dict()]
                return {"code": 0, "msg": "get plan success", "data": datas}
            else:
                # 如果测试计划不存在,返回提示信息
                return {"code": 40004, "msg": "plan is not exists"}
        else:
            # 如果参数中不包含 id,则返回全部测试计划
            datas = [p.as_dict() for p in plan_service.list()]
            return {"code": 0, "msg": "get plans success", "data": datas}

    plan_post_parser = api.parser()
    plan_post_parser.add_argument('Authorization', type=str, location="headers")
    plan_post_model = plan_ns.model("plan_post_model", {
        "name": fields.String,
        "testcase_ids": fields.List(fields.Integer)
    })

    @plan_ns.expect(plan_post_model, plan_post_parser)
    def post(self):
        '''
        新增测试计划
        '''
        data = request.json
        # data -> {name=1, testcase_ids=[2,3,4,5,6]}
        testcase_id_list = data.pop("testcase_ids")
        plan = PlanModel(**data)
        # 新增
        plan_id = plan_service.create(plan, testcase_id_list)
        if plan_id:
            # 存在id,则证明新增成功了
            return {"code": 0, "msg": f"add plan success", "data": {"plan_id": plan_id}}
        else:
            return {"code": 40001, "msg": "plan is exists"}

    plan_delete_parser = api.parser()
    plan_delete_parser.add_argument("id", type=int, location="json", required=True)
    plan_delete_parser.add_argument('Authorization', type=str, location="headers")

    @plan_ns.expect(plan_delete_parser)
    def delete(self):
        """
        测试计划的删除
        :return:
        """
        # data ==> {"id": 1}
        data = request.json
        # 删除
        plan_id = plan_service.delete(data.get("id"))
        if plan_id:
            # 存在测试用例id,则证明用例修改成功了
            return {"code": 0, "msg": f"plan delete success", "data":{"plan_id": plan_id}}
        else:
            return {"code": 40001, "msg": "delete plan fail"}

Controller 层开发 - 测试记录

# 测试记录管理接口
# controller/record_controller.py

from flask import request
from flask_jwt_extended import jwt_required
from flask_restx import Namespace, Resource, fields
from server import api
from service.record_service import RecordService

record_ns = Namespace("record", description="测试记录管理")

record_service = RecordService()


@record_ns.route("")
class RecordController(Resource):
    # 鉴权操作
    decorators = [jwt_required()]
    record_get_parser = api.parser()
    record_get_parser.add_argument("plan_id", type=int, location="args")
    record_get_parser.add_argument('Authorization', type=str, location="headers")

    @record_ns.expect(record_get_parser)
    # @jwt_required()
    def get(self):
        """
        测试记录的查找
        :return:
        """
        plan_id = request.args.get("plan_id")
        if plan_id:
            # 如有有id则进行数据查找
            data = record_service.list_by_plan(plan_id)
            if data:
                # 如果查到数据,则返回给前端
                datas = [_.as_dict() for _ in data]
                return {"code": 0, "msg": "get record success", "data": datas}
            else:
                # 如果没有数据,则返回数据已存在
                return {"code": 40004, "msg": "record is not exists"}
        else:
            # 如果没有id,则返回全部数据
            datas = [build.as_dict() for build in record_service.list()]
            return {"code": 0, "msg": "get records success", "data": datas}

    record_post_model = record_ns.model("build_post_model", {
        "plan_id": fields.Integer
    })
    record_post_parser = api.parser()
    record_post_parser.add_argument('Authorization', type=str, location="headers")

    @record_ns.expect(record_post_model, record_post_parser)
    def post(self):
        """
        测试记录的新增
        :return:
        """
        data = request.json
        # 新增
        record_id = record_service.create(data.get("plan_id"))
        if record_id:
            # 存在id,则证明新增成功了
            return {"code": 0, "msg": f"record add success", "data": {"record_id": record_id}}
        else:
            return {"code": 40001, "msg": "record is exists"}

注册路由

# server.py

def register_router():
    from controller.testcase_controller import testcase_ns
    from controller.plan_controller import plan_ns
    from controller.record_controller import record_ns
    from controller.user_controller import user_ns

    api.add_namespace(testcase_ns, "/testcase")
    api.add_namespace(plan_ns, "/plan")
    api.add_namespace(record_ns, "/record")
    api.add_namespace(user_ns, "/user")


if __name__ == '__main__':
    # 注册路由
    register_router()
    # ArgumentParser() 解析命令行参数并生成帮助文档
    parser = argparse.ArgumentParser()
    # add_argument() 添加具体的命令行参数和对应的帮助信息
    parser.add_argument("--port", type=int, default=5055, help="服务启动端口")
    # 解析命令行参数并返回一个 Namespace 对象,该对象包含了所有解析出来的参数
    args = parser.parse_args()
    # 启动服务
    app.run(host="0.0.0.0", debug=True, port=args.port)

接口测试

  • 启动后端服务。
  • 使用 requests 完成接口测试。

添加日志

  • Service 层和 Controller 层添加日志信息。
# utils/log_util.py

import logging
import os

from logging.handlers import RotatingFileHandler

# 绑定绑定句柄到logger对象
logger = logging.getLogger(__name__)
# 获取当前工具文件所在的路径
root_path = os.path.dirname(os.path.abspath(__file__))
# 拼接当前要输出日志的路径
log_dir_path = os.sep.join([root_path, f'/logs'])
if not os.path.isdir(log_dir_path):
    os.mkdir(log_dir_path)
# 创建日志记录器,指明日志保存路径,每个日志的大小,保存日志的上限
file_log_handler = RotatingFileHandler(os.sep.join([log_dir_path, 'log.log']), maxBytes=1024 * 1024, backupCount=10 , encoding="utf-8")
# 设置日志的格式
date_string = '%Y-%m-%d %H:%M:%S'
formatter = logging.Formatter(
    '[%(asctime)s] [%(levelname)s] [%(filename)s]/[line: %(lineno)d]/[%(funcName)s] %(message)s ', date_string)
# 日志输出到控制台的句柄
stream_handler = logging.StreamHandler()
# 将日志记录器指定日志的格式
file_log_handler.setFormatter(formatter)
stream_handler.setFormatter(formatter)
# 为全局的日志工具对象添加日志记录器
# 绑定绑定句柄到logger对象
logger.addHandler(stream_handler)
logger.addHandler(file_log_handler)
# 设置日志输出级别
logger.setLevel(level=logging.INFO)

解决连接 Session 失败问题

# server.py

# SQLAlchemy 设置

...

# 创建session对象
# DBSession = sessionmaker(bind=engine)
# db_session: Session = DBSession()
# 解决session的复用问题   不然会报使用的时候前一个session没有回滚
DBSession = scoped_session(sessionmaker(bind=engine))


@app.before_request
def before_request():
    # 在每个请求前执行的代码
    # 在请求开始的时候实例化DBsession
    DBSession()


@app.teardown_request
def teardown_request(exception=None):
    # 在每个请求后执行的代码
    if exception:
        DBSession.rollback()
    # 请求结束之后remove掉DBsession
    DBSession.remove()
# Dao 层各个文件需要修改导入的依赖。

from server import DBSession as db_session

自动导入自动化测试用例

# 自动化测试用例项目中设置
# conftest.py

import json
from datetime import datetime


def pytest_collection_modifyitems(
    session, config, items
) -> None:
    '''
    搜集全部用例
    :param items: 所有用例的列表
    :return:
    '''
    cases = []
    # 测试用例参数的编码格式改写
    for item in items:
        # item 就是一条用例对象
        # 修改每一条用例对象的 name 和 _nodeid 两个属性的编码
        item.name = item.name.encode('utf-8').decode('unicode-escape')
        item._nodeid = item.nodeid.encode('utf-8').decode('unicode-escape')
        case_path = str(item.fspath.strpath)
        path_list = case_path.split("/")
        name = f"{path_list[-2]}/{item.nodeid}"
        case_info = {
            "name": name,
            "step": item.nodeid,
            "method": "pytest",
            "remark": f"路径:{case_path};自动导入时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
        }
        cases.append(case_info)

    # 用例信息写入 json 文件
    with open('cases_info.json', 'w', encoding='utf-8') as f:
        json.dump(cases, f, ensure_ascii=True)
# 平台项目 tests 中导入用例信息
# test_import_case_info.py

import json
import requests
from utils.log_util import logger

class TestImportCaseInfo:

    def setup_class(self):
        url = "http://127.0.0.1:5055/user/login"
        data = {
            "username": "lily",
            "password": "123456"
        }
        r = requests.post(url, json=data)
        self.token = r.json().get("data").get("token")
        logger.info(f"token is {self.token}")
        self.headers = {
            "Authorization": "Bearer " + self.token
        }


    def test_add_cases(self):
        # 读取 json 文件中的内容,转化为 python 对象
        with open('./case_data/cases_info.json', 'r') as f:
            data = json.load(f)
        logger.info(data)
        for case in data:
            url = "http://127.0.0.1:5055/testcase"
            r = requests.post(url, json=case, headers=self.headers)
            logger.info(r.text)

总结

  • 测试平台架构
  • 模型定义
  • 分层开发
  • 用户鉴权