feat: 初始化仓库

This commit is contained in:
2025-02-12 02:38:29 +08:00
commit 46e9e79670
67 changed files with 8960 additions and 0 deletions

7
utils/__init__.py Normal file
View File

@@ -0,0 +1,7 @@
# _*_ coding : UTF-8 _*_
# @Time : 2025/01/18 01:59
# @UpdateTime : 2025/01/18 01:59
# @Author : sonder
# @File : __init__.py.py
# @Software : PyCharm
# @Comment : 本程序

112
utils/captcha.py Normal file
View File

@@ -0,0 +1,112 @@
# _*_ coding : UTF-8 _*_
# @Time : 2025/01/26 20:59
# @UpdateTime : 2025/01/26 20:59
# @Author : sonder
# @File : captcha.py
# @Software : PyCharm
# @Comment : 本程序
import base64
import io
import os
import random
import string
from PIL import Image, ImageDraw, ImageFont
class Captcha:
"""
验证码类
"""
@classmethod
async def create_captcha(cls, captcha_type: str = "0"):
"""
生成验证码
:param captcha_type: 验证码类型0为数字1为数字和字母
:return: 验证码图片和验证码base64字符串
"""
# 创建空白图像
image = Image.new('RGB', (120, 40), color='#EAEAEA')
draw = ImageDraw.Draw(image)
# 设置字体
font_path = os.path.join(os.path.abspath(os.getcwd()), 'assets', 'font', 'MiSans-Medium.ttf')
font = ImageFont.truetype(font_path, size=25)
if captcha_type == '0':
# 生成两个0-9之间的随机整数
num1 = random.randint(0, 9)
num2 = random.randint(0, 9)
# 从运算符列表中随机选择一个
operational_character_list = ['+', '-', '*']
operational_character = random.choice(operational_character_list)
# 根据选择的运算符进行计算
if operational_character == '+':
result = num1 + num2
elif operational_character == '-':
result = num1 - num2
else:
result = num1 * num2
# 生成算术题文本
text = f'{num1} {operational_character} {num2} = ?'
# 计算文本宽度以居中显示
text_width = draw.textlength(text, font=font)
x = (120 - text_width) / 2
draw.text((x, 5), text, fill='blue', font=font)
else:
# 生成随机字母和数字组合
result = ''.join(random.choices(string.ascii_letters + string.digits, k=4))
# 绘制每个字符,并添加随机旋转和倾斜
x = 10
for char in result:
# 创建单个字符的图像
char_image = Image.new('RGBA', (25, 40), color=(234, 234, 234, 0))
char_draw = ImageDraw.Draw(char_image)
char_draw.text((0, 0), char, font=font, fill=(0, 0, 255))
# 随机旋转字符
char_image = char_image.rotate(random.randint(-40, 40), expand=1)
# 随机倾斜字符
char_image = char_image.transform(char_image.size, Image.AFFINE,
(1, random.uniform(-0.3, 0.3), 0, 0, 1, 0))
# 将字符粘贴到主图像上
image.paste(char_image, (x, 0), char_image)
x += 25
# 添加干扰元素
cls._add_noise(image)
cls._add_lines(image)
# 将图像数据保存到内存中
buffer = io.BytesIO()
image.save(buffer, format='PNG')
# 将图像数据转换为base64字符串
base64_string = base64.b64encode(buffer.getvalue()).decode()
return [base64_string, result]
@staticmethod
def _add_noise(image):
"""
添加噪点干扰
"""
draw = ImageDraw.Draw(image)
for _ in range(150): # 添加100个噪点
x = random.randint(0, 120)
y = random.randint(0, 40)
draw.point((x, y), fill=(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)))
@staticmethod
def _add_lines(image):
"""
添加干扰线
"""
draw = ImageDraw.Draw(image)
for _ in range(10): # 添加5条干扰线
x1 = random.randint(0, 120)
y1 = random.randint(0, 40)
x2 = random.randint(0, 120)
y2 = random.randint(0, 40)
draw.line((x1, y1, x2, y2),
fill=(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)),
width=1)

36
utils/common.py Normal file
View File

