Compare commits

...

10 Commits

Author SHA1 Message Date
1c316594f5 feat(generate): 优化代码生成逻辑
- 新增公共字段配置,统一处理常见字段的生成规则
- 修复模板中的一些错误,如变量名、函数名等
- 优化代码结构,提高可读性和可维护性
2025-07-01 23:40:43 +08:00
bd13f1cfdc feat: 添加代码生成功能 2025-03-04 00:54:33 +08:00
c790233aee fix: 修复数据权限异常 2025-03-02 01:13:07 +08:00
141883424b fix: 修正角色权限分配异常问题 2025-02-28 16:47:48 +08:00
b59dba18f0 feat: 添加系统级管理专属页面权限 2025-02-26 22:56:15 +08:00
1dd9f7db43 feat: 添加注销功能 2025-02-25 18:20:41 +08:00
f0c678b8d0 fix: 修复普通用户也能获取下属部门的数据 2025-02-24 18:19:04 +08:00
df5f2977d4 fix: 修复注册异常,删除用户异常,调整用户信息存储时间 2025-02-23 03:58:38 +08:00
5be35d8231 feat: 缓存列表,监控,性能监控添加按钮级权限控制 2025-02-22 23:46:00 +08:00
75a163910d feat: 操作日志添加按钮级权限控制 2025-02-22 23:27:53 +08:00
37 changed files with 3539 additions and 89 deletions

154
.env Normal file
View File

@@ -0,0 +1,154 @@
# -------- 应用配置 --------
# 应用运行环境
APP_ENV = 'dev'
# 应用名称
APP_NAME = 'FastAPI-RBAC-System'
# 应用代理路径
APP_ROOT_PATH = ''
# 应用主机
APP_HOST = '0.0.0.0'
# 应用端口
APP_PORT = 9090
# 应用版本
APP_VERSION= '1.0.0'
# 应用是否开启热重载
APP_RELOAD = true
# 应用是否开启IP归属区域查询
APP_IP_LOCATION_QUERY = true
# 应用是否允许账号同时登录
APP_SAME_TIME_LOGIN = true
# -------JWT配置------------
# JWT 签名密钥
JWT_SECRET_KEY=b01c66dc2c58dc6a0aabfe2144256be36226de378bf87f72c0c795dda67f4d55
# JWT 签名算法
JWT_ALGORITHM=HS256
# JWT 盐值
JWT_SALT=jwt_salt
# JWT 令牌有效期(分钟)
JWT_EXPIRE_MINUTES=1440
# JWT 令牌在 Redis 中的缓存有效期(分钟)
JWT_REDIS_EXPIRE_MINUTES=30
# -------- 数据库配置 --------
# 数据库类型,默认为'mysql'
DB_TYPE = 'mysql'
# 数据库主机
DB_HOST = '127.0.0.1'
# 数据库端口
DB_PORT = 3306
# 数据库用户名
DB_USERNAME = 'root'
# 数据库密码
DB_PASSWORD = ''
# 数据库名称
DB_DATABASE = 'fastapi'
# 是否开启日志
DB_ECHO = true
# 数据库日志级别,默认为 10DEBUG
DB_LOG_LEVEL = 10
# 允许溢出连接池大小的最大连接数
DB_MAX_OVERFLOW = 10
# 连接池大小0表示连接数无限制
DB_POOL_SIZE = 50
# 连接回收时间(单位:秒)
DB_POOL_RECYCLE = 3600
# 连接池中没有线程可用时,最多等待的时间(单位:秒)
DB_POOL_TIMEOUT = 30
# -------- Redis配置 --------
# Redis主机
REDIS_HOST = '127.0.0.1'
# Redis端口
REDIS_PORT = 6379
# Redis用户名
REDIS_USERNAME = ''
# Redis密码
REDIS_PASSWORD = ''
# Redis数据库
REDIS_DATABASE = 2
# ======================
# 上传配置
# ======================
# 文件上传的 URL 前缀,默认为 '/profile'。
# 例如:`/profile/example.jpg`。
UPLOAD_PREFIX=/profile
# 文件上传的存储路径,默认为 'data/upload_path'。
# 上传的文件将存储在此目录中,如果目录不存在,会自动创建。
UPLOAD_PATH=data/upload_path
# 上传机器的标识,默认为 'A'。
# 用于区分不同的上传机器或节点,在多机部署时可以使用此字段。
UPLOAD_MACHINE=A
# 默认允许上传的文件扩展名列表,使用逗号分隔。
# 包含常见的图片、文档、压缩文件、视频和 PDF 格式。
# 可以根据需求扩展或修改此列表。
DEFAULT_ALLOWED_EXTENSION=bmp,gif,jpg,jpeg,png,doc,docx,xls,xlsx,ppt,pptx,html,htm,txt,rar,zip,gz,bz2,mp4,avi,rmvb,pdf
# 文件下载的存储路径,默认为 'data/download_path'。
# 下载的文件将存储在此目录中,如果目录不存在,会自动创建。
DOWNLOAD_PATH=data/download_path
# ======================
# 邮件配置
# ======================
# 邮件发送者的用户名,默认为空。
EMAIL_USERNAME=
# 邮件发送者的密码,默认为空。
EMAIL_PASSWORD=
# 邮件服务器地址,默认为 "smtp.qq.com"。
# 如果是其他邮件服务商,请修改为对应的 SMTP 服务器地址。
EMIAL_HOST=smtp.qq.com
# 邮件服务器端口,默认为 465。
# 如果是其他邮件服务商,请根据其要求修改端口号。
EMAIL_PORT=587
# ======================
# 地图配置
# ======================
# 百度地图的 AK 密钥,用于获取地图数据。
# 请在百度地图官网申请获取 AK 密钥。
AK=
# 百度地图sk密钥用于获取地图数据。
# 请在百度地图官网申请获取 SK 密钥。
SK=

View File

@@ -9,6 +9,7 @@ from functools import wraps
from fastapi import Request from fastapi import Request
from config.constant import RedisKeyConfig
from controller.login import LoginController from controller.login import LoginController
from exceptions.exception import PermissionException from exceptions.exception import PermissionException
@@ -40,3 +41,29 @@ class Auth:
raise PermissionException(message="该用户无此接口权限!") raise PermissionException(message="该用户无此接口权限!")
return wrapper return wrapper
async def hasAuth(request: Request, permission: str) -> bool:
"""
判断是有拥有某项权限
"""
token = request.headers.get('Authorization') # 直接使用 request 对象
current_user = await LoginController.get_current_user(request, token)
permissions = current_user.get('permissions')
if permission in permissions:
return True
else:
return False
async def hasAdmin(request: Request, department_id: str) -> bool:
"""
判断是否有管理员权限
"""
permissions = []
if ids := await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:permission_departments'):
permissions = eval(ids)
if department_id in permissions:
return True
else:
return False

View File

