Files
fastapi-project-template/api/generate.py

445 lines
19 KiB
Python

# _*_ coding : UTF-8 _*_
# @Time : 2025/02/28 17:28
# @UpdateTime : 2025/02/28 17:28
# @Author : sonder
# @File : generate.py
# @Software : PyCharm
# @Comment : 本程序
import uuid
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Path, Query, Depends, Request
from fastapi.responses import JSONResponse
from jinja2 import Environment, FileSystemLoader
from tortoise import Tortoise
from annotation.auth import Auth
from annotation.log import Log
from config.constant import MYSQL_TO_PYTHON_TYPE, BusinessType
from controller.login import LoginController
from models.generate import GenerateInfo, GenerateColumn
from schemas.common import BaseResponse, DeleteListParams
from schemas.generate import GetTablesListResponse, AddGenerateInfoParams, UpdateGenerateInfoParams, \
GetGenerateInfoResponse, GetGenerateInfoListResponse
from utils.common import bytes2human
from utils.generate import Generate
from utils.response import Response
generateAPI = APIRouter(
prefix="/generate",
dependencies=[Depends(LoginController.get_current_user)]
)
@generateAPI.get("/tables", response_class=JSONResponse, response_model=GetTablesListResponse,
summary="获取数据库中的所有表信息")
@Log(title="获取数据库中的所有表信息", business_type=BusinessType.SELECT)
@Auth(permission_list=["generate:btn:tables"])
async def get_database_tables(request: Request):
"""
获取当前数据库中的所有表信息,包括:
- 表名
- 表注释
- 记录行数
- 数据大小
- 索引大小
- 创建时间
- 最后更新时间(如果支持)
"""
sql = """SELECT TABLE_NAME,TABLE_COMMENT,TABLE_ROWS,DATA_LENGTH,INDEX_LENGTH,CREATE_TIME,UPDATE_TIME FROM information_schema.tables WHERE table_schema = DATABASE();"""
result = await Tortoise.get_connection("default").execute_query_dict(sql)
# 将结果中的键转换为小写
formatted_result = [{k.lower(): v for k, v in row.items()} for row in result]
for row in formatted_result:
for key, value in row.items():
if key == "data_length" or key == "index_length":
row[key] = bytes2human(value)
return Response.success(data={
"page": 1,
"total": len(formatted_result),
"pageSize": 99999,
"result": formatted_result
})
@generateAPI.post("/add", response_class=JSONResponse, response_model=BaseResponse, summary="添加生成表信息")
@Log(title="添加生成表信息", business_type=BusinessType.INSERT)
@Auth(permission_list=["generate:btn:add"])
async def add_generate_info(request: Request, params: AddGenerateInfoParams):
if await GenerateInfo.get_or_none(table_name=params.table_name, del_flag=1):
return Response.error(msg="该表信息已存在!")
gen = await GenerateInfo.create(
author=params.author,
class_name=params.class_name,
table_comment=params.table_comment,
table_name=params.table_name,
prefix=params.prefix,
remark=params.remark,
permission_id=params.permission_id,
description=params.description
)
if gen:
sql = """SELECT * FROM information_schema.columns WHERE table_schema = DATABASE() AND table_name = %s;"""
result = await Tortoise.get_connection("default").execute_query_dict(sql, [params.table_name])
# 转换为小写键名
formatted_result = [{k.lower(): v for k, v in row.items()} for row in result]
formatted_result.sort(key=lambda x: x['ordinal_position'])
for column in formatted_result:
if await GenerateColumn.get_or_none(table_id=gen.id, column_name=column["column_name"], del_flag=1):
continue
await GenerateColumn.create(
table=gen,
index=column["ordinal_position"],
column_name=column["column_name"],
column_comment=column["column_comment"],
column_type=column["column_type"],
python_type=MYSQL_TO_PYTHON_TYPE.get(column["column_type"]) if MYSQL_TO_PYTHON_TYPE.get(
column["column_type"]) else MYSQL_TO_PYTHON_TYPE.get(column["data_type"], "str"),
python_name=column["column_name"].lower(),
is_insert=True,
is_edit=True,
is_list=True,
is_query=True,
is_required=False,
query_way="__icontains",
show_type="input",
is_hide=False,
)
return Response.success(msg="添加成功!")
return Response.error(msg="添加失败!")
@generateAPI.delete("/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除生成表信息")
@generateAPI.post("/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除生成表信息")
@Log(title="删除生成表信息", business_type=BusinessType.DELETE)
@Auth(permission_list=["generate:btn:delete"])
async def delete_generate_info(request: Request, id: str = Path(description="生成表信息ID")):
if table := await GenerateInfo.get_or_none(id=id, del_flag=1):
table.del_flag = 0
await GenerateColumn.filter(table_id=table.id, del_flag=1).update(del_flag=0)
await table.save()
return Response.success(msg="删除成功!")
return Response.error(msg="该生成表信息不存在!")
@generateAPI.delete("/deleteList", response_class=JSONResponse, response_model=BaseResponse,
summary="批量删除生成表信息")
@generateAPI.post("/deleteList", response_class=JSONResponse, response_model=BaseResponse, summary="批量删除生成表信息")
@Log(title="批量删除生成表信息", business_type=BusinessType.DELETE)
@Auth(permission_list=["generate:btn:delete"])
async def delete_generate_info_list(request: Request, params: DeleteListParams):
for id in set(params.ids):
if table := await GenerateInfo.get_or_none(id=id, del_flag=1):
table.del_flag = 0
await GenerateColumn.filter(table_id=table.id, del_flag=1).update(del_flag=0)
await table.save()
return Response.success(msg="删除成功!")
@generateAPI.put("/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="更新生成表信息")
@generateAPI.post("/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="更新生成表信息")
@Log(title="更新生成表信息", business_type=BusinessType.UPDATE)
@Auth(permission_list=["generate:btn:update"])
async def update_generate_info(
request: Request,
params: AddGenerateInfoParams,
id: str = Path(description="生成表信息ID")
):
if table := await GenerateInfo.get_or_none(id=id, del_flag=1):
table.author = params.info.author
table.class_name = params.info.class_name
table.permission_id = params.info.permission_id
table.prefix = params.info.prefix
table.remark = params.info.remark
table.table_comment = params.info.table_comment
table.table_name = params.info.table_name
table.description = params.info.description
await table.save()
return Response.success(msg="更新成功!")
return Response.error(msg="该生成表信息不存在!")
@generateAPI.get("/info/{id}", response_class=JSONResponse, response_model=GetGenerateInfoResponse,
summary="获取生成表信息")
@Log(title="获取生成表信息", business_type=BusinessType.SELECT)
@Auth(permission_list=["generate:btn:info"])
async def get_generate_info(request: Request, id: str = Path(description="生成表信息ID")):
if table := await GenerateInfo.get_or_none(id=id, del_flag=1):
table = {
"id": table.id,
"author": table.author,
"class_name": table.class_name,
"permission_id": table.permission_id,
"prefix": table.prefix,
"remark": table.remark,
"table_comment": table.table_comment,
"table_name": table.table_name,
"description": table.description,
"create_time": table.create_time,
"update_time": table.update_time,
}
columns = await GenerateColumn.filter(table_id=table["id"], del_flag=1).order_by("index").values(
id="id",
table_id="table__id",
table_name="table__table_name",
index="index",
column_comment="column_comment",
column_name="column_name",
column_type="column_type",
python_name="python_name",
python_type="python_type",
query_way="query_way",
show_type="show_type",
is_insert="is_insert",
is_edit="is_edit",
is_list="is_list",
is_query="is_query",
is_required="is_required",
is_hide="is_hide",
create_time="create_time",
update_time="update_time",
)
table["columns"] = columns
return Response.success(data=table)
return Response.error(msg="该生成表信息不存在!")
@generateAPI.put("/updateColumns/{id}", response_class=JSONResponse, response_model=BaseResponse,
summary="更新生成表列信息")
@generateAPI.post("/updateColumns/{id}", response_class=JSONResponse, response_model=BaseResponse,
summary="更新生成表列信息")
@Log(title="更新生成表列信息", business_type=BusinessType.UPDATE)
@Auth(permission_list=["generate:btn:updateColumns"])
async def update_generate_info_columns(
request: Request,
params: UpdateGenerateInfoParams,
id: str = Path(description="生成表信息ID")
):
if table := await GenerateInfo.get_or_none(id=id, del_flag=1):
for column in params.columns:
if gen_column := await GenerateColumn.get_or_none(id=column.id, table_id=table.id, del_flag=1):
gen_column.column_comment = column.column_comment
gen_column.column_name = column.column_name
gen_column.column_type = column.column_type
gen_column.python_name = column.python_name
gen_column.python_type = column.python_type
gen_column.query_way = column.query_way
gen_column.show_type = column.show_type
gen_column.is_insert = column.is_insert
gen_column.is_edit = column.is_edit
gen_column.is_list = column.is_list
gen_column.is_query = column.is_query
gen_column.is_required = column.is_required
gen_column.is_hide = column.is_hide
await gen_column.save()
return Response.success(msg="更新成功!")
return Response.error(msg="该生成表信息不存在!")
@generateAPI.get("/list", response_class=JSONResponse, response_model=GetGenerateInfoListResponse,
summary="查询生成表信息列表")
@Log(title="查询生成表信息列表", business_type=BusinessType.SELECT)
@Auth(permission_list=["generate:btn:list"])
async def get_generate_info_list(
request: Request,
page: int = Query(default=1, description="页码"),
pageSize: int = Query(default=10, description="每页数量"),
table_comment: Optional[str] = Query(default=None, description="表注释"),
permission_id: Optional[str] = Query(default=None, description="权限ID"),
):
filterArgs = {
f'{k}__contains': v for k, v in {
'table_comment': table_comment,
'permission_id': permission_id
}.items() if v
}
result = await GenerateInfo.filter(**filterArgs, del_flag=1).order_by("-create_time").offset(
(page - 1) * pageSize).limit(
pageSize).values(
id="id",
author="author",
class_name="class_name",
permission_id="permission_id",
prefix="prefix",
remark="remark",
table_comment="table_comment",
table_name="table_name",
description="description",
create_time="create_time",
update_time="update_time",
)
for item in result:
columns = await GenerateColumn.filter(table_id=item["id"], del_flag=1).order_by("index").values(
id="id",
table_id="table__id",
table_name="table__table_name",
index="index",
column_comment="column_comment",
column_name="column_name",
column_type="column_type",
python_name="python_name",
python_type="python_type",
query_way="query_way",
show_type="show_type",
is_insert="is_insert",
is_edit="is_edit",
is_list="is_list",
is_query="is_query",
is_required="is_required",
is_hide="is_hide",
create_time="create_time",
update_time="update_time",
)
item["columns"] = columns
return Response.success(data={
"result": result,
"total": await GenerateInfo.filter(**filterArgs, del_flag=1).count(),
"page": page,
"pageSize": pageSize,
})
@generateAPI.get("/code/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="生成代码")
@Log(title="生成代码", business_type=BusinessType.GENCODE)
@Auth(permission_list=["generate:btn:code"])
async def generate_code(
request: Request,
id: str = Path(description="生成表信息ID")
):
if table := await GenerateInfo.get_or_none(id=id, del_flag=1):
info = {
"author": table.author,
"class_name": table.class_name,
"permission_id": table.permission_id,
"prefix": table.prefix,
"remark": table.remark,
"table_comment": table.table_comment,
"table_name": table.table_name,
"description": table.description,
}
columns = await GenerateColumn.filter(table_id=table.id, del_flag=1).order_by("index").values(
id="id",
table_id="table__id",
table_name="table__table_name",
index="index",
column_comment="column_comment",
column_name="column_name",
column_type="column_type",
python_name="python_name",
python_type="python_type",
query_way="query_way",
show_type="show_type",
is_insert="is_insert",
is_edit="is_edit",
is_list="is_list",
is_query="is_query",
is_required="is_required",
is_hide="is_hide",
create_time="create_time",
update_time="update_time",
)
def generate_uuid():
return str(uuid.uuid4())
def current_time():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
comment_env = Environment(loader=FileSystemLoader('templates'))
comment_env.globals["uuid4"] = generate_uuid # 注册到 Jinja2
comment_env.globals["now"] = current_time
data = Generate.prepare_template_data(info, columns)
model_template = comment_env.get_template('python/model.py.jinja')
model_code = model_template.render(data)
schemas_template = comment_env.get_template('python/schemas.py.jinja')
schemas_code = schemas_template.render(data)
api_py_template = comment_env.get_template('python/api.py.jinja')
api_py_code = api_py_template.render(data)
type_template = comment_env.get_template('typescript/type.d.ts.jinja')
type_code = type_template.render(data)
api_ts_template = comment_env.get_template('typescript/api.ts.jinja')
api_ts_code = api_ts_template.render(data)
hook_template = comment_env.get_template('typescript/hook.tsx.jinja')
hook_code = hook_template.render(data)
vue_env = Environment(
loader=FileSystemLoader('templates/vue'),
block_start_string="[%", # 替换 Jinja2 代码块的起始标记
block_end_string="%]", # 替换 Jinja2 代码块的结束标记
variable_start_string="[[",
variable_end_string="]]",
comment_start_string="[#",
comment_end_string="#]"
)
index_template = vue_env.get_template('index.vue.jinja')
index_code = index_template.render(data)
form_template = vue_env.get_template('form.vue.jinja')
form_code = form_template.render(data)
sql_template = comment_env.get_template('sql.jinja')
sql_code = sql_template.render(data)
data = {
"backend": [
{
"label": f"/models/{info['class_name'].lower()}.py",
"value": model_code,
"type": "python",
"path": f"/python/models/{info['class_name'].lower()}.py"
},
{
"label": f"/schemas/{info['class_name'].lower()}.py",
"value": schemas_code,
"type": "python",
"path": f"/python/schemas/{info['class_name'].lower()}.py"
},
{
"label": f"/api/{info['class_name'].lower()}.py",
"value": api_py_code,
"type": "python",
"path": f"/python/api/{info['class_name'].lower()}.py"
},
],
"frontend": [
{
"label": f"/types/{info['class_name'].lower()}.d.ts",
"value": type_code,
"type": "typescript",
"path": f"/types/{info['class_name'].lower()}.d.ts"
},
{
"label": f"/api/{info['class_name'].lower()}.ts",
"value": api_ts_code,
"type": "typescript",
"path": f"/api/{info['class_name'].lower()}.ts"
},
{
"label": f"/{info['class_name'].lower()}/utils/hook.tsx",
"value": hook_code,
"type": "typescript",
"path": f"/{info['class_name'].lower()}/utils/hook.tsx"
},
{
"label": f"/{info['class_name'].lower()}/index.vue",
"value": index_code,
"type": "vue",
"path": f"/{info['class_name'].lower()}/index.vue"
},
{
"label": f"/{info['class_name'].lower()}/components/form.vue",
"value": form_code,
"type": "vue",
"path": f"/{info['class_name'].lower()}/components/form.vue"
},
],
"sql": [
{
"label": f"{info['table_name']}.sql",
"value": sql_code,
"type": "sql",
"path": f"{info['table_name']}.sql"
}
]
}
return Response.success(data=data)
return Response.error(msg="生成失败!")