@@ -0,0 +1,36 @@
# _*_ coding : UTF-8 _*_
# @Time : 2025/01/19 00:52
# @UpdateTime : 2025/01/19 00:52
# @Author : sonder
# @File : common.py
# @Software : PyCharm
# @Comment : 本程序
def bytes2human(n, format_str='%(value).1f%(symbol)s'):
"""Used by various scripts. See:
http://goo.gl/zeJZl
>>> bytes2human(10000)
'9.8K'
>>> bytes2human(100001221)
'95.4M'
"""
symbols = ('B', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB')
prefix = {}
for i, s in enumerate(symbols[1:]):
prefix[s] = 1 << (i + 1) * 10
for symbol in reversed(symbols[1:]):
if n >= prefix[symbol]:
value = float(n) / prefix[symbol]
return format_str % locals()
return format_str % dict(symbol=symbols[0], value=n)
async def filterKeyValues(dataList: list, key: str) -> list:
"""
获取列表字段数据
:param dataList: 数据列表
:param key: 关键字
:return:
"""
return [item[key] for item in dataList]

180
utils/cron.py Normal file
View File

@@ -0,0 +1,180 @@
# _*_ coding : UTF-8 _*_
# @Time : 2025/01/19 00:53
# @UpdateTime : 2025/01/19 00:53
# @Author : sonder
# @File : cron.py
# @Software : PyCharm
# @Comment : 本程序
import re
from datetime import datetime
class Cron:
"""
Cron表达式工具类
"""
@classmethod
def __valid_range(cls, search_str: str, start_range: int, end_range: int):
match = re.match(r'^(\d+)-(\d+)$', search_str)
if match:
start, end = int(match.group(1)), int(match.group(2))
return start_range <= start < end <= end_range
return False
@classmethod
def __valid_sum(
cls, search_str: str, start_range_a: int, start_range_b: int, end_range_a: int, end_range_b: int,
sum_range: int
):
match = re.match(r'^(\d+)/(\d+)$', search_str)
if match:
start, end = int(match.group(1)), int(match.group(2))
return (
start_range_a <= start <= start_range_b
and end_range_a <= end <= end_range_b
and start + end <= sum_range
)
return False
@classmethod
def validate_second_or_minute(cls, second_or_minute: str):
"""
校验秒或分钟值是否正确
:param second_or_minute: 秒或分钟值
:return: 校验结果
"""
if (
second_or_minute == '*'
or ('-' in second_or_minute and cls.__valid_range(second_or_minute, 0, 59))
or ('/' in second_or_minute and cls.__valid_sum(second_or_minute, 0, 58, 1, 59, 59))
or re.match(r'^(?:[0-5]?\d|59)(?:,[0-5]?\d|59)*$', second_or_minute)
):
return True
return False
@classmethod
def validate_hour(cls, hour: str):
"""
校验小时值是否正确
:param hour: 小时值
:return: 校验结果
"""
if (
hour == '*'
or ('-' in hour and cls.__valid_range(hour, 0, 23))
or ('/' in hour and cls.__valid_sum(hour, 0, 22, 1, 23, 23))
or re.match(r'^(?:0|[1-9]|1\d|2[0-3])(?:,(?:0|[1-9]|1\d|2[0-3]))*$', hour)
):
return True
return False
@classmethod
def validate_day(cls, day: str):
"""
校验日值是否正确
:param day: 日值
:return: 校验结果
"""
if (
day in ['*', '?', 'L']
or ('-' in day and cls.__valid_range(day, 1, 31))
or ('/' in day and cls.__valid_sum(day, 1, 30, 1, 30, 31))
or ('W' in day and re.match(r'^(?:[1-9]|1\d|2\d|3[01])W$', day))
or re.match(r'^(?:0|[1-9]|1\d|2[0-9]|3[0-1])(?:,(?:0|[1-9]|1\d|2[0-9]|3[0-1]))*$', day)
):
return True
return False
@classmethod
def validate_month(cls, month: str):
"""
校验月值是否正确
:param month: 月值
:return: 校验结果
"""
if (
month == '*'
or ('-' in month and cls.__valid_range(month, 1, 12))
or ('/' in month and cls.__valid_sum(month, 1, 11, 1, 11, 12))
or re.match(r'^(?:0|[1-9]|1[0-2])(?:,(?:0|[1-9]|1[0-2]))*$', month)
):
return True
return False
@classmethod
def validate_week(cls, week: str):
"""
校验周值是否正确
:param week: 周值
:return: 校验结果
"""
if (
week in ['*', '?']
or ('-' in week and cls.__valid_range(week, 1, 7))
or ('#' in week and re.match(r'^[1-7]#[1-4]$', week))
or ('L' in week and re.match(r'^[1-7]L$', week))
or re.match(r'^[1-7](?:(,[1-7]))*$', week)
):
return True
return False
@classmethod
def validate_year(cls, year: str):
"""
校验年值是否正确
:param year: 年值
:return: 校验结果
"""
current_year = int(datetime.now().year)
future_years = [current_year + i for i in range(9)]
if (
year == '*'
or ('-' in year and cls.__valid_range(year, current_year, 2099))
or ('/' in year and cls.__valid_sum(year, current_year, 2098, 1, 2099 - current_year, 2099))
or ('#' in year and re.match(r'^[1-7]#[1-4]$', year))
or ('L' in year and re.match(r'^[1-7]L$', year))
or (
(len(year) == 4 or ',' in year)
and all(int(item) in future_years and current_year <= int(item) <= 2099 for item in year.split(','))
)
):
return True
return False
@classmethod
def validate_cron_expression(cls, cron_expression: str):
"""
校验Cron表达式是否正确
:param cron_expression: Cron表达式
:return: 校验结果
"""
values = cron_expression.split()
if len(values) != 6 and len(values) != 7:
return False
second_validation = cls.validate_second_or_minute(values[0])
minute_validation = cls.validate_second_or_minute(values[1])
hour_validation = cls.validate_hour(values[2])
day_validation = cls.validate_day(values[3])
month_validation = cls.validate_month(values[4])
week_validation = cls.validate_week(values[5])
validation = (
second_validation
and minute_validation
and hour_validation
and day_validation
and month_validation
and week_validation
)
if len(values) == 6:
return validation
if len(values) == 7:
year_validation = cls.validate_year(values[6])
return validation and year_validation

124
utils/log.py Normal file
View File

@@ -0,0 +1,124 @@
# _*_ coding : UTF-8 _*_
# @Time : 2025/01/18 02:08
# @UpdateTime : 2025/01/18 02:08
# @Author : sonder
# @File : log.py
# @Software : PyCharm
# @Comment : 本程序
import os
import sys
import time
from loguru import logger
# 日志存储目录
log_path = os.path.join(os.getcwd(), 'logs')
if not os.path.exists(log_path):
os.makedirs(log_path) # 如果目录不存在,则创建
# 按天创建日志目录
daily_log_path = os.path.join(log_path, time.strftime("%Y-%m-%d"))
if not os.path.exists(daily_log_path):
os.makedirs(daily_log_path)
# 定义按级别分开的日志文件路径
log_path_debug = os.path.join(daily_log_path, 'debug.log')
log_path_info = os.path.join(daily_log_path, 'info.log')
log_path_error = os.path.join(daily_log_path, 'error.log')
log_path_warning = os.path.join(daily_log_path, 'warning.log')
log_path_sql = os.path.join(daily_log_path, 'sql.log') # SQL 查询日志文件
# 定义合并后的日志文件路径
log_path_all = os.path.join(daily_log_path, 'all.log')
# 移除默认的日志处理器
logger.remove()
# 添加控制台日志处理器(彩色输出)
logger.add(
sink=sys.stdout,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level="DEBUG", # 控制台输出所有级别的日志
colorize=True, # 启用彩色输出
enqueue=True, # 异步写入日志
)
# 添加按级别分开的日志文件处理器
logger.add(
sink=log_path_debug,
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} - {message}",
level="DEBUG", # 只记录 DEBUG 级别的日志
rotation="50 MB", # 日志文件大小达到 50MB 时轮换
retention="30 days", # 日志文件保留 30 天
compression="zip", # 压缩旧日志文件
encoding="utf-8",
enqueue=True, # 异步写入日志
filter=lambda record: record["level"].name == "DEBUG", # 只处理 DEBUG 级别的日志
)
logger.add(
sink=log_path_info,
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} - {message}",
level="INFO", # 只记录 INFO 级别的日志
rotation="50 MB",
retention="30 days",
compression="zip",
encoding="utf-8",
enqueue=True,
filter=lambda record: record["level"].name == "INFO", # 只处理 INFO 级别的日志
)
logger.add(
sink=log_path_warning,
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} - {message}",
level="WARNING", # 只记录 WARNING 级别的日志
rotation="50 MB",
retention="30 days",
compression="zip",
encoding="utf-8",
enqueue=True,
filter=lambda record: record["level"].name == "WARNING", # 只处理 WARNING 级别的日志
)
logger.add(
sink=log_path_error,
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} - {message}",
level="ERROR", # 只记录 ERROR 级别的日志
rotation="50 MB",
retention="30 days",
compression="zip",
encoding="utf-8",
enqueue=True,
filter=lambda record: record["level"].name == "ERROR", # 只处理 ERROR 级别的日志
)
# 添加 SQL 查询日志文件处理器
logger.add(
sink=log_path_sql,
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} - {message}",
level="DEBUG", # 记录所有 SQL 查询日志
rotation="50 MB",
retention="30 days",
compression="zip",
encoding="utf-8",
enqueue=True,
filter=lambda record: "tortoise.db_client" in record["name"], # 只处理 SQL 查询日志
)
# 添加合并后的日志文件处理器
logger.add(
sink=log_path_all,
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {name}:{function}:{line} - {message}",
level="DEBUG", # 记录所有级别的日志
rotation="50 MB",
retention="30 days",
compression="zip",
encoding="utf-8",
enqueue=True,
)
# 自定义日志颜色
logger.level("DEBUG", color="<blue>") # DEBUG 级别为蓝色
logger.level("INFO", color="<green>") # INFO 级别为绿色
logger.level("WARNING", color="<yellow>") # WARNING 级别为金色
logger.level("ERROR", color="<red>") # ERROR 级别为红色

