Compare commits

..

11 Commits

Author SHA1 Message Date
41bbdabf5b docs(README): 添加 FastAPI项目安装和配置指南
- 更新系统和安装依赖
- 安装 Python 3.11- 配置 Python 虚拟环境
- 安装和配置 MySQL
- 导入 fastapi.sql 数据文件
- 配置 Nginx 反向代理
- 启动和运行 FastAPI 项目
2025-07-01 23:48:27 +08:00
1c316594f5 feat(generate): 优化代码生成逻辑
- 新增公共字段配置,统一处理常见字段的生成规则
- 修复模板中的一些错误,如变量名、函数名等
- 优化代码结构,提高可读性和可维护性
2025-07-01 23:40:43 +08:00
bd13f1cfdc feat: 添加代码生成功能 2025-03-04 00:54:33 +08:00
c790233aee fix: 修复数据权限异常 2025-03-02 01:13:07 +08:00
141883424b fix: 修正角色权限分配异常问题 2025-02-28 16:47:48 +08:00
b59dba18f0 feat: 添加系统级管理专属页面权限 2025-02-26 22:56:15 +08:00
1dd9f7db43 feat: 添加注销功能 2025-02-25 18:20:41 +08:00
f0c678b8d0 fix: 修复普通用户也能获取下属部门的数据 2025-02-24 18:19:04 +08:00
df5f2977d4 fix: 修复注册异常,删除用户异常,调整用户信息存储时间 2025-02-23 03:58:38 +08:00
5be35d8231 feat: 缓存列表,监控,性能监控添加按钮级权限控制 2025-02-22 23:46:00 +08:00
75a163910d feat: 操作日志添加按钮级权限控制 2025-02-22 23:27:53 +08:00
38 changed files with 4007 additions and 89 deletions

154
.env Normal file
View File

@@ -0,0 +1,154 @@
# -------- 应用配置 --------
# 应用运行环境
APP_ENV = 'dev'
# 应用名称
APP_NAME = 'FastAPI-RBAC-System'
# 应用代理路径
APP_ROOT_PATH = ''
# 应用主机
APP_HOST = '0.0.0.0'
# 应用端口
APP_PORT = 9090
# 应用版本
APP_VERSION= '1.0.0'
# 应用是否开启热重载
APP_RELOAD = true
# 应用是否开启IP归属区域查询
APP_IP_LOCATION_QUERY = true
# 应用是否允许账号同时登录
APP_SAME_TIME_LOGIN = true
# -------JWT配置------------
# JWT 签名密钥
JWT_SECRET_KEY=b01c66dc2c58dc6a0aabfe2144256be36226de378bf87f72c0c795dda67f4d55
# JWT 签名算法
JWT_ALGORITHM=HS256
# JWT 盐值
JWT_SALT=jwt_salt
# JWT 令牌有效期(分钟)
JWT_EXPIRE_MINUTES=1440
# JWT 令牌在 Redis 中的缓存有效期(分钟)
JWT_REDIS_EXPIRE_MINUTES=30
# -------- 数据库配置 --------
# 数据库类型,默认为'mysql'
DB_TYPE = 'mysql'
# 数据库主机
DB_HOST = '127.0.0.1'
# 数据库端口
DB_PORT = 3306
# 数据库用户名
DB_USERNAME = 'root'
# 数据库密码
DB_PASSWORD = ''
# 数据库名称
DB_DATABASE = 'fastapi'
# 是否开启日志
DB_ECHO = true
# 数据库日志级别,默认为 10DEBUG
DB_LOG_LEVEL = 10
# 允许溢出连接池大小的最大连接数
DB_MAX_OVERFLOW = 10
# 连接池大小0表示连接数无限制
DB_POOL_SIZE = 50
# 连接回收时间(单位:秒)
DB_POOL_RECYCLE = 3600
# 连接池中没有线程可用时,最多等待的时间(单位:秒)
DB_POOL_TIMEOUT = 30
# -------- Redis配置 --------
# Redis主机
REDIS_HOST = '127.0.0.1'
# Redis端口
REDIS_PORT = 6379
# Redis用户名
REDIS_USERNAME = ''
# Redis密码
REDIS_PASSWORD = ''
# Redis数据库
REDIS_DATABASE = 2
# ======================
# 上传配置
# ======================
# 文件上传的 URL 前缀,默认为 '/profile'。
# 例如:`/profile/example.jpg`。
UPLOAD_PREFIX=/profile
# 文件上传的存储路径,默认为 'data/upload_path'。
# 上传的文件将存储在此目录中,如果目录不存在,会自动创建。
UPLOAD_PATH=data/upload_path
# 上传机器的标识,默认为 'A'。
# 用于区分不同的上传机器或节点,在多机部署时可以使用此字段。
UPLOAD_MACHINE=A
# 默认允许上传的文件扩展名列表,使用逗号分隔。
# 包含常见的图片、文档、压缩文件、视频和 PDF 格式。
# 可以根据需求扩展或修改此列表。
DEFAULT_ALLOWED_EXTENSION=bmp,gif,jpg,jpeg,png,doc,docx,xls,xlsx,ppt,pptx,html,htm,txt,rar,zip,gz,bz2,mp4,avi,rmvb,pdf
# 文件下载的存储路径,默认为 'data/download_path'。
# 下载的文件将存储在此目录中,如果目录不存在,会自动创建。
DOWNLOAD_PATH=data/download_path
# ======================
# 邮件配置
# ======================
# 邮件发送者的用户名,默认为空。
EMAIL_USERNAME=
# 邮件发送者的密码,默认为空。
EMAIL_PASSWORD=
# 邮件服务器地址,默认为 "smtp.qq.com"。
# 如果是其他邮件服务商,请修改为对应的 SMTP 服务器地址。
EMIAL_HOST=smtp.qq.com
# 邮件服务器端口,默认为 465。
# 如果是其他邮件服务商,请根据其要求修改端口号。
EMAIL_PORT=587
# ======================
# 地图配置
# ======================
# 百度地图的 AK 密钥,用于获取地图数据。
# 请在百度地图官网申请获取 AK 密钥。
AK=
# 百度地图sk密钥用于获取地图数据。
# 请在百度地图官网申请获取 SK 密钥。
SK=

468
README.md Normal file
View File

