feat: 操作日志添加按钮级权限控制

This commit is contained in:
2025-02-22 23:27:53 +08:00
parent ebe180f2f0
commit 75a163910d

View File

@@ -5,8 +5,9 @@
# @File : log.py
# @Software : PyCharm
# @Comment : 本程序
from typing import Optional
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, Path, Query, Request
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
@@ -154,13 +155,39 @@ async def delete_login_log(request: Request, params: DeleteListParams,
@logAPI.get("/operation", response_class=JSONResponse, response_model=GetOperationLogResponse,
summary="用户获取操作日志")
@Auth(permission_list=["operation:btn:list"])
async def get_operation_log(request: Request,
page: int = Query(default=1, description="页码"),
name: Optional[str] = Query(default=None, description="操作名称"),
type: Optional[str] = Query(default=None, description="操作类型"),
pageSize: int = Query(default=10, description="每页数量"),
username: Optional[str] = Query(default=None, description="用户账号"),
nickname: Optional[str] = Query(default=None, description="用户昵称"),
department_id: Optional[str] = Query(default=None, description="部门ID"),
startTime: Optional[str] = Query(default=None, description="开始时间"),
endTime: Optional[str] = Query(default=None, description="结束时间"),
status: Optional[str] = Query(default=None, description="登录状态"),
current_user: dict = Depends(LoginController.get_current_user),
):
user_id = current_user.get("id")
result = await OperationLog.filter(operator_id=user_id, del_flag=1).offset((page - 1) * pageSize).limit(
sub_departments = current_user.get("sub_departments")
filterArgs = {
f'{k}__contains': v for k, v in {
'operation_name': name,
'operation_type': type,
'operator__username': username,
'operator__nickname': nickname,
'department_id': department_id,
}.items() if v
}
if status is not None:
filterArgs['status'] = status
if startTime and endTime:
startTime = datetime.fromtimestamp(float(startTime) / 1000)
endTime = datetime.fromtimestamp(float(endTime) / 1000)
filterArgs['operation_time__range'] = [startTime, endTime]
if not department_id:
filterArgs['department__id__in'] = sub_departments
result = await OperationLog.filter(**filterArgs, del_flag=1).offset((page - 1) * pageSize).limit(
pageSize).values(
id="id",
operation_name="operation_name",
@@ -184,9 +211,10 @@ async def get_operation_log(request: Request,
cost_time="cost_time"
)
return Response.success(data={
"total": await OperationLog.filter(operator_id=user_id).count(),
"total": await OperationLog.filter(**filterArgs, del_flag=1).count(),
"result": result,
"page": page,
"pageSize": pageSize
})
@@ -196,30 +224,28 @@ async def get_operation_log(request: Request,
summary="用户删除操作日志")
@Log(title="用户删除操作日志", business_type=BusinessType.DELETE)
@Auth(permission_list=["operation:btn:delete"])
async def delete_operation_log(id: str = Path(..., description="操作日志id"),
async def delete_operation_log(request: Request, id: str = Path(..., description="操作日志id"),
current_user: dict = Depends(LoginController.get_current_user)):
if log := await OperationLog.get_or_none(id=id):
if log.operator == current_user.get("id"):
sub_departments = current_user.get("sub_departments")
if log := await OperationLog.get_or_none(id=id, department__id__in=sub_departments, del_flag=1):
log.del_flag = 0
await log.save()
return Response.success(msg="删除成功")
else:
return Response.failure(msg="无权限删除")
else:
return Response.failure(msg="删除失败,操作日志不存在!")
@logAPI.delete("/deleteList/operation", response_model=BaseResponse, response_class=JSONResponse,
summary="用户删除操作日志")
summary="用户批量删除操作日志")
@logAPI.post("/deleteList/operation", response_model=BaseResponse, response_class=JSONResponse,
summary="用户删除操作日志")
summary="用户批量删除操作日志")
@Log(title="用户批量删除操作日志", business_type=BusinessType.DELETE)
@Auth(permission_list=["operation:btn:delete"])
async def delete_operation_log(params: DeleteListParams,
async def delete_operation_log(request: Request, params: DeleteListParams,
current_user: dict = Depends(LoginController.get_current_user)):
sub_departments = current_user.get("sub_departments")
for id in set(params.ids):
if log := await OperationLog.get_or_none(id=id):
if log.operator == current_user.get("id"):
if log := await OperationLog.get_or_none(id=id, department__id__in=sub_departments, del_flag=1):
log.del_flag = 0
await log.save()
return Response.success(msg="删除成功")