105
utils/mail.py Normal file
View File

@@ -0,0 +1,105 @@
# _*_ coding : UTF-8 _*_
# @Time : 2025/01/26 21:51
# @UpdateTime : 2025/01/26 21:51
# @Author : sonder
# @File : mail.py
# @Software : PyCharm
# @Comment : 本程序
import random
from datetime import timedelta
from email.message import EmailMessage
from email.utils import formataddr
from aiosmtplib import send
from fastapi import Request
from jinja2 import Environment, FileSystemLoader
from config.constant import RedisKeyConfig
from config.env import AppConfig, EmailConfig
from utils.log import logger
class Email:
"""
邮件发送类,用于发送邮件。
"""
@classmethod
async def generate_verification_code(cls, length: int = 4) -> str:
"""
随机生成数字验证码
:param length: 验证码长度
:return:
"""
return ''.join(str(random.randint(0, 9)) for _ in range(length))
@classmethod
async def send_email(cls, request: Request, username: str, title: str = "注册", mail: str = "") -> bool:
"""
发送邮件
:param request: 请求对象
:param username: 用户账号
:param title: 邮件标题
:param mail: 邮箱地址
"""
code = await cls.generate_verification_code(4)
codeStr = ""
for i in code:
codeStr += f"""<button class="button">{i}</button>"""
env = Environment(loader=FileSystemLoader('templates'))
template = env.get_template('mail_en.html')
content = template.render(
TITLE=title,
CODE=codeStr,
PROJECTNAME=AppConfig.app_name,
)
subject = f"{AppConfig.app_name}-{title} Verification Code"
sendName = AppConfig.app_name
hostname = EmailConfig.email_host
port = EmailConfig.email_port
message = EmailMessage()
message["From"] = formataddr((sendName, EmailConfig.email_username))
message["To"] = mail
message["Subject"] = subject
message.set_content(content, subtype="html")
try:
await send(
message,
hostname=hostname,
port=port,
username=EmailConfig.email_username,
password=EmailConfig.email_password
)
await request.app.state.redis.set(f"{RedisKeyConfig.EMAIL_CODES.key}:{mail}-{username}", code, ex=timedelta(minutes=2))
logger.info(f"发送邮件至{mail}成功,验证码:{code}")
return True
except Exception as e:
logger.error(e)
return False
@classmethod
async def verify_code(cls, request: Request, username: str, mail: str, code: str) -> dict:
"""
验证验证码
:param request: 请求对象
:param username: 用户账号
:param mail: 邮箱地址
:param code: 验证码
"""
redis_code = await request.app.state.redis.get(f"{RedisKeyConfig.EMAIL_CODES.key}:{mail}-{username}")
if redis_code is None:
return {
"status": False,
"msg": "验证码已过期"
}
if str(redis_code).lower() == code.lower():
await request.app.state.redis.delete(f"{RedisKeyConfig.EMAIL_CODES.key}:{mail}-{username}")
return {
"status": True,
"msg": "验证码正确"
}
return {
"status": False,
"msg": "验证码错误"
}