@@ -0,0 +1,468 @@
## 安装 `python3.11`
### 一、Linux 安装 Python 3.11
#### 1. 更新系统和安装依赖
```bash
sudo apt update
sudo apt install -y software-properties-common
```
#### 2. 添加 Python 3.11 PPA 源Ubuntu
```bash
sudo add-apt-repository ppa:deadsnakes/ppa
sudo apt update
```
#### 3. 安装 Python 3.11
```bash
sudo apt install -y python3.11 python3.11-venv python3.11-dev
```
#### 4. 确认安装版本
```bash
python3.11 --version
```
#### 5. 设置 Python 3.11 为默认版本(可选)
> 如果需要将 Python 3.11 设置为系统默认 Python3 版本:
```bash
sudo update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.11 1
```
### 二、Windows 安装 Python 3.11
#### 1. 下载 Python 3.11
- 前往 [Python 官方下载页面](https://www.python.org/downloads/release/python-3110/)。
- 选择 Windows 安装程序Installer并下载适合的版本32位或64位
#### 2. 运行安装程序
- 双击下载的 `.exe` 文件,启动安装程序。
- 勾选 **“Add Python 3.11 to PATH”**,以便系统能够识别 Python 命令。
- 选择 **“Customize installation”** 以进行自定义安装。
- 点击 **“Install”** 开始安装。
#### 3. 确认安装
- 在命令提示符Command Prompt或 PowerShell 中输入以下命令检查安装是否成功:
```cmd
python --version
```
---
## 配置 Python 虚拟环境
### 1. Linux 环境
- **创建虚拟环境**
```shell
python3 -m venv 虚拟环境名称
```
- **启动虚拟环境**
```shell
source 虚拟环境名称/bin/activate
```
- **安装 Python 依赖**
```shell
pip install -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple -r ./requirements.txt
```
### 2. Windows 环境
- **创建虚拟环境**
```shell
python -m venv 虚拟环境名称
```
- **启动虚拟环境**
```shell
.\虚拟环境名称\Scripts\activate
```
- **安装 Python 依赖**
```shell
pip install -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple -r ./requirements.txt
```
## MySQL 的安装和配置说明。
### 1. 在 Linux 上安装 MySQL
- **步骤 1**: 更新包列表并安装 MySQL 服务器
```shell
sudo apt update
sudo apt install mysql-server -y
```
- **步骤 2**: 启动 MySQL 服务并设置为开机启动
```shell
sudo systemctl start mysql
sudo systemctl enable mysql
```
- **步骤 3**: 安全配置 MySQL
```shell
sudo mysql_secure_installation
```
按照提示配置 root 密码,移除匿名用户,禁止 root 远程登录,删除测试数据库等。
- **步骤 4**: 登录 MySQL 并创建数据库和用户
```shell
sudo mysql -u root -p
```
使用以下命令创建数据库和用户并赋予权限:
```sql
CREATE DATABASE 数据库名称 CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE USER '用户名'@'%' IDENTIFIED BY '用户密码';
GRANT ALL PRIVILEGES ON 数据库名称.* TO '用户名'@'%';
FLUSH PRIVILEGES;
```
- **步骤 5**: 配置 MySQL 远程访问(如有需要)
修改 MySQL 配置文件 `/etc/mysql/mysql.conf.d/mysqld.cnf`,将 `bind-address` 设置为 `0.0.0.0`
```ini
bind-address = 0.0.0.0
```
保存并重启 MySQL 服务:
```shell
sudo systemctl restart mysql
```
### 2. 在 Windows 上安装 MySQL
- **步骤 1**: 下载并安装 MySQL
- 前往 [MySQL 官网下载](https://dev.mysql.com/downloads/installer/),选择适合的版本进行下载。
- 安装时可选择默认安装路径,并勾选 MySQL Server 和 MySQL Workbench 等工具。
- **步骤 2**: 配置 MySQL
- 在安装向导中,设置 root 用户的密码,并选择 MySQL 配置(建议选择开发者配置)。
- 设置端口(默认 3306和字符集为 `utf8mb4`。
- **步骤 3**: 配置环境变量(可选)
- 打开 Windows 环境变量设置,将 MySQL 安装目录的 `bin` 文件夹路径(如 `C:\Program Files\MySQL\MySQL Server 8.0\bin`)添加到 `Path` 中,以便在命令行直接使用 `mysql` 命令。
- **步骤 4**: 登录 MySQL 并创建数据库和用户
使用命令行工具登录 MySQL并按照以下命令创建数据库和用户
```shell
mysql -u root -p
```
```sql
CREATE DATABASE 数据库名称 CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE USER '用户名'@'localhost' IDENTIFIED BY '用户密码';
GRANT ALL PRIVILEGES ON 数据库名称.* TO '用户名'@'localhost';
FLUSH PRIVILEGES;
```
```
### 4. 验证数据库连接
- **配置完成后,可以在 FastAPI 项目中测试 MySQL 数据库连接,确保连接配置正确。**
---
## 导入 `fastapi.sql` 数据文件。
- 在 MySQL 配置完成后,可以将项目的初始数据通过 `fastapi.sql` 文件导入到数据库中。
### 1. 在 Linux 上导入 `fastapi.sql` 数据
- **步骤 1**: 登录到 MySQL
```shell
mysql -u 用户名 -p
```
系统会提示输入密码,输入后进入 MySQL 命令行。
- **步骤 2**: 使用 `source` 命令导入数据
在 MySQL 命令行中,选择目标数据库并导入 `fastapi.sql` 文件:
```sql
USE 数据库名称;
SOURCE /path/to/fastapi.sql;
```
确保 `fastapi.sql` 文件路径正确,数据将被导入指定的数据库。
- **步骤 3**: 验证导入
导入完成后,可以执行一些简单查询来验证数据是否导入成功:
```sql
SELECT * FROM 表名称 LIMIT 5;
```
### 2. 在 Windows 上导入 `fastapi.sql` 数据
- **步骤 1**: 打开命令提示符并登录到 MySQL
打开命令提示符CMD并运行以下命令
```shell
mysql -u 用户名 -p
```
输入密码后进入 MySQL 命令行。
- **步骤 2**: 使用 `source` 命令导入数据
在 MySQL 命令行中,先选择数据库,然后导入 `fastapi.sql` 文件:
```sql
USE 数据库名称;
SOURCE C:\\path\\to\\fastapi.sql;
```
注意Windows 系统中路径使用双反斜杠 `\\`。
- **步骤 3**: 验证导入
导入完成后,可以使用简单的查询语句检查数据是否导入成功:
```sql
SELECT * FROM 表名称 LIMIT 5;
```
### 3. 使用命令直接导入 `fastapi.sql` 文件
- 对于一些简单情况,可以直接在命令行中使用以下方式导入,无需进入 MySQL 命令行。
- **通用方法**
```shell
mysql -u 用户名 -p 数据库名称 < /path/to/fastapi.sql
```
这种方法适用于 Linux 和 Windows 系统,路径需根据操作系统设置正确的格式。
### 注意事项
- 确保 `fastapi.sql` 文件中的表结构与目标数据库一致,以避免导入错误。
- 如果数据库中已有相同数据,请谨慎操作,避免数据冲突或重复。
---
## Nginx配置
### Linux 版本
1. **安装 Nginx**
在 Linux 上,使用以下命令安装 Nginx
```shell
sudo apt update
sudo apt install nginx
```
2. **创建 FastAPI 反向代理配置**
打开或创建 Nginx 配置文件,例如 `/etc/nginx/sites-available/fastapi.conf`
```nginx
server {
listen 80;
server_name example.com; # 替换为您的域名或服务器 IP
# 日志路径
access_log /var/log/nginx/fastapi_access.log;
error_log /var/log/nginx/fastapi_error.log;
# 文件上传大小限制为 1024MB
client_max_body_size 1024M;
location / {
root /var/www/html/; # 替换为您的项目根目录
index index.html index.htm;
try_files $uri $uri/ /index.html;
}
location /api/ {
# 如果后端在本地比如127.0.0.1或者localhost请解开下面的rewrite注释即可
rewrite ^.+api/?(.*)$ /$1 break;
# 这里填写后端地址(后面一定不要忘记添加 /
proxy_pass http://127.0.0.1:9090/;
proxy_set_header Host $host;
proxy_set_header Cookie $http_cookie;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_redirect default;
add_header Access-Control-Allow-Origin *;
add_header Access-Control-Allow-Headers *;
add_header Access-Control-Allow-Methods *;
}
}
```
3. **启用配置**
- 将配置文件链接到 `sites-enabled`
```shell
sudo ln -s /etc/nginx/sites-available/fastapi.conf /etc/nginx/sites-enabled/
```
- 检查配置文件是否有错误:
```shell
sudo nginx -t
```
- 重启 Nginx 以应用配置:
```shell
sudo systemctl reload nginx
```
4. **验证配置**
在浏览器中访问 `http://example.com`,确保 FastAPI 应用能够正常访问并支持 1GB 文件上传。
### Windows 版本
1. **安装 Nginx**
- 下载 Nginx Windows 版本:[Nginx 官方网站](https://nginx.org/en/download.html)。
- 解压后,将 Nginx 文件夹放置在您选择的目录(例如 `C:\nginx`)。
2. **创建 FastAPI 配置**
打开 `C:\nginx\conf\nginx.conf` 文件,在 `http` 块中添加以下配置:
```nginx
http {
include mime.types;
default_type application/octet-stream;
# 日志路径
access_log logs/fastapi_access.log;
error_log logs/fastapi_error.log;
# 客户端最大文件上传大小限制为 1024MB
client_max_body_size 1024M;
server {
listen 80;
server_name localhost; # 或替换为您的服务器 IP
location / {
root /var/www/html/; # 替换为您的项目根目录
index index.html index.htm;
try_files $uri $uri/ /index.html;
}
location /api/ {
# 如果后端在本地比如127.0.0.1或者localhost请解开下面的rewrite注释即可
rewrite ^.+api/?(.*)$ /$1 break;
# 这里填写后端地址(后面一定不要忘记添加 /
proxy_pass http://127.0.0.1:9090/;
proxy_set_header Host $host;
proxy_set_header Cookie $http_cookie;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_redirect default;
add_header Access-Control-Allow-Origin *;
add_header Access-Control-Allow-Headers *;
add_header Access-Control-Allow-Methods *;
}
}
}
```
3. **启动 Nginx**
- 打开命令提示符,进入 Nginx 目录并启动服务:
```shell
cd C:\nginx
nginx.exe
```
4. **验证配置**
在浏览器中访问 `http://localhost`,检查 FastAPI 应用是否可用并支持 1GB 文件上传。
### 可选的 SSL 配置
- 若需要在 Linux 系统上启用 HTTPS 和 SSL 证书,请参考 [Lets Encrypt](https://letsencrypt.org/) 或其他 SSL 提供商的文档,使用类似以下的配置:
```nginx
server {
listen 80;
server_name example.com;
return 301 https://$host$request_uri;
}
server {
listen 443 ssl;
server_name example.com;
ssl_certificate /etc/letsencrypt/live/example.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/example.com/privkey.pem;
client_max_body_size 1024M;
location / {
root /var/www/html/; # 替换为您的项目根目录
index index.html index.htm;
try_files $uri $uri/ /index.html;
}
location /api/ {
# 如果后端在本地比如127.0.0.1或者localhost请解开下面的rewrite注释即可
rewrite ^.+api/?(.*)$ /$1 break;
# 这里填写后端地址(后面一定不要忘记添加 /
proxy_pass http://127.0.0.1:9090/;
proxy_set_header Host $host;
proxy_set_header Cookie $http_cookie;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_redirect default;
add_header Access-Control-Allow-Origin *;
add_header Access-Control-Allow-Headers *;
add_header Access-Control-Allow-Methods *;
}
}
```
---
## 启动运行
### 1. 进入项目目录并激活虚拟环境
#### Linux
```shell
# 进入项目目录
cd /path/to/your/project
# 激活虚拟环境
source venv/bin/activate
```
#### Windows
```shell
# 进入项目目录
cd \path\to\your\project
# 激活虚拟环境
.\venv\Scripts\activate
```
### 2. 启动 FastAPI 项目
```shell
python app.py
```
### 3. 停止项目
在命令行中按下 `Ctrl + C` 以停止运行中的 FastAPI 项目。
### 备注
- 运行项目时,请确保虚拟环境已正确激活,以避免 Python 库路径错误。
- 如果 Nginx 已配置为反向代理,则可以直接访问 Nginx 设置的域名或 IP。

View File

@@ -9,6 +9,7 @@ from functools import wraps
from fastapi import Request
from config.constant import RedisKeyConfig
from controller.login import LoginController
from exceptions.exception import PermissionException
@@ -40,3 +41,29 @@ class Auth:
raise PermissionException(message="该用户无此接口权限!")
return wrapper
async def hasAuth(request: Request, permission: str) -> bool:
"""
判断是有拥有某项权限
"""
token = request.headers.get('Authorization') # 直接使用 request 对象
current_user = await LoginController.get_current_user(request, token)
permissions = current_user.get('permissions')
if permission in permissions:
return True
else:
return False
async def hasAdmin(request: Request, department_id: str) -> bool:
"""
判断是否有管理员权限
"""
permissions = []
if ids := await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:permission_departments'):
permissions = eval(ids)
if department_id in permissions:
return True
else:
return False

View File

@@ -149,7 +149,7 @@ class Log:
# else:
session_id = request.app.state.session_id
status = 1 if request.app.state.login_status else 0
current_user = await User.get_or_none(username=payload.get("username"))
current_user = await User.get_or_none(username=payload.get("username"),del_flag=1)
await LoginLog.create(
user_id=current_user.id,
login_ip=host,

View File

@@ -9,6 +9,7 @@
from fastapi import APIRouter, Depends, Path, Request
from fastapi.responses import JSONResponse
from annotation.auth import Auth
from annotation.log import Log
from config.constant import BusinessType, RedisKeyConfig
from controller.login import LoginController
@@ -26,6 +27,7 @@ cacheAPI = APIRouter(
@cacheAPI.get("/monitor", response_class=JSONResponse, response_model=GetCacheMonitorResponse,
summary="获取缓存监控信息")
@Log(title="获取缓存监控信息", business_type=BusinessType.SELECT)
@Auth(permission_list=['cache:btn:infoList'])
async def get_cache_info(request: Request):
info = await request.app.state.redis.info()
db_size = await request.app.state.redis.dbsize()
@@ -45,6 +47,7 @@ async def get_cache_info(request: Request):
@cacheAPI.get("/names", response_class=JSONResponse, response_model=GetCacheInfoResponse,
summary="获取缓存名称列表")
@Log(title="获取缓存名称列表", business_type=BusinessType.SELECT)
@Auth(permission_list=['cache:btn:list'])
async def get_cache_names(request: Request):
name_list = []
for key_config in RedisKeyConfig:
@@ -62,6 +65,7 @@ async def get_cache_names(request: Request):
@cacheAPI.get("/keys/{cacheName}", response_class=JSONResponse, response_model=GetCacheKeysListResponse,
summary="获取缓存键名列表")
@Log(title="获取缓存键名列表", business_type=BusinessType.SELECT)
@Auth(permission_list=['cache:btn:list'])
async def get_cache_keys(request: Request, cacheName: str = Path(description="缓存名称")):
cache_keys = await request.app.state.redis.keys(f'{cacheName}*')
cache_key_list = [key.split(':', 1)[1] for key in cache_keys if key.startswith(f'{cacheName}:')]
@@ -71,6 +75,7 @@ async def get_cache_keys(request: Request, cacheName: str = Path(description="
@cacheAPI.get("/info/{cacheName}/{cacheKey}", response_class=JSONResponse, response_model=GetCacheInfoResponse,
summary="获取缓存信息")
@Log(title="获取缓存信息", business_type=BusinessType.SELECT)
@Auth(permission_list=['cache:btn:info'])
async def get_cache_info(request: Request, cacheName: str = Path(description="缓存名称"),
cacheKey: str = Path(description="缓存键名")):
cache_value = await request.app.state.redis.get(f'{cacheName}:{cacheKey}')
@@ -88,6 +93,7 @@ async def get_cache_info(request: Request, cacheName: str = Path(description="
@cacheAPI.post("/cacheName/{name}", response_class=JSONResponse, response_model=BaseResponse,
summary="通过键名删除缓存")
@Log(title="通过键名删除缓存", business_type=BusinessType.DELETE)
@Auth(permission_list=['cache:btn:delete'])
async def delete_cache(request: Request, name: str = Path(description="缓存名称")):
cache_keys = await request.app.state.redis.keys(f'{name}*')
if cache_keys:
@@ -99,6 +105,7 @@ async def delete_cache(request: Request, name: str = Path(description="缓存名
summary="通过键值删除缓存")
@cacheAPI.post("/cacheKey/{key}", response_class=JSONResponse, response_model=BaseResponse, summary="通过键值删除缓存")
@Log(title="通过键值删除缓存", business_type=BusinessType.DELETE)
@Auth(permission_list=['cache:btn:delete'])
async def delete_cache_key(request: Request, key: str = Path(description="缓存键名")):
cache_keys = await request.app.state.redis.keys(f'*{key}')
if cache_keys:
@@ -109,6 +116,7 @@ async def delete_cache_key(request: Request, key: str = Path(description="缓存
@cacheAPI.delete("/clearAll", response_class=JSONResponse, response_model=BaseResponse, summary="删除所有缓存")
@cacheAPI.post("/clearAll", response_class=JSONResponse, response_model=BaseResponse, summary="删除所有缓存")
@Log(title="删除所有缓存", business_type=BusinessType.DELETE)
@Auth(permission_list=['cache:btn:delete'])
async def delete_all_cache(request: Request):
cache_keys = await request.app.state.redis.keys()
if cache_keys:

View File

@@ -14,7 +14,7 @@ from annotation.auth import Auth
from annotation.log import Log
from config.constant import BusinessType, RedisKeyConfig
from controller.login import LoginController
from models import Department
from models import Department, Role
from schemas.common import BaseResponse, DeleteListParams
from schemas.department import AddDepartmentParams, GetDepartmentInfoResponse, \
GetDepartmentListResponse
@@ -79,8 +79,9 @@ async def delete_department_list(request: Request, params: DeleteListParams,
if department := await Department.get_or_none(id=item, del_flag=1):
if item in sub_departments:
await delete_department_recursive(department_id=department.id)
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:*'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:*')
userInfos = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_INFO.key}*')
if userInfos:
await request.app.state.redis.delete(*userInfos)
return Response.success(msg="删除成功!")
@@ -116,6 +117,9 @@ async def update_department(request: Request, params: AddDepartmentParams, id: s
department.sort = params.sort
department.status = params.status
await department.save()
userInfos = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_INFO.key}*')
if userInfos:
await request.app.state.redis.delete(*userInfos)
return Response.success(msg="修改成功!")
else:
return Response.error(msg="修改失败,部门不存在!")
@@ -201,3 +205,36 @@ async def get_department_list(
"page": page,
"pageSize": pageSize
})
@departmentAPI.get("/roleList/{id}", response_model=GetDepartmentListResponse, response_class=JSONResponse,
summary="用户获取部门角色列表")
@Log(title="获取部门角色列表", business_type=BusinessType.SELECT)
@Auth(["department:btn:list"])
async def get_department_role_list(
request: Request,
id: str = Path(..., description="部门ID"),
current_user: dict = Depends(LoginController.get_current_user)
):
sub_departments = current_user.get("sub_departments")
if id not in sub_departments:
return Response.error(msg="查询失败,无权限!")
data = await Role.filter(department__id=id).values(
id="id",
department_id="department__id",
department_name="department__name",
department_phone="department__phone",
department_principal="department__principal",
department_email="department__email",
role_name="name",
role_code="code",
role_id="id",
create_time="create_time",
update_time="update_time"
)
return Response.success(data={
"result": data,
"total": len(data),
"page": 1,
"pageSize": 9999
})

508
api/generate.py Normal file
View File

@@ -0,0 +1,508 @@
# _*_ coding : UTF-8 _*_
# @Time : 2025/02/28 17:28
# @UpdateTime : 2025/02/28 17:28
# @Author : sonder
# @File : generate.py
# @Software : PyCharm
# @Comment : 本程序
import uuid
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Path, Query, Depends, Request
from fastapi.responses import JSONResponse
from jinja2 import Environment, FileSystemLoader
from tortoise import Tortoise
from annotation.auth import Auth
from annotation.log import Log
from config.constant import MYSQL_TO_PYTHON_TYPE, BusinessType
from controller.login import LoginController
from models.generate import GenerateInfo, GenerateColumn
from schemas.common import BaseResponse, DeleteListParams
from schemas.generate import GetTablesListResponse, AddGenerateInfoParams, UpdateGenerateInfoParams, \
GetGenerateInfoResponse, GetGenerateInfoListResponse
from utils.common import bytes2human
from utils.generate import Generate
from utils.response import Response
generateAPI = APIRouter(
prefix="/generate",
dependencies=[Depends(LoginController.get_current_user)]
)
@generateAPI.get("/tables", response_class=JSONResponse, response_model=GetTablesListResponse,
summary="获取数据库中的所有表信息")
@Log(title="获取数据库中的所有表信息", business_type=BusinessType.SELECT)
@Auth(permission_list=["generate:btn:tables"])
async def get_database_tables(request: Request):
"""
获取当前数据库中的所有表信息,包括:
- 表名
- 表注释
- 记录行数
- 数据大小
- 索引大小
- 创建时间
- 最后更新时间(如果支持)
"""
sql = """SELECT TABLE_NAME,TABLE_COMMENT,TABLE_ROWS,DATA_LENGTH,INDEX_LENGTH,CREATE_TIME,UPDATE_TIME FROM information_schema.tables WHERE table_schema = DATABASE();"""
result = await Tortoise.get_connection("default").execute_query_dict(sql)
# 将结果中的键转换为小写
formatted_result = [{k.lower(): v for k, v in row.items()} for row in result]
for row in formatted_result:
for key, value in row.items():
if key == "data_length" or key == "index_length":
row[key] = bytes2human(value)
return Response.success(data={
"page": 1,
"total": len(formatted_result),
"pageSize": 99999,
"result": formatted_result
})
@generateAPI.post("/add", response_class=JSONResponse, response_model=BaseResponse, summary="添加生成表信息")
@Log(title="添加生成表信息", business_type=BusinessType.INSERT)
@Auth(permission_list=["generate:btn:add"])
async def add_generate_info(request: Request, params: AddGenerateInfoParams):
if await GenerateInfo.get_or_none(table_name=params.table_name, del_flag=1):
return Response.error(msg="该表信息已存在!")
gen = await GenerateInfo.create(
author=params.author,
class_name=params.class_name,
table_comment=params.table_comment,
table_name=params.table_name,
prefix=params.prefix,
remark=params.remark,
permission_id=params.permission_id,
description=params.description
)
if gen:
sql = """SELECT * FROM information_schema.columns WHERE table_schema = DATABASE() AND table_name = %s;"""
result = await Tortoise.get_connection("default").execute_query_dict(sql, [params.table_name])
# 转换为小写键名
formatted_result = [{k.lower(): v for k, v in row.items()} for row in result]
formatted_result.sort(key=lambda x: x['ordinal_position'])
common_column = ["id", "del_flag", "create_by", "update_by", "create_time", "update_time"]
common_set = {
"id": {
"is_insert": False,
"is_edit": False,
"is_list": True,
"is_query": False,
"is_required": False,
"is_hide": True,
},
"del_flag": {
"is_insert": False,
"is_edit": False,
"is_list": False,
"is_query": False,
"is_required": False,
"is_hide": True,
},
"create_by": {
"is_insert": False,
"is_edit": False,
"is_list": False,
"is_query": False,
"is_required": False,
"is_hide": True,
},
"update_by": {
"is_insert": False,
"is_edit": False,
"is_list": False,
"is_query": False,
"is_required": False,
"is_hide": True,
},
"create_time": {
"is_insert": False,
"is_edit": False,
"is_list": True,
"is_query": False,
"is_required": False,
"is_hide": False,
},
"update_time": {
"is_insert": False,
"is_edit": False,
"is_list": True,
"is_query": False,
"is_required": False,
"is_hide": False,
}
}
for column in formatted_result:
if await GenerateColumn.get_or_none(table_id=gen.id, column_name=column["column_name"], del_flag=1):
continue
is_insert = True
is_edit = True
is_list = True
is_query = True
is_required = True
is_hide = False
if column["column_name"] in common_column:
is_insert = common_set[column["column_name"]]["is_insert"]
is_edit = common_set[column["column_name"]]["is_edit"]
is_list = common_set[column["column_name"]]["is_list"]
is_query = common_set[column["column_name"]]["is_query"]
is_required = common_set[column["column_name"]]["is_required"]
is_hide = common_set[column["column_name"]]["is_hide"]
await GenerateColumn.create(
table=gen,
index=column["ordinal_position"],
column_name=column["column_name"],
column_comment=column["column_comment"],
column_type=column["column_type"],
python_type=MYSQL_TO_PYTHON_TYPE.get(column["column_type"]) if MYSQL_TO_PYTHON_TYPE.get(
column["column_type"]) else MYSQL_TO_PYTHON_TYPE.get(column["data_type"], "str"),
python_name=column["column_name"].lower(),
is_insert=is_insert,
is_edit=is_edit,
is_list=is_list,
is_query=is_query,
is_required=is_required,
query_way="__icontains",
show_type="input",
is_hide=is_hide,
)
return Response.success(msg="添加成功!")
return Response.error(msg="添加失败!")
@generateAPI.delete("/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除生成表信息")
@generateAPI.post("/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除生成表信息")
@Log(title="删除生成表信息", business_type=BusinessType.DELETE)
@Auth(permission_list=["generate:btn:delete"])
async def delete_generate_info(request: Request, id: str = Path(description="生成表信息ID")):
if table := await GenerateInfo.get_or_none(id=id, del_flag=1):
table.del_flag = 0
await GenerateColumn.filter(table_id=table.id, del_flag=1).update(del_flag=0)
await table.save()
return Response.success(msg="删除成功!")
return Response.error(msg="该生成表信息不存在!")
@generateAPI.delete("/deleteList", response_class=JSONResponse, response_model=BaseResponse,
summary="批量删除生成表信息")
@generateAPI.post("/deleteList", response_class=JSONResponse, response_model=BaseResponse, summary="批量删除生成表信息")
@Log(title="批量删除生成表信息", business_type=BusinessType.DELETE)
@Auth(permission_list=["generate:btn:delete"])
async def delete_generate_info_list(request: Request, params: DeleteListParams):
for id in set(params.ids):
if table := await GenerateInfo.get_or_none(id=id, del_flag=1):
table.del_flag = 0
await GenerateColumn.filter(table_id=table.id, del_flag=1).update(del_flag=0)
await table.save()
return Response.success(msg="删除成功!")
@generateAPI.put("/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="更新生成表信息")
@generateAPI.post("/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="更新生成表信息")
@Log(title="更新生成表信息", business_type=BusinessType.UPDATE)
@Auth(permission_list=["generate:btn:update"])
async def update_generate_info(
request: Request,
params: AddGenerateInfoParams,
id: str = Path(description="生成表信息ID")
):
if table := await GenerateInfo.get_or_none(id=id, del_flag=1):
table.author = params.info.author
table.class_name = params.info.class_name
table.permission_id = params.info.permission_id
table.prefix = params.info.prefix
table.remark = params.info.remark
table.table_comment = params.info.table_comment
table.table_name = params.info.table_name
table.description = params.info.description
await table.save()
return Response.success(msg="更新成功!")
return Response.error(msg="该生成表信息不存在!")
@generateAPI.get("/info/{id}", response_class=JSONResponse, response_model=GetGenerateInfoResponse,
summary="获取生成表信息")
@Log(title="获取生成表信息", business_type=BusinessType.SELECT)
@Auth(permission_list=["generate:btn:info"])
async def get_generate_info(request: Request, id: str = Path(description="生成表信息ID")):
if table := await GenerateInfo.get_or_none(id=id, del_flag=1):
table = {
"id": table.id,
"author": table.author,
"class_name": table.class_name,
"permission_id": table.permission_id,
"prefix": table.prefix,
"remark": table.remark,
"table_comment": table.table_comment,
"table_name": table.table_name,
"description": table.description,
"create_time": table.create_time,
"update_time": table.update_time,
}
columns = await GenerateColumn.filter(table_id=table["id"], del_flag=1).order_by("index").values(
id="id",
table_id="table__id",
table_name="table__table_name",
index="index",
column_comment="column_comment",
column_name="column_name",
column_type="column_type",
python_name="python_name",
python_type="python_type",
query_way="query_way",
show_type="show_type",
is_insert="is_insert",
is_edit="is_edit",
is_list="is_list",
is_query="is_query",
is_required="is_required",
is_hide="is_hide",
create_time="create_time",
update_time="update_time",
)
table["columns"] = columns
return Response.success(data=table)
return Response.error(msg="该生成表信息不存在!")
@generateAPI.put("/updateColumns/{id}", response_class=JSONResponse, response_model=BaseResponse,
summary="更新生成表列信息")
@generateAPI.post("/updateColumns/{id}", response_class=JSONResponse, response_model=BaseResponse,
summary="更新生成表列信息")
@Log(title="更新生成表列信息", business_type=BusinessType.UPDATE)
@Auth(permission_list=["generate:btn:updateColumns"])
async def update_generate_info_columns(
request: Request,
params: UpdateGenerateInfoParams,
id: str = Path(description="生成表信息ID")
):
if table := await GenerateInfo.get_or_none(id=id, del_flag=1):
for column in params.columns:
if gen_column := await GenerateColumn.get_or_none(id=column.id, table_id=table.id, del_flag=1):
gen_column.column_comment = column.column_comment
gen_column.column_name = column.column_name
gen_column.column_type = column.column_type
gen_column.python_name = column.python_name
gen_column.python_type = column.python_type
gen_column.query_way = column.query_way
gen_column.show_type = column.show_type
gen_column.is_insert = column.is_insert
gen_column.is_edit = column.is_edit
gen_column.is_list = column.is_list
gen_column.is_query = column.is_query
gen_column.is_required = column.is_required
gen_column.is_hide = column.is_hide
await gen_column.save()
return Response.success(msg="更新成功!")
return Response.error(msg="该生成表信息不存在!")
@generateAPI.get("/list", response_class=JSONResponse, response_model=GetGenerateInfoListResponse,
summary="查询生成表信息列表")
@Log(title="查询生成表信息列表", business_type=BusinessType.SELECT)
@Auth(permission_list=["generate:btn:list"])
async def get_generate_info_list(
request: Request,
page: int = Query(default=1, description="页码"),
pageSize: int = Query(default=10, description="每页数量"),
table_comment: Optional[str] = Query(default=None, description="表注释"),
permission_id: Optional[str] = Query(default=None, description="权限ID"),
):
filterArgs = {
f'{k}__contains': v for k, v in {
'table_comment': table_comment,
'permission_id': permission_id
}.items() if v
}
result = await GenerateInfo.filter(**filterArgs, del_flag=1).order_by("-create_time").offset(
(page - 1) * pageSize).limit(
pageSize).values(
id="id",
author="author",
class_name="class_name",
permission_id="permission_id",
prefix="prefix",
remark="remark",
table_comment="table_comment",
table_name="table_name",
description="description",
create_time="create_time",
update_time="update_time",
)
for item in result:
columns = await GenerateColumn.filter(table_id=item["id"], del_flag=1).order_by("index").values(
id="id",
table_id="table__id",
table_name="table__table_name",
index="index",
column_comment="column_comment",
column_name="column_name",
column_type="column_type",
python_name="python_name",
python_type="python_type",
query_way="query_way",
show_type="show_type",
is_insert="is_insert",
is_edit="is_edit",
is_list="is_list",
is_query="is_query",
is_required="is_required",
is_hide="is_hide",
create_time="create_time",
update_time="update_time",
)
item["columns"] = columns
return Response.success(data={
"result": result,
"total": await GenerateInfo.filter(**filterArgs, del_flag=1).count(),
"page": page,
"pageSize": pageSize,
})
@generateAPI.get("/code/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="生成代码")
@Log(title="生成代码", business_type=BusinessType.GENCODE)
@Auth(permission_list=["generate:btn:code"])
async def generate_code(
request: Request,
id: str = Path(description="生成表信息ID")
):
if table := await GenerateInfo.get_or_none(id=id, del_flag=1):
info = {
"author": table.author,
"class_name": table.class_name,
"permission_id": table.permission_id,
"prefix": table.prefix,
"remark": table.remark,
"table_comment": table.table_comment,
"table_name": table.table_name,
"description": table.description,
}
columns = await GenerateColumn.filter(table_id=table.id, del_flag=1).order_by("index").values(
id="id",
table_id="table__id",
table_name="table__table_name",
index="index",
column_comment="column_comment",
column_name="column_name",
column_type="column_type",
python_name="python_name",
python_type="python_type",
query_way="query_way",
show_type="show_type",
is_insert="is_insert",
is_edit="is_edit",
is_list="is_list",
is_query="is_query",
is_required="is_required",
is_hide="is_hide",
create_time="create_time",
update_time="update_time",
)
def generate_uuid():
return str(uuid.uuid4())
def current_time():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
comment_env = Environment(loader=FileSystemLoader('templates'))
comment_env.globals["uuid4"] = generate_uuid # 注册到 Jinja2
comment_env.globals["now"] = current_time
data = Generate.prepare_template_data(info, columns)
model_template = comment_env.get_template('python/model.py.jinja')
model_code = model_template.render(data)
schemas_template = comment_env.get_template('python/schemas.py.jinja')
schemas_code = schemas_template.render(data)
api_py_template = comment_env.get_template('python/api.py.jinja')
api_py_code = api_py_template.render(data)
type_template = comment_env.get_template('typescript/type.d.ts.jinja')
type_code = type_template.render(data)
api_ts_template = comment_env.get_template('typescript/api.ts.jinja')
api_ts_code = api_ts_template.render(data)
hook_template = comment_env.get_template('typescript/hook.tsx.jinja')
hook_code = hook_template.render(data)
vue_env = Environment(
loader=FileSystemLoader('templates/vue'),
block_start_string="[%", # 替换 Jinja2 代码块的起始标记
block_end_string="%]", # 替换 Jinja2 代码块的结束标记
variable_start_string="[[",
variable_end_string="]]",
comment_start_string="[#",
comment_end_string="#]"
)
index_template = vue_env.get_template('index.vue.jinja')
index_code = index_template.render(data)
form_template = vue_env.get_template('form.vue.jinja')
form_code = form_template.render(data)
sql_template = comment_env.get_template('sql.jinja')
sql_code = sql_template.render(data)
data = {
"backend": [
{
"label": f"/models/{info['class_name'].lower()}.py",
"value": model_code,
"type": "python",
"path": f"/python/models/{info['class_name'].lower()}.py"
},
{
"label": f"/schemas/{info['class_name'].lower()}.py",
"value": schemas_code,
"type": "python",
"path": f"/python/schemas/{info['class_name'].lower()}.py"
},
{
"label": f"/api/{info['class_name'].lower()}.py",
"value": api_py_code,
"type": "python",
"path": f"/python/api/{info['class_name'].lower()}.py"
},
],
"frontend": [
{
"label": f"/types/{info['class_name'].lower()}.d.ts",
"value": type_code,
"type": "typescript",
"path": f"/types/{info['class_name'].lower()}.d.ts"
},
{
"label": f"/api/{info['class_name'].lower()}.ts",
"value": api_ts_code,
"type": "typescript",
"path": f"/api/{info['class_name'].lower()}.ts"
},
{
"label": f"/{info['class_name'].lower()}/utils/hook.tsx",
"value": hook_code,
"type": "typescript",
"path": f"/{info['class_name'].lower()}/utils/hook.tsx"
},
{
"label": f"/{info['class_name'].lower()}/index.vue",
"value": index_code,
"type": "vue",
"path": f"/{info['class_name'].lower()}/index.vue"
},
{
"label": f"/{info['class_name'].lower()}/components/form.vue",
"value": form_code,
"type": "vue",
"path": f"/{info['class_name'].lower()}/components/form.vue"
},
],
"sql": [
{
"label": f"{info['table_name']}.sql",
"value": sql_code,
"type": "sql",
"path": f"{info['table_name']}.sql"
}
]
}
return Response.success(data=data)
return Response.error(msg="生成失败!")

View File

@@ -274,11 +274,7 @@ async def get_i18n_list(request: Request,
@Auth(["i18n:btn:infoList"])
async def get_i18n_info_list(request: Request, id: str = Path(description="国际化内容语言ID")):
if locale := await Locale.get_or_none(id=id, del_flag=1):
result = await request.app.state.redis.get(f'{RedisKeyConfig.TRANSLATION_INFO.key}:{id}')
if result:
result = eval(result)
return Response.success(data=result)
data = await I18n.filter(locale_id=locale.id, del_flag=1).values(
data = await I18n.filter(locale_id=locale.id, del_flag=1).order_by("key").values(
id="id",
key="key",
translation="translation",
@@ -292,13 +288,6 @@ async def get_i18n_info_list(request: Request, id: str = Path(description="国
result = {}
for i18n in data:
result[f"{i18n['key']}"] = i18n["translation"]
await request.app.state.redis.set(f'{RedisKeyConfig.TRANSLATION_INFO.key}:{id}',
str(jsonable_encoder({
"data": result,
"locale": locale.code,
"name": locale.name,
})),
ex=timedelta(minutes=60))
return Response.success(data={
"data": result,
"locale": locale.code,

View File

@@ -5,13 +5,14 @@
# @File : log.py
# @Software : PyCharm
# @Comment : 本程序
from typing import Optional
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, Path, Query, Request
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
from annotation.auth import Auth
from annotation.auth import Auth, hasAuth
from annotation.log import Log
from config.constant import BusinessType, RedisKeyConfig
from controller.login import LoginController
@@ -40,14 +41,12 @@ async def get_login_log(request: Request,
current_user: dict = Depends(LoginController.get_current_user),
):
sub_departments = current_user.get("sub_departments")
user_id = current_user.get("id")
online_user_list = await LoginController.get_online_user(request, sub_departments)
online_user_list = list(
filter(lambda x: x["department_id"] in sub_departments, jsonable_encoder(online_user_list)))
filterArgs = {
f'{k}__contains': v for k, v in {
'username': username,
'nickname': nickname,
'department_id': department_id,
}.items() if v
}
if status is not None:
@@ -56,9 +55,21 @@ async def get_login_log(request: Request,
startTime = datetime.fromtimestamp(float(startTime) / 1000)
endTime = datetime.fromtimestamp(float(endTime) / 1000)
filterArgs['login_time__range'] = [startTime, endTime]
if await hasAuth(request, "login:btn:admin"):
online_user_list = list(
filter(lambda x: x["department_id"] in sub_departments, jsonable_encoder(online_user_list)))
if not department_id:
filterArgs['user__department__id__in'] = sub_departments
result = await LoginLog.filter(**filterArgs, del_flag=1).offset(
else:
filterArgs['user__department__id'] = department_id
else:
online_user_list = list(
filter(lambda x: x["user_id"] == user_id, jsonable_encoder(online_user_list)))
if department_id:
filterArgs['user__department__id'] = department_id
else:
filterArgs["user__department__id"] = current_user.get("department_id")
result = await LoginLog.filter(**filterArgs, user__del_flag=1, del_flag=1).offset(
(page - 1) * pageSize).limit(pageSize).values(
id="id",
user_id="user__id",
@@ -82,7 +93,7 @@ async def get_login_log(request: Request,
if item["session_id"] == log["session_id"]:
log["online"] = True
return Response.success(data={
"total": await LoginLog.filter(**filterArgs, del_flag=1, ).count(),
"total": await LoginLog.filter(**filterArgs, del_flag=1, user__del_flag=1, ).count(),
"result": result,
"page": page,
})
@@ -154,13 +165,47 @@ async def delete_login_log(request: Request, params: DeleteListParams,
@logAPI.get("/operation", response_class=JSONResponse, response_model=GetOperationLogResponse,
summary="用户获取操作日志")
@Auth(permission_list=["operation:btn:list"])
async def get_operation_log(request: Request,
page: int = Query(default=1, description="页码"),
name: Optional[str] = Query(default=None, description="操作名称"),
type: Optional[str] = Query(default=None, description="操作类型"),
pageSize: int = Query(default=10, description="每页数量"),
username: Optional[str] = Query(default=None, description="用户账号"),
nickname: Optional[str] = Query(default=None, description="用户昵称"),
department_id: Optional[str] = Query(default=None, description="部门ID"),
startTime: Optional[str] = Query(default=None, description="开始时间"),
endTime: Optional[str] = Query(default=None, description="结束时间"),
status: Optional[str] = Query(default=None, description="登录状态"),
current_user: dict = Depends(LoginController.get_current_user),
):
sub_departments = current_user.get("sub_departments")
user_id = current_user.get("id")
result = await OperationLog.filter(operator_id=user_id, del_flag=1).offset((page - 1) * pageSize).limit(
filterArgs = {
f'{k}__contains': v for k, v in {
'operation_name': name,
'operation_type': type,
'operator__username': username,
'operator__nickname': nickname,
}.items() if v
}
if status is not None:
filterArgs['status'] = status
if startTime and endTime:
startTime = datetime.fromtimestamp(float(startTime) / 1000)
endTime = datetime.fromtimestamp(float(endTime) / 1000)
filterArgs['operation_time__range'] = [startTime, endTime]
if await hasAuth(request, "operation:btn:admin"):
if not department_id:
filterArgs['department__id__in'] = sub_departments
else:
filterArgs['department__id'] = department_id
else:
filterArgs['operator__id'] = user_id
if department_id:
filterArgs['department__id'] = department_id
result = await OperationLog.filter(**filterArgs, operator__del_flag=1, del_flag=1).offset(
(page - 1) * pageSize).limit(
pageSize).values(
id="id",
operation_name="operation_name",
@@ -184,9 +229,10 @@ async def get_operation_log(request: Request,
cost_time="cost_time"
)
return Response.success(data={
"total": await OperationLog.filter(operator_id=user_id).count(),
"total": await OperationLog.filter(**filterArgs, del_flag=1, operator__del_flag=1).count(),
"result": result,
"page": page,
"pageSize": pageSize
})
@@ -196,30 +242,28 @@ async def get_operation_log(request: Request,
summary="用户删除操作日志")
@Log(title="用户删除操作日志", business_type=BusinessType.DELETE)
@Auth(permission_list=["operation:btn:delete"])
async def delete_operation_log(id: str = Path(..., description="操作日志id"),
async def delete_operation_log(request: Request, id: str = Path(..., description="操作日志id"),
current_user: dict = Depends(LoginController.get_current_user)):
if log := await OperationLog.get_or_none(id=id):
if log.operator == current_user.get("id"):
sub_departments = current_user.get("sub_departments")
if log := await OperationLog.get_or_none(id=id, department__id__in=sub_departments, del_flag=1):
log.del_flag = 0
await log.save()
return Response.success(msg="删除成功")
else:
return Response.failure(msg="无权限删除")
else:
return Response.failure(msg="删除失败,操作日志不存在!")
@logAPI.delete("/deleteList/operation", response_model=BaseResponse, response_class=JSONResponse,
summary="用户删除操作日志")
summary="用户批量删除操作日志")
@logAPI.post("/deleteList/operation", response_model=BaseResponse, response_class=JSONResponse,
summary="用户删除操作日志")
summary="用户批量删除操作日志")
@Log(title="用户批量删除操作日志", business_type=BusinessType.DELETE)
@Auth(permission_list=["operation:btn:delete"])
async def delete_operation_log(params: DeleteListParams,
async def delete_operation_log(request: Request, params: DeleteListParams,
current_user: dict = Depends(LoginController.get_current_user)):
sub_departments = current_user.get("sub_departments")
for id in set(params.ids):
if log := await OperationLog.get_or_none(id=id):
if log.operator == current_user.get("id"):
if log := await OperationLog.get_or_none(id=id, department__id__in=sub_departments, del_flag=1):
log.del_flag = 0
await log.save()
return Response.success(msg="删除成功")

View File

@@ -18,7 +18,7 @@ from config.constant import BusinessType
from config.constant import RedisKeyConfig
from controller.login import CustomOAuth2PasswordRequestForm, LoginController
from controller.query import QueryController
from models import Department, User
from models import Department, User, Role, UserRole, LoginLog, OperationLog
from schemas.common import BaseResponse
from schemas.login import LoginParams, GetUserInfoResponse, LoginResponse, GetCaptchaResponse, GetEmailCodeParams, \
ResetPasswordParams
@@ -94,7 +94,7 @@ async def login(
async def register(request: Request, params: RegisterUserParams):
register_enabled = (
True
if await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:register_enabled')
if await request.app.state.redis.get(f'{RedisKeyConfig.SYSTEM_CONFIG.key}:account_register_enabled')
== 'true'
else False
)
@@ -106,7 +106,14 @@ async def register(request: Request, params: RegisterUserParams):
if await QueryController.register_user_before(username=params.username, phone=params.phone, email=params.email):
return Response.error(msg="注册失败,用户已存在!")
params.password = await Password.get_password_hash(input_password=params.password)
# 默认分配注册用户
userRole = await Role.get_or_none(department__name="注册用户", code="user", del_flag=1).values(
department_id="department__id", id="id")
if not params.department_id:
params.department_id = userRole.get("department_id", "")
department = await Department.get_or_none(id=params.department_id)
userRole = await Role.get_or_none(department__id=department.id, code="user", del_flag=1).values(id="id")
print(userRole)
user = await User.create(
username=params.username,
password=params.password,
@@ -118,6 +125,11 @@ async def register(request: Request, params: RegisterUserParams):
status=params.status,
)
if user:
# 默认分配普通用户角色
await UserRole.create(
user_id=user.id,
role_id=userRole.get("id", ""),
)
userParams = LoginParams(
username=params.username,
password=params.password
@@ -140,7 +152,7 @@ async def register(request: Request, params: RegisterUserParams):
result.pop("session_id")
result.pop("userInfo")
return Response.success(msg="注册成功!", data=result)
return Response.error(msg="注册成功!")
return Response.success(msg="注册成功!")
else:
return Response.error(msg="注册失败!")
@@ -221,7 +233,7 @@ async def info(
@loginAPI.get("/getRoutes", response_class=JSONResponse, summary="获取路由信息")
# @Log(title="获取路由信息", business_type=BusinessType.SELECT)
@Log(title="获取路由信息", business_type=BusinessType.SELECT)
async def get_routes(request: Request, current_user: dict = Depends(LoginController.get_current_user)):
sub_departments = current_user.get("sub_departments")
routes = await request.app.state.redis.get(f'{RedisKeyConfig.USER_ROUTES.key}:{current_user["id"]}')
@@ -259,3 +271,31 @@ async def logout(request: Request, status: bool = Depends(LoginController.logout
if status:
return Response.success(data="退出成功!")
return Response.error(data="登出失败!")
@loginAPI.post("/unsubscribe", response_class=JSONResponse, response_model=BaseResponse, summary="用户注销")
@Log(title="用户注销", business_type=BusinessType.FORCE)
async def unsubscribe(request: Request, current_user: dict = Depends(LoginController.get_current_user)):
id = current_user.get("id")
session_id = current_user.get("session_id")
if user := await User.get_or_none(id=id, del_flag=1):
user.del_flag = 0
await user.save()
redis_token = await request.app.state.redis.get(f'{RedisKeyConfig.ACCESS_TOKEN.key}:{session_id}')
if redis_token:
await request.app.state.redis.delete(f'{RedisKeyConfig.ACCESS_TOKEN.key}:{session_id}')
# 移除用户角色
await UserRole.filter(user_id=user.id, del_flag=1).update(del_flag=0)
# 移除用户登录日志
await LoginLog.filter(user_id=user.id, del_flag=1).update(del_flag=0)
# 移除用户操作日志
await OperationLog.filter(user_id=user.id, del_flag=1).update(del_flag=0)
# 更新用户信息缓存
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{id}')
# 更新用户路由缓存
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_ROUTES.key}:{id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_ROUTES.key}:{id}')
return Response.success(data="注销成功!")
else:
return Response.error(data="注销失败!")

View File

@@ -10,11 +10,11 @@ from typing import Optional
from fastapi import APIRouter, Depends, Path, Query, Request
from fastapi.responses import JSONResponse
from annotation.auth import Auth
from annotation.auth import Auth, hasAdmin
from annotation.log import Log
from config.constant import BusinessType
from config.constant import BusinessType, RedisKeyConfig
from controller.login import LoginController
from models import Permission
from models import Permission,RolePermission
from schemas.common import BaseResponse
from schemas.permission import AddPermissionParams, GetPermissionInfoResponse, GetPermissionListResponse
from utils.response import Response
@@ -51,8 +51,17 @@ async def add_permission(request: Request, params: AddPermissionParams):
leave_transition=params.leave_transition,
fixed_tag=params.fixed_tag,
hidden_tag=params.hidden_tag,
is_admin=params.is_admin
)
if permission:
# 更新用户信息缓存
userInfos = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_INFO.key}*')
if userInfos:
await request.app.state.redis.delete(*userInfos)
# 更新用户路由缓存
userRoutes = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_ROUTES.key}*')
if userRoutes:
await request.app.state.redis.delete(*userRoutes)
return Response.success(msg="新增权限成功!")
else:
return Response.error(msg="新增权限失败!")
@@ -64,13 +73,34 @@ async def add_permission(request: Request, params: AddPermissionParams):
@Auth(permission_list=["permission:btn:delete"])
async def delete_permission(request: Request, id: str = Path(description="权限ID")):
if permission := await Permission.get_or_none(id=id, del_flag=1):
permission.del_flag = 0
await permission.save()
# 移除角色权限
await delete_permission_recursive(permission_id=permission.id)
# 更新用户信息缓存
userInfos = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_INFO.key}*')
if userInfos:
await request.app.state.redis.delete(*userInfos)
# 更新用户路由缓存
userRoutes = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_ROUTES.key}*')
if userRoutes:
await request.app.state.redis.delete(*userRoutes)
return Response.success(msg="删除权限成功!")
else:
return Response.error(msg="删除权限失败,权限不存在!")
async def delete_permission_recursive(permission_id: str):
"""
递归删除权限及其附属权限
:param permission_id: 权限ID
:return:
"""
await Permission.filter(id=permission_id, del_flag=1).update(del_flag=0)
await RolePermission.filter(permission_id=permission_id, del_flag=1).update(del_flag=0)
sub_permissions = await Permission.filter(parent_id=permission_id, del_flag=1).all()
for sub_department in sub_permissions:
await delete_permission_recursive(sub_department.id)
return True
@permissionAPI.put("/update/{id}", response_model=BaseResponse, response_class=JSONResponse, summary="更新权限")
@permissionAPI.post("/update/{id}", response_model=BaseResponse, response_class=JSONResponse, summary="更新权限")
@Log(title="更新权限", business_type=BusinessType.UPDATE)
@@ -98,7 +128,16 @@ async def update_permission(request: Request, params: AddPermissionParams, id: s
permission.leave_transition = params.leave_transition
permission.fixed_tag = params.fixed_tag
permission.hidden_tag = params.hidden_tag
permission.is_admin = params.is_admin
await permission.save()
# 更新用户信息缓存
userInfos = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_INFO.key}*')
if userInfos:
await request.app.state.redis.delete(*userInfos)
# 更新用户路由缓存
userRoutes = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_ROUTES.key}*')
if userRoutes:
await request.app.state.redis.delete(*userRoutes)
return Response.success(msg="更新权限成功!")
else:
return Response.error(msg="更新权限失败,权限不存在!")
@@ -137,6 +176,7 @@ async def get_permission(request: Request, id: str = Path(description="权限ID"
fixed_tag="fixed_tag",
show_link="show_link",
show_parent="show_parent",
is_admin="is_admin"
)
return Response.success(msg="查询权限详情成功!", data=permission)
else:
@@ -171,7 +211,9 @@ async def get_permission_list(
enterTransition: Optional[str] = Query(default=None, description="进场动画"),
leaveTransition: Optional[str] = Query(default=None, description="离场动画"),
fixedTag: Optional[bool] = Query(default=None, description="固定标签页"),
hiddenTag: Optional[bool] = Query(default=None, description="隐藏标签页")
hiddenTag: Optional[bool] = Query(default=None, description="隐藏标签页"),
isAdmin: Optional[bool] = Query(default=None, description="是否为管理专属页面"),
current_user: dict = Depends(LoginController.get_current_user),
):
filterArgs = {
f'{k}__contains': v for k, v in {
@@ -195,9 +237,13 @@ async def get_permission_list(
"enter_transition": enterTransition,
"leave_transition": leaveTransition,
"fixed_tag": fixedTag,
"hidden_tag": hiddenTag
"hidden_tag": hiddenTag,
"is_admin": isAdmin
}.items() if v
}
department_id = current_user.get("department_id", "")
if not await hasAdmin(request, department_id):
filterArgs["is_admin"] = False
total = await Permission.filter(**filterArgs, del_flag=1).count()
result = await Permission.filter(**filterArgs, del_flag=1).offset((page - 1) * pageSize).limit(pageSize).order_by(
'rank').values(
@@ -226,7 +272,8 @@ async def get_permission_list(
hidden_tag="hidden_tag",
fixed_tag="fixed_tag",
show_link="show_link",
show_parent="show_parent"
show_parent="show_parent",
is_admin="is_admin"
)
return Response.success(data={
"total": total,

View File

@@ -10,7 +10,7 @@ from typing import Optional
from fastapi import APIRouter, Depends, Path, Query, Request
from fastapi.responses import JSONResponse
from annotation.auth import Auth
from annotation.auth import Auth, hasAuth, hasAdmin
from annotation.log import Log
from config.constant import BusinessType, RedisKeyConfig
from controller.login import LoginController
@@ -31,7 +31,7 @@ roleAPI = APIRouter(
@Auth(permission_list=["role:btn:add"])
async def add_role(request: Request, params: AddRoleParams,
current_user: dict = Depends(LoginController.get_current_user)):
if await Role.get_or_none(code=params.role_code, department_id=params.department_id, del_flag=1):
if await Role.get_or_none(code=params.code, department_id=params.department_id, del_flag=1):
return Response.error(msg="角色编码已存在!")
sub_departments = current_user.get("sub_departments")
if params.department_id not in sub_departments:
@@ -47,10 +47,10 @@ async def add_role(request: Request, params: AddRoleParams,
)
else:
role = await Role.create(
code=params.role_code,
name=params.role_name,
code=params.code,
name=params.name,
status=params.status,
description=params.role_description,
description=params.description,
department_id=None,
)
if role:
@@ -194,8 +194,16 @@ async def get_role_list(
"status": status
}.items() if v
}
if await hasAuth(request, "role:btn:admin"):
if not department_id:
filterArgs["department__id__in"] = current_user.get("sub_departments")
else:
filterArgs["department__id"] = current_user.get("department_id")
else:
if department_id:
filterArgs["department__id"] = department_id
else:
filterArgs["department__id"] = current_user.get("department_id")
total = await Role.filter(**filterArgs, del_flag=1).count()
data = await Role.filter(**filterArgs, del_flag=1).offset(
(page - 1) * pageSize).limit(
@@ -231,6 +239,11 @@ async def add_role_permission(request: Request, params: AddRolePermissionParams,
id: str = Path(..., description="角色ID"),
current_user: dict = Depends(LoginController.get_current_user)):
sub_departments = current_user.get("sub_departments")
if await hasAdmin(request, current_user.get("department_id")):
department_permissions = await Permission.filter(del_flag=1).values("id")
else:
department_permissions = await Permission.filter(is_admin=False, del_flag=1).values("id")
department_permissions = filterKeyValues(department_permissions, "id")
if role := await Role.get_or_none(id=id, del_flag=1, department__id__in=sub_departments):
# 已有角色权限
rolePermissions = await RolePermission.filter(role_id=id, del_flag=1).values("permission_id")
@@ -239,6 +252,8 @@ async def add_role_permission(request: Request, params: AddRolePermissionParams,
add_list = set(params.permission_ids).difference(set(rolePermissions))
# 循环添加角色权限
for item in add_list:
if item not in department_permissions:
continue
permission = await Permission.get_or_none(id=item, del_flag=1)
if permission:
await RolePermission.create(
@@ -290,10 +305,15 @@ async def update_role_permission(request: Request, params: AddRolePermissionPara
id: str = Path(..., description="角色ID"),
current_user: dict = Depends(LoginController.get_current_user)):
sub_departments = current_user.get("sub_departments")
if await hasAdmin(request, current_user.get("department_id")):
department_permissions = await Permission.filter(del_flag=1).values("id")
else:
department_permissions = await Permission.filter(is_admin=False, del_flag=1).values("id")
department_permissions = await filterKeyValues(department_permissions, key="id", convert_type=str)
if role := await Role.get_or_none(id=id, del_flag=1, department__id__in=sub_departments):
# 已有角色权限
rolePermissions = await RolePermission.filter(role_id=role.id, del_flag=1).values("permission_id")
rolePermissions = await filterKeyValues(rolePermissions, "permission_id")
rolePermissions = await filterKeyValues(rolePermissions, key="permission_id", convert_type=str)
# 利用集合筛选出角色权限中不存在的权限
delete_list = set(rolePermissions).difference(set(params.permission_ids))
# 利用集合筛选出角色权限中新增的权限
@@ -303,6 +323,8 @@ async def update_role_permission(request: Request, params: AddRolePermissionPara
await RolePermission.filter(role_id=id, permission_id=item, del_flag=1).update(del_flag=0)
# 循环添加角色权限
for item in add_list:
if item not in department_permissions:
continue
await RolePermission.create(role_id=id, permission_id=item)
# 更新用户信息缓存
userInfos = await request.app.state.redis.keys(f'{RedisKeyConfig.USER_INFO.key}*')

View File

@@ -14,12 +14,14 @@ import psutil
from fastapi import APIRouter, Depends, Request
from fastapi.responses import JSONResponse
from annotation.auth import Auth
from annotation.log import Log
from config.constant import BusinessType
from controller.login import LoginController
from schemas.server import GetServerInfoResponse, CpuInfo, MemoryInfo, SystemInfo, PythonInfo, SystemFiles, \
GetSystemInfoResult
from utils.common import bytes2human
from utils.log import logger
from utils.response import Response
serverAPI = APIRouter(
@@ -30,6 +32,7 @@ serverAPI = APIRouter(
@serverAPI.get("", response_class=JSONResponse, response_model=GetServerInfoResponse, summary="获取服务器信息")
@Log(title="获取服务器信息", business_type=BusinessType.SELECT)
@Auth(permission_list=["server:btn:info"])
async def get_server_info(request: Request):
# CPU信息
# 获取CPU总核心数
@@ -96,6 +99,7 @@ async def get_server_info(request: Request):
io = psutil.disk_partitions()
sys_files = []
for i in io:
try:
o = psutil.disk_usage(i.device)
disk_data = SystemFiles(
dirName=i.device,
@@ -107,6 +111,9 @@ async def get_server_info(request: Request):
usage=f'{psutil.disk_usage(i.device).percent}%',
)
sys_files.append(disk_data)
except Exception as e:
logger.error(f"获取磁盘信息失败:{e}")
continue
result = GetSystemInfoResult(cpu=cpu, memory=mem, system=sys, python=py, systemFiles=sys_files)
return Response.success(data=result)

View File

@@ -9,7 +9,7 @@ import os
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, Path, Query, UploadFile, File, Request
from fastapi import APIRouter, Depends, Path, Query, UploadFile, File, Request, Form
from fastapi.responses import JSONResponse
from annotation.auth import Auth
@@ -20,14 +20,14 @@ from controller.login import LoginController
from controller.query import QueryController
from exceptions.exception import ModelValidatorException
from models import File as FileModel
from models import Role, Department
from models import Role, Department, OperationLog, LoginLog
from models.user import User, UserRole
from schemas.common import BaseResponse, DeleteListParams
from schemas.department import GetDepartmentListResponse
from schemas.file import UploadFileResponse
from schemas.user import AddUserParams, GetUserListResponse, GetUserInfoResponse, UpdateUserParams, \
AddUserRoleParams, GetUserRoleInfoResponse, UpdateUserRoleParams, GetUserPermissionListResponse, \
ResetPasswordParams
ResetPasswordParams, UpdateBaseUserInfoParams
from utils.common import filterKeyValues
from utils.password import Password
from utils.response import Response
@@ -76,6 +76,18 @@ async def delete_user(
if user := await User.get_or_none(id=id, department__id__in=sub_departments, del_flag=1):
user.del_flag = 0
await user.save()
# 移除用户角色
await UserRole.filter(user_id=user.id, del_flag=1).update(del_flag=0)
# 移除用户登录日志
await LoginLog.filter(user_id=user.id, del_flag=1).update(del_flag=0)
# 移除用户操作日志
await OperationLog.filter(user_id=user.id, del_flag=1).update(del_flag=0)
# 更新用户信息缓存
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{id}')
# 更新用户路由缓存
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_ROUTES.key}:{id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_ROUTES.key}:{id}')
return Response.success(msg="删除成功!")
else:
return Response.error(msg="删除失败,用户不存在!")
@@ -422,3 +434,82 @@ async def reset_user_password(request: Request, params: ResetPasswordParams, id:
await user.save()
return Response.success(msg="重置密码成功!")
return Response.failure(msg="用户不存在!")
@userAPI.put("/updateBaseUserInfo", response_model=BaseResponse, response_class=JSONResponse,
summary="更新基础个人信息")
@userAPI.post("/updateBaseUserInfo", response_model=BaseResponse, response_class=JSONResponse,
summary="更新基础个人信息")
@Log(title="更新基础个人信息", business_type=BusinessType.UPDATE)
async def update_base_userinfo(params: UpdateBaseUserInfoParams, request: Request,
current_user: dict = Depends(LoginController.get_current_user)):
user = await User.get_or_none(id=current_user.get("id"), del_flag=1)
if user:
user.nickname = params.name
user.gender = params.gender
await user.save()
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{user.id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{user.id}')
return Response.success(msg="更新成功!")
return Response.error(msg="更新失败!")
@userAPI.put("/updatePassword", response_class=JSONResponse, response_model=BaseResponse, summary="用户更新密码")
@userAPI.post("/updatePassword", response_class=JSONResponse, response_model=BaseResponse, summary="用户更新密码")
@Log(title="用户更新密码", business_type=BusinessType.UPDATE)
async def update_user_password(request: Request, oldPassword: str = Form(description="用户旧密码"),
newPassword: str = Form(description="用户新密码"),
current_user: dict = Depends(LoginController.get_current_user)):
if user := await User.get_or_none(id=current_user.get("id"), del_flag=1):
password = await Password.get_password_hash(oldPassword)
if user.password != password:
return Response.error(msg="旧密码错误!")
newPassword = await Password.get_password_hash(newPassword)
user.password = newPassword
await user.save()
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{user.id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{user.id}')
return Response.success(msg="更新成功!")
return Response.error(msg="更新失败!")
@userAPI.put("/updatePhone", response_class=JSONResponse, response_model=BaseResponse, summary="用户更新手机号")
@userAPI.post("/updatePhone", response_class=JSONResponse, response_model=BaseResponse, summary="用户更新手机号")
@Log(title="用户更新手机号", business_type=BusinessType.UPDATE)
async def update_user_phone(request: Request, password: str = Form(description="用户密码"),
phone: str = Form(description="用户手机号"),
current_user: dict = Depends(LoginController.get_current_user)):
if user := await User.get_or_none(id=current_user.get("id"), del_flag=1):
password = await Password.get_password_hash(password)
if user.password != password:
return Response.error("更改失败,请正确输入旧密码")
phoneStatus = await User.filter(phone=phone, del_flag=1).count()
if phoneStatus:
return Response.error(f"更改失败,手机号:{phone}已绑定其他账号!")
user.phone = phone
await user.save()
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{user.id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{user.id}')
return Response.success(msg="更新成功!")
return Response.error(msg="更新失败!")
@userAPI.put("/updateEmail", response_class=JSONResponse, response_model=BaseResponse, summary="用户更新邮箱")
@userAPI.post("/updateEmail", response_class=JSONResponse, response_model=BaseResponse, summary="用户更新邮箱")
@Log(title="用户更新邮箱", business_type=BusinessType.UPDATE)
async def update_user_email(request: Request, password: str = Form(description="用户密码"),
email: str = Form(description="用户邮箱"),
current_user: dict = Depends(LoginController.get_current_user)):
if user := await User.get_or_none(id=current_user.get("id"), del_flag=1):
password = await Password.get_password_hash(password)
if user.password != password:
return Response.error("更改失败,请正确输入旧密码")
emailStatus = await User.filter(email=email, del_flag=1).count()
if emailStatus:
return Response.error(f"更改失败,邮箱:{email}已绑定其他账号!")
user.email = email
await user.save()
if await request.app.state.redis.get(f'{RedisKeyConfig.USER_INFO.key}:{user.id}'):
await request.app.state.redis.delete(f'{RedisKeyConfig.USER_INFO.key}:{user.id}')
return Response.success(msg="更新成功!")
return Response.error(msg="更新失败!")

2
app.py
View File

@@ -23,6 +23,7 @@ from api.permission import permissionAPI
from api.role import roleAPI
from api.server import serverAPI
from api.user import userAPI
from api.generate import generateAPI
from config.database import init_db, close_db
from config.env import AppConfig
from config.get_redis import Redis
@@ -87,6 +88,7 @@ api_list = [
{'api': serverAPI, 'tags': ['服务器管理']},
{'api': i18nAPI, 'tags': ['国际化管理']},
{'api': configApi, 'tags': ['配置管理']},
{'api': generateAPI, 'tags': ['代码生成管理']},
]
for api in api_list:

View File

@@ -265,3 +265,72 @@ class RedisKeyConfig(Enum):
"""国际化类型,存储国际化类型及其配置信息。"""
SYSTEM_CONFIG = {'key': 'system_config', 'remark': '系统配置信息'}
"""系统配置信息,存储系统的配置信息。"""
# MYSQL类型和 Python类型的映射关系
MYSQL_TO_PYTHON_TYPE = {
"int": "int",
"bigint": "int",
"smallint": "int",
"tinyint": "int",
"tinyint(1)": "bool", # MySQL 的 tinyint(1) 通常用作布尔值
"float": "float",
"double": "float",
"decimal": "float",
"char": "str",
"varchar": "str",
"text": "str",
"longtext": "str",
"date": "date",
"datetime": "datetime",
"timestamp": "datetime",
"time": "time",
"json": "dict",
"binary": "bytes",
"varbinary": "bytes",
"blob": "bytes",
"tinyblob": "bytes",
"mediumblob": "bytes",
"longblob": "bytes"
}
MYSQL_TO_TORTOISE_TYPE = {
# 数值类型
"tinyint": "BooleanField", # 通常用于布尔值
"smallint": "IntField",
"mediumint": "IntField",
"int": "IntField",
"integer": "IntField",
"bigint": "BigIntField",
"decimal": "DecimalField",
"numeric": "DecimalField",
"float": "FloatField",
"double": "FloatField",
"real": "FloatField",
# 字符串类型
"char": "CharField",
"varchar": "CharField",
"tinytext": "TextField",
"text": "TextField",
"mediumtext": "TextField",
"longtext": "TextField",
# 日期和时间
"date": "DateField",
"datetime": "DatetimeField",
"timestamp": "DatetimeField",
"time": "TimeField",
"year": "IntField",
# 二进制数据
"binary": "BinaryField",
"varbinary": "BinaryField",
"blob": "BinaryField",
"tinyblob": "BinaryField",
"mediumblob": "BinaryField",
"longblob": "BinaryField",
# JSON
"json": "JSONField",
}

View File

@@ -5,9 +5,13 @@
# @File : database.py
# @Software : PyCharm
# @Comment : 本程序
import asyncio
import logging
import subprocess
import sys
from datetime import datetime
from logging.handlers import RotatingFileHandler
from pathlib import Path
from tortoise import Tortoise
@@ -136,3 +140,41 @@ async def configure_tortoise_logging(enable_logging: bool = True, log_level: int
else:
# 如果禁用日志,设置日志级别为 WARNING 以抑制大部分输出
tortoise_logger.setLevel(logging.WARNING)
async def backup_database():
"""
备份数据库
"""
logger.info("开始备份数据库")
# 配置数据库连接信息
backup_dir = Path().cwd() / "sql" # 备份文件存储的目录
# 如果 migrations 目录不存在,则创建
backup_dir.mkdir(parents=True, exist_ok=True)
# 生成备份文件名,格式为 dbYYYYMMDDHHMMSS.sql
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
backup_filename = f"db{timestamp}.sql"
backup_filepath = backup_dir / backup_filename # 使用 Path 对象组合路径
# 构造 mysqldump 命令
command = [
"mysqldump",
"-u", DataBaseConfig.db_username,
f"-p{DataBaseConfig.db_password}", # 直接传递密码
DataBaseConfig.db_database,
"--result-file=" + str(backup_filepath) # 指定备份文件路径
]
# 使用 asyncio.to_thread 来在线程中执行阻塞操作
await asyncio.to_thread(run_mysqldump, command)
def run_mysqldump(command):
"""在阻塞线程中执行 mysqldump 命令"""
try:
subprocess.run(command, check=True)
logger.info(f"数据库备份已完成,文件保存为 {command[-1]}")
except subprocess.CalledProcessError as e:
logger.error(f"备份失败,错误: {e}")

View File

@@ -130,7 +130,7 @@ class LoginController:
userInfo = await QueryController.get_user_info(user_id=user_id)
await request.app.state.redis.set(f'{RedisKeyConfig.USER_INFO.key}:{user_id}',
str(jsonable_encoder(userInfo)),
ex=timedelta(minutes=5))
ex=timedelta(minutes=2))
if not userInfo:
logger.warning('用户token不合法')
raise AuthException(data='', message='用户token不合法')

View File

@@ -132,7 +132,8 @@ class QueryController:
keepAlive="permission__keep_alive",
hiddenTag="permission__hidden_tag",
showLink="permission__show_link",
showParent="permission__show_parent"
showParent="permission__show_parent",
isAdmin="permission__is_admin",
)
permissions.extend(permission)
return permissions

View File

@@ -9,6 +9,7 @@
from models.config import Config
from models.department import Department
from models.file import File
from models.generate import GenerateInfo, GenerateColumn
from models.i18n import I18n, Locale
from models.log import LoginLog, OperationLog
from models.permission import Permission
@@ -27,5 +28,7 @@ __all__ = [
'UserRole',
'I18n',
'Locale',
'Config'
'Config',
'GenerateInfo',
'GenerateColumn'
]

54
models/generate.py Normal file
View File

@@ -0,0 +1,54 @@
# _*_ coding : UTF-8 _*_
# @Time : 2025/02/21 03:37
# @UpdateTime : 2025/02/21 03:37
# @Author : sonder
# @File : generate.py
# @Software : PyCharm
# @Comment : 本程序
from tortoise import fields
from models.common import BaseModel
class GenerateInfo(BaseModel):
"""
代码生成表模型
"""
table_name = fields.CharField(max_length=255, default="", description="表名称", source_field="table_name")
table_comment = fields.CharField(max_length=255, default="", description="表注释", source_field="table_comment")
class_name = fields.CharField(max_length=255, default="", description="类名", source_field="class_name")
author = fields.CharField(max_length=255, default="", description="作者", source_field="author")
remark = fields.TextField(default="", description="备注", null=True, source_field="remark")
permission_id = fields.CharField(max_length=255, default="", description="权限ID", source_field="permission_id")
prefix = fields.CharField(max_length=255, default="", description="api前缀", source_field="prefix")
description = fields.TextField(default="", description="描述", null=True, source_field="description")
class Meta:
table = "generate_info"
table_description = "代码生成表"
class GenerateColumn(BaseModel):
"""
代码生成列模型
"""
table = fields.ForeignKeyField("models.GenerateInfo", related_name="columns", description="",
source_field="table_id")
index = fields.IntField(default=0, description="索引", source_field="index")
column_name = fields.CharField(max_length=255, default="", description="字段名称", source_field="column_name")
column_comment = fields.CharField(max_length=255, default="", description="字段注释", source_field="column_comment")
column_type = fields.CharField(max_length=255, default="", description="字段类型", source_field="column_type")
python_type = fields.CharField(max_length=255, default="", description="python类型", source_field="python_type")
python_name = fields.CharField(max_length=255, default="", description="python名称", source_field="python_name")
is_insert = fields.BooleanField(default=True, description="是否插入", source_field="is_insert")
is_edit = fields.BooleanField(default=True, description="是否编辑", source_field="is_edit")
is_list = fields.BooleanField(default=True, description="是否列表", source_field="is_list")
is_query = fields.BooleanField(default=True, description="是否查询", source_field="is_query")
is_required = fields.BooleanField(default=False, description="是否必填", source_field="is_required")
is_hide = fields.BooleanField(default=False, description="是否隐藏", source_field="is_hide")
query_way = fields.CharField(max_length=255, default="", description="查询方式", source_field="query_way")
show_type = fields.CharField(max_length=255, default="", description="显示类型", source_field="show_type")
class Meta:
table = "generate_column"
table_description = "代码生成列"

View File

@@ -280,6 +280,18 @@ class Permission(BaseModel):
- 映射到数据库字段 show_parent。
"""
is_admin = fields.BooleanField(
default=False,
description="是否为管理专属页面",
source_field="is_admin" # 映射到数据库字段 is_admin
)
"""
是否为管理专属页面。
- 是否为管理专属页面,仅管理员可见。
- 默认为 False。
- 映射到数据库字段 is_admin。
"""
class Meta:
table = "permission" # 数据库表名
table_description = "权限表" # 表描述

View File

@@ -17,13 +17,11 @@ class User(BaseModel):
username = fields.CharField(
max_length=255,
unique=True,
description="用户名",
source_field="username" # 映射到数据库字段 username
)
"""
用户名。
- 必须唯一。
- 最大长度为 255 个字符。
- 映射到数据库字段 username。
"""

136
schemas/generate.py Normal file
View File

@@ -0,0 +1,136 @@
# _*_ coding : UTF-8 _*_
# @Time : 2025/02/28 18:00
# @UpdateTime : 2025/02/28 18:00
# @Author : sonder
# @File : generate.py
# @Software : PyCharm
# @Comment : 本程序
from typing import List
from pydantic import BaseModel, Field, ConfigDict
from pydantic.alias_generators import to_camel, to_snake
from schemas.common import BaseResponse, ListQueryResult
class TableInfo(BaseModel):
"""
数据表信息模型
"""
model_config = ConfigDict(alias_generator=to_snake)
id: str = Field(default="", description="主键")
table_name: str = Field(default="", description="表名称")
table_comment: str = Field(default="", description="表注释")
table_rows: int = Field(default=0, description="表行数")
data_length: str = Field(default="", description="表大小")
index_length: str = Field(default="", description="索引大小")
create_time: str = Field(default="", description="创建时间")
update_time: str = Field(default="", description="更新时间")
class GenerateTableInfo(BaseModel):
"""
生成表信息模型
"""
model_config = ConfigDict(alias_generator=to_snake)
id: str = Field(default="", description="主键")
table_name: str = Field(default="", description="表名称")
table_comment: str = Field(default="", description="表注释")
author: str = Field(default="", description="作者")
prefix: str = Field(default="", description="api前缀")
class_name: str = Field(default="", description="类名")
remark: str = Field(default="", description="备注")
description: str = Field(default="", description="描述")
permission_id: str = Field(default="", description="权限ID")
create_time: str = Field(default="", description="创建时间")
update_time: str = Field(default="", description="更新时间")
class GetTableListResult(ListQueryResult):
"""
获取数据表结果
"""
result: List[TableInfo] = Field(default=None, description="响应数据")
class GetTablesListResponse(BaseResponse):
"""
获取数据库表结果
"""
data: TableInfo = Field(default=None, description="响应数据")
class AddGenerateInfoParams(BaseModel):
"""
添加生成信息参数
"""
table_name: str = Field(default="", description="表名称")
table_comment: str = Field(default="", description="表注释")
author: str = Field(default="", description="作者")
prefix: str = Field(default="", description="api前缀")
class_name: str = Field(default="", description="类名")
remark: str = Field(default="", description="备注")
permission_id: str = Field(default="", description="权限ID")
description: str = Field(default="", description="备注")
class GenerateColumnInfo(BaseModel):
"""
生成列信息
"""
model_config = ConfigDict(alias_generator=to_snake)
id: str = Field(default="", description="主键")
table_id: str = Field(default="", description="表ID")
table_name: str = Field(default="", description="表名称")
table_comment: str = Field(default="", description="表注释")
column_name: str = Field(default="", description="字段名称")
column_comment: str = Field(default="", description="字段注释")
column_type: str = Field(default="", description="字段类型")
python_type: str = Field(default="", description="python类型")
python_name: str = Field(default="", description="python名称")
is_insert: bool = Field(default=True, description="是否插入")
is_edit: bool = Field(default=True, description="是否编辑")
is_list: bool = Field(default=True, description="是否列表")
is_query: bool = Field(default=True, description="是否查询")
query_way: str = Field(default="", description="查询方式")
show_type: str = Field(default="", description="显示类型")
is_required: bool = Field(default=False, description="是否必填")
is_hide: bool = Field(default=False, description="是否隐藏")
index: int = Field(default=0, description="索引")
create_time: str = Field(default="", description="创建时间")
update_time: str = Field(default="", description="更新时间")
class UpdateGenerateInfoParams(BaseModel):
"""
更新生成信息参数
"""
columns: List[GenerateColumnInfo] = Field(default=None, description="生成列信息")
class GetGenerateInfoResult(GenerateTableInfo):
"""
获取生成信息结果
"""
columns: List[GenerateColumnInfo] = Field(default=None, description="生成列信息")
class GetGenerateInfoListResult(ListQueryResult):
"""
获取生成信息结果
"""
result: List[GetGenerateInfoResult] = Field(default=[], description="响应数据")
class GetGenerateInfoResponse(BaseResponse):
"""
获取生成信息结果
"""
data: GetGenerateInfoResult = Field(default=None, description="响应数据")
class GetGenerateInfoListResponse(BaseResponse):
"""
获取生成信息结果
"""
data: GetGenerateInfoListResult = Field(default=[], description="响应数据")

View File

@@ -43,6 +43,7 @@ class PermissionInfo(BaseModel):
fixed_tag: bool = Field(default=False, description="固定标签页")
show_link: bool = Field(default=True, description="显示菜单")
show_parent: bool = Field(default=True, description="显示父级菜单")
is_admin: bool = Field(default=False, description="是否为管理专属页面")
class Config:
json_schema_extra = {
@@ -72,7 +73,8 @@ class PermissionInfo(BaseModel):
"hidden_tag": False,
"fixed_tag": False,
"show_link": True,
"show_parent": True
"show_parent": True,
"is_admin": False
}
}
@@ -109,6 +111,7 @@ class AddPermissionParams(BaseModel):
show_parent: bool = Field(default=True, description="显示父级菜单")
parent_id: str = Field(default="", max_length=36, description="父级菜单ID")
menu_type: int = Field(default=0, description="菜单类型")
is_admin: bool = Field(default=False, description="是否为管理专属页面")
class Config:
json_schema_extra = {
@@ -133,7 +136,8 @@ class AddPermissionParams(BaseModel):
"show_link": True,
"show_parent": True,
"parent_id": "",
"menu_type": 0
"menu_type": 0,
"is_admin": False
}
}

View File

@@ -6,6 +6,7 @@
# @Software : PyCharm
# @Comment : 本程序
from datetime import datetime
from enum import IntEnum
from typing import Optional, List
from uuid import UUID
@@ -15,6 +16,11 @@ from pydantic_validation_decorator import Xss, NotBlank, Size, Network
from schemas.common import BaseResponse, ListQueryResult
class Gender(IntEnum):
MAN = 0
WOMAN = 1
class UserBase(BaseModel):
"""
用户表基础模型。
@@ -387,3 +393,19 @@ class GetUserStatisticsResponse(BaseResponse):
获取用户统计信息响应模型。
"""
data: GetUserStatisticsResult = Field(default=None, description="响应数据")
class UpdateBaseUserInfoParams(BaseModel):
"""修改基础信息参数"""
name: str
"""姓名"""
gender: Gender
"""性别"""
class Config:
json_schema_extra = {
"example": {
"name": "张三",
"gender": 1,
}
}

1075
sql/fastapi.sql Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,135 @@
# _*_ coding : UTF-8 _*_
# @Time : {{ current_time }}
# @UpdateTime : {{ current_time }}
# @Author : {{ author }}
# @File : {{ table_name }}.py
# @Comment : 本程序用于生成{{ table_comment }}增删改查接口
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, Path, Request, Query
from fastapi.responses import JSONResponse
from annotation.auth import Auth
from annotation.log import Log
from config.constant import BusinessType
from controller.login import LoginController
from models import {{ class_name }}
from schemas.common import BaseResponse, DeleteListParams
from schemas.{{ name }} import Add{{ class_name }}Params, Update{{ class_name }}Params, Get{{ class_name }}InfoResponse, Get{{ class_name }}ListResponse
from utils.response import Response
{{ table_name }}API= APIRouter(
prefix="{{ prefix }}",
dependencies=[Depends(LoginController.get_current_user)],
)
@{{ table_name }}API.post("/add", response_class=JSONResponse, response_model=BaseResponse, summary="新增{{ description }}")
@Log(title="新增{{ description }}", business_type=BusinessType.INSERT)
@Auth(permission_list=["{{ name }}:btn:add"])
async def add_{{ name }}(request: Request, params: Add{{ class_name }}Params):
if await {{ class_name }}.get_or_none(
{% for column in columns if column.is_insert %}
{{ column.python_name }} = params.{{ column.python_name }},
{% endfor %}
del_flag=1
):
return Response.error(msg="{{ description }}已存在!")
{{ name }} = await {{ class_name }}.create(
{% for column in columns if column.is_insert %}
{{ column.python_name }} = params.{{ column.python_name }},
{% endfor %}
)
if {{ name }}:
return Response.success(msg="新增成功!")
else:
return Response.error(msg="新增失败")
@{{ table_name }}API.delete("/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除{{ description }}")
@{{ table_name }}API.post("/delete/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="删除{{ description }}")
@Log(title="删除{{ description }}", business_type=BusinessType.DELETE)
@Auth(permission_list=["{{ name }}:btn:delete"])
async def delete_{{ name }}(request: Request, id: str = Path(description="{{ description }}ID")):
if {{ name }} := await {{ class_name }}.get_or_none(id=id, del_flag=1):
{{ name }}.del_flag = 0
await {{ name }}.save()
return Response.success(msg="删除成功")
else:
return Response.error(msg="{{ description }}不存在!")
@{{ table_name }}API.delete("/deleteList", response_class=JSONResponse, response_model=BaseResponse, summary="批量删除{{ description }}")
@{{ table_name }}API.post("/deleteList", response_class=JSONResponse, response_model=BaseResponse, summary="批量删除{{ description }}")
@Log(title="批量删除{{ description }}", business_type=BusinessType.DELETE)
@Auth(permission_list=["{{ name }}:btn:delete"])
async def delete_{{ name }}_list(request: Request, params: DeleteListParams):
for id in set(params.ids):
if {{ name }} := await {{ class_name }}.get_or_none(id=id, del_flag=1):
{{ name }}.del_flag = 0
await {{ name }}.save()
return Response.success(msg="删除成功")
@{{ table_name }}API.put("/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="修改{{ description }}")
@{{ table_name }}API.post("/update/{id}", response_class=JSONResponse, response_model=BaseResponse, summary="修改{{ description }}")
@Log(title="修改{{ description }}", business_type=BusinessType.UPDATE)
@Auth(permission_list=["{{ name }}:btn:update"])
async def update_{{ name }}(request: Request, params: Update{{ class_name }}Params, id: str = Path(description="{{ description }}ID")):
if {{ name }} := await {{ class_name }}.get_or_none(id=id, del_flag=1):
{% for column in columns if column.is_edit %}
{{ name }}.{{ column.python_name }} = params.{{ column.python_name }}
{% endfor %}
await {{ name }}.save()
return Response.success(msg="修改成功")
else:
return Response.error(msg="{{ description }}不存在")
@{{ table_name }}API.get("/info/{id}", response_class=JSONResponse, response_model=Get{{ class_name }}InfoResponse, summary="获取{{ description }}信息")
@Log(title="获取{{ description }}信息", business_type=BusinessType.SELECT)
@Auth(permission_list=["{{ name }}:btn:info"])
async def get_{{ name }}_info(request: Request, id: str = Path(description="{{ description }}ID")):
if {{ name }} := await {{ class_name }}.get_or_none(id=id, del_flag=1):
data = {
{% for column in columns if column.is_list %}
"{{ column.python_name }}":{{ name }}.{{ column.python_name }},
{% endfor %}
}
return Response.success(data=data)
else:
return Response.error(msg="{{ description }}不存在")
@{{ table_name }}API.get("/list", response_class=JSONResponse, response_model=Get{{ class_name }}ListResponse, summary="获取{{ description }}列表")
@Log(title="获取{{ description }}列表", business_type=BusinessType.SELECT)
@Auth(permission_list=["{{ name }}:btn:list"])
async def get_{{ name }}_list(
request: Request,
page: int = Query(default=1, description="当前页码"),
pageSize: int = Query(default=10, description="每页数量"),
{% for column in columns if column.is_query %}
{{ column.python_name }}: Optional[str] = Query(default=None, description="{{ column.column_comment }}"),
{% endfor %}
):
filterArgs={
{% for column in columns if column.is_query %}
"{{ column.python_name }}{{ column.query_way }}": {{ column.python_name }},
{% endfor %}
}
filterArgs = {k: v for k, v in filterArgs.items() if v is not None}
total = await {{ class_name }}.filter(**filterArgs, del_flag=1).count()
data = await {{ class_name }}.filter(**filterArgs, del_flag=1).offset((page - 1) * pageSize).limit(pageSize).values(
{% for column in columns if column.is_list %}
{{ column.python_name }} = "{{ column.python_name }}",
{% endfor %}
)
return Response.success(data={
"total": total,
"result": data,
"page": page,
"pageSize": pageSize,
})

View File

@@ -0,0 +1,44 @@
# _*_ coding : UTF-8 _*_
# @Time : {{ current_time }}
# @UpdateTime : {{ current_time }}
# @Author : {{ author }}
# @File : {{ table_name }}.py
# @Comment : 本程序用于{{ table_comment }}模型
from tortoise import fields
from models.common import BaseModel
class {{ class_name }}(BaseModel):
"""
{{ table_comment }}模型
"""
{% for column in columns if column.is_common == false %}
{%- set params = [] %}
{%- if column.max_length is not none %}{% set params = params + ["max_length=" ~ column.max_length] %}{% endif %}
{%- if column.is_nullable %}{% set params = params + ["null=True"] %}{% endif %}
{%- if column.is_unique %}{% set params = params + ["unique=True"] %}{% endif %}
{%- if column.default is not none %}{% set params = params + ["default=" ~ column.default] %}{% endif %}
{%- if column.column_comment %}{% set params = params + ['description="' ~ column.column_comment ~ '"'] %}{% endif %}
{%- if column.column_name %}{% set params = params + ['source_field="' ~ column.column_name ~ '"'] %}{% endif %}
{{ column.python_name }} = fields.{{ column.field_type }}({{ params | join(", ") }})
"""
{{ column.column_comment }}。
{%- if column.max_length is not none %}
- 最大长度为 {{ column.max_length }} 个字符
{%- endif %}
- 映射到数据库字段 {{ column.column_name }}
{%- if column.is_nullable %}
- 可为空
{%- endif %}
{%- if column.default is not none %}
- 默认值:{{ column.default }}
{%- endif %}
"""
{% endfor %}
class Meta:
table = "{{ table_name }}"
table_description = "{{ table_comment }}"

View File

@@ -0,0 +1,51 @@
# _*_ coding : UTF-8 _*_
# @Time : {{ current_time }}
# @UpdateTime : {{ current_time }}
# @Author : {{ author }}
# @File : {{ table_name }}.py
# @Comment : 本程序用于生成{{ table_comment }}参数和响应模型
from datetime import datetime
from typing import Optional, List
from pydantic import BaseModel, Field, ConfigDict
from pydantic.alias_generators import to_snake
from schemas.common import BaseResponse, ListQueryResult
class {{ class_name }}Info(BaseModel):
"""{{ description }}信息"""
model_config = ConfigDict(alias_generator=to_snake, populate_by_name=True)
{% for column in columns if column.is_list %}
{{ column.python_name }}: {% if not column.is_required %}Optional[{% endif %}{{ column.python_type }}{% if not column.is_required %}]{% endif %} = Field(
{% if column.default is not none %}default={{ column.default }}, {% endif %}title="{{ column.column_comment }}"
)
{% endfor %}
class Add{{ class_name }}Params(BaseModel):
"""新增{{ description }}参数"""
model_config = ConfigDict(alias_generator=to_snake, populate_by_name=True)
{% for column in columns if column.is_insert %}
{{ column.python_name }}: {% if not column.is_required %}Optional[{% endif %}{{ column.python_type }}{% if not column.is_required %}]{% endif %} = Field(
{% if column.default is not none %}default={{ column.default }}, {% endif %}title="{{ column.column_comment }}"
)
{% endfor %}
class Update{{ class_name }}Params(BaseModel):
"""更新{{ description }}参数"""
model_config = ConfigDict(alias_generator=to_snake, populate_by_name=True)
{% for column in columns if column.is_edit %}
{{ column.python_name }}: {% if not column.is_required %}Optional[{% endif %}{{ column.python_type }}{% if not column.is_required %}]{% endif %} = Field(
{% if column.default is not none %}default={{ column.default }}, {% endif %}title="{{ column.column_comment }}"
)
{% endfor %}
class Get{{ class_name }}InfoResponse(BaseResponse):
"""获取{{ description }}信息响应"""
data: {{ class_name }}Info = Field(None, title="{{ table_comment }}信息")
class Get{{ class_name }}InfoListResult(ListQueryResult):
"""获取{{ description }}信息列表响应结果"""
result: List[{{ class_name }}Info] = Field(None, title="{{ table_comment }}信息列表")
class Get{{ class_name }}ListResponse(BaseResponse):
"""获取{{ description }}信息列表响应"""
data: Get{{ class_name }}InfoListResult = Field(None, title="{{ table_comment }}信息列表")

14
templates/sql.jinja Normal file
View File

@@ -0,0 +1,14 @@
-- {{ description }}权限
-- 按钮权限
-- 添加
INSERT INTO `permission` VALUES ('{{ uuid4() }}', 1, '', '{{ now() }}', '', '{{ now() }}', 3, '{{ permission_id }}', 'buttons:Add', '', '', '', 1, '', '', '', '', '', '', '{{ name }}:btn:add', '', 1, 0, 0, 0, 1, 0, 0);
-- 删除
INSERT INTO `permission` VALUES ('{{ uuid4() }}', 1, '', '{{ now() }}', '', '{{ now() }}', 3, '{{ permission_id }}', 'buttons:Delete', '', '', '', 2, '', '', '', '', '', '', '{{ name }}:btn:delete', '', 1, 0, 0, 0, 1, 0, 0);
-- 修改
INSERT INTO `permission` VALUES ('{{ uuid4() }}', 1, '', '{{ now() }}', '', '{{ now() }}', 3, '{{ permission_id }}', 'buttons:Update', '', '', '', 3, '', '', '', '', '', '', '{{ name }}:btn:update', '', 1, 0, 0, 0, 1, 0, 0);
-- 详情
INSERT INTO `permission` VALUES ('{{ uuid4() }}', 1, '', '{{ now() }}', '', '{{ now() }}', 3, '{{ permission_id }}', 'buttons:Details', '', '', '', 4, '', '', '', '', '', '', '{{ name }}:btn:info', '', 1, 0, 0, 0, 1, 0, 0);
-- 数据列表
INSERT INTO `permission` VALUES ('{{ uuid4() }}', 1, '', '{{ now() }}', '', '{{ now() }}', 3, '{{ permission_id }}', 'buttons:DataList', '', '', '', 5, '', '', '', '', '', '', '{{ name }}:btn:list', '', 1, 0, 0, 0, 1, 0, 0);

View File

@@ -0,0 +1,42 @@
import { http } from "@/utils/http";
import type {
{{ class_name }}Info,
Get{{ class_name }}ListParams,
Add{{ class_name }}Params,
Update{{ class_name }}Params,
} from "types/{{ name }}";
import { filterEmptyObject } from "./utils";
/** 添加{{ description }}数据 */
export const postAdd{{ class_name }}API = (data: Add{{ class_name }}Params) => {
return http.request<null>("post", "/api{{ prefix }}/add", { data });
};
/** 删除{{ description }}数据 */
export const delete{{ class_name }}API = (id: string) => {
return http.request<null>("delete", `/api{{ prefix }}/delete/${id}`);
};
/** 批量删除{{ description }}数据 */
export const delete{{ class_name }}ListAPI = (ids: string[]) => {
return http.request<null>("delete", "/api{{ prefix }}/delete", {
data: { ids },
});
};
/** 修改{{ description }}数据 */
export const putUpdate{{ class_name }}API = (data: Update{{ class_name }}Params, id: string) => {
return http.request<null>("put", `/api{{ prefix }}/update/${id}`, { data });
};
/** 获取{{ description }}信息 */
export const get{{ class_name }}InfoAPI = (id: string) => {
return http.request<{{ class_name }}Info>("get", `/api{{ prefix }}/info/${id}`);
};
/** 获取{{ description }}列表 */
export const get{{ class_name }}ListAPI = (params: Get{{ class_name }}ListParams) => {
return http.request<QueryListResult<{{ class_name }}Info>>("get", "/api{{ prefix }}/list", {
params: filterEmptyObject(params),
});
};

View File

@@ -0,0 +1,282 @@
import dayjs from "dayjs";
import editForm from "../components/form.vue";
import { message } from "@/utils/message";
import { type Ref, ref, reactive, onMounted, h, toRaw } from "vue";
import type { {{ class_name }}Info } from "types/{{ name }}";
import type { PaginationProps } from "@pureadmin/table";
import { addDialog } from "@/components/ReDialog";
import {
delete{{ class_name }}API,
delete{{ class_name }}ListAPI,
get{{ class_name }}ListAPI,
postAdd{{ class_name }}API,
putUpdate{{ class_name }}API
} from "@/api/{{ name }}";
import { getKeyList } from "@pureadmin/utils";
export const use{{ class_name }} = (tableRef: Ref) => {
/**
* 查询表单
*/
const form = reactive({
{% for column in columns if column.is_query %}
/** {{ column.column_comment }} */
{{ column.column_name }}: "",
{% endfor %}
});
/**
* 表单Ref
*/
const formRef = ref(null);
/**
* 数据列表
*/
const dataList = ref<{{ class_name }}Info[]>([]);
/**
* 加载状态
*/
const loading = ref(true);
/**
* 已选数量
*/
const selectedNum = ref<number>(0);
/**
* 分页参数
*/
const pagination = reactive<PaginationProps>({
total: 0,
pageSize: 10,
currentPage: 1,
background: true,
pageSizes: [10, 20, 30, 40, 50]
});
/**
* 表格列设置
*/
const columns: TableColumnList = [
{
label: "勾选列", // 如果需要表格多选此处label必须设置
type: "selection",
fixed: "left",
reserveSelection: true // 数据刷新后保留选项
},
{% for column in columns if column.is_list %}
{
label: "{{ column.column_comment }}",
prop: "{{ column.column_name }}",
hide: {{ "true" if column.is_hide else "false" }}{% if column.python_type == "datetime" %},
formatter: ({ {{ column.column_name }} }) =>
dayjs({{ column.column_name }}).format("YYYY-MM-DD HH:mm:ss"){% endif %}
},
{% endfor %}
{
label: "操作",
fixed: "right",
width: 220,
slot: "operation"
}
];
/**
* 初次查询
*/
const onSearch = async () => {
loading.value = true;
const res = await get{{ class_name }}ListAPI({
page: pagination.currentPage,
pageSize: pagination.pageSize,
...toRaw(form)
});
if (res.success) {
dataList.value = res.data.result;
pagination.total = res.data.total;
pagination.currentPage = res.data.page;
pagination.pageSize = res.data.pageSize;
}
message(res.msg, {
type: res.success ? "success" : "error"
});
loading.value = false;
};
/**
* 重置表单
* @param formEl 表单ref
* @returns
*/
const resetForm = async (formEl: any) => {
if (!formEl) return;
formEl.resetFields();
await onSearch();
};
/**
* 处理删除
* @param row
*/
const handleDelete = async (row: {{ class_name }}Info) => {
const res = await delete{{ class_name }}API(row.id);
if (res.success) {
onSearch();
}
message(res.msg, {
type: res.success ? "success" : "error"
});
};
/**
* 处理每页数量变化
*/
const handleSizeChange = async (val: number) => {
loading.value = true;
const res = await get{{ class_name }}ListAPI({
page: pagination.currentPage,
pageSize: val,
...toRaw(form)
});
if (res.success) {
dataList.value = res.data.result;
pagination.total = res.data.total;
pagination.currentPage = res.data.page;
pagination.pageSize = res.data.pageSize;
}
message(res.msg, {
type: res.success ? "success" : "error"
});
loading.value = false;
};
/**
* 处理页码变化
* @param val
*/
const handleCurrentChange = async (val: number) => {
loading.value = true;
const res = await get{{ class_name }}ListAPI({
page: val,
pageSize: pagination.pageSize,
...toRaw(form)
});
if (res.success) {
dataList.value = res.data.result;
pagination.total = res.data.total;
pagination.currentPage = res.data.page;
pagination.pageSize = res.data.pageSize;
}
message(res.msg, {
type: res.success ? "success" : "error"
});
loading.value = false;
};
/** 当CheckBox选择项发生变化时会触发该事件 */
const handleSelectionChange = async (val: any) => {
selectedNum.value = val.length;
// 重置表格高度
tableRef.value.setAdaptive();
};
/** 取消选择 */
const onSelectionCancel = async () => {
selectedNum.value = 0;
// 用于多选表格,清空用户的选择
tableRef.value.getTableRef().clearSelection();
};
/**
* 批量删除
*/
const onbatchDel = async () => {
// 返回当前选中的行
const curSelected = tableRef.value.getTableRef().getSelectionRows();
const res = await delete{{ class_name }}ListAPI(getKeyList(curSelected, "id"));
if (res.success) {
message(res.msg, {
type: "success"
});
tableRef.value.getTableRef().clearSelection();
onSearch();
} else {
message(res.msg, { type: "error", duration: 5000 });
}
};
const openDialog = async (title = "新增", row?: {{ class_name }}Info) => {
addDialog({
title: `${title}配置`,
props: {
formInline: {
/** 方式 */
title:title,
{% for column in columns if column.is_list %}
/** {{ column.column_comment }} */
{{ column.python_name }}: row?.{{ column.python_name }} ?? "",
{% endfor %}
}
},
width: "45%",
draggable: true,
fullscreenIcon: true,
closeOnClickModal: false,
contentRenderer: () =>
h(editForm, {
formInline: {
/** 方式 */
title:title,
{% for column in columns if column.is_list %}
/** {{ column.column_comment }} */
{{ column.python_name }}: row?.{{ column.python_name }} ?? "",
{% endfor %}
},
ref: formRef
}),
beforeSure: async (done, {}) => {
const FormData = formRef.value.newFormInline;
if (title === "新增") {
let addForm = {
{% for column in columns if column.is_insert %}
/** {{ column.column_comment }} */
{{ column.python_name }}: FormData.{{ column.python_name }} ?? "",
{% endfor %}
};
const res = await postAdd{{ class_name }}API(addForm);
if (res.success) {
done();
await onSearch();
}
message(res.msg, { type: res.success ? "success" : "error" });
} else {
let updateForm = {
{% for column in columns if column.is_edit %}
/** {{ column.column_comment }} */
{{ column.python_name }}: FormData.{{ column.python_name }} ?? "",
{% endfor %}
};
const res = await putUpdate{{ class_name }}API(updateForm, row.id);
if (res.success) {
done();
await onSearch();
}
message(res.msg, { type: res.success ? "success" : "error" });
}
}
});
};
/**
* 页面加载执行
*/
onMounted(async () => {
await onSearch();
});
return {
form,
dataList,
loading,
pagination,
columns,
selectedNum,
openDialog,
onSearch,
resetForm,
handleDelete,
handleSizeChange,
handleCurrentChange,
handleSelectionChange,
onSelectionCancel,
onbatchDel
};
};

View File

@@ -0,0 +1,37 @@
/** {{ description }}信息 */
export interface {{ class_name }}Info {
{% for column in columns if column.is_list %}
/** {{ column.column_comment }} */
{{ column.python_name }}: {{ column.typescript_type }};
{% endfor %}
}
/** 获取{{ description }}列表参数 */
export interface Get{{ class_name }}ListParams {
/** 当前页 */
page: number;
/** 每页数量 */
pageSize: number;
{% for column in columns if column.is_query %}
/** {{ column.column_comment }} */
{{ column.python_name }}?: string;
{% endfor %}
}
/** 添加{{ description }}数据参数 */
export interface Add{{ class_name }}Params {
{% for column in columns if column.is_insert %}
/** {{ column.column_comment }} */
{{ column.python_name }}{% if not column.is_required %}?{% endif %}: {{ column.typescript_type }};
{% endfor %}
}
/** 更新{{ description }}数据参数 */
export interface Update{{ class_name }}Params {
{% for column in columns if column.is_edit %}
/** {{ column.column_comment }} */
{{ column.python_name }}{% if not column.is_required %}?{% endif %}: {{ column.typescript_type }};
{% endfor %}
}

View File

@@ -0,0 +1,100 @@
<template>
<el-form
ref="ruleFormRef"
:model="newFormInline"
:rules="formRules"
label-width="82px"
>
<el-row :gutter="30">
[% for column in columns if column.is_insert or column.is_edit %]
<re-col :value="24" :xm="24" :sm="24">
<el-form-item label="[[ column.column_comment ]]" prop="[[ column.python_name ]]">
[% if column.show_type == "input" %]
<el-input
v-model="newFormInline.[[ column.python_name ]]"
placeholder="请输入[[ column.column_comment ]]~"
clearable
/>
[% elif column.show_type == "textarea" %]
<el-input
v-model="newFormInline.[[ column.python_name ]]"
type="textarea"
placeholder="请输入[[ column.column_comment ]]~"
clearable
/>
[% elif column.show_type == "select" %]
<el-select
v-model="newFormInline.[[ column.python_name ]]"
placeholder="请选择[[ column.column_comment ]]~"
filterable
clearable
>
<el-option
v-for="item in options.[[ column.python_name ]]"
:key="item.value"
:label="item.label"
:value="item.value" />
</el-select>
[% elif column.show_type == "radio" %]
<el-radio-group v-model="newFormInline.[[ column.python_name ]]">
<el-radio
v-for="item in options.[[ column.python_name ]]"
:key="item.value"
:label="item.value"
>{{ item.label }}
</el-radio>
</el-radio-group>
[% elif column.show_type == "checkbox" %]
<el-checkbox-group v-model="newFormInline.[[ column.python_name ]]">
<el-checkbox
v-for="item in options.[[ column.python_name ]]"
:key="item.value"
:label="item.value">
{{ item.label }}
</el-checkbox>
</el-checkbox-group>
[% elif column.show_type == "datetime" %]
<el-date-picker
v-model="newFormInline.[[ column.python_name ]]"
type="datetime"
placeholder="请选择[[ column.column_comment ]]~"
format="YYYY-MM-DD HH:mm:ss"
value-format="x" />
[% endif %]
</el-form-item>
</re-col>
[% endfor %]
</el-row>
</el-form>
</template>
<script setup lang="ts">
import { reactive, ref } from "vue";
import ReCol from "@/components/ReCol";
import type { FormRules } from "element-plus";
import { [[ class_name ]]Info } from "types/[[ name ]]";
interface FormItemProps {
[% for column in columns if column.is_list %]
/** [[ column.column_comment ]] */
[[ column.python_name ]]: [[ column.typescript_type ]];
[% endfor %]
}
interface FormProps {
formInline: FormItemProps;
}
const props = withDefaults(defineProps<FormProps>(), {
formInline: () => ({
[% for column in columns if column.is_list %]
/** [[ column.column_comment ]] */
[[ column.python_name ]]: "",
[% endfor %]
})
});
const newFormInline = ref<[[ class_name ]]Info>(props.formInline);
/** 自定义表单规则校验 */
const formRules = reactive<FormRules>({
[% for column in columns if column.is_insert or column.is_edit %]
[[ column.python_name ]]: [{ required: [[ 'true' if column.is_required else 'false' ]], message: "请输入[[ column.column_comment ]]~", trigger: "blur" }],
[% endfor %]
});
defineExpose({ newFormInline });
</script>

View File

@@ -0,0 +1,237 @@
<template>
<div class="main">
<el-form
ref="formRef"
:inline="true"
:model="form"
class="search-form bg-bg_color w-[99/100] pl-8 pt-[12px]"
>
[% for column in columns if column.is_query %]
<el-form-item label="[[ column.column_comment ]]" prop="[[ column.python_name ]]">
[% if column.show_type == "input" %]
<el-input
v-model="form.[[ column.python_name ]]"
placeholder="请输入[[ column.column_comment ]]~"
class="!w-[200px]"
clearable
/>
[% elif column.show_type == "textarea" %]
<el-input
v-model="form.[[ column.python_name ]]"
type="textarea"
placeholder="请输入[[ column.column_comment ]]~"
class="!w-[200px]"
clearable
/>
[% elif column.show_type == "select" %]
<el-select
v-model="form.[[ column.python_name ]]"
placeholder="请选择[[ column.column_comment ]]~"
class="!w-[200px]"
clearable
>
<el-option
v-for="item in options.[[ column.python_name ]]"
:key="item.value"
:label="item.label"
:value="item.value" />
</el-select>
[% elif column.show_type == "radio" %]
<el-radio-group v-model="form.[[ column.python_name ]]">
<el-radio
v-for="item in options.[[ column.python_name ]]"
:key="item.value"
:label="item.value"
>{{ item.label }}
</el-radio>
</el-radio-group>
[% elif column.show_type == "checkbox" %]
<el-checkbox-group v-model="form.[[ column.python_name ]]">
<el-checkbox
v-for="item in options.[[ column.python_name ]]"
:key="item.value"
:label="item.value">
{{ item.label }}
</el-checkbox>
</el-checkbox-group>
[% elif column.show_type == "datetime" %]
<el-date-picker
v-model="form.[[ column.python_name ]]"
type="datetime"
placeholder="请选择[[ column.column_comment ]]~"
format="YYYY-MM-DD HH:mm:ss"
value-format="x" />
[% endif %]
</el-form-item>
[% endfor %]
<el-form-item>
<el-button
type="primary"
:icon="useRenderIcon('ri:search-line')"
:loading="loading"
@click="onSearch"
>
{{ t("buttons:Search") }}
</el-button>
<el-button :icon="useRenderIcon(Refresh)" @click="resetForm(formRef)">
{{ t("buttons:Reset") }}
</el-button>
</el-form-item>
</el-form>
<PureTableBar title="[[ description ]]管理" :columns="columns" @refresh="onSearch">
<template #buttons>
<el-button
v-if="hasAuth('[[ name ]]:btn:add')"
type="primary"
:icon="useRenderIcon(AddFill)"
@click="openDialog('新增')"
>
{{ t("buttons:Add") }}
</el-button>
</template>
<template v-slot="{ size, dynamicColumns }">
<div
v-if="selectedNum > 0"
v-motion-fade
class="bg-[var(--el-fill-color-light)] w-full h-[46px] mb-2 pl-4 flex items-center"
>
<div class="flex-auto">
<span
style="font-size: var(--el-font-size-base)"
class="text-[rgba(42,46,54,0.5)] dark:text-[rgba(220,220,242,0.5)]"
>
已选 {{ selectedNum }} 项
</span>
<el-button type="primary" text @click="onSelectionCancel">
{{ t("buttons:Deselect") }}
</el-button>
</div>
<el-popconfirm
v-if="hasAuth('[[ name ]]:btn:delete')"
title="是否确认删除?"
@confirm="onbatchDel"
>
<template #reference>
<el-button type="danger" text class="mr-1">
{{ t("buttons:DeleteInBatches") }}
</el-button>
</template>
</el-popconfirm>
</div>
<pure-table
ref="tableRef"
row-key="id"
adaptive
border
stripe
:adaptiveConfig="{ offsetBottom: 45 }"
align-whole="center"
table-layout="auto"
:loading="loading"
:size="size"
:data="dataList"
:columns="dynamicColumns"
:pagination="pagination"
:paginationSmall="size === 'small' ? true : false"
:header-cell-style="{
background: 'var(--el-fill-color-light)',
color: 'var(--el-text-color-primary)'
}"
@selection-change="handleSelectionChange"
@page-size-change="handleSizeChange"
@page-current-change="handleCurrentChange"
>
<template #operation="{ row }">
<el-button
class="reset-margin"
link
type="primary"
:size="size"
:disabled="!hasAuth('[[ name ]]:btn:update')"
:icon="useRenderIcon(EditPen)"
@click="openDialog('修改', row)"
>
{{ t("buttons:Update") }}
</el-button>
<el-popconfirm
:title="`是否确认删除配置名为 ${row.name} 的这条数据`"
@confirm="handleDelete(row)"
>
<template #reference>
<el-button
class="reset-margin"
link
:disabled="!hasAuth('[[ name ]]:btn:delete')"
type="danger"
:size="size"
:icon="useRenderIcon(Delete)"
>
{{ t("buttons:Delete") }}
</el-button>
</template>
</el-popconfirm>
</template>
</pure-table>
</template>
</PureTableBar>
</div>
</template>
<script setup lang="ts">
defineOptions({
name: "[[ class_name ]]Index"
});
import { ref } from "vue";
import { use[[ class_name ]] } from "./utils/hook";
import { useI18n } from "vue-i18n";
import { PureTableBar } from "@/components/RePureTableBar";
import { useRenderIcon } from "@/components/ReIcon/src/hooks";
import Delete from "@iconify-icons/ep/delete";
import EditPen from "@iconify-icons/ep/edit-pen";
import Refresh from "@iconify-icons/ep/refresh";
import AddFill from "@iconify-icons/ri/add-circle-line";
const { t } = useI18n();
import { hasAuth } from "@/utils/auth";
/**
* 表格Ref
*/
const tableRef = ref();
const formRef = ref();
const {
form,
dataList,
loading,
pagination,
columns,
selectedNum,
onSearch,
openDialog,
resetForm,
handleDelete,
handleSizeChange,
handleCurrentChange,
handleSelectionChange,
onSelectionCancel,
onbatchDel
} = use[[ class_name ]](tableRef);
</script>
<style scoped lang="scss">
:deep(.el-dropdown-menu__item i) {
margin: 0;
}
:deep(.el-button:focus-visible) {
outline: none;
}
.main-content {
margin: 24px 24px 0 !important;
}
.search-form {
:deep(.el-form-item) {
margin-bottom: 12px;
}
}
</style>

View File

@@ -5,6 +5,8 @@
# @File : common.py
# @Software : PyCharm
# @Comment : 本程序
from typing import List, Any, Optional, Type
def bytes2human(n, format_str='%(value).1f%(symbol)s'):
"""Used by various scripts. See:
@@ -26,11 +28,13 @@ def bytes2human(n, format_str='%(value).1f%(symbol)s'):
return format_str % dict(symbol=symbols[0], value=n)
async def filterKeyValues(dataList: list, key: str) -> list:
async def filterKeyValues(dataList: List[dict], key: str, default: Any = None, convert_type: Optional[Type] = None) -> List[Any]:
"""
获取列表字段数据
:param dataList: 数据列表
:param key: 关键字
:return:
获取列表字段数据,并可选择进行类型转换。
:param dataList: 数据列表(列表中的元素是字典)
:param key: 要提取的字段
:param default: 如果字段不存在,返回的默认值
:param convert_type: 需要转换的类型(如 int、str、float 等),默认为 None 不转换
:return: 提取并转换后的值列表
"""
return [item[key] for item in dataList]
return [convert_type(item.get(key, default)) if convert_type else item.get(key, default) for item in dataList]

112
utils/generate.py Normal file
View File

@@ -0,0 +1,112 @@
# _*_ coding : UTF-8 _*_
# @Time : 2025/03/03 00:03
# @UpdateTime : 2025/03/03 00:03
# @Author : sonder
# @File : generate.py
# @Software : PyCharm
# @Comment : 本程序
from datetime import datetime
from config.constant import MYSQL_TO_TORTOISE_TYPE
class Generate:
"""
代码生成成工具
"""
@classmethod
def convert_mysql_type(cls, mysql_type: str):
"""
将 MySQL 字段类型转换为 Tortoise-ORM 的字段类型。
:param mysql_type: MySQL 的字段类型,如 "varchar(255)""int(11)"
:return: Tortoise-ORM 对应的字段类型,如 "CharField""IntField"
"""
# 提取字段的基本类型(去掉括号及长度信息)
base_type = mysql_type.split("(")[0].lower()
# 处理特殊情况tinyint(1) -> BooleanField
if base_type == "tinyint":
if "(1)" in mysql_type: # tinyint(1) 视为 Boolean
return "BooleanField", None
return "IntField", None # 其他 tinyint 作为 IntField 处理
max_length = None
if "(" in mysql_type and base_type in {"char", "varchar"}:
max_length = int(mysql_type.split("(")[1].split(")")[0]) # 提取最大长度
field_type = MYSQL_TO_TORTOISE_TYPE.get(base_type, "TextField")
return field_type, max_length
@classmethod
def prepare_template_data(cls, table_info, columns):
TYPE_DEFINE = {
"int": 0,
"str": "",
"bool": False,
"float": 0.0,
"datetime": "",
"list": [],
"dict": {},
"set": set(),
"tuple": (),
"bytes": b"",
"None": 'null',
}
PYTHON_TO_TS = {
"int": "number",
"float": "number",
"str": "string",
"bool": "boolean",
"datetime": "string",
"Optional[int]": "number | null",
"Optional[str]": "string | null",
"Optional[bool]": "boolean | null",
"Optional[float]": "number | null",
"Optional[datetime]": "string | null",
"List[int]": "number[]",
"List[str]": "string[]",
"List[bool]": "boolean[]",
"List[datetime]": "string[]",
"Dict[str, Any]": "Record<string, any>",
"Any": "any",
}
common_column = ["id", "del_flag", "create_by", "update_by", "create_time", "update_time"]
"""组织数据,供 Jinja2 渲染"""
return {
"author": table_info.get('author', ""),
"prefix": table_info["prefix"],
"table_name": table_info["table_name"],
"class_name": table_info["class_name"],
"table_comment": table_info.get("table_comment", ""),
"description": table_info.get("description", ""),
"name": table_info.get('class_name', "").lower(),
"permission_id": table_info.get('permission_id', ""),
"columns": [
{
"python_name": col["python_name"],
"field_type": cls.convert_mysql_type(col["column_type"])[0],
"max_length": cls.convert_mysql_type(col["column_type"])[1],
"is_common": col["python_name"] in common_column,
"is_nullable": not col["is_required"],
"is_unique": col.get("is_unique", False),
"default": col.get("default", None),
"column_comment": col.get("column_comment", ""),
"column_name": col["column_name"],
"is_required": col.get("is_required", False),
"is_edit": col.get("is_edit", False),
"is_list": col.get("is_list", False),
"is_query": col.get("is_query", False),
"is_insert": col.get("is_insert", False),
"is_hide": col.get("is_hide", False),
"query_way": col.get("query_way", "="),
"show_type": col.get("show_type", "input"),
"python_type": col.get("python_type", "str"),
"define": TYPE_DEFINE.get(col.get("python_type", "str"), None),
"typescript_type": PYTHON_TO_TS.get(col.get("python_type", "str"), "any"),
}
for col in columns
if col["python_name"] and col["column_type"]
],
"current_time": datetime.now().strftime("%Y/%m/%d %H:%M:%S")
}