@@ -149,7 +149,7 @@ class Log:
# else: # else:
session_id = request.app.state.session_id session_id = request.app.state.session_id
status = 1 if request.app.state.login_status else 0 status = 1 if request.app.state.login_status else 0
current_user = await User.get_or_none(username=payload.get("username")) current_user = await User.get_or_none(username=payload.get("username"),del_flag=1)
await LoginLog.create( await LoginLog.create(
user_id=current_user.id, user_id=current_user.id,
login_ip=host, login_ip=host,

View File

@@ -9,6 +9,7 @@
from fastapi import APIRouter, Depends, Path, Request from fastapi import APIRouter, Depends, Path, Request
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from annotation.auth import Auth
from annotation.log import Log from annotation.log import Log
from config.constant import BusinessType, RedisKeyConfig from config.constant import BusinessType, RedisKeyConfig
from controller.login import LoginController from controller.login import LoginController
@@ -26,6 +27,7 @@ cacheAPI = APIRouter(
@cacheAPI.get("/monitor", response_class=JSONResponse, response_model=GetCacheMonitorResponse, @cacheAPI.get("/monitor", response_class=JSONResponse, response_model=GetCacheMonitorResponse,
summary="获取缓存监控信息") summary="获取缓存监控信息")
@Log(title="获取缓存监控信息", business_type=BusinessType.SELECT) @Log(title="获取缓存监控信息", business_type=BusinessType.SELECT)
@Auth(permission_list=['cache:btn:infoList'])
async def get_cache_info(request: Request): async def get_cache_info(request: Request):
info = await request.app.state.redis.info() info = await request.app.state.redis.info()
db_size = await request.app.state.redis.dbsize() db_size = await request.app.state.redis.dbsize()
@@ -45,6 +47,7 @@ async def get_cache_info(request: Request):
@cacheAPI.get("/names", response_class=JSONResponse, response_model=GetCacheInfoResponse, @cacheAPI.get("/names", response_class=JSONResponse, response_model=GetCacheInfoResponse,
summary="获取缓存名称列表") summary="获取缓存名称列表")
@Log(title="获取缓存名称列表", business_type=BusinessType.SELECT) @Log(title="获取缓存名称列表", business_type=BusinessType.SELECT)
@Auth(permission_list=['cache:btn:list'])
async def get_cache_names(request: Request): async def get_cache_names(request: Request):
name_list = [] name_list = []
for key_config in RedisKeyConfig: for key_config in RedisKeyConfig:
@@ -62,6 +65,7 @@ async def get_cache_names(request: Request):
@cacheAPI.get("/keys/{cacheName}", response_class=JSONResponse, response_model=GetCacheKeysListResponse, @cacheAPI.get("/keys/{cacheName}", response_class=JSONResponse, response_model=GetCacheKeysListResponse,
summary="获取缓存键名列表") summary="获取缓存键名列表")
@Log(title="获取缓存键名列表", business_type=BusinessType.SELECT) @Log(title="获取缓存键名列表", business_type=BusinessType.SELECT)
@Auth(permission_list=['cache:btn:list'])
async def get_cache_keys(request: Request, cacheName: str = Path(description="缓存名称")): async def get_cache_keys(request: Request, cacheName: str = Path(description="缓存名称")):
cache_keys = await request.app.state.redis.keys(f'{cacheName}*') cache_keys = await request.app.state.redis.keys(f'{cacheName}*')
cache_key_list = [key.split(':', 1)[1] for key in cache_keys if key.startswith(f'{cacheName}:')] cache_key_list = [key.split(':', 1)[1] for key in cache_keys if key.startswith(f'{cacheName}:')]
@@ -71,6 +75,7 @@ async def get_cache_keys(request: Request, cacheName: str = Path(description="
@cacheAPI.get("/info/{cacheName}/{cacheKey}", response_class=JSONResponse, response_model=GetCacheInfoResponse, @cacheAPI.get("/info/{cacheName}/{cacheKey}", response_class=JSONResponse, response_model=GetCacheInfoResponse,
summary="获取缓存信息") summary="获取缓存信息")
@Log(title="获取缓存信息", business_type=BusinessType.SELECT) @Log(title="获取缓存信息", business_type=BusinessType.SELECT)
@Auth(permission_list=['cache:btn:info'])
async def get_cache_info(request: Request, cacheName: str = Path(description="缓存名称"), async def get_cache_info(request: Request, cacheName: str = Path(description="缓存名称"),
cacheKey: str = Path(description="缓存键名")): cacheKey: str = Path(description="缓存键名")):
cache_value = await request.app.state.redis.get(f'{cacheName}:{cacheKey}') cache_value = await request.app.state.redis.get(f'{cacheName}:{cacheKey}')
@@ -88,6 +93,7 @@ async def get_cache_info(request: Request, cacheName: str = Path(description="
@cacheAPI.post("/cacheName/{name}", response_class=JSONResponse, response_model=BaseResponse, @cacheAPI.post("/cacheName/{name}", response_class=JSONResponse, response_model=BaseResponse,
summary="通过键名删除缓存") summary="通过键名删除缓存")
@Log(title="通过键名删除缓存", business_type=BusinessType.DELETE) @Log(title="通过键名删除缓存", business_type=BusinessType.DELETE)
@Auth(permission_list=['cache:btn:delete'])
async def delete_cache(request: Request, name: str = Path(description="缓存名称")): async def delete_cache(request: Request, name: str = Path(description="缓存名称")):
cache_keys = await request.app.state.redis.keys(f'{name}*') cache_keys = await request.app.state.redis.keys(f'{name}*')
if cache_keys: if cache_keys:
@@ -99,6 +105,7 @@ async def delete_cache(request: Request, name: str = Path(description="缓存名
summary="通过键值删除缓存") summary="通过键值删除缓存")
@cacheAPI.post("/cacheKey/{key}", response_class=JSONResponse, response_model=BaseResponse, summary="通过键值删除缓存") @cacheAPI.post("/cacheKey/{key}", response_class=JSONResponse, response_model=BaseResponse, summary="通过键值删除缓存")
@Log(title="通过键值删除缓存", business_type=BusinessType.DELETE) @Log(title="通过键值删除缓存", business_type=BusinessType.DELETE)
@Auth(permission_list=['cache:btn:delete'])
async def delete_cache_key(request: Request, key: str = Path(description="缓存键名")): async def delete_cache_key(request: Request, key: str = Path(description="缓存键名")):
cache_keys = await request.app.state.redis.keys(f'*{key}') cache_keys = await request.app.state.redis.keys(f'*{key}')
if cache_keys: if cache_keys:
@@ -109,6 +116,7 @@ async def delete_cache_key(request: Request, key: str = Path(description="缓存
@cacheAPI.delete("/clearAll", response_class=JSONResponse, response_model=BaseResponse, summary="删除所有缓存") @cacheAPI.delete("/clearAll", response_class=JSONResponse, response_model=BaseResponse, summary="删除所有缓存")
@cacheAPI.post("/clearAll", response_class=JSONResponse, response_model=BaseResponse, summary="删除所有缓存") @cacheAPI.post("/clearAll", response_class=JSONResponse, response_model=BaseResponse, summary="删除所有缓存")
@Log(title="删除所有缓存", business_type=BusinessType.DELETE) @Log(title="删除所有缓存", business_type=BusinessType.DELETE)
@Auth(permission_list=['cache:btn:delete'])
async def delete_all_cache(request: Request): async def delete_all_cache(request: Request):
cache_keys = await request.app.state.redis.keys() cache_keys = await request.app.state.redis.keys()
if cache_keys: if cache_keys:

View File

@@ -14,7 +14,7 @@ from annotation.auth import Auth
from annotation.log import Log from annotation.log import Log
from config.constant import BusinessType, RedisKeyConfig from config.constant import BusinessType, RedisKeyConfig
from controller.login import LoginController from controller.login import LoginController
from models import Department from models import Department, Role
from schemas.common import BaseResponse, DeleteListParams from schemas.common import BaseResponse, DeleteListParams
from schemas.department import AddDepartmentParams, GetDepartmentInfoResponse, \ from schemas.department import AddDepartmentParams, GetDepartmentInfoResponse, \
GetDepartmentListResponse GetDepartmentListResponse
@@ -79,8 +79,9 @@ async def delete_department_list(request: Request, params: DeleteListParams,
if department := await Department.get_or_none(id=item, del_flag=1): if department := await Department.get_or_none(id=item, del_flag=1):
if item in sub_departments: if item in sub_departments:
await delete_department_recursive(department_id=department.id) await delete_department_recursive(department_id=department.id)
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:*'): userInfos = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_INFO.key}*')
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:*') if userInfos:
await request.app.state.redis.delete(*userInfos)
return Response.success(msg="删除成功!") return Response.success(msg="删除成功!")
@@ -116,6 +117,9 @@ async def update_department(request: Request, params: AddDepartmentParams, id: s
department.sort = params.sort department.sort = params.sort
department.status = params.status department.status = params.status
await department.save() await department.save()
userInfos = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_INFO.key}*')
if userInfos:
await request.app.state.redis.delete(*userInfos)
return Response.success(msg="修改成功!") return Response.success(msg="修改成功!")
else: else:
return Response.error(msg="修改失败,部门不存在!") return Response.error(msg="修改失败,部门不存在!")
@@ -201,3 +205,36 @@ async def get_department_list(
"page": page, "page": page,
"pageSize": pageSize "pageSize": pageSize
}) })
@departmentAPI.get("/roleList/{id}", response_model=GetDepartmentListResponse, response_class=JSONResponse,
summary="用户获取部门角色列表")
@Log(title="获取部门角色列表", business_type=BusinessType.SELECT)
@Auth(["department:btn:list"])
async def get_department_role_list(
request: Request,
id: str = Path(..., description="部门ID"),
current_user: dict = Depends(LoginController.get_current_user)
):
sub_departments = current_user.get("sub_departments")
if id not in sub_departments:
return Response.error(msg="查询失败,无权限!")
data = await Role.filter(department__id=id).values(
id="id",
department_id="department__id",
department_name="department__name",
department_phone="department__phone",
department_principal="department__principal",
department_email="department__email",
role_name="name",
role_code="code",
role_id="id",
create_time="create_time",
update_time="update_time"
)
return Response.success(data={
"result": data,
"total": len(data),
"page": 1,
"pageSize": 9999
})

508
api/generate.py Normal file
View File

@@ -0,0 +1,508 @@
# _*_ coding : UTF-8 _*_
# @Time : 2025/02/28 17:28
# @UpdateTime : 2025/02/28 17:28
# @Author : sonder
# @File : generate.py
# @Software : PyCharm
# @Comment : 本程序
import uuid
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Path, Query, Depends, Request
from fastapi.responses import JSONResponse
from jinja2 import Environment, FileSystemLoader
from tortoise import Tortoise
from annotation.auth import Auth
from annotation.log import Log
from config.constant import MYSQL_TO_PYTHON_TYPE, BusinessType
from controller.login import LoginController
from models.generate import GenerateInfo, GenerateColumn
from schemas.common import BaseResponse, DeleteListParams
from schemas.generate import GetTablesListResponse, AddGenerateInfoParams, UpdateGenerateInfoParams, \
GetGenerateInfoResponse, GetGenerateInfoListResponse
from utils.common import bytes2human
from utils.generate import Generate
from utils.response import Response
generateAPI = APIRouter(
prefix="/generate",
dependencies=[Depends(LoginController.get_current_user)]
)
@generateAPI.get("/tables", response_class=JSONResponse, response_model=GetTablesListResponse,
summary="获取数据库中的所有表信息")
@Log(title="获取数据库中的所有表信息", business_type=BusinessType.SELECT)
@Auth(permission_list=["generate:btn:tables"])
async def get_database_tables(request: Request):
"""
获取当前数据库中的所有表信息,包括:
- 表名
- 表注释
- 记录行数
- 数据大小
- 索引大小
- 创建时间
- 最后更新时间(如果支持)
"""
sql = """SELECT TABLE_NAME,TABLE_COMMENT,TABLE_ROWS,DATA_LENGTH,INDEX_LENGTH,CREATE_TIME,UPDATE_TIME FROM information_schema.tables WHERE table_schema = DATABASE();"""
result = await Tortoise.get_connection("default").execute_query_dict(sql)
# 将结果中的键转换为小写
formatted_result = [{k.lower(): v for k, v in row.items()} for row in result]
for row in formatted_result:
for key, value in row.items():
if key == "data_length" or key == "index_length":
row[key] = bytes2human(value)
return Response.success(data={
"page": 1,
"total": len(formatted_result),
"pageSize": 99999,
"result": formatted_result
})
@generateAPI.post("/add", response_class=JSONResponse, response_model=BaseResponse, summary="添加生成表信息")
@Log(title="添加生成表信息", business_type=BusinessType.INSERT)
@Auth(permission_list=["generate:btn:add"])
async def add_generate_info(request: Request, params: AddGenerateInfoParams):
if await GenerateInfo.get_or_none(table_name=params.table_name, del_flag=1):
return Response.error(msg="该表信息已存在!")
gen = await GenerateInfo.create(
author=params.author,
class_name=params.class_name,
table_comment=params.table_comment,
table_name=params.table_name,
prefix=params.prefix,
remark=params.remark,
permission_id=params.permission_id,
description=params.description
)
if gen:
sql = """SELECT * FROM information_schema.columns WHERE table_schema = DATABASE() AND table_name = %s;"""
result = await Tortoise.get_connection("default").execute_query_dict(sql, [params.table_name])
# 转换为小写键名
formatted_result = [{k.lower(): v for k, v in row.items()} for row in result]
formatted_result.sort(key=lambda x: x['ordinal_position'])
common_column = ["id", "del_flag", "create_by", "update_by", "create_time", "update_time"]
common_set = {
"id": {
"is_insert": False,
"is_edit": False,
"is_list": True,
"is_query": False,
"is_required": False,
"is_hide": True,
},
"del_flag": {
"is_insert": False,
"is_edit": False,
"is_list": False,
"is_query": False,
"is_required": False,
"is_hide": True,
},
"create_by": {
"is_insert": False,
"is_edit": False,
"is_list": False,
"is_query": False,
"is_required": False,
"is_hide": True,
},
"update_by": {
"is_insert": False,
"is_edit": False,
"is_list": False,
"is_query": False,
"is_required": False,
"is_hide": True,
},
"create_time": {
"is_insert": False,
"is_edit": False,
"is_list": True,
"is_query": False,
"is_required": False,
"is_hide": False,
},
"update_time": {
"is_insert": False,
"is_edit": False,
"is_list": True,
"is_query": False,
"is_required": False,
"is_hide": False,
}
}
for column in formatted_result:
if await GenerateColumn.get_or_none(table_id=gen.id, column_name=column["column_name"], del_flag=1):
continue
is_insert = True
is_edit = True
is_list = True
is_query = True
is_required = True
is_hide = False
if column["column_name"] in common_column:
is_insert = common_set[column["column_name"]]["is_insert"]
is_edit = common_set[column["column_name"]]["is_edit"]
is_list = common_set[column["column_name"]]["is_list"]
is_query = common_set[column["column_name"]]["is_query"]
is_required = common_set[column["column_name"]]["is_required"]
is_hide = common_set[column["column_name"]]["is_hide"]
await GenerateColumn.create(
table=gen,
index=column["ordinal_position"],
column_name=column["column_name"],
column_comment=column["column_comment"],
column_type=column["column_type"],
python_type=MYSQL_TO_PYTHON_TYPE.get(column["column_type"]) if MYSQL_TO_PYTHON_TYPE.get(
column["column_type"]) else MYSQL_TO_PYTHON_TYPE.get(column["data_type"], "str"),
python_name=column["column_name"].lower(),
is_insert=is_insert,
is_edit=is_edit,
is_list=is_list,
is_query=is_query,
is_required=is_required,
query_way="__icontains",
show_type="input",
is_hide=is_hide,
)
return Response.success(msg="添加成功!")
return Response.error(msg="添加失败!")
@generateAPI.delete("/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除生成表信息")
@generateAPI.post("/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除生成表信息")
@Log(title="删除生成表信息", business_type=BusinessType.DELETE)
@Auth(permission_list=["generate:btn:delete"])
async def delete_generate_info(request: Request, id: str = Path(description="生成表信息ID")):
if table := await GenerateInfo.get_or_none(id=id, del_flag=1):
table.del_flag = 0
await GenerateColumn.filter(table_id=table.id, del_flag=1).update(del_flag=0)
await table.save()
return Response.success(msg="删除成功!")
return Response.error(msg="该生成表信息不存在!")
@generateAPI.delete("/deleteList", response_class=JSONResponse, response_model=BaseResponse,
summary="批量删除生成表信息")
@generateAPI.post("/deleteList", response_class=JSONResponse, response_model=BaseResponse, summary="批量删除生成表信息")
@Log(title="批量删除生成表信息", business_type=BusinessType.DELETE)
@Auth(permission_list=["generate:btn:delete"])
async def delete_generate_info_list(request: Request, params: DeleteListParams):
for id in set(params.ids):
if table := await GenerateInfo.get_or_none(id=id, del_flag=1):
table.del_flag = 0
await GenerateColumn.filter(table_id=table.id, del_flag=1).update(del_flag=0)
await table.save()
return Response.success(msg="删除成功!")
@generateAPI.put("/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="更新生成表信息")
@generateAPI.post("/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="更新生成表信息")
@Log(title="更新生成表信息", business_type=BusinessType.UPDATE)
@Auth(permission_list=["generate:btn:update"])
async def update_generate_info(
request: Request,
params: AddGenerateInfoParams,
id: str = Path(description="生成表信息ID")
):
if table := await GenerateInfo.get_or_none(id=id, del_flag=1):
table.author = params.info.author
table.class_name = params.info.class_name
table.permission_id = params.info.permission_id
table.prefix = params.info.prefix
table.remark = params.info.remark
table.table_comment = params.info.table_comment
table.table_name = params.info.table_name
table.description = params.info.description
await table.save()
return Response.success(msg="更新成功!")
return Response.error(msg="该生成表信息不存在!")
@generateAPI.get("/info/{id}", response_class=JSONResponse, response_model=GetGenerateInfoResponse,
summary="获取生成表信息")
@Log(title="获取生成表信息", business_type=BusinessType.SELECT)
@Auth(permission_list=["generate:btn:info"])
async def get_generate_info(request: Request, id: str = Path(description="生成表信息ID")):
if table := await GenerateInfo.get_or_none(id=id, del_flag=1):
table = {
"id": table.id,
"author": table.author,
"class_name": table.class_name,
"permission_id": table.permission_id,
"prefix": table.prefix,
"remark": table.remark,
"table_comment": table.table_comment,
"table_name": table.table_name,
"description": table.description,
"create_time": table.create_time,
"update_time": table.update_time,
}
columns = await GenerateColumn.filter(table_id=table["id"], del_flag=1).order_by("index").values(
id="id",
table_id="table__id",
table_name="table__table_name",
index="index",
column_comment="column_comment",
column_name="column_name",
column_type="column_type",
python_name="python_name",
python_type="python_type",
query_way="query_way",
show_type="show_type",
is_insert="is_insert",
is_edit="is_edit",
is_list="is_list",
is_query="is_query",
is_required="is_required",
is_hide="is_hide",
create_time="create_time",
update_time="update_time",
)
table["columns"] = columns
return Response.success(data=table)
return Response.error(msg="该生成表信息不存在!")
@generateAPI.put("/updateColumns/{id}", response_class=JSONResponse, response_model=BaseResponse,
summary="更新生成表列信息")
@generateAPI.post("/updateColumns/{id}", response_class=JSONResponse, response_model=BaseResponse,
summary="更新生成表列信息")
@Log(title="更新生成表列信息", business_type=BusinessType.UPDATE)
@Auth(permission_list=["generate:btn:updateColumns"])
async def update_generate_info_columns(
request: Request,
params: UpdateGenerateInfoParams,
id: str = Path(description="生成表信息ID")
):
if table := await GenerateInfo.get_or_none(id=id, del_flag=1):
for column in params.columns:
if gen_column := await GenerateColumn.get_or_none(id=column.id, table_id=table.id, del_flag=1):
gen_column.column_comment = column.column_comment
gen_column.column_name = column.column_name
gen_column.column_type = column.column_type
gen_column.python_name = column.python_name
gen_column.python_type = column.python_type
gen_column.query_way = column.query_way
gen_column.show_type = column.show_type
gen_column.is_insert = column.is_insert
gen_column.is_edit = column.is_edit
gen_column.is_list = column.is_list
gen_column.is_query = column.is_query
gen_column.is_required = column.is_required
gen_column.is_hide = column.is_hide
await gen_column.save()
return Response.success(msg="更新成功!")
return Response.error(msg="该生成表信息不存在!")
@generateAPI.get("/list", response_class=JSONResponse, response_model=GetGenerateInfoListResponse,
summary="查询生成表信息列表")
@Log(title="查询生成表信息列表", business_type=BusinessType.SELECT)
@Auth(permission_list=["generate:btn:list"])
async def get_generate_info_list(
request: Request,
page: int = Query(default=1, description="页码"),
pageSize: int = Query(default=10, description="每页数量"),
table_comment: Optional[str] = Query(default=None, description="表注释"),
permission_id: Optional[str] = Query(default=None, description="权限ID"),
):
filterArgs = {
f'{k}__contains': v for k, v in {
'table_comment': table_comment,
'permission_id': permission_id
}.items() if v
}
result = await GenerateInfo.filter(**filterArgs, del_flag=1).order_by("-create_time").offset(
(page - 1) * pageSize).limit(
pageSize).values(
id="id",
author="author",
class_name="class_name",
permission_id="permission_id",
prefix="prefix",
remark="remark",
table_comment="table_comment",
table_name="table_name",
description="description",
create_time="create_time",
update_time="update_time",
)
for item in result:
columns = await GenerateColumn.filter(table_id=item["id"], del_flag=1).order_by("index").values(
id="id",
table_id="table__id",
table_name="table__table_name",
index="index",
column_comment="column_comment",
column_name="column_name",
column_type="column_type",
python_name="python_name",
python_type="python_type",
query_way="query_way",
show_type="show_type",
is_insert="is_insert",
is_edit="is_edit",
is_list="is_list",
is_query="is_query",
is_required="is_required",
is_hide="is_hide",
create_time="create_time",
update_time="update_time",
)
item["columns"] = columns
return Response.success(data={
"result": result,
"total": await GenerateInfo.filter(**filterArgs, del_flag=1).count(),
"page": page,
"pageSize": pageSize,
})
@generateAPI.get("/code/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="生成代码")
@Log(title="生成代码", business_type=BusinessType.GENCODE)
@Auth(permission_list=["generate:btn:code"])
async def generate_code(
request: Request,
id: str = Path(description="生成表信息ID")
):
if table := await GenerateInfo.get_or_none(id=id, del_flag=1):
info = {
"author": table.author,
"class_name": table.class_name,
"permission_id": table.permission_id,
"prefix": table.prefix,
"remark": table.remark,
"table_comment": table.table_comment,
"table_name": table.table_name,
"description": table.description,
}
columns = await GenerateColumn.filter(table_id=table.id, del_flag=1).order_by("index").values(
id="id",
table_id="table__id",
table_name="table__table_name",
index="index",
column_comment="column_comment",
column_name="column_name",
column_type="column_type",
python_name="python_name",
python_type="python_type",
query_way="query_way",
show_type="show_type",
is_insert="is_insert",
is_edit="is_edit",
is_list="is_list",
is_query="is_query",
is_required="is_required",
is_hide="is_hide",
create_time="create_time",
update_time="update_time",
)
def generate_uuid():
return str(uuid.uuid4())
def current_time():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
comment_env = Environment(loader=FileSystemLoader('templates'))
comment_env.globals["uuid4"] = generate_uuid # 注册到 Jinja2
comment_env.globals["now"] = current_time
data = Generate.prepare_template_data(info, columns)
model_template = comment_env.get_template('python/model.py.jinja')
model_code = model_template.render(data)
schemas_template = comment_env.get_template('python/schemas.py.jinja')
schemas_code = schemas_template.render(data)
api_py_template = comment_env.get_template('python/api.py.jinja')
api_py_code = api_py_template.render(data)
type_template = comment_env.get_template('typescript/type.d.ts.jinja')
type_code = type_template.render(data)
api_ts_template = comment_env.get_template('typescript/api.ts.jinja')
api_ts_code = api_ts_template.render(data)
hook_template = comment_env.get_template('typescript/hook.tsx.jinja')
hook_code = hook_template.render(data)
vue_env = Environment(
loader=FileSystemLoader('templates/vue'),
block_start_string="[%", # 替换 Jinja2 代码块的起始标记
block_end_string="%]", # 替换 Jinja2 代码块的结束标记
variable_start_string="[[",
variable_end_string="]]",
comment_start_string="[#",
comment_end_string="#]"
)
index_template = vue_env.get_template('index.vue.jinja')
index_code = index_template.render(data)
form_template = vue_env.get_template('form.vue.jinja')
form_code = form_template.render(data)
sql_template = comment_env.get_template('sql.jinja')
sql_code = sql_template.render(data)
data = {
"backend": [
{
"label": f"/models/{info['class_name'].lower()}.py",
"value": model_code,
"type": "python",
"path": f"/python/models/{info['class_name'].lower()}.py"
},
{
"label": f"/schemas/{info['class_name'].lower()}.py",
"value": schemas_code,
"type": "python",
"path": f"/python/schemas/{info['class_name'].lower()}.py"
},
{
"label": f"/api/{info['class_name'].lower()}.py",
"value": api_py_code,
"type": "python",
"path": f"/python/api/{info['class_name'].lower()}.py"
},
],
"frontend": [
{
"label": f"/types/{info['class_name'].lower()}.d.ts",
"value": type_code,
"type": "typescript",
"path": f"/types/{info['class_name'].lower()}.d.ts"
},
{
"label": f"/api/{info['class_name'].lower()}.ts",
"value": api_ts_code,
"type": "typescript",
"path": f"/api/{info['class_name'].lower()}.ts"
},
{
"label": f"/{info['class_name'].lower()}/utils/hook.tsx",
"value": hook_code,
"type": "typescript",
"path": f"/{info['class_name'].lower()}/utils/hook.tsx"
},
{
"label": f"/{info['class_name'].lower()}/index.vue",
"value": index_code,
"type": "vue",
"path": f"/{info['class_name'].lower()}/index.vue"
},
{
"label": f"/{info['class_name'].lower()}/components/form.vue",
"value": form_code,
"type": "vue",
"path": f"/{info['class_name'].lower()}/components/form.vue"
},
],
"sql": [
{
"label": f"{info['table_name']}.sql",
"value": sql_code,
"type": "sql",
"path": f"{info['table_name']}.sql"
}
]
}
return Response.success(data=data)
return Response.error(msg="生成失败!")

View File

@@ -274,11 +274,7 @@ async def get_i18n_list(request: Request,
@Auth(["i18n:btn:infoList"]) @Auth(["i18n:btn:infoList"])
async def get_i18n_info_list(request: Request, id: str = Path(description="国际化内容语言ID")): async def get_i18n_info_list(request: Request, id: str = Path(description="国际化内容语言ID")):
if locale := await Locale.get_or_none(id=id, del_flag=1): if locale := await Locale.get_or_none(id=id, del_flag=1):
result = await request.app.state.redis.get(f'{RedisKeyConfig.TRANSLATION_INFO.key}:{id}') data = await I18n.filter(locale_id=locale.id, del_flag=1).order_by("key").values(
if result:
result = eval(result)
return Response.success(data=result)
data = await I18n.filter(locale_id=locale.id, del_flag=1).values(
id="id", id="id",
key="key", key="key",
translation="translation", translation="translation",
@@ -292,13 +288,6 @@ async def get_i18n_info_list(request: Request, id: str = Path(description="国
result = {} result = {}
for i18n in data: for i18n in data:
result[f"{i18n['key']}"] = i18n["translation"] result[f"{i18n['key']}"] = i18n["translation"]
await request.app.state.redis.set(f'{RedisKeyConfig.TRANSLATION_INFO.key}:{id}',
str(jsonable_encoder({
"data": result,
"locale": locale.code,
"name": locale.name,
})),
ex=timedelta(minutes=60))
return Response.success(data={ return Response.success(data={
"data": result, "data": result,
"locale": locale.code, "locale": locale.code,

View File

@@ -5,13 +5,14 @@
# @File : log.py # @File : log.py
# @Software : PyCharm # @Software : PyCharm
# @Comment : 本程序 # @Comment : 本程序
from typing import Optional
from datetime import datetime from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, Path, Query, Request from fastapi import APIRouter, Depends, Path, Query, Request
from fastapi.encoders import jsonable_encoder from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from annotation.auth import Auth from annotation.auth import Auth, hasAuth
from annotation.log import Log from annotation.log import Log
from config.constant import BusinessType, RedisKeyConfig from config.constant import BusinessType, RedisKeyConfig
from controller.login import LoginController from controller.login import LoginController
@@ -40,14 +41,12 @@ async def get_login_log(request: Request,
current_user: dict = Depends(LoginController.get_current_user), current_user: dict = Depends(LoginController.get_current_user),
): ):
sub_departments = current_user.get("sub_departments") sub_departments = current_user.get("sub_departments")
user_id = current_user.get("id")
online_user_list = await LoginController.get_online_user(request, sub_departments) online_user_list = await LoginController.get_online_user(request, sub_departments)
online_user_list = list(
filter(lambda x: x["department_id"] in sub_departments, jsonable_encoder(online_user_list)))
filterArgs = { filterArgs = {
f'{k}__contains': v for k, v in { f'{k}__contains': v for k, v in {
'username': username, 'username': username,
'nickname': nickname, 'nickname': nickname,
'department_id': department_id,
}.items() if v }.items() if v
} }
if status is not None: if status is not None:
@@ -56,9 +55,21 @@ async def get_login_log(request: Request,
startTime = datetime.fromtimestamp(float(startTime) / 1000) startTime = datetime.fromtimestamp(float(startTime) / 1000)
endTime = datetime.fromtimestamp(float(endTime) / 1000) endTime = datetime.fromtimestamp(float(endTime) / 1000)
filterArgs['login_time__range'] = [startTime, endTime] filterArgs['login_time__range'] = [startTime, endTime]
if not department_id: if await hasAuth(request, "login:btn:admin"):
filterArgs['user__department__id__in'] = sub_departments online_user_list = list(
result = await LoginLog.filter(**filterArgs, del_flag=1).offset( filter(lambda x: x["department_id"] in sub_departments, jsonable_encoder(online_user_list)))
if not department_id:
filterArgs['user__department__id__in'] = sub_departments
else:
filterArgs['user__department__id'] = department_id
else:
online_user_list = list(
filter(lambda x: x["user_id"] == user_id, jsonable_encoder(online_user_list)))
if department_id:
filterArgs['user__department__id'] = department_id
else:
filterArgs["user__department__id"] = current_user.get("department_id")
result = await LoginLog.filter(**filterArgs, user__del_flag=1, del_flag=1).offset(
(page - 1) * pageSize).limit(pageSize).values( (page - 1) * pageSize).limit(pageSize).values(
id="id", id="id",
user_id="user__id", user_id="user__id",
@@ -82,7 +93,7 @@ async def get_login_log(request: Request,
if item["session_id"] == log["session_id"]: if item["session_id"] == log["session_id"]:
log["online"] = True log["online"] = True
return Response.success(data={ return Response.success(data={
"total": await LoginLog.filter(**filterArgs, del_flag=1, ).count(), "total": await LoginLog.filter(**filterArgs, del_flag=1, user__del_flag=1, ).count(),
"result": result, "result": result,
"page": page, "page": page,
}) })
@@ -154,13 +165,47 @@ async def delete_login_log(request: Request, params: DeleteListParams,
@logAPI.get("/operation", response_class=JSONResponse, response_model=GetOperationLogResponse, @logAPI.get("/operation", response_class=JSONResponse, response_model=GetOperationLogResponse,
summary="用户获取操作日志") summary="用户获取操作日志")
@Auth(permission_list=["operation:btn:list"])
async def get_operation_log(request: Request, async def get_operation_log(request: Request,
page: int = Query(default=1, description="页码"), page: int = Query(default=1, description="页码"),
name: Optional[str] = Query(default=None, description="操作名称"),
type: Optional[str] = Query(default=None, description="操作类型"),
pageSize: int = Query(default=10, description="每页数量"), pageSize: int = Query(default=10, description="每页数量"),
username: Optional[str] = Query(default=None, description="用户账号"),
nickname: Optional[str] = Query(default=None, description="用户昵称"),
department_id: Optional[str] = Query(default=None, description="部门ID"),
startTime: Optional[str] = Query(default=None, description="开始时间"),
endTime: Optional[str] = Query(default=None, description="结束时间"),
status: Optional[str] = Query(default=None, description="登录状态"),
current_user: dict = Depends(LoginController.get_current_user), current_user: dict = Depends(LoginController.get_current_user),
): ):
sub_departments = current_user.get("sub_departments")
user_id = current_user.get("id") user_id = current_user.get("id")
result = await OperationLog.filter(operator_id=user_id, del_flag=1).offset((page - 1) * pageSize).limit( filterArgs = {
f'{k}__contains': v for k, v in {
'operation_name': name,
'operation_type': type,
'operator__username': username,
'operator__nickname': nickname,
}.items() if v
}
if status is not None:
filterArgs['status'] = status
if startTime and endTime:
startTime = datetime.fromtimestamp(float(startTime) / 1000)
endTime = datetime.fromtimestamp(float(endTime) / 1000)
filterArgs['operation_time__range'] = [startTime, endTime]
if await hasAuth(request, "operation:btn:admin"):
if not department_id:
filterArgs['department__id__in'] = sub_departments
else:
filterArgs['department__id'] = department_id
else:
filterArgs['operator__id'] = user_id
if department_id:
filterArgs['department__id'] = department_id
result = await OperationLog.filter(**filterArgs, operator__del_flag=1, del_flag=1).offset(
(page - 1) * pageSize).limit(
pageSize).values( pageSize).values(
id="id", id="id",
operation_name="operation_name", operation_name="operation_name",
@@ -184,9 +229,10 @@ async def get_operation_log(request: Request,
cost_time="cost_time" cost_time="cost_time"
) )
return Response.success(data={ return Response.success(data={
"total": await OperationLog.filter(operator_id=user_id).count(), "total": await OperationLog.filter(**filterArgs, del_flag=1, operator__del_flag=1).count(),
"result": result, "result": result,
"page": page, "page": page,
"pageSize": pageSize
}) })
@@ -196,30 +242,28 @@ async def get_operation_log(request: Request,
summary="用户删除操作日志") summary="用户删除操作日志")
@Log(title="用户删除操作日志", business_type=BusinessType.DELETE) @Log(title="用户删除操作日志", business_type=BusinessType.DELETE)
@Auth(permission_list=["operation:btn:delete"]) @Auth(permission_list=["operation:btn:delete"])
async def delete_operation_log(id: str = Path(..., description="操作日志id"), async def delete_operation_log(request: Request, id: str = Path(..., description="操作日志id"),
current_user: dict = Depends(LoginController.get_current_user)): current_user: dict = Depends(LoginController.get_current_user)):
if log := await OperationLog.get_or_none(id=id): sub_departments = current_user.get("sub_departments")
if log.operator == current_user.get("id"): if log := await OperationLog.get_or_none(id=id, department__id__in=sub_departments, del_flag=1):
log.del_flag = 0 log.del_flag = 0
await log.save() await log.save()
return Response.success(msg="删除成功") return Response.success(msg="删除成功")
else:
return Response.failure(msg="无权限删除")
else: else:
return Response.failure(msg="删除失败,操作日志不存在!") return Response.failure(msg="删除失败,操作日志不存在!")
@logAPI.delete("/deleteList/operation", response_model=BaseResponse, response_class=JSONResponse, @logAPI.delete("/deleteList/operation", response_model=BaseResponse, response_class=JSONResponse,
summary="用户删除操作日志") summary="用户批量删除操作日志")
@logAPI.post("/deleteList/operation", response_model=BaseResponse, response_class=JSONResponse, @logAPI.post("/deleteList/operation", response_model=BaseResponse, response_class=JSONResponse,
summary="用户删除操作日志") summary="用户批量删除操作日志")
@Log(title="用户批量删除操作日志", business_type=BusinessType.DELETE) @Log(title="用户批量删除操作日志", business_type=BusinessType.DELETE)
@Auth(permission_list=["operation:btn:delete"]) @Auth(permission_list=["operation:btn:delete"])
async def delete_operation_log(params: DeleteListParams, async def delete_operation_log(request: Request, params: DeleteListParams,
current_user: dict = Depends(LoginController.get_current_user)): current_user: dict = Depends(LoginController.get_current_user)):
sub_departments = current_user.get("sub_departments")
for id in set(params.ids): for id in set(params.ids):
if log := await OperationLog.get_or_none(id=id): if log := await OperationLog.get_or_none(id=id, department__id__in=sub_departments, del_flag=1):
if log.operator == current_user.get("id"): log.del_flag = 0
log.del_flag = 0 await log.save()
await log.save()
return Response.success(msg="删除成功") return Response.success(msg="删除成功")

View File

@@ -18,7 +18,7 @@ from config.constant import BusinessType
from config.constant import RedisKeyConfig from config.constant import RedisKeyConfig
from controller.login import CustomOAuth2PasswordRequestForm, LoginController from controller.login import CustomOAuth2PasswordRequestForm, LoginController
from controller.query import QueryController from controller.query import QueryController
from models import Department, User from models import Department, User, Role, UserRole, LoginLog, OperationLog
from schemas.common import BaseResponse from schemas.common import BaseResponse
from schemas.login import LoginParams, GetUserInfoResponse, LoginResponse, GetCaptchaResponse, GetEmailCodeParams, \ from schemas.login import LoginParams, GetUserInfoResponse, LoginResponse, GetCaptchaResponse, GetEmailCodeParams, \
ResetPasswordParams ResetPasswordParams
@@ -94,7 +94,7 @@ async def login(
async def register(request: Request, params: RegisterUserParams): async def register(request: Request, params: RegisterUserParams):
register_enabled = ( register_enabled = (
True True
if await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:register_enabled') if await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:account_register_enabled')
== 'true' == 'true'
else False else False
) )
@@ -106,7 +106,14 @@ async def register(request: Request, params: RegisterUserParams):
if await QueryController.register_user_before(username=params.username, phone=params.phone, email=params.email): if await QueryController.register_user_before(username=params.username, phone=params.phone, email=params.email):
return Response.error(msg="注册失败,用户已存在!") return Response.error(msg="注册失败,用户已存在!")
params.password = await Password.get_password_hash(input_password=params.password) params.password = await Password.get_password_hash(input_password=params.password)
# 默认分配注册用户
userRole = await Role.get_or_none(department__name="注册用户", code="user", del_flag=1).values(
department_id="department__id", id="id")
if not params.department_id:
params.department_id = userRole.get("department_id", "")
department = await Department.get_or_none(id=params.department_id) department = await Department.get_or_none(id=params.department_id)
userRole = await Role.get_or_none(department__id=department.id, code="user", del_flag=1).values(id="id")
print(userRole)
user = await User.create( user = await User.create(
username=params.username, username=params.username,
password=params.password, password=params.password,
@@ -118,6 +125,11 @@ async def register(request: Request, params: RegisterUserParams):
status=params.status, status=params.status,
) )
if user: if user:
# 默认分配普通用户角色
await UserRole.create(
user_id=user.id,
role_id=userRole.get("id", ""),
)
userParams = LoginParams( userParams = LoginParams(
username=params.username, username=params.username,
password=params.password password=params.password
@@ -140,7 +152,7 @@ async def register(request: Request, params: RegisterUserParams):
result.pop("session_id") result.pop("session_id")
result.pop("userInfo") result.pop("userInfo")
return Response.success(msg="注册成功!", data=result) return Response.success(msg="注册成功!", data=result)
return Response.error(msg="注册成功!") return Response.success(msg="注册成功!")
else: else:
return Response.error(msg="注册失败!") return Response.error(msg="注册失败!")
@@ -221,7 +233,7 @@ async def info(
@loginAPI.get("/getRoutes", response_class=JSONResponse, summary="获取路由信息") @loginAPI.get("/getRoutes", response_class=JSONResponse, summary="获取路由信息")
# @Log(title="获取路由信息", business_type=BusinessType.SELECT) @Log(title="获取路由信息", business_type=BusinessType.SELECT)
async def get_routes(request: Request, current_user: dict = Depends(LoginController.get_current_user)): async def get_routes(request: Request, current_user: dict = Depends(LoginController.get_current_user)):
sub_departments = current_user.get("sub_departments") sub_departments = current_user.get("sub_departments")
routes = await request.app.state.redis.get(f'{RedisKeyConfig.USER_ROUTES.key}:{current_user["id"]}') routes = await request.app.state.redis.get(f'{RedisKeyConfig.USER_ROUTES.key}:{current_user["id"]}')
@@ -259,3 +271,31 @@ async def logout(request: Request, status: bool = Depends(LoginController.logout
if status: if status:
return Response.success(data="退出成功!") return Response.success(data="退出成功!")
return Response.error(data="登出失败!") return Response.error(data="登出失败!")
@loginAPI.post("/unsubscribe", response_class=JSONResponse, response_model=BaseResponse, summary="用户注销")
@Log(title="用户注销", business_type=BusinessType.FORCE)
async def unsubscribe(request: Request, current_user: dict = Depends(LoginController.get_current_user)):
id = current_user.get("id")
session_id = current_user.get("session_id")
if user := await User.get_or_none(id=id, del_flag=1):
user.del_flag = 0
await user.save()
redis_token = await request.app.state.redis.get(f'{RedisKeyConfig.ACCESS_TOKEN.key}:{session_id}')
if redis_token:
await request.app.state.redis.delete(f'{RedisKeyConfig.ACCESS_TOKEN.key}:{session_id}')
# 移除用户角色
await UserRole.filter(user_id=user.id, del_flag=1).update(del_flag=0)
# 移除用户登录日志
await LoginLog.filter(user_id=user.id, del_flag=1).update(del_flag=0)
# 移除用户操作日志
await OperationLog.filter(user_id=user.id, del_flag=1).update(del_flag=0)
# 更新用户信息缓存
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{id}')
# 更新用户路由缓存
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_ROUTES.key}:{id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_ROUTES.key}:{id}')
return Response.success(data="注销成功!")
else:
return Response.error(data="注销失败!")

View File

@@ -10,11 +10,11 @@ from typing import Optional
from fastapi import APIRouter, Depends, Path, Query, Request from fastapi import APIRouter, Depends, Path, Query, Request
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from annotation.auth import Auth from annotation.auth import Auth, hasAdmin
from annotation.log import Log from annotation.log import Log
from config.constant import BusinessType from config.constant import BusinessType, RedisKeyConfig
from controller.login import LoginController from controller.login import LoginController
from models import Permission from models import Permission,RolePermission
from schemas.common import BaseResponse from schemas.common import BaseResponse
from schemas.permission import AddPermissionParams, GetPermissionInfoResponse, GetPermissionListResponse from schemas.permission import AddPermissionParams, GetPermissionInfoResponse, GetPermissionListResponse
from utils.response import Response from utils.response import Response
@@ -51,8 +51,17 @@ async def add_permission(request: Request, params: AddPermissionParams):
leave_transition=params.leave_transition, leave_transition=params.leave_transition,
fixed_tag=params.fixed_tag, fixed_tag=params.fixed_tag,
hidden_tag=params.hidden_tag, hidden_tag=params.hidden_tag,
is_admin=params.is_admin
) )
if permission: if permission:
# 更新用户信息缓存
userInfos = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_INFO.key}*')
if userInfos:
await request.app.state.redis.delete(*userInfos)
# 更新用户路由缓存
userRoutes = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_ROUTES.key}*')
if userRoutes:
await request.app.state.redis.delete(*userRoutes)
return Response.success(msg="新增权限成功!") return Response.success(msg="新增权限成功!")
else: else:
return Response.error(msg="新增权限失败!") return Response.error(msg="新增权限失败!")
@@ -64,13 +73,34 @@ async def add_permission(request: Request, params: AddPermissionParams):
@Auth(permission_list=["permission:btn:delete"]) @Auth(permission_list=["permission:btn:delete"])
async def delete_permission(request: Request, id: str = Path(description="权限ID")): async def delete_permission(request: Request, id: str = Path(description="权限ID")):
if permission := await Permission.get_or_none(id=id, del_flag=1): if permission := await Permission.get_or_none(id=id, del_flag=1):
permission.del_flag = 0 # 移除角色权限
await permission.save() await delete_permission_recursive(permission_id=permission.id)
# 更新用户信息缓存
userInfos = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_INFO.key}*')
if userInfos:
await request.app.state.redis.delete(*userInfos)
# 更新用户路由缓存
userRoutes = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_ROUTES.key}*')
if userRoutes:
await request.app.state.redis.delete(*userRoutes)
return Response.success(msg="删除权限成功!") return Response.success(msg="删除权限成功!")
else: else:
return Response.error(msg="删除权限失败,权限不存在!") return Response.error(msg="删除权限失败,权限不存在!")
async def delete_permission_recursive(permission_id: str):
"""
递归删除权限及其附属权限
:param permission_id: 权限ID
:return:
"""
await Permission.filter(id=permission_id, del_flag=1).update(del_flag=0)
await RolePermission.filter(permission_id=permission_id, del_flag=1).update(del_flag=0)
sub_permissions = await Permission.filter(parent_id=permission_id, del_flag=1).all()
for sub_department in sub_permissions:
await delete_permission_recursive(sub_department.id)
return True
@permissionAPI.put("/update/{id}", response_model=BaseResponse, response_class=JSONResponse, summary="更新权限") @permissionAPI.put("/update/{id}", response_model=BaseResponse, response_class=JSONResponse, summary="更新权限")
@permissionAPI.post("/update/{id}", response_model=BaseResponse, response_class=JSONResponse, summary="更新权限") @permissionAPI.post("/update/{id}", response_model=BaseResponse, response_class=JSONResponse, summary="更新权限")
@Log(title="更新权限", business_type=BusinessType.UPDATE) @Log(title="更新权限", business_type=BusinessType.UPDATE)
@@ -98,7 +128,16 @@ async def update_permission(request: Request, params: AddPermissionParams, id: s
permission.leave_transition = params.leave_transition permission.leave_transition = params.leave_transition
permission.fixed_tag = params.fixed_tag permission.fixed_tag = params.fixed_tag
permission.hidden_tag = params.hidden_tag permission.hidden_tag = params.hidden_tag
permission.is_admin = params.is_admin
await permission.save() await permission.save()
# 更新用户信息缓存
userInfos = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_INFO.key}*')
if userInfos:
await request.app.state.redis.delete(*userInfos)
# 更新用户路由缓存
userRoutes = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_ROUTES.key}*')
if userRoutes:
await request.app.state.redis.delete(*userRoutes)
return Response.success(msg="更新权限成功!") return Response.success(msg="更新权限成功!")
else: else:
return Response.error(msg="更新权限失败,权限不存在!") return Response.error(msg="更新权限失败,权限不存在!")
@@ -137,6 +176,7 @@ async def get_permission(request: Request, id: str = Path(description="权限ID"
fixed_tag="fixed_tag", fixed_tag="fixed_tag",
show_link="show_link", show_link="show_link",
show_parent="show_parent", show_parent="show_parent",
is_admin="is_admin"
) )
return Response.success(msg="查询权限详情成功!", data=permission) return Response.success(msg="查询权限详情成功!", data=permission)
else: else:
@@ -171,7 +211,9 @@ async def get_permission_list(
enterTransition: Optional[str] = Query(default=None, description="进场动画"), enterTransition: Optional[str] = Query(default=None, description="进场动画"),
leaveTransition: Optional[str] = Query(default=None, description="离场动画"), leaveTransition: Optional[str] = Query(default=None, description="离场动画"),
fixedTag: Optional[bool] = Query(default=None, description="固定标签页"), fixedTag: Optional[bool] = Query(default=None, description="固定标签页"),
hiddenTag: Optional[bool] = Query(default=None, description="隐藏标签页") hiddenTag: Optional[bool] = Query(default=None, description="隐藏标签页"),
isAdmin: Optional[bool] = Query(default=None, description="是否为管理专属页面"),
current_user: dict = Depends(LoginController.get_current_user),
): ):
filterArgs = { filterArgs = {
f'{k}__contains': v for k, v in { f'{k}__contains': v for k, v in {
@@ -195,9 +237,13 @@ async def get_permission_list(
"enter_transition": enterTransition, "enter_transition": enterTransition,
"leave_transition": leaveTransition, "leave_transition": leaveTransition,
"fixed_tag": fixedTag, "fixed_tag": fixedTag,
"hidden_tag": hiddenTag "hidden_tag": hiddenTag,
"is_admin": isAdmin
}.items() if v }.items() if v
} }
department_id = current_user.get("department_id", "")
if not await hasAdmin(request, department_id):
filterArgs["is_admin"] = False
total = await Permission.filter(**filterArgs, del_flag=1).count() total = await Permission.filter(**filterArgs, del_flag=1).count()
result = await Permission.filter(**filterArgs, del_flag=1).offset((page - 1) * pageSize).limit(pageSize).order_by( result = await Permission.filter(**filterArgs, del_flag=1).offset((page - 1) * pageSize).limit(pageSize).order_by(
'rank').values( 'rank').values(
@@ -226,7 +272,8 @@ async def get_permission_list(
hidden_tag="hidden_tag", hidden_tag="hidden_tag",
fixed_tag="fixed_tag", fixed_tag="fixed_tag",
show_link="show_link", show_link="show_link",
show_parent="show_parent" show_parent="show_parent",
is_admin="is_admin"
) )
return Response.success(data={ return Response.success(data={
"total": total, "total": total,

View File

@@ -10,7 +10,7 @@ from typing import Optional
from fastapi import APIRouter, Depends, Path, Query, Request from fastapi import APIRouter, Depends, Path, Query, Request
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from annotation.auth import Auth from annotation.auth import Auth, hasAuth, hasAdmin
from annotation.log import Log from annotation.log import Log
from config.constant import BusinessType, RedisKeyConfig from config.constant import BusinessType, RedisKeyConfig
from controller.login import LoginController from controller.login import LoginController
@@ -31,7 +31,7 @@ roleAPI = APIRouter(
@Auth(permission_list=["role:btn:add"]) @Auth(permission_list=["role:btn:add"])
async def add_role(request: Request, params: AddRoleParams, async def add_role(request: Request, params: AddRoleParams,
current_user: dict = Depends(LoginController.get_current_user)): current_user: dict = Depends(LoginController.get_current_user)):
if await Role.get_or_none(code=params.role_code, department_id=params.department_id, del_flag=1): if await Role.get_or_none(code=params.code, department_id=params.department_id, del_flag=1):
return Response.error(msg="角色编码已存在!") return Response.error(msg="角色编码已存在!")
sub_departments = current_user.get("sub_departments") sub_departments = current_user.get("sub_departments")
if params.department_id not in sub_departments: if params.department_id not in sub_departments:
@@ -47,10 +47,10 @@ async def add_role(request: Request, params: AddRoleParams,
) )
else: else:
role = await Role.create( role = await Role.create(
code=params.role_code, code=params.code,
name=params.role_name, name=params.name,
status=params.status, status=params.status,
description=params.role_description, description=params.description,
department_id=None, department_id=None,
) )
if role: if role:
@@ -194,8 +194,16 @@ async def get_role_list(
"status": status "status": status
}.items() if v }.items() if v
} }
if not department_id: if await hasAuth(request, "role:btn:admin"):
filterArgs["department__id__in"] = current_user.get("sub_departments") if not department_id:
filterArgs["department__id__in"] = current_user.get("sub_departments")
else:
filterArgs["department__id"] = current_user.get("department_id")
else:
if department_id:
filterArgs["department__id"] = department_id
else:
filterArgs["department__id"] = current_user.get("department_id")
total = await Role.filter(**filterArgs, del_flag=1).count() total = await Role.filter(**filterArgs, del_flag=1).count()
data = await Role.filter(**filterArgs, del_flag=1).offset( data = await Role.filter(**filterArgs, del_flag=1).offset(
(page - 1) * pageSize).limit( (page - 1) * pageSize).limit(
@@ -231,6 +239,11 @@ async def add_role_permission(request: Request, params: AddRolePermissionParams,
id: str = Path(..., description="角色ID"), id: str = Path(..., description="角色ID"),
current_user: dict = Depends(LoginController.get_current_user)): current_user: dict = Depends(LoginController.get_current_user)):
sub_departments = current_user.get("sub_departments") sub_departments = current_user.get("sub_departments")
if await hasAdmin(request, current_user.get("department_id")):
department_permissions = await Permission.filter(del_flag=1).values("id")
else:
department_permissions = await Permission.filter(is_admin=False, del_flag=1).values("id")
department_permissions = filterKeyValues(department_permissions, "id")
if role := await Role.get_or_none(id=id, del_flag=1, department__id__in=sub_departments): if role := await Role.get_or_none(id=id, del_flag=1, department__id__in=sub_departments):
# 已有角色权限 # 已有角色权限
rolePermissions = await RolePermission.filter(role_id=id, del_flag=1).values("permission_id") rolePermissions = await RolePermission.filter(role_id=id, del_flag=1).values("permission_id")
@@ -239,6 +252,8 @@ async def add_role_permission(request: Request, params: AddRolePermissionParams,
add_list = set(params.permission_ids).difference(set(rolePermissions)) add_list = set(params.permission_ids).difference(set(rolePermissions))
# 循环添加角色权限 # 循环添加角色权限
for item in add_list: for item in add_list:
if item not in department_permissions:
continue
permission = await Permission.get_or_none(id=item, del_flag=1) permission = await Permission.get_or_none(id=item, del_flag=1)
if permission: if permission:
await RolePermission.create( await RolePermission.create(
@@ -290,10 +305,15 @@ async def update_role_permission(request: Request, params: AddRolePermissionPara
id: str = Path(..., description="角色ID"), id: str = Path(..., description="角色ID"),
current_user: dict = Depends(LoginController.get_current_user)): current_user: dict = Depends(LoginController.get_current_user)):
sub_departments = current_user.get("sub_departments") sub_departments = current_user.get("sub_departments")
if await hasAdmin(request, current_user.get("department_id")):
department_permissions = await Permission.filter(del_flag=1).values("id")
else:
department_permissions = await Permission.filter(is_admin=False, del_flag=1).values("id")
department_permissions = await filterKeyValues(department_permissions, key="id", convert_type=str)
if role := await Role.get_or_none(id=id, del_flag=1, department__id__in=sub_departments): if role := await Role.get_or_none(id=id, del_flag=1, department__id__in=sub_departments):
# 已有角色权限 # 已有角色权限
rolePermissions = await RolePermission.filter(role_id=role.id, del_flag=1).values("permission_id") rolePermissions = await RolePermission.filter(role_id=role.id, del_flag=1).values("permission_id")
rolePermissions = await filterKeyValues(rolePermissions, "permission_id") rolePermissions = await filterKeyValues(rolePermissions, key="permission_id", convert_type=str)
# 利用集合筛选出角色权限中不存在的权限 # 利用集合筛选出角色权限中不存在的权限
delete_list = set(rolePermissions).difference(set(params.permission_ids)) delete_list = set(rolePermissions).difference(set(params.permission_ids))
# 利用集合筛选出角色权限中新增的权限 # 利用集合筛选出角色权限中新增的权限
@@ -303,6 +323,8 @@ async def update_role_permission(request: Request, params: AddRolePermissionPara
await RolePermission.filter(role_id=id, permission_id=item, del_flag=1).update(del_flag=0) await RolePermission.filter(role_id=id, permission_id=item, del_flag=1).update(del_flag=0)
# 循环添加角色权限 # 循环添加角色权限
for item in add_list: for item in add_list:
if item not in department_permissions:
continue
await RolePermission.create(role_id=id, permission_id=item) await RolePermission.create(role_id=id, permission_id=item)
# 更新用户信息缓存 # 更新用户信息缓存
userInfos = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_INFO.key}*') userInfos = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_INFO.key}*')

View File

@@ -14,12 +14,14 @@ import psutil
from fastapi import APIRouter, Depends, Request from fastapi import APIRouter, Depends, Request
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from annotation.auth import Auth
from annotation.log import Log from annotation.log import Log
from config.constant import BusinessType from config.constant import BusinessType
from controller.login import LoginController from controller.login import LoginController
from schemas.server import GetServerInfoResponse, CpuInfo, MemoryInfo, SystemInfo, PythonInfo, SystemFiles, \ from schemas.server import GetServerInfoResponse, CpuInfo, MemoryInfo, SystemInfo, PythonInfo, SystemFiles, \
GetSystemInfoResult GetSystemInfoResult
from utils.common import bytes2human from utils.common import bytes2human
from utils.log import logger
from utils.response import Response from utils.response import Response
serverAPI = APIRouter( serverAPI = APIRouter(
@@ -30,6 +32,7 @@ serverAPI = APIRouter(
@serverAPI.get("", response_class=JSONResponse, response_model=GetServerInfoResponse, summary="获取服务器信息") @serverAPI.get("", response_class=JSONResponse, response_model=GetServerInfoResponse, summary="获取服务器信息")
@Log(title="获取服务器信息", business_type=BusinessType.SELECT) @Log(title="获取服务器信息", business_type=BusinessType.SELECT)
@Auth(permission_list=["server:btn:info"])
async def get_server_info(request: Request): async def get_server_info(request: Request):
# CPU信息 # CPU信息
# 获取CPU总核心数 # 获取CPU总核心数
@@ -96,17 +99,21 @@ async def get_server_info(request: Request):
io = psutil.disk_partitions() io = psutil.disk_partitions()
sys_files = [] sys_files = []
for i in io: for i in io:
o = psutil.disk_usage(i.device) try:
disk_data = SystemFiles( o = psutil.disk_usage(i.device)
dirName=i.device, disk_data = SystemFiles(
sysTypeName=i.fstype, dirName=i.device,
typeName='本地固定磁盘(' + i.mountpoint.replace('\\', '') + '', sysTypeName=i.fstype,
total=bytes2human(o.total), typeName='本地固定磁盘(' + i.mountpoint.replace('\\', '') + '',
used=bytes2human(o.used), total=bytes2human(o.total),
free=bytes2human(o.free), used=bytes2human(o.used),
usage=f'{psutil.disk_usage(i.device).percent}%', free=bytes2human(o.free),
) usage=f'{psutil.disk_usage(i.device).percent}%',
sys_files.append(disk_data) )
sys_files.append(disk_data)
except Exception as e:
logger.error(f"获取磁盘信息失败:{e}")
continue
result = GetSystemInfoResult(cpu=cpu, memory=mem, system=sys, python=py, systemFiles=sys_files) result = GetSystemInfoResult(cpu=cpu, memory=mem, system=sys, python=py, systemFiles=sys_files)
return Response.success(data=result) return Response.success(data=result)

View File

@@ -9,7 +9,7 @@ import os
from datetime import datetime from datetime import datetime
from typing import Optional from typing import Optional
from fastapi import APIRouter, Depends, Path, Query, UploadFile, File, Request from fastapi import APIRouter, Depends, Path, Query, UploadFile, File, Request, Form
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from annotation.auth import Auth from annotation.auth import Auth
@@ -20,14 +20,14 @@ from controller.login import LoginController
from controller.query import QueryController from controller.query import QueryController
from exceptions.exception import ModelValidatorException from exceptions.exception import ModelValidatorException
from models import File as FileModel from models import File as FileModel
from models import Role, Department from models import Role, Department, OperationLog, LoginLog
from models.user import User, UserRole from models.user import User, UserRole
from schemas.common import BaseResponse, DeleteListParams from schemas.common import BaseResponse, DeleteListParams
from schemas.department import GetDepartmentListResponse from schemas.department import GetDepartmentListResponse
from schemas.file import UploadFileResponse from schemas.file import UploadFileResponse
from schemas.user import AddUserParams, GetUserListResponse, GetUserInfoResponse, UpdateUserParams, \ from schemas.user import AddUserParams, GetUserListResponse, GetUserInfoResponse, UpdateUserParams, \
AddUserRoleParams, GetUserRoleInfoResponse, UpdateUserRoleParams, GetUserPermissionListResponse, \ AddUserRoleParams, GetUserRoleInfoResponse, UpdateUserRoleParams, GetUserPermissionListResponse, \
ResetPasswordParams ResetPasswordParams, UpdateBaseUserInfoParams
from utils.common import filterKeyValues from utils.common import filterKeyValues
from utils.password import Password from utils.password import Password
from utils.response import Response from utils.response import Response
@@ -76,6 +76,18 @@ async def delete_user(
if user := await User.get_or_none(id=id, department__id__in=sub_departments, del_flag=1): if user := await User.get_or_none(id=id, department__id__in=sub_departments, del_flag=1):
user.del_flag = 0 user.del_flag = 0
await user.save() await user.save()
# 移除用户角色
await UserRole.filter(user_id=user.id, del_flag=1).update(del_flag=0)
# 移除用户登录日志
await LoginLog.filter(user_id=user.id, del_flag=1).update(del_flag=0)
# 移除用户操作日志
await OperationLog.filter(user_id=user.id, del_flag=1).update(del_flag=0)
# 更新用户信息缓存
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{id}')
# 更新用户路由缓存
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_ROUTES.key}:{id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_ROUTES.key}:{id}')
return Response.success(msg="删除成功!") return Response.success(msg="删除成功!")
else: else:
return Response.error(msg="删除失败,用户不存在!") return Response.error(msg="删除失败,用户不存在!")
@@ -422,3 +434,82 @@ async def reset_user_password(request: Request, params: ResetPasswordParams, id:
await user.save() await user.save()
return Response.success(msg="重置密码成功!") return Response.success(msg="重置密码成功!")
return Response.failure(msg="用户不存在!") return Response.failure(msg="用户不存在!")
@userAPI.put("/updateBaseUserInfo", response_model=BaseResponse, response_class=JSONResponse,
summary="更新基础个人信息")
@userAPI.post("/updateBaseUserInfo", response_model=BaseResponse, response_class=JSONResponse,
summary="更新基础个人信息")
@Log(title="更新基础个人信息", business_type=BusinessType.UPDATE)
async def update_base_userinfo(params: UpdateBaseUserInfoParams, request: Request,
current_user: dict = Depends(LoginController.get_current_user)):
user = await User.get_or_none(id=current_user.get("id"), del_flag=1)
if user:
user.nickname = params.name
user.gender = params.gender
await user.save()
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{user.id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{user.id}')
return Response.success(msg="更新成功!")
return Response.error(msg="更新失败!")
@userAPI.put("/updatePassword", response_class=JSONResponse, response_model=BaseResponse, summary="用户更新密码")
@userAPI.post("/updatePassword", response_class=JSONResponse, response_model=BaseResponse, summary="用户更新密码")
@Log(title="用户更新密码", business_type=BusinessType.UPDATE)
async def update_user_password(request: Request, oldPassword: str = Form(description="用户旧密码"),
newPassword: str = Form(description="用户新密码"),
current_user: dict = Depends(LoginController.get_current_user)):
if user := await User.get_or_none(id=current_user.get("id"), del_flag=1):
password = await Password.get_password_hash(oldPassword)
if user.password != password:
return Response.error(msg="旧密码错误!")
newPassword = await Password.get_password_hash(newPassword)
user.password = newPassword
await user.save()
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{user.id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{user.id}')
return Response.success(msg="更新成功!")
return Response.error(msg="更新失败!")
@userAPI.put("/updatePhone", response_class=JSONResponse, response_model=BaseResponse, summary="用户更新手机号")
@userAPI.post("/updatePhone", response_class=JSONResponse, response_model=BaseResponse, summary="用户更新手机号")
@Log(title="用户更新手机号", business_type=BusinessType.UPDATE)
async def update_user_phone(request: Request, password: str = Form(description="用户密码"),
phone: str = Form(description="用户手机号"),
current_user: dict = Depends(LoginController.get_current_user)):
if user := await User.get_or_none(id=current_user.get("id"), del_flag=1):
password = await Password.get_password_hash(password)
if user.password != password:
return Response.error("更改失败,请正确输入旧密码")
phoneStatus = await User.filter(phone=phone, del_flag=1).count()
if phoneStatus:
return Response.error(f"更改失败,手机号:{phone}已绑定其他账号!")
user.phone = phone
await user.save()
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{user.id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{user.id}')
return Response.success(msg="更新成功!")
return Response.error(msg="更新失败!")
@userAPI.put("/updateEmail", response_class=JSONResponse, response_model=BaseResponse, summary="用户更新邮箱")
@userAPI.post("/updateEmail", response_class=JSONResponse, response_model=BaseResponse, summary="用户更新邮箱")
@Log(title="用户更新邮箱", business_type=BusinessType.UPDATE)
async def update_user_email(request: Request, password: str = Form(description="用户密码"),
email: str = Form(description="用户邮箱"),
current_user: dict = Depends(LoginController.get_current_user)):
if user := await User.get_or_none(id=current_user.get("id"), del_flag=1):
password = await Password.get_password_hash(password)
if user.password != password:
return Response.error("更改失败,请正确输入旧密码")
emailStatus = await User.filter(email=email, del_flag=1).count()
if emailStatus:
return Response.error(f"更改失败,邮箱:{email}已绑定其他账号!")
user.email = email
await user.save()
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{user.id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{user.id}')
return Response.success(msg="更新成功!")
return Response.error(msg="更新失败!")

2
app.py
View File

@@ -23,6 +23,7 @@ from api.permission import permissionAPI
from api.role import roleAPI from api.role import roleAPI
from api.server import serverAPI from api.server import serverAPI
from api.user import userAPI from api.user import userAPI
from api.generate import generateAPI
from config.database import init_db, close_db from config.database import init_db, close_db
from config.env import AppConfig from config.env import AppConfig
from config.get_redis import Redis from config.get_redis import Redis
@@ -87,6 +88,7 @@ api_list = [
{'api': serverAPI, 'tags': ['服务器管理']}, {'api': serverAPI, 'tags': ['服务器管理']},
{'api': i18nAPI, 'tags': ['国际化管理']}, {'api': i18nAPI, 'tags': ['国际化管理']},
{'api': configApi, 'tags': ['配置管理']}, {'api': configApi, 'tags': ['配置管理']},
{'api': generateAPI, 'tags': ['代码生成管理']},
] ]
for api in api_list: for api in api_list:

View File

@@ -265,3 +265,72 @@ class RedisKeyConfig(Enum):
"""国际化类型,存储国际化类型及其配置信息。""" """国际化类型,存储国际化类型及其配置信息。"""
SYSTEM_CONFIG = {'key': 'system_config', 'remark': '系统配置信息'} SYSTEM_CONFIG = {'key': 'system_config', 'remark': '系统配置信息'}
"""系统配置信息,存储系统的配置信息。""" """系统配置信息,存储系统的配置信息。"""
# MYSQL类型和 Python类型的映射关系
MYSQL_TO_PYTHON_TYPE = {
"int": "int",
"bigint": "int",
"smallint": "int",
"tinyint": "int",
"tinyint(1)": "bool", # MySQL 的 tinyint(1) 通常用作布尔值
"float": "float",
"double": "float",
"decimal": "float",
"char": "str",
"varchar": "str",
"text": "str",
"longtext": "str",
"date": "date",
"datetime": "datetime",
"timestamp": "datetime",
"time": "time",
"json": "dict",
"binary": "bytes",
"varbinary": "bytes",
"blob": "bytes",
"tinyblob": "bytes",
"mediumblob": "bytes",
"longblob": "bytes"
}
MYSQL_TO_TORTOISE_TYPE = {
# 数值类型
"tinyint": "BooleanField", # 通常用于布尔值
"smallint": "IntField",
"mediumint": "IntField",
"int": "IntField",
"integer": "IntField",
"bigint": "BigIntField",
"decimal": "DecimalField",
"numeric": "DecimalField",
"float": "FloatField",
"double": "FloatField",
"real": "FloatField",
# 字符串类型
"char": "CharField",
"varchar": "CharField",
"tinytext": "TextField",
"text": "TextField",
"mediumtext": "TextField",
"longtext": "TextField",
# 日期和时间
"date": "DateField",
"datetime": "DatetimeField",
"timestamp": "DatetimeField",
"time": "TimeField",
"year": "IntField",
# 二进制数据
"binary": "BinaryField",
"varbinary": "BinaryField",
"blob": "BinaryField",
"tinyblob": "BinaryField",
"mediumblob": "BinaryField",
"longblob": "BinaryField",
# JSON
"json": "JSONField",
}

View File

@@ -5,9 +5,13 @@
# @File : database.py # @File : database.py
# @Software : PyCharm # @Software : PyCharm
# @Comment : 本程序 # @Comment : 本程序
import asyncio
import logging import logging
import subprocess
import sys import sys
from datetime import datetime
from logging.handlers import RotatingFileHandler from logging.handlers import RotatingFileHandler
from pathlib import Path
from tortoise import Tortoise from tortoise import Tortoise
@@ -136,3 +140,41 @@ async def configure_tortoise_logging(enable_logging: bool = True, log_level: int
else: else:
# 如果禁用日志,设置日志级别为 WARNING 以抑制大部分输出 # 如果禁用日志,设置日志级别为 WARNING 以抑制大部分输出
tortoise_logger.setLevel(logging.WARNING) tortoise_logger.setLevel(logging.WARNING)
async def backup_database():
"""
备份数据库
"""
logger.info("开始备份数据库")
# 配置数据库连接信息
backup_dir = Path().cwd() / "sql" # 备份文件存储的目录
# 如果 migrations 目录不存在,则创建
backup_dir.mkdir(parents=True, exist_ok=True)
# 生成备份文件名,格式为 dbYYYYMMDDHHMMSS.sql
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
backup_filename = f"db{timestamp}.sql"
backup_filepath = backup_dir / backup_filename # 使用 Path 对象组合路径
# 构造 mysqldump 命令
command = [
"mysqldump",
"-u", DataBaseConfig.db_username,
f"-p{DataBaseConfig.db_password}", # 直接传递密码
DataBaseConfig.db_database,
"--result-file=" + str(backup_filepath) # 指定备份文件路径
]
# 使用 asyncio.to_thread 来在线程中执行阻塞操作
await asyncio.to_thread(run_mysqldump, command)
def run_mysqldump(command):
"""在阻塞线程中执行 mysqldump 命令"""
try:
subprocess.run(command, check=True)
logger.info(f"数据库备份已完成,文件保存为 {command[-1]}")
except subprocess.CalledProcessError as e:
logger.error(f"备份失败,错误: {e}")

View File

@@ -130,7 +130,7 @@ class LoginController:
userInfo = await QueryController.get_user_info(user_id=user_id) userInfo = await QueryController.get_user_info(user_id=user_id)
await request.app.state.redis.set(f'{RedisKeyConfig.USER_INFO.key}:{user_id}', await request.app.state.redis.set(f'{RedisKeyConfig.USER_INFO.key}:{user_id}',
str(jsonable_encoder(userInfo)), str(jsonable_encoder(userInfo)),
ex=timedelta(minutes=5)) ex=timedelta(minutes=2))
if not userInfo: if not userInfo:
logger.warning('用户token不合法') logger.warning('用户token不合法')
raise AuthException(data='', message='用户token不合法') raise AuthException(data='', message='用户token不合法')

View File

@@ -132,7 +132,8 @@ class QueryController:
keepAlive="permission__keep_alive", keepAlive="permission__keep_alive",
hiddenTag="permission__hidden_tag", hiddenTag="permission__hidden_tag",
showLink="permission__show_link", showLink="permission__show_link",
showParent="permission__show_parent" showParent="permission__show_parent",
isAdmin="permission__is_admin",
) )
permissions.extend(permission) permissions.extend(permission)
return permissions return permissions

View File

@@ -9,6 +9,7 @@
from models.config import Config from models.config import Config
from models.department import Department from models.department import Department
from models.file import File from models.file import File
from models.generate import GenerateInfo, GenerateColumn
from models.i18n import I18n, Locale from models.i18n import I18n, Locale
from models.log import LoginLog, OperationLog from models.log import LoginLog, OperationLog
from models.permission import Permission from models.permission import Permission
@@ -27,5 +28,7 @@ __all__ = [
'UserRole', 'UserRole',
'I18n', 'I18n',
'Locale', 'Locale',
'Config' 'Config',
'GenerateInfo',
'GenerateColumn'
] ]

54
models/generate.py Normal file
View File

@@ -0,0 +1,54 @@
# _*_ coding : UTF-8 _*_
# @Time : 2025/02/21 03:37
# @UpdateTime : 2025/02/21 03:37
# @Author : sonder
# @File : generate.py
# @Software : PyCharm
# @Comment : 本程序
from tortoise import fields
from models.common import BaseModel
class GenerateInfo(BaseModel):
"""
代码生成表模型
"""
table_name = fields.CharField(max_length=255, default="", description="表名称", source_field="table_name")
table_comment = fields.CharField(max_length=255, default="", description="表注释", source_field="table_comment")
class_name = fields.CharField(max_length=255, default="", description="类名", source_field="class_name")
author = fields.CharField(max_length=255, default="", description="作者", source_field="author")
remark = fields.TextField(default="", description="备注", null=True, source_field="remark")
permission_id = fields.CharField(max_length=255, default="", description="权限ID", source_field="permission_id")
prefix = fields.CharField(max_length=255, default="", description="api前缀", source_field="prefix")
description = fields.TextField(default="", description="描述", null=True, source_field="description")
class Meta:
table = "generate_info"
table_description = "代码生成表"
class GenerateColumn(BaseModel):
"""
代码生成列模型
"""
table = fields.ForeignKeyField("models.GenerateInfo", related_name="columns", description="",
source_field="table_id")
index = fields.IntField(default=0, description="索引", source_field="index")
column_name = fields.CharField(max_length=255, default="", description="字段名称", source_field="column_name")
column_comment = fields.CharField(max_length=255, default="", description="字段注释", source_field="column_comment")
column_type = fields.CharField(max_length=255, default="", description="字段类型", source_field="column_type")
python_type = fields.CharField(max_length=255, default="", description="python类型", source_field="python_type")
python_name = fields.CharField(max_length=255, default="", description="python名称", source_field="python_name")
is_insert = fields.BooleanField(default=True, description="是否插入", source_field="is_insert")
is_edit = fields.BooleanField(default=True, description="是否编辑", source_field="is_edit")
is_list = fields.BooleanField(default=True, description="是否列表", source_field="is_list")
is_query = fields.BooleanField(default=True, description="是否查询", source_field="is_query")
is_required = fields.BooleanField(default=False, description="是否必填", source_field="is_required")
is_hide = fields.BooleanField(default=False, description="是否隐藏", source_field="is_hide")
query_way = fields.CharField(max_length=255, default="", description="查询方式", source_field="query_way")
show_type = fields.CharField(max_length=255, default="", description="显示类型", source_field="show_type")
class Meta:
table = "generate_column"
table_description = "代码生成列"

View File

@@ -280,6 +280,18 @@ class Permission(BaseModel):
- 映射到数据库字段 show_parent。 - 映射到数据库字段 show_parent。
""" """
is_admin = fields.BooleanField(
default=False,
description="是否为管理专属页面",
source_field="is_admin" # 映射到数据库字段 is_admin
)
"""
是否为管理专属页面。
- 是否为管理专属页面,仅管理员可见。
- 默认为 False。
- 映射到数据库字段 is_admin。
"""
class Meta: class Meta:
table = "permission" # 数据库表名 table = "permission" # 数据库表名
table_description = "权限表" # 表描述 table_description = "权限表" # 表描述

View File

@@ -17,13 +17,11 @@ class User(BaseModel):
username = fields.CharField( username = fields.CharField(
max_length=255, max_length=255,
unique=True,
description="用户名", description="用户名",
source_field="username" # 映射到数据库字段 username source_field="username" # 映射到数据库字段 username
) )
""" """
用户名。 用户名。
- 必须唯一。
- 最大长度为 255 个字符。 - 最大长度为 255 个字符。
- 映射到数据库字段 username。 - 映射到数据库字段 username。
""" """

136
schemas/generate.py Normal file
View File

@@ -0,0 +1,136 @@
# _*_ coding : UTF-8 _*_
# @Time : 2025/02/28 18:00
# @UpdateTime : 2025/02/28 18:00
# @Author : sonder
# @File : generate.py
# @Software : PyCharm
# @Comment : 本程序
from typing import List
from pydantic import BaseModel, Field, ConfigDict
from pydantic.alias_generators import to_camel, to_snake
from schemas.common import BaseResponse, ListQueryResult
class TableInfo(BaseModel):
"""
数据表信息模型
"""
model_config = ConfigDict(alias_generator=to_snake)
id: str = Field(default="", description="主键")
table_name: str = Field(default="", description="表名称")
table_comment: str = Field(default="", description="表注释")
table_rows: int = Field(default=0, description="表行数")
data_length: str = Field(default="", description="表大小")
index_length: str = Field(default="", description="索引大小")
create_time: str = Field(default="", description="创建时间")
update_time: str = Field(default="", description="更新时间")
class GenerateTableInfo(BaseModel):
"""
生成表信息模型
"""
model_config = ConfigDict(alias_generator=to_snake)
id: str = Field(default="", description="主键")
table_name: str = Field(default="", description="表名称")
table_comment: str = Field(default="", description="表注释")
author: str = Field(default="", description="作者")
prefix: str = Field(default="", description="api前缀")
class_name: str = Field(default="", description="类名")
remark: str = Field(default="", description="备注")
description: str = Field(default="", description="描述")
permission_id: str = Field(default="", description="权限ID")
create_time: str = Field(default="", description="创建时间")
update_time: str = Field(default="", description="更新时间")
class GetTableListResult(ListQueryResult):
"""
获取数据表结果
"""
result: List[TableInfo] = Field(default=None, description="响应数据")
class GetTablesListResponse(BaseResponse):
"""
获取数据库表结果
"""
data: TableInfo = Field(default=None, description="响应数据")
class AddGenerateInfoParams(BaseModel):
"""
添加生成信息参数
"""
table_name: str = Field(default="", description="表名称")
table_comment: str = Field(default="", description="表注释")
author: str = Field(default="", description="作者")
prefix: str = Field(default="", description="api前缀")
class_name: str = Field(default="", description="类名")
remark: str = Field(default="", description="备注")
permission_id: str = Field(default="", description="权限ID")
description: str = Field(default="", description="备注")
class GenerateColumnInfo(BaseModel):
"""
生成列信息
"""
model_config = ConfigDict(alias_generator=to_snake)
id: str = Field(default="", description="主键")
table_id: str = Field(default="", description="表ID")
table_name: str = Field(default="", description="表名称")
table_comment: str = Field(default="", description="表注释")
column_name: str = Field(default="", description="字段名称")
column_comment: str = Field(default="", description="字段注释")
column_type: str = Field(default="", description="字段类型")
python_type: str = Field(default="", description="python类型")
python_name: str = Field(default="", description="python名称")
is_insert: bool = Field(default=True, description="是否插入")
is_edit: bool = Field(default=True, description="是否编辑")
is_list: bool = Field(default=True, description="是否列表")
is_query: bool = Field(default=True, description="是否查询")
query_way: str = Field(default="", description="查询方式")
show_type: str = Field(default="", description="显示类型")
is_required: bool = Field(default=False, description="是否必填")
is_hide: bool = Field(default=False, description="是否隐藏")
index: int = Field(default=0, description="索引")
create_time: str = Field(default="", description="创建时间")
update_time: str = Field(default="", description="更新时间")
class UpdateGenerateInfoParams(BaseModel):
"""
更新生成信息参数
"""
columns: List[GenerateColumnInfo] = Field(default=None, description="生成列信息")
class GetGenerateInfoResult(GenerateTableInfo):
"""
获取生成信息结果
"""
columns: List[GenerateColumnInfo] = Field(default=None, description="生成列信息")
class GetGenerateInfoListResult(ListQueryResult):
"""
获取生成信息结果
"""
result: List[GetGenerateInfoResult] = Field(default=[], description="响应数据")
class GetGenerateInfoResponse(BaseResponse):
"""
获取生成信息结果
"""
data: GetGenerateInfoResult = Field(default=None, description="响应数据")
class GetGenerateInfoListResponse(BaseResponse):
"""
获取生成信息结果
"""
data: GetGenerateInfoListResult = Field(default=[], description="响应数据")

View File

@@ -43,6 +43,7 @@ class PermissionInfo(BaseModel):
fixed_tag: bool = Field(default=False, description="固定标签页") fixed_tag: bool = Field(default=False, description="固定标签页")
show_link: bool = Field(default=True, description="显示菜单") show_link: bool = Field(default=True, description="显示菜单")
show_parent: bool = Field(default=True, description="显示父级菜单") show_parent: bool = Field(default=True, description="显示父级菜单")
is_admin: bool = Field(default=False, description="是否为管理专属页面")
class Config: class Config:
json_schema_extra = { json_schema_extra = {
@@ -72,7 +73,8 @@ class PermissionInfo(BaseModel):
"hidden_tag": False, "hidden_tag": False,
"fixed_tag": False, "fixed_tag": False,
"show_link": True, "show_link": True,
"show_parent": True "show_parent": True,
"is_admin": False
} }
} }
@@ -109,6 +111,7 @@ class AddPermissionParams(BaseModel):
show_parent: bool = Field(default=True, description="显示父级菜单") show_parent: bool = Field(default=True, description="显示父级菜单")
parent_id: str = Field(default="", max_length=36, description="父级菜单ID") parent_id: str = Field(default="", max_length=36, description="父级菜单ID")
menu_type: int = Field(default=0, description="菜单类型") menu_type: int = Field(default=0, description="菜单类型")
is_admin: bool = Field(default=False, description="是否为管理专属页面")
class Config: class Config:
json_schema_extra = { json_schema_extra = {
@@ -133,7 +136,8 @@ class AddPermissionParams(BaseModel):
"show_link": True, "show_link": True,
"show_parent": True, "show_parent": True,
"parent_id": "", "parent_id": "",
"menu_type": 0 "menu_type": 0,
"is_admin": False
} }
} }

View File

@@ -6,6 +6,7 @@
# @Software : PyCharm # @Software : PyCharm
# @Comment : 本程序 # @Comment : 本程序
from datetime import datetime from datetime import datetime
from enum import IntEnum
from typing import Optional, List from typing import Optional, List
from uuid import UUID from uuid import UUID
@@ -15,6 +16,11 @@ from pydantic_validation_decorator import Xss, NotBlank, Size, Network
from schemas.common import BaseResponse, ListQueryResult from schemas.common import BaseResponse, ListQueryResult
class Gender(IntEnum):
MAN = 0
WOMAN = 1
class UserBase(BaseModel): class UserBase(BaseModel):
""" """
用户表基础模型。 用户表基础模型。
@@ -387,3 +393,19 @@ class GetUserStatisticsResponse(BaseResponse):
获取用户统计信息响应模型。 获取用户统计信息响应模型。
""" """
data: GetUserStatisticsResult = Field(default=None, description="响应数据") data: GetUserStatisticsResult = Field(default=None, description="响应数据")
class UpdateBaseUserInfoParams(BaseModel):
"""修改基础信息参数"""
name: str
"""姓名"""
gender: Gender
"""性别"""
class Config:
json_schema_extra = {
"example": {
"name": "张三",
"gender": 1,
}
}

1075
sql/fastapi.sql Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,135 @@
# _*_ coding : UTF-8 _*_
# @Time : {{ current_time }}
# @UpdateTime : {{ current_time }}
# @Author : {{ author }}
# @File : {{ table_name }}.py
# @Comment : 本程序用于生成{{ table_comment }}增删改查接口
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, Path, Request, Query
from fastapi.responses import JSONResponse
from annotation.auth import Auth
from annotation.log import Log
from config.constant import BusinessType
from controller.login import LoginController
from models import {{ class_name }}
from schemas.common import BaseResponse, DeleteListParams
from schemas.{{ name }} import Add{{ class_name }}Params, Update{{ class_name }}Params, Get{{ class_name }}InfoResponse, Get{{ class_name }}ListResponse
from utils.response import Response
{{ table_name }}API= APIRouter(
prefix="{{ prefix }}",
dependencies=[Depends(LoginController.get_current_user)],
)
@{{ table_name }}API.post("/add", response_class=JSONResponse, response_model=BaseResponse, summary="新增{{ description }}")
@Log(title="新增{{ description }}", business_type=BusinessType.INSERT)
@Auth(permission_list=["{{ name }}:btn:add"])
async def add_{{ name }}(request: Request, params: Add{{ class_name }}Params):
if await {{ class_name }}.get_or_none(
{% for column in columns if column.is_insert %}
{{ column.python_name }} = params.{{ column.python_name }},
{% endfor %}
del_flag=1
):
return Response.error(msg="{{ description }}已存在!")
{{ name }} = await {{ class_name }}.create(
{% for column in columns if column.is_insert %}
{{ column.python_name }} = params.{{ column.python_name }},
{% endfor %}
)
if {{ name }}:
return Response.success(msg="新增成功!")
else:
return Response.error(msg="新增失败")
@{{ table_name }}API.delete("/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除{{ description }}")
@{{ table_name }}API.post("/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除{{ description }}")
@Log(title="删除{{ description }}", business_type=BusinessType.DELETE)
@Auth(permission_list=["{{ name }}:btn:delete"])
async def delete_{{ name }}(request: Request, id: str = Path(description="{{ description }}ID")):
if {{ name }} := await {{ class_name }}.get_or_none(id=id, del_flag=1):
{{ name }}.del_flag = 0
await {{ name }}.save()
return Response.success(msg="删除成功")
else:
return Response.error(msg="{{ description }}不存在!")
@{{ table_name }}API.delete("/deleteList", response_class=JSONResponse, response_model=BaseResponse, summary="批量删除{{ description }}")
@{{ table_name }}API.post("/deleteList", response_class=JSONResponse, response_model=BaseResponse, summary="批量删除{{ description }}")
@Log(title="批量删除{{ description }}", business_type=BusinessType.DELETE)
@Auth(permission_list=["{{ name }}:btn:delete"])
async def delete_{{ name }}_list(request: Request, params: DeleteListParams):
for id in set(params.ids):
if {{ name }} := await {{ class_name }}.get_or_none(id=id, del_flag=1):
{{ name }}.del_flag = 0
await {{ name }}.save()
return Response.success(msg="删除成功")
@{{ table_name }}API.put("/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="修改{{ description }}")
@{{ table_name }}API.post("/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="修改{{ description }}")
@Log(title="修改{{ description }}", business_type=BusinessType.UPDATE)
@Auth(permission_list=["{{ name }}:btn:update"])
async def update_{{ name }}(request: Request, params: Update{{ class_name }}Params, id: str = Path(description="{{ description }}ID")):
if {{ name }} := await {{ class_name }}.get_or_none(id=id, del_flag=1):
{% for column in columns if column.is_edit %}
{{ name }}.{{ column.python_name }} = params.{{ column.python_name }}
{% endfor %}
await {{ name }}.save()
return Response.success(msg="修改成功")
else:
return Response.error(msg="{{ description }}不存在")
@{{ table_name }}API.get("/info/{id}", response_class=JSONResponse, response_model=Get{{ class_name }}InfoResponse, summary="获取{{ description }}信息")
@Log(title="获取{{ description }}信息", business_type=BusinessType.SELECT)
@Auth(permission_list=["{{ name }}:btn:info"])
async def get_{{ name }}_info(request: Request, id: str = Path(description="{{ description }}ID")):
if {{ name }} := await {{ class_name }}.get_or_none(id=id, del_flag=1):
data = {
{% for column in columns if column.is_list %}
"{{ column.python_name }}":{{ name }}.{{ column.python_name }},
{% endfor %}
}
return Response.success(data=data)
else:
return Response.error(msg="{{ description }}不存在")
@{{ table_name }}API.get("/list", response_class=JSONResponse, response_model=Get{{ class_name }}ListResponse, summary="获取{{ description }}列表")
@Log(title="获取{{ description }}列表", business_type=BusinessType.SELECT)
@Auth(permission_list=["{{ name }}:btn:list"])
async def get_{{ name }}_list(
request: Request,
page: int = Query(default=1, description="当前页码"),
pageSize: int = Query(default=10, description="每页数量"),
{% for column in columns if column.is_query %}
{{ column.python_name }}: Optional[str] = Query(default=None, description="{{ column.column_comment }}"),
{% endfor %}
):
filterArgs={
{% for column in columns if column.is_query %}
"{{ column.python_name }}{{ column.query_way }}": {{ column.python_name }},
{% endfor %}
}
filterArgs = {k: v for k, v in filterArgs.items() if v is not None}
total = await {{ class_name }}.filter(**filterArgs, del_flag=1).count()
data = await {{ class_name }}.filter(**filterArgs, del_flag=1).offset((page - 1) * pageSize).limit(pageSize).values(
{% for column in columns if column.is_list %}
{{ column.python_name }} = "{{ column.python_name }}",
{% endfor %}
)
return Response.success(data={
"total": total,
"result": data,
"page": page,
"pageSize": pageSize,
})

View File

@@ -0,0 +1,44 @@
# _*_ coding : UTF-8 _*_
# @Time : {{ current_time }}
# @UpdateTime : {{ current_time }}
# @Author : {{ author }}
# @File : {{ table_name }}.py
# @Comment : 本程序用于{{ table_comment }}模型
from tortoise import fields
from models.common import BaseModel
class {{ class_name }}(BaseModel):
"""
{{ table_comment }}模型
"""
{% for column in columns if column.is_common == false %}
{%- set params = [] %}
{%- if column.max_length is not none %}{% set params = params + ["max_length=" ~ column.max_length] %}{% endif %}
{%- if column.is_nullable %}{% set params = params + ["null=True"] %}{% endif %}
{%- if column.is_unique %}{% set params = params + ["unique=True"] %}{% endif %}
{%- if column.default is not none %}{% set params = params + ["default=" ~ column.default] %}{% endif %}
{%- if column.column_comment %}{% set params = params + ['description="' ~ column.column_comment ~ '"'] %}{% endif %}
{%- if column.column_name %}{% set params = params + ['source_field="' ~ column.column_name ~ '"'] %}{% endif %}
{{ column.python_name }} = fields.{{ column.field_type }}({{ params | join(", ") }})
"""
{{ column.column_comment }}。
{%- if column.max_length is not none %}
- 最大长度为 {{ column.max_length }} 个字符
{%- endif %}
- 映射到数据库字段 {{ column.column_name }}
{%- if column.is_nullable %}
- 可为空
{%- endif %}
{%- if column.default is not none %}
- 默认值:{{ column.default }}
{%- endif %}
"""
{% endfor %}
class Meta:
table = "{{ table_name }}"
table_description = "{{ table_comment }}"

View File

@@ -0,0 +1,51 @@
# _*_ coding : UTF-8 _*_
# @Time : {{ current_time }}
# @UpdateTime : {{ current_time }}
# @Author : {{ author }}
# @File : {{ table_name }}.py
# @Comment : 本程序用于生成{{ table_comment }}参数和响应模型
from datetime import datetime
from typing import Optional, List
from pydantic import BaseModel, Field, ConfigDict
from pydantic.alias_generators import to_snake
from schemas.common import BaseResponse, ListQueryResult
class {{ class_name }}Info(BaseModel):
"""{{ description }}信息"""
model_config = ConfigDict(alias_generator=to_snake, populate_by_name=True)
{% for column in columns if column.is_list %}
{{ column.python_name }}: {% if not column.is_required %}Optional[{% endif %}{{ column.python_type }}{% if not column.is_required %}]{% endif %} = Field(
{% if column.default is not none %}default={{ column.default }}, {% endif %}title="{{ column.column_comment }}"
)
{% endfor %}
class Add{{ class_name }}Params(BaseModel):
"""新增{{ description }}参数"""
model_config = ConfigDict(alias_generator=to_snake, populate_by_name=True)
{% for column in columns if column.is_insert %}
{{ column.python_name }}: {% if not column.is_required %}Optional[{% endif %}{{ column.python_type }}{% if not column.is_required %}]{% endif %} = Field(
{% if column.default is not none %}default={{ column.default }}, {% endif %}title="{{ column.column_comment }}"
)
{% endfor %}
class Update{{ class_name }}Params(BaseModel):
"""更新{{ description }}参数"""
model_config = ConfigDict(alias_generator=to_snake, populate_by_name=True)
{% for column in columns if column.is_edit %}
{{ column.python_name }}: {% if not column.is_required %}Optional[{% endif %}{{ column.python_type }}{% if not column.is_required %}]{% endif %} = Field(
{% if column.default is not none %}default={{ column.default }}, {% endif %}title="{{ column.column_comment }}"
)
{% endfor %}
class Get{{ class_name }}InfoResponse(BaseResponse):
"""获取{{ description }}信息响应"""
data: {{ class_name }}Info = Field(None, title="{{ table_comment }}信息")
class Get{{ class_name }}InfoListResult(ListQueryResult):
"""获取{{ description }}信息列表响应结果"""
result: List[{{ class_name }}Info] = Field(None, title="{{ table_comment }}信息列表")
class Get{{ class_name }}ListResponse(BaseResponse):
"""获取{{ description }}信息列表响应"""
data: Get{{ class_name }}InfoListResult = Field(None, title="{{ table_comment }}信息列表")

14
templates/sql.jinja Normal file
View File

@@ -0,0 +1,14 @@
-- {{ description }}权限
-- 按钮权限
-- 添加
INSERT INTO `permission` VALUES ('{{ uuid4() }}', 1, '', '{{ now() }}', '', '{{ now() }}', 3, '{{ permission_id }}', 'buttons:Add', '', '', '', 1, '', '', '', '', '', '', '{{ name }}:btn:add', '', 1, 0, 0, 0, 1, 0, 0);
-- 删除
INSERT INTO `permission` VALUES ('{{ uuid4() }}', 1, '', '{{ now() }}', '', '{{ now() }}', 3, '{{ permission_id }}', 'buttons:Delete', '', '', '', 2, '', '', '', '', '', '', '{{ name }}:btn:delete', '', 1, 0, 0, 0, 1, 0, 0);
-- 修改
INSERT INTO `permission` VALUES ('{{ uuid4() }}', 1, '', '{{ now() }}', '', '{{ now() }}', 3, '{{ permission_id }}', 'buttons:Update', '', '', '', 3, '', '', '', '', '', '', '{{ name }}:btn:update', '', 1, 0, 0, 0, 1, 0, 0);
-- 详情
INSERT INTO `permission` VALUES ('{{ uuid4() }}', 1, '', '{{ now() }}', '', '{{ now() }}', 3, '{{ permission_id }}', 'buttons:Details', '', '', '', 4, '', '', '', '', '', '', '{{ name }}:btn:info', '', 1, 0, 0, 0, 1, 0, 0);
-- 数据列表
INSERT INTO `permission` VALUES ('{{ uuid4() }}', 1, '', '{{ now() }}', '', '{{ now() }}', 3, '{{ permission_id }}', 'buttons:DataList', '', '', '', 5, '', '', '', '', '', '', '{{ name }}:btn:list', '', 1, 0, 0, 0, 1, 0, 0);

View File

@@ -0,0 +1,42 @@
import { http } from "@/utils/http";
import type {
{{ class_name }}Info,
Get{{ class_name }}ListParams,
Add{{ class_name }}Params,
Update{{ class_name }}Params,
} from "types/{{ name }}";
import { filterEmptyObject } from "./utils";
/** 添加{{ description }}数据 */
export const postAdd{{ class_name }}API = (data: Add{{ class_name }}Params) => {
return http.request<null>("post", "/api{{ prefix }}/add", { data });
};
/** 删除{{ description }}数据 */
export const delete{{ class_name }}API = (id: string) => {
return http.request<null>("delete", `/api{{ prefix }}/delete/${id}`);
};
/** 批量删除{{ description }}数据 */
export const delete{{ class_name }}ListAPI = (ids: string[]) => {
return http.request<null>("delete", "/api{{ prefix }}/delete", {
data: { ids },
});
};
/** 修改{{ description }}数据 */
export const putUpdate{{ class_name }}API = (data: Update{{ class_name }}Params, id: string) => {
return http.request<null>("put", `/api{{ prefix }}/update/${id}`, { data });
};
/** 获取{{ description }}信息 */
export const get{{ class_name }}InfoAPI = (id: string) => {
return http.request<{{ class_name }}Info>("get", `/api{{ prefix }}/info/${id}`);
};
/** 获取{{ description }}列表 */
export const get{{ class_name }}ListAPI = (params: Get{{ class_name }}ListParams) => {
return http.request<QueryListResult<{{ class_name }}Info>>("get", "/api{{ prefix }}/list", {
params: filterEmptyObject(params),
});
};

View File

@@ -0,0 +1,282 @@
import dayjs from "dayjs";
import editForm from "../components/form.vue";
import { message } from "@/utils/message";
import { type Ref, ref, reactive, onMounted, h, toRaw } from "vue";
import type { {{ class_name }}Info } from "types/{{ name }}";
import type { PaginationProps } from "@pureadmin/table";
import { addDialog } from "@/components/ReDialog";
import {
delete{{ class_name }}API,
delete{{ class_name }}ListAPI,
get{{ class_name }}ListAPI,
postAdd{{ class_name }}API,
putUpdate{{ class_name }}API
} from "@/api/{{ name }}";
import { getKeyList } from "@pureadmin/utils";
export const use{{ class_name }} = (tableRef: Ref) => {
/**
* 查询表单
*/
const form = reactive({
{% for column in columns if column.is_query %}
/** {{ column.column_comment }} */
{{ column.column_name }}: "",
{% endfor %}
});
/**
* 表单Ref
*/
const formRef = ref(null);
/**
* 数据列表
*/
const dataList = ref<{{ class_name }}Info[]>([]);
/**
* 加载状态
*/
const loading = ref(true);
/**
* 已选数量
*/
const selectedNum = ref<number>(0);
/**
* 分页参数
*/
const pagination = reactive<PaginationProps>({
total: 0,
pageSize: 10,
currentPage: 1,
background: true,
pageSizes: [10, 20, 30, 40, 50]
});
/**
* 表格列设置
*/
const columns: TableColumnList = [
{
label: "勾选列", // 如果需要表格多选此处label必须设置
type: "selection",
fixed: "left",
reserveSelection: true // 数据刷新后保留选项
},
{% for column in columns if column.is_list %}
{
label: "{{ column.column_comment }}",
prop: "{{ column.column_name }}",
hide: {{ "true" if column.is_hide else "false" }}{% if column.python_type == "datetime" %},
formatter: ({ {{ column.column_name }} }) =>
dayjs({{ column.column_name }}).format("YYYY-MM-DD HH:mm:ss"){% endif %}
},
{% endfor %}
{
label: "操作",
fixed: "right",
width: 220,
slot: "operation"
}
];
/**
* 初次查询
*/
const onSearch = async () => {
loading.value = true;
const res = await get{{ class_name }}ListAPI({
page: pagination.currentPage,
pageSize: pagination.pageSize,
...toRaw(form)
});
if (res.success) {
dataList.value = res.data.result;
pagination.total = res.data.total;
pagination.currentPage = res.data.page;
pagination.pageSize = res.data.pageSize;
}
message(res.msg, {
type: res.success ? "success" : "error"
});
loading.value = false;
};
/**
* 重置表单
* @param formEl 表单ref
* @returns
*/
const resetForm = async (formEl: any) => {
if (!formEl) return;
formEl.resetFields();
await onSearch();
};
/**
* 处理删除
* @param row
*/
const handleDelete = async (row: {{ class_name }}Info) => {
const res = await delete{{ class_name }}API(row.id);
if (res.success) {
onSearch();
}
message(res.msg, {
type: res.success ? "success" : "error"
});
};
/**
* 处理每页数量变化
*/
const handleSizeChange = async (val: number) => {
loading.value = true;
const res = await get{{ class_name }}ListAPI({
page: pagination.currentPage,
pageSize: val,
...toRaw(form)
});
if (res.success) {
dataList.value = res.data.result;
pagination.total = res.data.total;
pagination.currentPage = res.data.page;
pagination.pageSize = res.data.pageSize;
}
message(res.msg, {
type: res.success ? "success" : "error"
});
loading.value = false;
};
/**
* 处理页码变化
* @param val
*/
const handleCurrentChange = async (val: number) => {
loading.value = true;
const res = await get{{ class_name }}ListAPI({
page: val,
pageSize: pagination.pageSize,
...toRaw(form)
});
if (res.success) {
dataList.value = res.data.result;
pagination.total = res.data.total;
pagination.currentPage = res.data.page;
pagination.pageSize = res.data.pageSize;
}
message(res.msg, {
type: res.success ? "success" : "error"
});
loading.value = false;
};
/** 当CheckBox选择项发生变化时会触发该事件 */
const handleSelectionChange = async (val: any) => {
selectedNum.value = val.length;
// 重置表格高度
tableRef.value.setAdaptive();
};
/** 取消选择 */
const onSelectionCancel = async () => {
selectedNum.value = 0;
// 用于多选表格,清空用户的选择
tableRef.value.getTableRef().clearSelection();
};
/**
* 批量删除
*/
const onbatchDel = async () => {
// 返回当前选中的行
const curSelected = tableRef.value.getTableRef().getSelectionRows();
const res = await delete{{ class_name }}ListAPI(getKeyList(curSelected, "id"));
if (res.success) {
message(res.msg, {
type: "success"
});
tableRef.value.getTableRef().clearSelection();
onSearch();
} else {
message(res.msg, { type: "error", duration: 5000 });
}
};
const openDialog = async (title = "新增", row?: {{ class_name }}Info) => {
addDialog({
title: `${title}配置`,
props: {
formInline: {
/** 方式 */
title:title,
{% for column in columns if column.is_list %}
/** {{ column.column_comment }} */
{{ column.python_name }}: row?.{{ column.python_name }} ?? "",
{% endfor %}
}
},
width: "45%",
draggable: true,
fullscreenIcon: true,
closeOnClickModal: false,
contentRenderer: () =>
h(editForm, {
formInline: {
/** 方式 */
title:title,
{% for column in columns if column.is_list %}
/** {{ column.column_comment }} */
{{ column.python_name }}: row?.{{ column.python_name }} ?? "",
{% endfor %}
},
ref: formRef
}),
beforeSure: async (done, {}) => {
const FormData = formRef.value.newFormInline;
if (title === "新增") {
let addForm = {
{% for column in columns if column.is_insert %}
/** {{ column.column_comment }} */
{{ column.python_name }}: FormData.{{ column.python_name }} ?? "",
{% endfor %}
};
const res = await postAdd{{ class_name }}API(addForm);
if (res.success) {
done();
await onSearch();
}
message(res.msg, { type: res.success ? "success" : "error" });
} else {
let updateForm = {
{% for column in columns if column.is_edit %}
/** {{ column.column_comment }} */
{{ column.python_name }}: FormData.{{ column.python_name }} ?? "",
{% endfor %}
};
const res = await putUpdate{{ class_name }}API(updateForm, row.id);
if (res.success) {
done();
await onSearch();
}
message(res.msg, { type: res.success ? "success" : "error" });
}
}
});
};
/**
* 页面加载执行
*/
onMounted(async () => {
await onSearch();
});
return {
form,
dataList,
loading,
pagination,
columns,
selectedNum,
openDialog,
onSearch,
resetForm,
handleDelete,
handleSizeChange,
handleCurrentChange,
handleSelectionChange,
onSelectionCancel,
onbatchDel
};
};

View File

@@ -0,0 +1,37 @@
/** {{ description }}信息 */
export interface {{ class_name }}Info {
{% for column in columns if column.is_list %}
/** {{ column.column_comment }} */
{{ column.python_name }}: {{ column.typescript_type }};
{% endfor %}
}
/** 获取{{ description }}列表参数 */
export interface Get{{ class_name }}ListParams {
/** 当前页 */
page: number;
/** 每页数量 */
pageSize: number;
{% for column in columns if column.is_query %}
/** {{ column.column_comment }} */
{{ column.python_name }}?: string;
{% endfor %}
}
/** 添加{{ description }}数据参数 */
export interface Add{{ class_name }}Params {
{% for column in columns if column.is_insert %}
/** {{ column.column_comment }} */
{{ column.python_name }}{% if not column.is_required %}?{% endif %}: {{ column.typescript_type }};
{% endfor %}
}
/** 更新{{ description }}数据参数 */
export interface Update{{ class_name }}Params {
{% for column in columns if column.is_edit %}
/** {{ column.column_comment }} */
{{ column.python_name }}{% if not column.is_required %}?{% endif %}: {{ column.typescript_type }};
{% endfor %}
}

View File

@@ -0,0 +1,100 @@
<template>
<el-form
ref="ruleFormRef"
:model="newFormInline"
:rules="formRules"
label-width="82px"
>
<el-row :gutter="30">
[% for column in columns if column.is_insert or column.is_edit %]
<re-col :value="24" :xm="24" :sm="24">
<el-form-item label="[[ column.column_comment ]]" prop="[[ column.python_name ]]">
[% if column.show_type == "input" %]
<el-input
v-model="newFormInline.[[ column.python_name ]]"
placeholder="请输入[[ column.column_comment ]]~"
clearable
/>
[% elif column.show_type == "textarea" %]
<el-input
v-model="newFormInline.[[ column.python_name ]]"
type="textarea"
placeholder="请输入[[ column.column_comment ]]~"
clearable
/>
[% elif column.show_type == "select" %]
<el-select
v-model="newFormInline.[[ column.python_name ]]"
placeholder="请选择[[ column.column_comment ]]~"
filterable
clearable
>
<el-option
v-for="item in options.[[ column.python_name ]]"
:key="item.value"
:label="item.label"
:value="item.value" />
</el-select>
[% elif column.show_type == "radio" %]
<el-radio-group v-model="newFormInline.[[ column.python_name ]]">
<el-radio
v-for="item in options.[[ column.python_name ]]"
:key="item.value"
:label="item.value"
>{{ item.label }}
</el-radio>
</el-radio-group>
[% elif column.show_type == "checkbox" %]
<el-checkbox-group v-model="newFormInline.[[ column.python_name ]]">
<el-checkbox
v-for="item in options.[[ column.python_name ]]"
:key="item.value"
:label="item.value">
{{ item.label }}
</el-checkbox>
</el-checkbox-group>
[% elif column.show_type == "datetime" %]
<el-date-picker
v-model="newFormInline.[[ column.python_name ]]"
type="datetime"
placeholder="请选择[[ column.column_comment ]]~"
format="YYYY-MM-DD HH:mm:ss"
value-format="x" />
[% endif %]
</el-form-item>
</re-col>
[% endfor %]
</el-row>
</el-form>
</template>
<script setup lang="ts">
import { reactive, ref } from "vue";
import ReCol from "@/components/ReCol";
import type { FormRules } from "element-plus";
import { [[ class_name ]]Info } from "types/[[ name ]]";
interface FormItemProps {
[% for column in columns if column.is_list %]
/** [[ column.column_comment ]] */
[[ column.python_name ]]: [[ column.typescript_type ]];
[% endfor %]
}
interface FormProps {
formInline: FormItemProps;
}
const props = withDefaults(defineProps<FormProps>(), {
formInline: () => ({
[% for column in columns if column.is_list %]
/** [[ column.column_comment ]] */
[[ column.python_name ]]: "",
[% endfor %]
})
});
const newFormInline = ref<[[ class_name ]]Info>(props.formInline);
/** 自定义表单规则校验 */
const formRules = reactive<FormRules>({
[% for column in columns if column.is_insert or column.is_edit %]
[[ column.python_name ]]: [{ required: [[ 'true' if column.is_required else 'false' ]], message: "请输入[[ column.column_comment ]]~", trigger: "blur" }],
[% endfor %]
});
defineExpose({ newFormInline });
</script>

View File

@@ -0,0 +1,237 @@
<template>
<div class="main">
<el-form
ref="formRef"
:inline="true"
:model="form"
class="search-form bg-bg_color w-[99/100] pl-8 pt-[12px]"
>
[% for column in columns if column.is_query %]
<el-form-item label="[[ column.column_comment ]]" prop="[[ column.python_name ]]">
[% if column.show_type == "input" %]
<el-input
v-model="form.[[ column.python_name ]]"
placeholder="请输入[[ column.column_comment ]]~"
class="!w-[200px]"
clearable
/>
[% elif column.show_type == "textarea" %]
<el-input
v-model="form.[[ column.python_name ]]"
type="textarea"
placeholder="请输入[[ column.column_comment ]]~"
class="!w-[200px]"
clearable
/>
[% elif column.show_type == "select" %]
<el-select
v-model="form.[[ column.python_name ]]"
placeholder="请选择[[ column.column_comment ]]~"
class="!w-[200px]"
clearable
>
<el-option
v-for="item in options.[[ column.python_name ]]"
:key="item.value"
:label="item.label"
:value="item.value" />
</el-select>
[% elif column.show_type == "radio" %]
<el-radio-group v-model="form.[[ column.python_name ]]">
<el-radio
v-for="item in options.[[ column.python_name ]]"
:key="item.value"
:label="item.value"
>{{ item.label }}
</el-radio>
</el-radio-group>
[% elif column.show_type == "checkbox" %]
<el-checkbox-group v-model="form.[[ column.python_name ]]">
<el-checkbox
v-for="item in options.[[ column.python_name ]]"
:key="item.value"
:label="item.value">
{{ item.label }}
</el-checkbox>
</el-checkbox-group>
[% elif column.show_type == "datetime" %]
<el-date-picker
v-model="form.[[ column.python_name ]]"
type="datetime"
placeholder="请选择[[ column.column_comment ]]~"
format="YYYY-MM-DD HH:mm:ss"
value-format="x" />
[% endif %]
</el-form-item>
[% endfor %]
<el-form-item>
<el-button
type="primary"
:icon="useRenderIcon('ri:search-line')"
:loading="loading"
@click="onSearch"
>
{{ t("buttons:Search") }}
</el-button>
<el-button :icon="useRenderIcon(Refresh)" @click="resetForm(formRef)">
{{ t("buttons:Reset") }}
</el-button>
</el-form-item>
</el-form>
<PureTableBar title="[[ description ]]管理" :columns="columns" @refresh="onSearch">
<template #buttons>
<el-button
v-if="hasAuth('[[ name ]]:btn:add')"
type="primary"
:icon="useRenderIcon(AddFill)"
@click="openDialog('新增')"
>
{{ t("buttons:Add") }}
</el-button>
</template>
<template v-slot="{ size, dynamicColumns }">
<div
v-if="selectedNum > 0"
v-motion-fade
class="bg-[var(--el-fill-color-light)] w-full h-[46px] mb-2 pl-4 flex items-center"
>
<div class="flex-auto">
<span
style="font-size: var(--el-font-size-base)"
class="text-[rgba(42,46,54,0.5)] dark:text-[rgba(220,220,242,0.5)]"
>
已选 {{ selectedNum }} 项
</span>
<el-button type="primary" text @click="onSelectionCancel">
{{ t("buttons:Deselect") }}
</el-button>
</div>
<el-popconfirm
v-if="hasAuth('[[ name ]]:btn:delete')"
title="是否确认删除?"
@confirm="onbatchDel"
>
<template #reference>
<el-button type="danger" text class="mr-1">
{{ t("buttons:DeleteInBatches") }}
</el-button>
</template>
</el-popconfirm>
</div>
<pure-table
ref="tableRef"
row-key="id"
adaptive
border
stripe
:adaptiveConfig="{ offsetBottom: 45 }"
align-whole="center"
table-layout="auto"
:loading="loading"
:size="size"
:data="dataList"
:columns="dynamicColumns"
:pagination="pagination"
:paginationSmall="size === 'small' ? true : false"
:header-cell-style="{
background: 'var(--el-fill-color-light)',
color: 'var(--el-text-color-primary)'
}"
@selection-change="handleSelectionChange"
@page-size-change="handleSizeChange"
@page-current-change="handleCurrentChange"
>
<template #operation="{ row }">
<el-button
class="reset-margin"
link
type="primary"
:size="size"
:disabled="!hasAuth('[[ name ]]:btn:update')"
:icon="useRenderIcon(EditPen)"
@click="openDialog('修改', row)"
>
{{ t("buttons:Update") }}
</el-button>
<el-popconfirm
:title="`是否确认删除配置名为 ${row.name} 的这条数据`"
@confirm="handleDelete(row)"
>
<template #reference>
<el-button
class="reset-margin"
link
:disabled="!hasAuth('[[ name ]]:btn:delete')"
type="danger"
:size="size"
:icon="useRenderIcon(Delete)"
>
{{ t("buttons:Delete") }}
</el-button>
</template>
</el-popconfirm>
</template>
</pure-table>
</template>
</PureTableBar>
</div>
</template>
<script setup lang="ts">
defineOptions({
name: "[[ class_name ]]Index"
});
import { ref } from "vue";
import { use[[ class_name ]] } from "./utils/hook";
import { useI18n } from "vue-i18n";
import { PureTableBar } from "@/components/RePureTableBar";
import { useRenderIcon } from "@/components/ReIcon/src/hooks";
import Delete from "@iconify-icons/ep/delete";
import EditPen from "@iconify-icons/ep/edit-pen";
import Refresh from "@iconify-icons/ep/refresh";
import AddFill from "@iconify-icons/ri/add-circle-line";
const { t } = useI18n();
import { hasAuth } from "@/utils/auth";
/**
* 表格Ref
*/
const tableRef = ref();
const formRef = ref();
const {
form,
dataList,
loading,
pagination,
columns,
selectedNum,
onSearch,
openDialog,
resetForm,
handleDelete,
handleSizeChange,
handleCurrentChange,
handleSelectionChange,
onSelectionCancel,
onbatchDel
} = use[[ class_name ]](tableRef);
</script>
<style scoped lang="scss">
:deep(.el-dropdown-menu__item i) {
margin: 0;
}
:deep(.el-button:focus-visible) {
outline: none;
}
.main-content {
margin: 24px 24px 0 !important;
}
.search-form {
:deep(.el-form-item) {
margin-bottom: 12px;
}
}
</style>

View File

@@ -5,6 +5,8 @@
# @File : common.py # @File : common.py
# @Software : PyCharm # @Software : PyCharm
# @Comment : 本程序 # @Comment : 本程序
from typing import List, Any, Optional, Type
def bytes2human(n, format_str='%(value).1f%(symbol)s'): def bytes2human(n, format_str='%(value).1f%(symbol)s'):
"""Used by various scripts. See: """Used by various scripts. See:
@@ -26,11 +28,13 @@ def bytes2human(n, format_str='%(value).1f%(symbol)s'):
return format_str % dict(symbol=symbols[0], value=n) return format_str % dict(symbol=symbols[0], value=n)
async def filterKeyValues(dataList: list, key: str) -> list: async def filterKeyValues(dataList: List[dict], key: str, default: Any = None, convert_type: Optional[Type] = None) -> List[Any]:
""" """
获取列表字段数据 获取列表字段数据,并可选择进行类型转换。
:param dataList: 数据列表 :param dataList: 数据列表(列表中的元素是字典)
:param key: 关键字 :param key: 要提取的字段
:return: :param default: 如果字段不存在,返回的默认值
:param convert_type: 需要转换的类型(如 int、str、float 等),默认为 None 不转换
:return: 提取并转换后的值列表
""" """
return [item[key] for item in dataList] return [convert_type(item.get(key, default)) if convert_type else item.get(key, default) for item in dataList]

112
utils/generate.py Normal file
View File

@@ -0,0 +1,112 @@
# _*_ coding : UTF-8 _*_
# @Time : 2025/03/03 00:03
# @UpdateTime : 2025/03/03 00:03
# @Author : sonder
# @File : generate.py
# @Software : PyCharm
# @Comment : 本程序
from datetime import datetime
from config.constant import MYSQL_TO_TORTOISE_TYPE
class Generate:
"""
代码生成成工具
"""
@classmethod
def convert_mysql_type(cls, mysql_type: str):
"""
将 MySQL 字段类型转换为 Tortoise-ORM 的字段类型。
:param mysql_type: MySQL 的字段类型,如 "varchar(255)""int(11)"
:return: Tortoise-ORM 对应的字段类型,如 "CharField""IntField"
"""
# 提取字段的基本类型(去掉括号及长度信息)
base_type = mysql_type.split("(")[0].lower()
# 处理特殊情况tinyint(1) -> BooleanField
if base_type == "tinyint":
if "(1)" in mysql_type: # tinyint(1) 视为 Boolean
return "BooleanField", None
return "IntField", None # 其他 tinyint 作为 IntField 处理
max_length = None
if "(" in mysql_type and base_type in {"char", "varchar"}:
max_length = int(mysql_type.split("(")[1].split(")")[0]) # 提取最大长度
field_type = MYSQL_TO_TORTOISE_TYPE.get(base_type, "TextField")
return field_type, max_length
@classmethod
def prepare_template_data(cls, table_info, columns):
TYPE_DEFINE = {
"int": 0,
"str": "",
"bool": False,
"float": 0.0,
"datetime": "",
"list": [],
"dict": {},
"set": set(),
"tuple": (),
"bytes": b"",
"None": 'null',
}
PYTHON_TO_TS = {
"int": "number",
"float": "number",
"str": "string",
"bool": "boolean",
"datetime": "string",
"Optional[int]": "number | null",
"Optional[str]": "string | null",
"Optional[bool]": "boolean | null",
"Optional[float]": "number | null",
"Optional[datetime]": "string | null",
"List[int]": "number[]",
"List[str]": "string[]",
"List[bool]": "boolean[]",
"List[datetime]": "string[]",
"Dict[str, Any]": "Record<string, any>",
"Any": "any",
}
common_column = ["id", "del_flag", "create_by", "update_by", "create_time", "update_time"]
"""组织数据,供 Jinja2 渲染"""
return {
"author": table_info.get('author', ""),
"prefix": table_info["prefix"],
"table_name": table_info["table_name"],
"class_name": table_info["class_name"],
"table_comment": table_info.get("table_comment", ""),
"description": table_info.get("description", ""),
"name": table_info.get('class_name', "").lower(),
"permission_id": table_info.get('permission_id', ""),
"columns": [
{
"python_name": col["python_name"],
"field_type": cls.convert_mysql_type(col["column_type"])[0],
"max_length": cls.convert_mysql_type(col["column_type"])[1],
"is_common": col["python_name"] in common_column,
"is_nullable": not col["is_required"],
"is_unique": col.get("is_unique", False),
"default": col.get("default", None),
"column_comment": col.get("column_comment", ""),
"column_name": col["column_name"],
"is_required": col.get("is_required", False),
"is_edit": col.get("is_edit", False),
"is_list": col.get("is_list", False),
"is_query": col.get("is_query", False),
"is_insert": col.get("is_insert", False),
"is_hide": col.get("is_hide", False),
"query_way": col.get("query_way", "="),
"show_type": col.get("show_type", "input"),
"python_type": col.get("python_type", "str"),
"define": TYPE_DEFINE.get(col.get("python_type", "str"), None),
"typescript_type": PYTHON_TO_TS.get(col.get("python_type", "str"), "any"),
}
for col in columns
if col["python_name"] and col["column_type"]
],
"current_time": datetime.now().strftime("%Y/%m/%d %H:%M:%S")
}