46
utils/password.py Normal file
View File

@@ -0,0 +1,46 @@
# _*_ coding : UTF-8 _*_
# @Time : 2025/01/18 02:34
# @UpdateTime : 2025/01/18 02:34
# @Author : sonder
# @File : password.py
# @Software : PyCharm
# @Comment : 本程序
import hashlib
from config.env import JwtConfig
class Password:
"""
密码工具类
"""
@classmethod
async def verify_password(cls, plain_password, hashed_password):
"""
工具方法:校验当前输入的密码与数据库存储的密码是否一致
:param plain_password: 当前输入的密码
:param hashed_password: 数据库存储的密码
:return: 校验结果
"""
salt = JwtConfig.jwt_salt
# 将盐值和密码拼接在一起
password_with_salt = (salt + plain_password).encode('utf-8')
# 使用SHA256算法对拼接后的密码进行加密
password_hashed = hashlib.sha256(password_with_salt).hexdigest()
return password_hashed == hashed_password
@classmethod
async def get_password_hash(cls, input_password: str):
"""
工具方法:对当前输入的密码进行加密
:param input_password: 输入的密码
:return: 加密成功的密码
"""
salt = JwtConfig.jwt_salt
# 将盐值和密码拼接在一起
password_with_salt = (salt + input_password).encode('utf-8')
# 使用SHA256算法对拼接后的密码进行加密
return hashlib.sha256(password_with_salt).hexdigest()

