前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >数据库|Flask+Redis实现登录权限管理

数据库|Flask+Redis实现登录权限管理

作者头像
算法与编程之美
发布2020-09-24 10:53:45
1.5K0
发布2020-09-24 10:53:45
举报

@home.route('/')@login_required()def test():     return jsonify({         'code': 0,         'msg': 'test',     })

1 准备工作

Redis简单来说就是远程字典服务,通常也被称为数据结构服务器,因为他的值(value)可以是多种形式的。在开始之前,需要先安装Redis,这里先不过多赘述,需要注意的是安装完成后需要设置密码,具体方法可以在百度上搜索,很简单。首先在Flask配置文件添加Redis的配置信息。

app.config['REDIS_HOST'] = 'localhost' #Redis的ip地址,本机的就是localhost app.config['REDIS_PORT'] = 6379 #Redis端口,默认为6379 app.config['REDIS_DB'] = '0' app.config['REDIS_PWD'] = 'yourpassword' #Redis的密码

2 Redis数据库操作

在utils文件目录下新建redis_utils.py文件,作用是对redis数据库进行操作。代码如下

import pickle import redis from flask import current_app as app class Redis(object): """ redis数据库操作 """ @staticmethod def _get_r(): host = app.config['REDIS_HOST'] port = app.config['REDIS_PORT'] db = app.config['REDIS_DB'] passwd = app.config['REDIS_PWD'] r = redis.StrictRedis(host=host, port=port, db=db, password=passwd) return r @classmethod def write(self, key, value, expire=None): """ 写入键值对 """ # 判断是否有过期时间,没有就设置默认值 if expire: expire_in_seconds = expire else: expire_in_seconds = app.config['EXPIRES_IN'] r = self._get_r() r.set(key, value, ex=expire_in_seconds) @classmethod def write_dict(self, key, value, expire=None): ''' 将内存数据二进制通过序列号转为文本流,再存入redis ''' if expire: expire_in_seconds = expire else: expire_in_seconds = app.config['REDIS_EXPIRE'] r = self._get_r() r.set(pickle.dumps(key), pickle.dumps(value), ex=expire_in_seconds) @classmethod def read_dict(self, key): ''' 将文本流从redis中读取并反序列化,返回 ''' r = self._get_r() data = r.get(pickle.dumps(key)) if data is None: return None return pickle.loads(data) @classmethod def read(self, key): """ 读取键值对内容 """ r = self._get_r() value = r.get(key) return value.decode('utf-8') if value else value @classmethod def hset(self, name, key, value): """ 写入hash表 """ r = self._get_r() r.hset(name, key, value) @classmethod def hmset(self, key, *value): """ 读取指定hash表的所有给定字段的值 """ r = self._get_r() value = r.hmset(key, *value) return value @classmethod def hget(self, name, key): """ 读取指定hash表的键值 """ r = self._get_r() value = r.hget(name, key) return value.decode('utf-8') if value else value @classmethod def hgetall(self, name): """ 获取指定hash表所有的值 """ r = self._get_r() return r.hgetall(name) @classmethod def delete(self, *names): """ 删除一个或者多个 """ r = self._get_r() r.delete(*names) @classmethod def hdel(self, name, key): """ 删除指定hash表的键值 """ r = self._get_r() r.hdel(name, key) @classmethod def expire(self, name, expire=None): """ 设置过期时间 """ if expire: expire_in_seconds = expire else: expire_in_seconds = app.config['REDIS_EXPIRE'] r = self._get_r() r.expire(name, expire_in_seconds)

3 实现用户登录验证

首先注册一个user的蓝图

app.register_blueprint(user_blueprint, url_prefix="/user/") user = Blueprint("user", __name__)

在permisson目录下新建一个user.py。这里是写跟登录有关的接口的。首先是登录验证,大概思路是先接收用户名与密码,然后校验参数,两者都不能为空,接着用用户名去user数据库查找是否存在此用户,如果查找结果为空,则返回一个错误码。接着校验接收到的密码与数据库的密码是否匹配。(在存入密码的时候不能直接存明文,需要加密,此处用到了werkzeug.security这个库进行加密。)校验密码还是用这个库的check_password_hash,只需传入需要验证的两个密码。再用户名与密码都正确的情况下,生成一个token,以后客户端只需要带上这个token来请求即可。这时token需要存入Redis中,客户端传来的token就和Redis中的token作对比。当生成token后,这就登录成功了,然后返回token等用户信息。接下来是login的代码

