@@ -5,9 +5,8 @@
# @File : user.py
# @File : user.py
# @Software : PyCharm
# @Software : PyCharm
# @Comment : 本程序
# @Comment : 本程序
import calendar
import os
import os
from datetime import datetime , timedelta
from datetime import datetime
from typing import Optional
from typing import Optional
from fastapi import APIRouter , Depends , Path , Query , UploadFile , File , Request
from fastapi import APIRouter , Depends , Path , Query , UploadFile , File , Request
@@ -15,32 +14,34 @@ from fastapi.responses import JSONResponse
from annotation . auth import Auth
from annotation . auth import Auth
from annotation . log import Log
from annotation . log import Log
from config . constant import BusinessType
from config . constant import BusinessType , RedisKeyConfig
from config . env import UploadConfig
from config . env import UploadConfig
from controller . login import LoginController
from controller . login import LoginController
from controller . query import QueryController
from controller . query import QueryController
from exceptions . exception import ModelValidatorException
from exceptions . exception import ModelValidatorException
from models import File as FileModel
from models import File as FileModel
from models import Role , Department , QueryCode
from models import Role , Department
from models . user import User , UserRole
from models . user import User , UserRole
from schemas . common import BaseResponse
from schemas . common import BaseResponse
from schemas . department import GetDepartmentListResponse
from schemas . department import GetDepartmentListResponse
from schemas . file import UploadFileResponse
from schemas . file import UploadFileResponse
from schemas . user import AddUserParams , GetUserListResponse , GetUserInfoResponse , UpdateUserParams , \
from schemas . user import AddUserParams , GetUserListResponse , GetUserInfoResponse , UpdateUserParams , \
AddUserRoleParams , GetUserRoleInfoResponse , UpdateUserRoleParams , GetUserPermissionListResponse , \
AddUserRoleParams , GetUserRoleInfoResponse , UpdateUserRoleParams , GetUserPermissionListResponse , \
ResetPasswordParams , GetUserStatisticsResponse
ResetPasswordParams
from utils . common import filterKeyValues
from utils . common import filterKeyValues
from utils . password import Password
from utils . password import Password
from utils . response import Response
from utils . response import Response
userAPI = APIRouter ( prefix = " /user " , dependencies = [ Depends ( LoginController . get_current_user ) ] )
userAPI = APIRouter ( prefix = " /user " )
@userAPI.post ( " /add " , response_class = JSONResponse , response_model = BaseResponse , summary = " 新增用户 " )
@userAPI.post ( " /add " , response_class = JSONResponse , response_model = BaseResponse , summary = " 新增用户 " )
@Log ( title = " 新增用户 " , business_type = BusinessType . INSERT )
@Log ( title = " 新增用户 " , business_type = BusinessType . INSERT )
@Auth ( [ " user:btn:addUser " ] )
async def add_user (
async def add_user (
request : Request ,
request : Request ,
params : AddUserParams
params : AddUserParams ,
current_user : dict = Depends ( LoginController . get_current_user )
) :
) :
if await QueryController . register_user_before ( username = params . username , phone = params . phone , email = params . email ) :
if await QueryController . register_user_before ( username = params . username , phone = params . phone , email = params . email ) :
return Response . error ( msg = " 添加失败,用户已存在! " )
return Response . error ( msg = " 添加失败,用户已存在! " )
@@ -65,10 +66,14 @@ async def add_user(
@userAPI.delete ( " /delete/ {id} " , response_class = JSONResponse , response_model = BaseResponse , summary = " 删除用户 " )
@userAPI.delete ( " /delete/ {id} " , response_class = JSONResponse , response_model = BaseResponse , summary = " 删除用户 " )
@userAPI.post ( " /delete/ {id} " , response_class = JSONResponse , response_model = BaseResponse , summary = " 删除用户 " )
@userAPI.post ( " /delete/ {id} " , response_class = JSONResponse , response_model = BaseResponse , summary = " 删除用户 " )
@Log ( title = " 删除用户 " , business_type = BusinessType . DELETE )
@Log ( title = " 删除用户 " , business_type = BusinessType . DELETE )
@Auth ( [ " user:btn:deleteUser " ] )
async def delete_user (
async def delete_user (
request : Request ,
request : Request ,
id : str = Path ( . . . , description = " 用户ID " ) ) :
id : str = Path ( . . . , description = " 用户ID " ) ,
if user := await User . get_or_none ( id = id ) :
current_user : dict = Depends ( LoginController . get_current_user )
) :
sub_departments = current_user . get ( " sub_departments " )
if user := await User . get_or_none ( id = id , department__id__in = sub_departments ) :
await user . delete ( )
await user . delete ( )
return Response . success ( msg = " 删除成功! " )
return Response . success ( msg = " 删除成功! " )
else :
else :
@@ -78,11 +83,15 @@ async def delete_user(
@userAPI.put ( " /update/ {id} " , response_class = JSONResponse , response_model = BaseResponse , summary = " 更新用户 " )
@userAPI.put ( " /update/ {id} " , response_class = JSONResponse , response_model = BaseResponse , summary = " 更新用户 " )
@userAPI.post ( " /update/ {id} " , response_class = JSONResponse , response_model = BaseResponse , summary = " 更新用户 " )
@userAPI.post ( " /update/ {id} " , response_class = JSONResponse , response_model = BaseResponse , summary = " 更新用户 " )
@Log ( title = " 更新用户 " , business_type = BusinessType . UPDATE )
@Log ( title = " 更新用户 " , business_type = BusinessType . UPDATE )
@Auth ( [ " user:btn:updateUser " ] )
async def update_user (
async def update_user (
request : Request ,
request : Request ,
params : UpdateUserParams ,
params : UpdateUserParams ,
id : str = Path ( . . . , description = " 用户ID " ) ) :
id : str = Path ( . . . , description = " 用户ID " ) ,
if user := await User . get_or_none ( id = id ) :
current_user : dict = Depends ( LoginController . get_current_user )
) :
sub_departments = current_user . get ( " sub_departments " )
if user := await User . get_or_none ( id = id , department__id__in = sub_departments ) :
user . username = params . username
user . username = params . username
user . nickname = params . nickname
user . nickname = params . nickname
user . phone = params . phone
user . phone = params . phone
@@ -94,6 +103,8 @@ async def update_user(
else :
else :
user . department = None
user . department = None
await user . save ( )
await user . save ( )
if await request . app . state . redis . get ( f ' { RedisKeyConfig . USER_INFO . key } : { id } ' ) :
await request . app . state . redis . delete ( f ' { RedisKeyConfig . USER_INFO . key } : { id } ' )
return Response . success ( msg = " 更新成功! " )
return Response . success ( msg = " 更新成功! " )
else :
else :
return Response . error ( msg = " 更新失败,用户不存在! " )
return Response . error ( msg = " 更新失败,用户不存在! " )
@@ -101,7 +112,9 @@ async def update_user(
@userAPI.get ( " /info/ {id} " , response_class = JSONResponse , response_model = GetUserInfoResponse , summary = " 获取用户信息 " )
@userAPI.get ( " /info/ {id} " , response_class = JSONResponse , response_model = GetUserInfoResponse , summary = " 获取用户信息 " )
@Log ( title = " 获取用户信息 " , business_type = BusinessType . SELECT )
@Log ( title = " 获取用户信息 " , business_type = BusinessType . SELECT )
async def get_user_info ( request : Request , id : str = Path ( . . . , description = " 用户ID " ) ) :
@Auth ( [ " user:btn:Userinfo " ] )
async def get_user_info ( request : Request , id : str = Path ( . . . , description = " 用户ID " ) ,
current_user : dict = Depends ( LoginController . get_current_user ) ) :
if user := await User . get_or_none ( id = id ) :
if user := await User . get_or_none ( id = id ) :
user = await user . first ( ) . values (
user = await user . first ( ) . values (
id = " id " ,
id = " id " ,
@@ -113,6 +126,7 @@ async def get_user_info(request: Request, id: str = Path(..., description="用
nickname = " nickname " ,
nickname = " nickname " ,
gender = " gender " ,
gender = " gender " ,
status = " status " ,
status = " status " ,
avatar = " avatar " ,
department_id = " department__id " ,
department_id = " department__id " ,
)
)
return Response . success ( data = user )
return Response . success ( data = user )
@@ -134,7 +148,9 @@ async def get_user_list(
gender : Optional [ str ] = Query ( default = None , description = " 性别 " ) ,
gender : Optional [ str ] = Query ( default = None , description = " 性别 " ) ,
status : Optional [ str ] = Query ( default = None , description = " 状态 " ) ,
status : Optional [ str ] = Query ( default = None , description = " 状态 " ) ,
department_id : Optional [ str ] = Query ( default = None , description = " 部门ID " ) ,
department_id : Optional [ str ] = Query ( default = None , description = " 部门ID " ) ,
current_user : dict = Depends ( LoginController . get_current_user )
) :
) :
sub_departments = current_user . get ( " sub_departments " )
filterArgs = {
filterArgs = {
f ' { k } __contains ' : v for k , v in {
f ' { k } __contains ' : v for k , v in {
' username ' : username ,
' username ' : username ,
@@ -146,6 +162,8 @@ async def get_user_list(
' department_id ' : department_id
' department_id ' : department_id
} . items ( ) if v
} . items ( ) if v
}
}
if not department_id :
filterArgs [ ' department_id__in ' ] = sub_departments
total = await User . filter ( * * filterArgs ) . count ( )
total = await User . filter ( * * filterArgs ) . count ( )
result = await User . filter ( * * filterArgs ) . offset ( ( page - 1 ) * pageSize ) . limit ( pageSize ) . values (
result = await User . filter ( * * filterArgs ) . offset ( ( page - 1 ) * pageSize ) . limit ( pageSize ) . values (
id = " id " ,
id = " id " ,
@@ -155,6 +173,7 @@ async def get_user_list(
email = " email " ,
email = " email " ,
phone = " phone " ,
phone = " phone " ,
nickname = " nickname " ,
nickname = " nickname " ,
avatar = " avatar " ,
gender = " gender " ,
gender = " gender " ,
status = " status " ,
status = " status " ,
department_id = " department__id " ,
department_id = " department__id " ,
@@ -168,13 +187,19 @@ async def get_user_list(
@userAPI.post ( " /addRole " , response_model = BaseResponse , response_class = JSONResponse , summary = " 添加用户角色 " )
@userAPI.post ( " /addRole " , response_model = BaseResponse , response_class = JSONResponse , summary = " 添加用户角色 " )
@Log ( title = " 添加用户角色 " , business_type = BusinessType . INSERT )
@Log ( title = " 添加用户角色 " , business_type = BusinessType . INSERT )
async def add_user_role ( request : Request , params : AddUserRoleParams ) :
@Auth ( [ " user:btn:addRole " ] )
if await UserRole . get_or_none ( user_id = params . user_id , role_id = params . role_id , del_flag = 1 ) :
async def add_user_role ( request : Request , params : AddUserRoleParams ,
current_user : dict = Depends ( LoginController . get_current_user ) ) :
sub_departments = current_user . get ( " sub_departments " )
if await UserRole . get_or_none ( user_id = params . user_id , role_id = params . role_id , del_flag = 1 ,
user__department__id__in = sub_departments ) :
return Response . error ( msg = " 该用户已存在该角色! " )
return Response . error ( msg = " 该用户已存在该角色! " )
if user := await User . get_or_none ( id = params . user_id , del_flag = 1 ) :
if user := await User . get_or_none ( id = params . user_id , del_flag = 1 , department__id__in = sub_departments ):
if role := await Role . get_or_none ( id = params . role_id , del_flag = 1 ) :
if role := await Role . get_or_none ( id = params . role_id , del_flag = 1 , department__id__in = sub_departments ):
userRole = await UserRole . create ( user_id = user . id , role_id = role . id )
userRole = await UserRole . create ( user_id = user . id , role_id = role . id )
if userRole :
if userRole :
if await request . app . state . redis . get ( f ' { RedisKeyConfig . USER_INFO . key } : { user . id } ' ) :
await request . app . state . redis . delete ( f ' { RedisKeyConfig . USER_INFO . key } : { user . id } ' )
return Response . success ( msg = " 添加成功! " )
return Response . success ( msg = " 添加成功! " )
else :
else :
return Response . error ( msg = " 添加失败! " )
return Response . error ( msg = " 添加失败! " )
@@ -189,9 +214,14 @@ async def add_user_role(request: Request, params: AddUserRoleParams):
@userAPI.post ( " /deleteRole/ {id} " , response_model = BaseResponse , response_class = JSONResponse ,
@userAPI.post ( " /deleteRole/ {id} " , response_model = BaseResponse , response_class = JSONResponse ,
summary = " 删除用户角色 " )
summary = " 删除用户角色 " )
@Log ( title = " 删除用户角色 " , business_type = BusinessType . DELETE )
@Log ( title = " 删除用户角色 " , business_type = BusinessType . DELETE )
async def delete_user_role ( request : Request , id : str = Path ( description = " 用户角色ID " ) ) :
@Auth ( [ " user:btn:deleteRole " ] )
if userR ole := await UserRole . get_or_none ( id = id , del_flag = 1 ) :
async def delete_ user_r ole( request : Request , id : str = Path ( description = " 用户角色ID " ) ,
current_user : dict = Depends ( LoginController . get_current_user ) ) :
sub_departments = current_user . get ( " sub_departments " )
if userRole := await UserRole . get_or_none ( id = id , del_flag = 1 , user__department__id__in = sub_departments ) :
await userRole . delete ( )
await userRole . delete ( )
if await request . app . state . redis . get ( f ' { RedisKeyConfig . USER_INFO . key } : { current_user . get ( " id " ) } ' ) :
await request . app . state . redis . delete ( f ' { RedisKeyConfig . USER_INFO . key } : { current_user . get ( " id " ) } ' )
return Response . success ( msg = " 删除成功! " )
return Response . success ( msg = " 删除成功! " )
else :
else :
return Response . error ( msg = " 删除失败,用户角色不存在! " )
return Response . error ( msg = " 删除失败,用户角色不存在! " )
@@ -201,9 +231,13 @@ async def delete_user_role(request: Request, id: str = Path(description="用户
@userAPI.post ( " /updateRole " , response_model = BaseResponse , response_class = JSONResponse ,
@userAPI.post ( " /updateRole " , response_model = BaseResponse , response_class = JSONResponse ,
summary = " 修改用户角色 " )
summary = " 修改用户角色 " )
@Log ( title = " 修改用户角色 " , business_type = BusinessType . UPDATE )
@Log ( title = " 修改用户角色 " , business_type = BusinessType . UPDATE )
async def update_user_role ( request : Request , params : UpdateUserRoleParams ) :
@Auth ( [ " user:btn:updateRole " ] )
async def update_user_role ( request : Request , params : UpdateUserRoleParams ,
current_user : dict = Depends ( LoginController . get_current_user ) ) :
sub_departments = current_user . get ( " sub_departments " )
# 获取用户已有角色
# 获取用户已有角色
userRoles = await UserRole . filter ( user_id = params . user_id , del_flag = 1 ) . values ( " role_id " )
userRoles = await UserRole . filter ( user_id = params . user_id , del_flag = 1 ,
user__department__id__in = sub_departments ) . values ( " role_id " )
userRoles = await filterKeyValues ( userRoles , " role_id " )
userRoles = await filterKeyValues ( userRoles , " role_id " )
# 利用集合找到需要添加的角色
# 利用集合找到需要添加的角色
addRoles = set ( params . role_ids ) . difference ( set ( userRoles ) )
addRoles = set ( params . role_ids ) . difference ( set ( userRoles ) )
@@ -211,20 +245,26 @@ async def update_user_role(request: Request, params: UpdateUserRoleParams):
deleteRoles = set ( userRoles ) . difference ( set ( params . role_ids ) )
deleteRoles = set ( userRoles ) . difference ( set ( params . role_ids ) )
# 添加角色
# 添加角色
for role_id in addRoles :
for role_id in addRoles :
if role := await Role . get_or_none ( id = role_id , del_flag = 1 ) :
if role := await Role . get_or_none ( id = role_id , del_flag = 1 , department__id__in = sub_departments ):
await UserRole . create ( user_id = params . user_id , role_id = role . id )
await UserRole . create ( user_id = params . user_id , role_id = role . id )
# 删除角色
# 删除角色
for role_id in deleteRoles :
for role_id in deleteRoles :
if userRole := await UserRole . get_or_none ( user_id = params . user_id , role_id = role_id , del_flag = 1 ) :
if userRole := await UserRole . get_or_none ( user_id = params . user_id , role_id = role_id , del_flag = 1 ,
user__department__id__in = sub_departments ) :
await userRole . delete ( )
await userRole . delete ( )
if await request . app . state . redis . get ( f ' { RedisKeyConfig . USER_INFO . key } : { params . user_id } ' ) :
await request . app . state . redis . delete ( f ' { RedisKeyConfig . USER_INFO . key } : { params . user_id } ' )
return Response . success ( msg = " 修改成功! " )
return Response . success ( msg = " 修改成功! " )
@userAPI.get ( " /roleInfo/ {id} " , response_model = GetUserRoleInfoResponse , response_class = JSONResponse ,
@userAPI.get ( " /roleInfo/ {id} " , response_model = GetUserRoleInfoResponse , response_class = JSONResponse ,
summary = " 获取用户角色信息 " )
summary = " 获取用户角色信息 " )
@Log ( title = " 获取用户角色信息 " , business_type = BusinessType . SELECT )
@Log ( title = " 获取用户角色信息 " , business_type = BusinessType . SELECT )
async def get_user_role_info ( request : Request , id : str = Path ( description = " 用户角色ID " ) ) :
@Auth ( [ " user:btn:roleInfo " ] )
if userR ole := await UserRole . get_or_none ( id = id , del_flag = 1 ) :
async def get_ user_r ole_info ( request : Request , id : str = Path ( description = " 用户角色ID " ) ,
current_user : dict = Depends ( LoginController . get_current_user ) ) :
sub_departments = current_user . get ( " sub_departments " )
if userRole := await UserRole . get_or_none ( id = id , del_flag = 1 , user__department__id__in = sub_departments ) :
data = await userRole . first ( ) . values (
data = await userRole . first ( ) . values (
id = " id " ,
id = " id " ,
user_id = " user__id " ,
user_id = " user__id " ,
@@ -243,11 +283,14 @@ async def get_user_role_info(request: Request, id: str = Path(description="用
@userAPI.get ( " /roleList/ {id} " , response_model = GetDepartmentListResponse , response_class = JSONResponse ,
@userAPI.get ( " /roleList/ {id} " , response_model = GetDepartmentListResponse , response_class = JSONResponse ,
summary = " 获取用户角色列表 " )
summary = " 获取用户角色列表 " )
@Log ( title = " 获取用户角色列表 " , business_type = BusinessType . SELECT )
@Log ( title = " 获取用户角色列表 " , business_type = BusinessType . SELECT )
@Auth ( [ " user:btn:roleList " ] )
async def get_user_role_list (
async def get_user_role_list (
request : Request ,
request : Request ,
id : str = Path ( description = " 用户ID " ) ,
id : str = Path ( description = " 用户ID " ) ,
current_user : dict = Depends ( LoginController . get_current_user )
) :
) :
result = await UserRole . filter ( user_id = id ) . values (
sub_departments = current_user . get ( " sub_departments " )
result = await UserRole . filter ( user_id = id , del_flag = 1 , user__department__id__in = sub_departments ) . values (
id = " id " ,
id = " id " ,
department_id = " user__department__id " ,
department_id = " user__department__id " ,
department_name = " user__department__name " ,
department_name = " user__department__name " ,
@@ -270,8 +313,11 @@ async def get_user_role_list(
@userAPI.get ( " /permissionList/ {id} " , response_class = JSONResponse , response_model = GetUserPermissionListResponse ,
@userAPI.get ( " /permissionList/ {id} " , response_class = JSONResponse , response_model = GetUserPermissionListResponse ,
summary = " 获取用户权限列表 " )
summary = " 获取用户权限列表 " )
@Log ( title = " 获取用户权限列表 " , business_type = BusinessType . SELECT )
@Log ( title = " 获取用户权限列表 " , business_type = BusinessType . SELECT )
async def get_user_ permission_l ist( request : Request , id : str = Path ( description = " 用户ID " ) ) :
@Auth ( [ " user:btn: permissionL ist" ] )
permissions = await QueryController . get_user_permissions ( user_id = id )
async def get_user_permission_list ( request : Request , id : str = Path ( description = " 用户ID " ) ,
current_user : dict = Depends ( LoginController . get_current_user ) ) :
sub_departments = current_user . get ( " sub_departments " )
permissions = await QueryController . get_user_permissions ( user_id = id , sub_departments = sub_departments )
permissions = await filterKeyValues ( permissions , " id " )
permissions = await filterKeyValues ( permissions , " id " )
# 获取用户角色
# 获取用户角色
return Response . success ( data = list ( set ( permissions ) ) )
return Response . success ( data = list ( set ( permissions ) ) )
@@ -279,11 +325,13 @@ async def get_user_permission_list(request: Request, id: str = Path(description=
@userAPI.post ( " /avatar/ {id} " , response_model = UploadFileResponse , response_class = JSONResponse , summary = " 上传用户头像 " )
@userAPI.post ( " /avatar/ {id} " , response_model = UploadFileResponse , response_class = JSONResponse , summary = " 上传用户头像 " )
@Log ( title = " 上传用户头像 " , business_type = BusinessType . UPDATE )
@Log ( title = " 上传用户头像 " , business_type = BusinessType . UPDATE )
@Auth ( [ " user:btn:uploadAvatar " ] )
async def upload_user_avatar (
async def upload_user_avatar (
request : Request ,
request : Request ,
id : str = Path ( description = " 用户ID " ) ,
id : str = Path ( description = " 用户ID " ) ,
file : UploadFile = File ( . . . ) ) :
file : UploadFile = File ( . . . ) , current_user : dict = Depends ( LoginController . get_current_user ) ):
if user : = await U ser. get_or_none ( id = id ) :
sub_departments = current_u ser. get( " sub_departments " )
if user := await User . get_or_none ( id = id , department__id__in = sub_departments ) :
image_mimetypes = [
image_mimetypes = [
' image/jpeg ' ,
' image/jpeg ' ,
' image/png ' ,
' image/png ' ,
@@ -335,6 +383,8 @@ async def upload_user_avatar(
create_time = " create_time " ,
create_time = " create_time " ,
update_time = " update_time " ,
update_time = " update_time " ,
)
)
if await request . app . state . redis . get ( f ' { RedisKeyConfig . USER_INFO . key } : { user . id } ' ) :
await request . app . state . redis . delete ( f ' { RedisKeyConfig . USER_INFO . key } : { user . id } ' )
return Response . success ( data = result )
return Response . success ( data = result )
return Response . failure ( msg = " 用户不存在! " )
return Response . failure ( msg = " 用户不存在! " )
@@ -343,105 +393,10 @@ async def upload_user_avatar(
@userAPI.post ( " /resetPassword/ {id} " , response_model = BaseResponse , response_class = JSONResponse , summary = " 重置用户密码 " )
@userAPI.post ( " /resetPassword/ {id} " , response_model = BaseResponse , response_class = JSONResponse , summary = " 重置用户密码 " )
@Log ( title = " 重置用户密码 " , business_type = BusinessType . UPDATE )
@Log ( title = " 重置用户密码 " , business_type = BusinessType . UPDATE )
@Auth ( permission_list = [ " user:btn:reset_password " ] )
@Auth ( permission_list = [ " user:btn:reset_password " ] )
async def reset_user_password ( request : Request , params : ResetPasswordParams , id : str = Path ( description = " 用户ID " ) ) :
async def reset_user_password ( request : Request , params : ResetPasswordParams , id : str = Path ( description = " 用户ID " ) ,
if user := await User . get_or_none ( id = id ) :
current_user : dict = Depends ( LoginController . get_current_user ) ) :
if user := await User . get_or_none ( id = id , department__id__in = current_user . get ( " sub_departments " ) ) :
user . password = await Password . get_password_hash ( params . password )
user . password = await Password . get_password_hash ( params . password )
await user . save ( )
await user . save ( )
return Response . success ( msg = " 重置密码成功! " )
return Response . success ( msg = " 重置密码成功! " )
return Response . failure ( msg = " 用户不存在! " )
return Response . failure ( msg = " 用户不存在! " )
@userAPI.get ( " /statistics " , response_model = GetUserStatisticsResponse , response_class = JSONResponse ,
summary = " 获取用户查询统计 " )
@Log ( title = " 获取用户查询统计 " , business_type = BusinessType . SELECT )
async def get_user_statistics ( request : Request , current_user : dict = Depends ( LoginController . get_current_user ) ) :
user_id = current_user . get ( " id " )
# 获取当前时间
now = datetime . now ( )
# 今日开始时间
today_start_time = now . replace ( hour = 0 , minute = 0 , second = 0 , microsecond = 0 )
# 今日结束时间
today_end_time = now . replace ( hour = 23 , minute = 59 , second = 59 , microsecond = 999999 )
# 当月开始时间
this_month_start_time = now . replace ( day = 1 , hour = 0 , minute = 0 , second = 0 , microsecond = 0 )
# 计算当月的最后一天
this_month_last_day = calendar . monthrange ( now . year , now . month ) [ 1 ]
# 当月结束时间
this_month_end_time = now . replace (
day = this_month_last_day , hour = 23 , minute = 59 , second = 59 , microsecond = 999999
)
# 计算上个月的年和月
if now . month == 1 : # 处理1月( 上月是去年的12月)
last_month_year = now . year - 1
last_month = 12
else :
last_month_year = now . year
last_month = now . month - 1
# 上月开始时间
last_month_start_time = datetime ( last_month_year , last_month , 1 , 0 , 0 , 0 , 0 )
# 计算上个月最后一天
last_month_last_day = calendar . monthrange ( last_month_year , last_month ) [ 1 ]
# 上月结束时间
last_month_end_time = datetime ( last_month_year , last_month , last_month_last_day , 23 , 59 , 59 , 999999 )
# 计算今日查询数量
today_count = await QueryCode . filter ( create_time__gte = today_start_time , create_time__lte = today_end_time ,
session__operator__id = user_id ) . count ( )
# 计算当月查询数量
this_month_count = await QueryCode . filter ( create_time__gte = this_month_start_time ,
create_time__lte = this_month_end_time ,
session__operator__id = user_id ) . count ( )
# 计算上月查询数量
last_month_count = await QueryCode . filter ( create_time__gte = last_month_start_time ,
create_time__lte = last_month_end_time ,
session__operator__id = user_id ) . count ( )
async def get_last_14_days_count ( status : int = 1 ) :
today = datetime . now ( ) . replace ( hour = 0 , minute = 0 , second = 0 , microsecond = 0 )
# 保存结果的列表
result = [ ]
# 遍历最近7天( 包括今天)
for i in range ( 14 ) :
day = today - timedelta ( days = i )
# 当天的开始时间和结束时间
day_start = day # 00:00:00
day_end = ( day + timedelta ( days = 1 ) ) # 23:59:59
# 统计当天查询数量
count = await QueryCode . filter (
create_time__gte = day_start ,
create_time__lte = day_end ,
session__operator__id = user_id ,
status = status
) . count ( )
# 添加到结果列表(格式化日期为 YYYY-MM-DD)
result . append ( { " date " : day . strftime ( " % Y- % m- %d " ) , " count " : count } )
# 结果按日期升序排列(可选,确保从过去到现在排序)
result . sort ( key = lambda x : x [ " date " ] )
return result
# 过去14天内的查询数量
before_14day_count_success = await get_last_14_days_count ( 1 )
before_14day_count_fail = await get_last_14_days_count ( 0 )
return Response . success ( data = {
" today_count " : today_count ,
" this_month_count " : this_month_count ,
" last_month_count " : last_month_count ,
" before_14day_count_success " : before_14day_count_success ,
" before_14day_count_fail " : before_14day_count_fail ,
} )