225
utils/response.py Normal file
View File

@@ -0,0 +1,225 @@
# _*_ coding : UTF-8 _*_
# @Time : 2025/01/18 02:13
# @UpdateTime : 2025/01/18 02:13
# @Author : sonder
# @File : response.py
# @Software : PyCharm
# @Comment : 本程序
from datetime import datetime
from typing import Any, Dict, Optional
from fastapi import status
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel
from config.constant import HttpStatusConstant
class Response:
"""
响应工具类,用于统一封装接口返回格式。
"""
@classmethod
def _build_response(
cls,
code: int,
msg: str,
data: Optional[Any] = None,
rows: Optional[Any] = None,
dict_content: Optional[Dict] = None,
model_content: Optional[BaseModel] = None,
success: bool = True,
) -> Dict:
"""
构建统一的响应结果字典。
:param code: 状态码
:param msg: 响应消息
:param data: 响应数据
:param rows: 响应行数据(通常用于分页)
:param dict_content: 自定义字典内容
:param model_content: 自定义 Pydantic 模型内容
:param success: 是否成功
:return: 统一的响应结果字典
"""
result = {
"code": code,
"msg": msg,
"success": success,
"time": datetime.now().isoformat(), # 添加时间戳
}
# 添加可选字段
if data is not None:
result["data"] = data
if rows is not None:
result["rows"] = rows
if dict_content is not None:
result.update(dict_content)
if model_content is not None:
result.update(model_content.model_dump(by_alias=True))
return result
@classmethod
def success(
cls,
msg: str = "操作成功",
data: Optional[Any] = None,
rows: Optional[Any] = None,
dict_content: Optional[Dict] = None,
model_content: Optional[BaseModel] = None,
) -> JSONResponse:
"""
成功响应方法。
:param msg: 响应消息,默认为 "操作成功"
:param data: 响应数据
:param rows: 响应行数据(通常用于分页)
:param dict_content: 自定义字典内容
:param model_content: 自定义 Pydantic 模型内容
:return: JSONResponse 对象
"""
result = cls._build_response(
code=HttpStatusConstant.SUCCESS,
msg=msg,
data=data,
rows=rows,
dict_content=dict_content,
model_content=model_content,
success=True,
)
return JSONResponse(status_code=status.HTTP_200_OK, content=jsonable_encoder(result))
@classmethod
def failure(
cls,
msg: str = "操作失败",
data: Optional[Any] = None,
rows: Optional[Any] = None,
dict_content: Optional[Dict] = None,
model_content: Optional[BaseModel] = None,
) -> JSONResponse:
"""
失败响应方法。
:param msg: 响应消息,默认为 "操作失败"
:param data: 响应数据
:param rows: 响应行数据(通常用于分页)
:param dict_content: 自定义字典内容
:param model_content: 自定义 Pydantic 模型内容
:return: JSONResponse 对象
"""
result = cls._build_response(
code=400,
msg=msg,
data=data,
rows=rows,
dict_content=dict_content,
model_content=model_content,
success=False,
)
return JSONResponse(status_code=status.HTTP_200_OK, content=jsonable_encoder(result))
@classmethod
def unauthorized(
cls,
msg: str = "登录信息已过期,访问系统资源失败",
data: Optional[Any] = None,
rows: Optional[Any] = None,
dict_content: Optional[Dict] = None,
model_content: Optional[BaseModel] = None,
) -> JSONResponse:
"""
未认证响应方法。
:param msg: 响应消息,默认为 "登录信息已过期,访问系统资源失败"
:param data: 响应数据
:param rows: 响应行数据(通常用于分页)
:param dict_content: 自定义字典内容
:param model_content: 自定义 Pydantic 模型内容
:return: JSONResponse 对象
"""
result = cls._build_response(
code=HttpStatusConstant.UNAUTHORIZED,
msg=msg,
data=data,
rows=rows,
dict_content=dict_content,
model_content=model_content,
success=False,
)
return JSONResponse(status_code=status.HTTP_200_OK, content=jsonable_encoder(result))
@classmethod
def forbidden(
cls,
msg: str = "该用户无此接口权限",
data: Optional[Any] = None,
rows: Optional[Any] = None,
dict_content: Optional[Dict] = None,
model_content: Optional[BaseModel] = None,
) -> JSONResponse:
"""
未授权响应方法。
:param msg: 响应消息,默认为 "该用户无此接口权限"
:param data: 响应数据
:param rows: 响应行数据(通常用于分页)
:param dict_content: 自定义字典内容
:param model_content: 自定义 Pydantic 模型内容
:return: JSONResponse 对象
"""
result = cls._build_response(
code=HttpStatusConstant.FORBIDDEN,
msg=msg,
data=data,
rows=rows,
dict_content=dict_content,
model_content=model_content,
success=False,
)
return JSONResponse(status_code=status.HTTP_200_OK, content=jsonable_encoder(result))
@classmethod
def error(
cls,
msg: str = "接口异常",
data: Optional[Any] = None,
rows: Optional[Any] = None,
dict_content: Optional[Dict] = None,
model_content: Optional[BaseModel] = None,
) -> JSONResponse:
"""
错误响应方法。
:param msg: 响应消息,默认为 "接口异常"
:param data: 响应数据
:param rows: 响应行数据(通常用于分页)
:param dict_content: 自定义字典内容
:param model_content: 自定义 Pydantic 模型内容
:return: JSONResponse 对象
"""
result = cls._build_response(
code=HttpStatusConstant.ERROR,
msg=msg,
data=data,
rows=rows,
dict_content=dict_content,
model_content=model_content,
success=False,
)
return JSONResponse(status_code=status.HTTP_200_OK, content=jsonable_encoder(result))
@classmethod
def streaming(cls, data: Any) -> StreamingResponse:
"""
流式响应方法。
:param data: 流式传输的内容
:return: StreamingResponse 对象
"""
return StreamingResponse(content=data, status_code=status.HTTP_200_OK)