@user.route('/login', methods=["POST"]) def login(): ''' 用户登录 :return:token ''' data = request.get_data() data = str(data, 'utf-8') res_dir = json.loads(data) print(res_dir) if res_dir is None: return NO_PARAMETER() # 获取前端传过来的参数 username = res_dir.get("username") password = res_dir.get("password") # 校验参数 if not all([username, password]): return jsonify(code=Code.NOT_NULL.value, msg="用户名和密码不能为空") try: user = User.query.filter_by(user_name=username).first() except Exception as e: print("login error:{}".format(e)) return jsonify(code=Code.REQUEST_ERROR.value, msg="获取信息失败") if user is None or not user.check_pwd(password) or user.del_flag == 2 or user.status == 2: return jsonify(code=Code.ERR_PWD.value, msg="用户名或密码错误") # 获取用户信息,传入生成token的方法,并接收返回的token # 获取用户角色 user_role = Role.query.join(UserRole, Role.id == UserRole.role_id).join(User, UserRole.user_id == user.id).filter( User.id == user.id).all() role_list = [i.role_key for i in user_role] token = create_token(user.id, user.user_name, role_list) data = {'token': token, 'userId': user.id, 'userName': user.user_name, 'nickname': user.nickname} # 记录登录ip将token存入rerdis try: user.login_ip = request.remote_addr print(user) user.update() Redis.write(f"token_{user.user_name}", token) except Exception as e: print(e) return jsonify(code=Code.UPDATE_DB_ERROR.value, msg="登录失败:"+str(e)) if token: # 把token返回给前端 return jsonify(code=Code.SUCCESS.value, msg="登录成功", data=data) else: return jsonify(code=Code.REQUEST_ERROR.value, msg="请求失败", data=token)

生成token的方法:

def create_token(user_id, user_name, role_list): ''' 生成token :param api_user:用户id :return: token ''' # 第一个参数是内部的私钥,这里写在共用的配置信息里了,如果只是测试可以写死 # 第二个参数是有效期(秒) s = Serializer(app.config['SECRET_KEY'], expires_in=app.config['EXPIRES_IN']) # 接收用户id转换与编码 token = None try: token = s.dumps({"id": user_id, "name": user_name, "role": role_list}).decode("ascii") except Exception as e: app.logger.error("获取token失败:{}".format(e)) return token

校验token方法:

def verify_token(token): ''' 校验token :param token: :return: 用户信息 or None ''' # 参数为私有秘钥,跟上面方法的秘钥保持一致 s = Serializer(app.config['SECRET_KEY']) try: # 转换为字典 data = s.loads(token) return data except Exception as e: app.logger.error(f"token转换失败:{e}") return None

注销登录,接收到token后作对比,成功后将redis中token删除。

@user.route('/logout', methods=["POST"]) @login_required() def logout(): ''' 注销方法:redis删除token :return: ''' try: token = request.headers["Authorization"] user = verify_token(token) if user: key = f"token_{user.get('name')}" redis_token = Redis.read(key) if redis_token: Redis.delete(key) return SUCCESS() else: return AUTH_ERR() except Exception as e: app.logger.error(f"注销失败") return REQUEST_ERROR()

检查是否登录:

@user.route('/check_token', methods=["POST"]) def check_token(): # 在请求头上拿到token token = request.headers["Authorization"] user = verify_token(token) if user: key = f"token_{user.get('name')}" redis_token = Redis.read(key) if redis_token == token: return SUCCESS(data=user.get('id')) else: return OTHER_LOGIN() else: return AUTH_ERR()

最后是需要做一个登录拦截器,意思是在客户端访问某些接口时,需要先进行登录验证,通过以后才能正常访问。

def login_required(*role): def decorator(func): @functools.wraps(func) def wrapper(*args, **kw): try: # 在请求头上拿到token token = request.headers["Authorization"] except Exception as e: # 没接收的到token,给前端抛出错误 return jsonify(code=Code.NO_PARAMETER.value, msg='缺少参数token') s = Serializer(app.config['SECRET_KEY']) try: user = s.loads(token) if role: # 获取token中的权限列表如果在参数列表中则表示有权限,否则就表示没有权限 user_role = user['role'] result = [x for x in user_role if x in list(role)] if not result: return jsonify(code=Code.ERR_PERMISSOM.value, msg="权限不够") except Exception as e: return jsonify(code=Code.LOGIN_TIMEOUT.value, msg="登录已过期") return func(*args, **kw) return wrapper return decorator

只需要在需要的接口下面加上@login_required()即可。

例如:

@home.route('/') @login_required() def test(): return jsonify({ 'code': 0, 'msg': 'test', })

END

实习编辑 | 王文星 @home.route('/') 责 编 | 刘玉江

能力越强,责任越大。实事求是,严谨细致。

——where2go 团队

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-09-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 算法与编程之美 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档