前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >后端Rabc权限控制

后端Rabc权限控制

作者头像
以谁为师
发布2023-09-12 09:39:01
1420
发布2023-09-12 09:39:01
举报

权限控制范围:

  • 前端动态菜单
  • 前端按钮显示权限
  • 后端API请求限制
后端Rabc权限控制_自定义
后端Rabc权限控制_自定义

前端

权限列表

后端Rabc权限控制_白名单_02
后端Rabc权限控制_白名单_02

角色关联权限

后端Rabc权限控制_redis_03
后端Rabc权限控制_redis_03

用户权限

后端Rabc权限控制_白名单_04
后端Rabc权限控制_白名单_04
后端Rabc权限控制_redis_05
后端Rabc权限控制_redis_05

后端

获取当前用户权限

代码语言:javascript
复制
# get /user/userinfo/
{
    "msg": "成功",
    "errors": "",
    "code": 20000,
    "data": {
        "id": 1,
        "username": "admin",
        "name": "管理员",
        "job": "",
        "mobile": "",
        "email": "admin@qq.com",
        "permissions": [
            "admin"
        ]
    }
}

自定义访问权限策略

代码语言:javascript
复制
# ezops/utils/permissions.py:

class UserLock(APIException):
    status_code = status.HTTP_400_BAD_REQUEST
    default_detail = '用户已被锁定,请联系管理员!!!'
    default_code = 'not_authenticated'
    #  "User account is disabled."

class RbacPermission(BasePermission):
    """
    自定义Rbac权限认证
    """
    @staticmethod
    def api_uri(uri):
        base_api = settings.BASE_API
        uri = '/' + base_api + '/' + uri + '/'
        return re.sub('/+', '/', uri)

    def has_permission(self, request, view):
        # -------------------- 白名单  -------------------- #
        for safe_url in settings.WHITE_LIST:
            if re.match('{}'.format(safe_url), request.path):
                logger.info("自定义rbac认证方式: 白名单接口")
                return True
        # -------------------- admin 管理员白名单  -------------------- #
        if 'admin' in request.user.roles.values_list('name', flat=True) or request.user.is_superuser: 
            logger.info("自定义rbac认证方式: admin 放行")
            return True

        if request.user.is_active:
            if False:  # 禁用redis
            # if settings.REDIS_STATUS:
                # -------------------- 缓存rbac  -------------------- #
                logger.info(f"自定义rbac认证方式: redis缓存规则, userId:{request.user.id}")
                orm_userinfo = Users.objects.get(pk=request.user.id).get_user_info()  
                logger.info(f"orm_userinfo:{orm_userinfo}")
                conn = get_redis_connection('user_info')
                url_keys = conn.hkeys('user_permissions_manage')
                for url_key in url_keys:
                    if re.match('{}'.format(self.api_uri(url_key.decode())), request.path):
                        redis_key = url_key.decode()
                        break
                    else:
                        return True
                permissions = json.loads(conn.hget('user_permissions_manage', redis_key).decode())
                method_hit = False
                # -------------------- 同一接口配置不同权限验证  -------------------- #
                for permission in permissions:
                    if permission.get('method') == request.method:
                        method_hit = True
                        if permission.get('sign') in orm_userinfo['permissions']:
                            info = 'sign: {}'.format(permission.get('sign'))
                            logger.info(info)
                            return True
                        else:
                            if method_hit:
                                return False
                            else:
                                 return True
            else:
                # -------------------- 数据库中权限列表  -------------------- #
                logger.info("自定义rbac认证方式: 数据库规则")
                permissions = Permission.objects.all()
                user_permissions_manage = {}
                for i in permissions:
                    if  not i.menu and i.path:
                        user_permissions_manage[i.sign] = { 'path':i.path,'method':i.method,'id':i.id}
                # -------------------- 用户permissions -------------------- #
                orm_userinfo = Users.objects.get(pk=request.user.id).get_user_info()  
                logger.info(f"用户userinfo:{orm_userinfo}")
                logger.info(f"user_permissions_manage:{user_permissions_manage}")
                # -------------------- 匹配路径和method -------------------- #
                for permission in user_permissions_manage:
                    user_permission_path = self.api_uri(user_permissions_manage[permission]['path'])
                    if re.match(user_permission_path,request.path) : # 请求路径匹配
                        if  request.method == user_permissions_manage[permission]['method'] and permission in orm_userinfo['permissions']:
                            info = '[+] {} {} {}'.format(user_permission_path,user_permissions_manage[permission]['method'],permission)
                            logger.info(info)
                            return True
                        else:
                            info = '[+] GET url: {} - {} - {}'.format(request.path,request.method,permission)
                            logger.info(info)
                            return True
        else:
            raise UserLock()
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 权限控制范围:
  • 前端
    • 权限列表
      • 角色关联权限
        • 用户权限
        • 后端
          • 获取当前用户权限
            • 自定义访问权限策略
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档