109
utils/string.py Normal file
View File

@@ -0,0 +1,109 @@
# _*_ coding : UTF-8 _*_
# @Time : 2025/01/18 23:58
# @UpdateTime : 2025/01/18 23:58
# @Author : sonder
# @File : string.py
# @Software : PyCharm
# @Comment : 本程序
from typing import List
from config.constant import CommonConstant
class String:
"""
字符串工具类
"""
@classmethod
def is_blank(cls, string: str) -> bool:
"""
校验字符串是否为''或全空格
:param string: 需要校验的字符串
:return: 校验结果
"""
if string is None:
return False
str_len = len(string)
if str_len == 0:
return True
else:
for i in range(str_len):
if string[i] != ' ':
return False
return True
@classmethod
def is_empty(cls, string) -> bool:
"""
校验字符串是否为''或None
:param string: 需要校验的字符串
:return: 校验结果
"""
return string is None or len(string) == 0
@classmethod
def is_http(cls, link: str):
"""
判断是否为http(s)://开头
:param link: 链接
:return: 是否为http(s)://开头
"""
return link.startswith(CommonConstant.HTTP) or link.startswith(CommonConstant.HTTPS)
@classmethod
def contains_ignore_case(cls, search_str: str, compare_str: str):
"""
查找指定字符串是否包含指定字符串同时串忽略大小写
:param search_str: 查找的字符串
:param compare_str: 比对的字符串
:return: 查找结果
"""
if compare_str and search_str:
return compare_str.lower() in search_str.lower()
return False
@classmethod
def contains_any_ignore_case(cls, search_str: str, compare_str_list: List[str]):
"""
查找指定字符串是否包含指定字符串列表中的任意一个字符串同时串忽略大小写
:param search_str: 查找的字符串
:param compare_str_list: 比对的字符串列表
:return: 查找结果
"""
if search_str and compare_str_list:
for compare_str in compare_str_list:
return cls.contains_ignore_case(search_str, compare_str)
return False
@classmethod
def startswith_case(cls, search_str: str, compare_str: str):
"""
查找指定字符串是否以指定字符串开头
:param search_str: 查找的字符串
:param compare_str: 比对的字符串
:return: 查找结果
"""
if compare_str and search_str:
return search_str.startswith(compare_str)
return False
@classmethod
def startswith_any_case(cls, search_str: str, compare_str_list: List[str]):
"""
查找指定字符串是否以指定字符串列表中的任意一个字符串开头
:param search_str: 查找的字符串
:param compare_str_list: 比对的字符串列表
:return: 查找结果
"""
if search_str and compare_str_list:
for compare_str in compare_str_list:
return cls.startswith_case(search_str, compare_str)
return False

