Files
fastapi-project-template/templates/python/api.py.jinja
皓月归尘 1c316594f5 feat(generate): 优化代码生成逻辑
- 新增公共字段配置,统一处理常见字段的生成规则
- 修复模板中的一些错误,如变量名、函数名等
- 优化代码结构,提高可读性和可维护性
2025-07-01 23:40:43 +08:00

136 lines
6.3 KiB
Django/Jinja

# _*_ coding : UTF-8 _*_
# @Time : {{ current_time }}
# @UpdateTime : {{ current_time }}
# @Author : {{ author }}
# @File : {{ table_name }}.py
# @Comment : 本程序用于生成{{ table_comment }}增删改查接口
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, Path, Request, Query
from fastapi.responses import JSONResponse
from annotation.auth import Auth
from annotation.log import Log
from config.constant import BusinessType
from controller.login import LoginController
from models import {{ class_name }}
from schemas.common import BaseResponse, DeleteListParams
from schemas.{{ name }} import Add{{ class_name }}Params, Update{{ class_name }}Params, Get{{ class_name }}InfoResponse, Get{{ class_name }}ListResponse
from utils.response import Response
{{ table_name }}API= APIRouter(
prefix="{{ prefix }}",
dependencies=[Depends(LoginController.get_current_user)],
)
@{{ table_name }}API.post("/add", response_class=JSONResponse, response_model=BaseResponse, summary="新增{{ description }}")
@Log(title="新增{{ description }}", business_type=BusinessType.INSERT)
@Auth(permission_list=["{{ name }}:btn:add"])
async def add_{{ name }}(request: Request, params: Add{{ class_name }}Params):
if await {{ class_name }}.get_or_none(
{% for column in columns if column.is_insert %}
{{ column.python_name }} = params.{{ column.python_name }},
{% endfor %}
del_flag=1
):
return Response.error(msg="{{ description }}已存在!")
{{ name }} = await {{ class_name }}.create(
{% for column in columns if column.is_insert %}
{{ column.python_name }} = params.{{ column.python_name }},
{% endfor %}
)
if {{ name }}:
return Response.success(msg="新增成功!")
else:
return Response.error(msg="新增失败")
@{{ table_name }}API.delete("/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除{{ description }}")
@{{ table_name }}API.post("/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除{{ description }}")
@Log(title="删除{{ description }}", business_type=BusinessType.DELETE)
@Auth(permission_list=["{{ name }}:btn:delete"])
async def delete_{{ name }}(request: Request, id: str = Path(description="{{ description }}ID")):
if {{ name }} := await {{ class_name }}.get_or_none(id=id, del_flag=1):
{{ name }}.del_flag = 0
await {{ name }}.save()
return Response.success(msg="删除成功")
else:
return Response.error(msg="{{ description }}不存在!")
@{{ table_name }}API.delete("/deleteList", response_class=JSONResponse, response_model=BaseResponse, summary="批量删除{{ description }}")
@{{ table_name }}API.post("/deleteList", response_class=JSONResponse, response_model=BaseResponse, summary="批量删除{{ description }}")
@Log(title="批量删除{{ description }}", business_type=BusinessType.DELETE)
@Auth(permission_list=["{{ name }}:btn:delete"])
async def delete_{{ name }}_list(request: Request, params: DeleteListParams):
for id in set(params.ids):
if {{ name }} := await {{ class_name }}.get_or_none(id=id, del_flag=1):
{{ name }}.del_flag = 0
await {{ name }}.save()
return Response.success(msg="删除成功")
@{{ table_name }}API.put("/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="修改{{ description }}")
@{{ table_name }}API.post("/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="修改{{ description }}")
@Log(title="修改{{ description }}", business_type=BusinessType.UPDATE)
@Auth(permission_list=["{{ name }}:btn:update"])
async def update_{{ name }}(request: Request, params: Update{{ class_name }}Params, id: str = Path(description="{{ description }}ID")):
if {{ name }} := await {{ class_name }}.get_or_none(id=id, del_flag=1):
{% for column in columns if column.is_edit %}
{{ name }}.{{ column.python_name }} = params.{{ column.python_name }}
{% endfor %}
await {{ name }}.save()
return Response.success(msg="修改成功")
else:
return Response.error(msg="{{ description }}不存在")
@{{ table_name }}API.get("/info/{id}", response_class=JSONResponse, response_model=Get{{ class_name }}InfoResponse, summary="获取{{ description }}信息")
@Log(title="获取{{ description }}信息", business_type=BusinessType.SELECT)
@Auth(permission_list=["{{ name }}:btn:info"])
async def get_{{ name }}_info(request: Request, id: str = Path(description="{{ description }}ID")):
if {{ name }} := await {{ class_name }}.get_or_none(id=id, del_flag=1):
data = {
{% for column in columns if column.is_list %}
"{{ column.python_name }}":{{ name }}.{{ column.python_name }},
{% endfor %}
}
return Response.success(data=data)
else:
return Response.error(msg="{{ description }}不存在")
@{{ table_name }}API.get("/list", response_class=JSONResponse, response_model=Get{{ class_name }}ListResponse, summary="获取{{ description }}列表")
@Log(title="获取{{ description }}列表", business_type=BusinessType.SELECT)
@Auth(permission_list=["{{ name }}:btn:list"])
async def get_{{ name }}_list(
request: Request,
page: int = Query(default=1, description="当前页码"),
pageSize: int = Query(default=10, description="每页数量"),
{% for column in columns if column.is_query %}
{{ column.python_name }}: Optional[str] = Query(default=None, description="{{ column.column_comment }}"),
{% endfor %}
):
filterArgs={
{% for column in columns if column.is_query %}
"{{ column.python_name }}{{ column.query_way }}": {{ column.python_name }},
{% endfor %}
}
filterArgs = {k: v for k, v in filterArgs.items() if v is not None}
total = await {{ class_name }}.filter(**filterArgs, del_flag=1).count()
data = await {{ class_name }}.filter(**filterArgs, del_flag=1).offset((page - 1) * pageSize).limit(pageSize).values(
{% for column in columns if column.is_list %}
{{ column.python_name }} = "{{ column.python_name }}",
{% endfor %}
)
return Response.success(data={
"total": total,
"result": data,
"page": page,
"pageSize": pageSize,
})