114
utils/upload.py Normal file
View File

@@ -0,0 +1,114 @@
# _*_ coding : UTF-8 _*_
# @Time : 2025/01/18 02:37
# @UpdateTime : 2025/01/18 02:37
# @Author : sonder
# @File : upload.py
# @Software : PyCharm
# @Comment : 本程序
import os
import random
from datetime import datetime
from fastapi import UploadFile
from config.env import UploadConfig
class Upload:
"""
上传工具类
"""
@classmethod
def generate_random_number(cls):
"""
生成3位数字构成的字符串
:return: 3位数字构成的字符串
"""
random_number = random.randint(1, 999)
return f'{random_number:03}'
@classmethod
def check_file_exists(cls, filepath: str):
"""
检查文件是否存在
:param filepath: 文件路径
:return: 校验结果
"""
return os.path.exists(filepath)
@classmethod
def check_file_extension(cls, file: UploadFile):
"""
检查文件后缀是否合法
:param file: 文件对象
:return: 校验结果
"""
file_extension = file.filename.rsplit('.', 1)[-1]
if file_extension in UploadConfig.DEFAULT_ALLOWED_EXTENSION:
return True
return False
@classmethod
def check_file_timestamp(cls, filename: str):
"""
校验文件时间戳是否合法
:param filename: 文件名称
:return: 校验结果
"""
timestamp = filename.rsplit('.', 1)[0].split('_')[-1].split(UploadConfig.UPLOAD_MACHINE)[0]
try:
datetime.strptime(timestamp, '%Y%m%d%H%M%S')
return True
except ValueError:
return False
@classmethod
def check_file_machine(cls, filename: str):
"""
校验文件机器码是否合法
:param filename: 文件名称
:return: 校验结果
"""
if filename.rsplit('.', 1)[0][-4] == UploadConfig.UPLOAD_MACHINE:
return True
return False
@classmethod
def check_file_random_code(cls, filename: str):
"""
校验文件随机码是否合法
:param filename: 文件名称
:return: 校验结果
"""
valid_code_list = [f'{i:03}' for i in range(1, 999)]
if filename.rsplit('.', 1)[0][-3:] in valid_code_list:
return True
return False
@classmethod
def generate_file(cls, filepath: str):
"""
根据文件生成二进制数据
:param filepath: 文件路径
:yield: 二进制数据
"""
with open(filepath, 'rb') as response_file:
yield from response_file
@classmethod
def delete_file(cls, filepath: str):
"""
根据文件路径删除对应文件
:param filepath: 文件路径
"""
os.remove(filepath)