前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python学习笔记(6)---OAut

Python学习笔记(6)---OAut

作者头像
py3study
发布2020-01-10 16:14:51
4180
发布2020-01-10 16:14:51
举报
文章被收录于专栏:python3python3

OAuth: (开放授权)

wKiom1WEMbKC3Nb5AAIMnP63WMs308.jpg
wKiom1WEMbKC3Nb5AAIMnP63WMs308.jpg

OAuth的授权模式:

  • 授权码模式: 功能最完善,流程最严密
  • 简码模式: 不通过第三方应用程序服务器,直接在浏览器中向认证服务器申请指令
  • 密码模式:用户向客户端提供用户名和密码
  • 客户端模式:

OAuth授权服务器:

在logindemo.py中添加:

代码语言:javascript
复制
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import base64
import random
import time

from flask import Flask, request, redirect

app = Flask(__name__)

users = {
    "xxxx": ["xxxxx"]
}

auth_code = {}

redirect_uri='http://localhost:5000/client/passport'
# 给用户添加账号
client_id = 'xxxxxx'
users[client_id] = []
# 授权服务器需要保存重定向uri
oauth_redirect_uri = []

def gen_token(uid):
    token = base64.b64encode(':'.join([str(uid), str(random.random()), str(time.time() + 7200)]))
    users[uid].append(token)
    return token


# 生成授权码:
def gen_auth_code(uri):
    code = random.randint(0, 10000)
    auth_code[code] = uri
    return code


def verify_token(token):
    _token = base64.b64decode(token)
    if not users.get(_token.split(':')[0])[-1] == token:
        return -1
    if float(_token.split(':')[-1]) >= time.time():
        return 1
    else:
        return 0

@app.route('/', methods=['GET'])
def index():
    print request.headers
    return 'hello'

@app.route('/login', methods=['GET'])
def login():
    uid, pw = base64.b64decode(request.headers['Authorization'].split(' ')[-1]).split(':')
    if users.get(uid)[0] == pw:
        return gen_token(uid)
    else:
        return 'error'


#授权码的发放:
@app.route('/oauth', methods=['GET'])
def oauth():
    # 验证用户授权
    if request.args.get('user'):
        if users.get(request.args.get('user'))[0] == request.args.get('pw') and oauth_redirect_uri:
            uri = oauth_redirect_uri[0] + '?code=%s' % gen_auth_code(oauth_redirect_uri[0])
            return redirect(uri)
    if request.args.get('code'): # 若请求中携带授权码,
        # 比对uri
        if auth_code.get(int(request.args.get('code'))) == request.args.get('redirect_uri'):
            return gen_token(request.args.get('client_id'))# 发放token
    if request.args.get('redirect_uri'):
        oauth_redirect_uri.append(request.args.get('redirect_uri'))
    return 'please login'


# 重定向
# 用户访问客户端的login目录,客户端将用户重定向到授权服务端的oauth
@app.route('/client/login', methods=['GET'])
def client_login():
    uri = 'http://localhost:5000/oauth?response_type=code&client_id=%s&redirect_uri=%s' % (
        client_id, redirect_uri)
    return redirect(uri)


@app.route('/client/passport', methods=['POST', 'GET'])
def client_passport():
    code = request.args.get('code')
    uri = 'http://localhost:5000/oauth?grant_type=authorization_code&code=%s&redirect_uri=%s&client_id=%s' % (code, redirect_uri, client_id)
    return redirect(uri)

@app.route('/test1', methods=['GET'])
def test():
    token = request.args.get('token')
    if verify_token(token) == 1:
        return 'data'
    else:
        return 'error'





if __name__ == '__main__':
    app.run(debug=True)

在requests_t.py

代码语言:javascript
复制
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import requests



r = requests.get('http://localhost:5000/client/login')
print r.text
print r.history
print r.url
uri_login = r.url.split('?')[0] + '?user=zx&pw=thystar'
r2 = requests.get(uri_login)
print r2.text
r = requests.get('http://127.0.0.1:5000/test1', params={'token': r2.text})
print r.text

Flask渲染页面集Cookies;

Cookies的加密方法:

对源代码的修改:

logindemo.py

代码语言:javascript
复制
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import base64
import random
import time
import json
import hmac
from datetime import datetime, timedelta

from flask import Flask, request, redirect, make_response

app = Flask(__name__)

users = {
    "zx": ["thystar"]
}

redirect_uri='http://localhost:5000/client/passport'
client_id = 'thystar'
users[client_id] = []
auth_code = {}

oauth_redirect_uri = []

TIMEOUT = 3600 * 2


# 新版本的token生成器
def gen_token(data):
    '''
    :param data: dict type
    :return: base64 str
    '''
    data = data.copy()
    if "salt" not in data:
        data["salt"] = unicode(random.random()).decode("ascii")
    if "expires" not in data:
        data["expires"] = time.time() + TIMEOUT
    payload = json.dumps(data).encode("utf8")
    # 生成签名
    sig = _get_signature(payload)
    return encode_token_bytes(payload + sig)

# 授权码生成器
def gen_auth_code(uri, user_id):
    code = random.randint(0,10000)
    auth_code[code] = [uri, user_id]
    return code

# 新版本的token验证
def verify_token(token):
    '''
    :param token: base64 str
    :return: dict type
    '''
    decoded_token = decode_token_bytes(str(token))
    payload = decoded_token[:-16]
    sig = decoded_token[-16:]
    # 生成签名
    expected_sig = _get_signature(payload)
    if sig != expected_sig:
        return {}
    data = json.loads(payload.decode("utf8"))
    if data.get('expires') >= time.time():
        return data
    return 0

# 使用hmac为消息生成签名
def _get_signature(value):
    """Calculate the HMAC signature for the given value."""
    return hmac.new('secret123456', value).digest()

# 下面两个函数将base64编码和解码单独封装
def encode_token_bytes(data):
    return base64.urlsafe_b64encode(data)

def decode_token_bytes(data):
    return base64.urlsafe_b64decode(data)


# 验证服务器端
@app.route('/index', methods=['POST', 'GET'])
def index():
    print request.headers
    return 'hello'

@app.route('/login', methods=['POST', 'GET'])
def login():
    uid, pw = base64.b64decode(request.headers['Authorization'].split(' ')[-1]).split(':')
    if users.get(uid)[0] == pw:
        return gen_token(dict(user=uid, pw=pw))
    else:
        return 'error'

@app.route('/oauth', methods=['POST', 'GET'])
def oauth():
    # 处理表单登录, 同时设置Cookie
    if request.method == 'POST' and request.form['user']:
        u = request.form['user']
        p = request.form['pw']
        if users.get(u)[0] == p and oauth_redirect_uri:
            uri = oauth_redirect_uri[0] + '?code=%s' % gen_auth_code(oauth_redirect_uri[0], u)
            expire_date = datetime.now() + timedelta(minutes=1)
            resp = make_response(redirect(uri))
            resp.set_cookie('login', '_'.join([u, p]), expires=expire_date)
            return resp
    # 验证授权码,发放token
    if request.args.get('code'):
        auth_info = auth_code.get(int(request.args.get('code')))
        if auth_info[0] == request.args.get('redirect_uri'):
            # 可以在授权码的auth_code中存储用户名,编进token中
            return gen_token(dict(client_id=request.args.get('client_id'), user_id=auth_info[1]))
    # 如果登录用户有Cookie,则直接验证成功,否则需要填写登录表单
    if request.args.get('redirect_uri'):
        oauth_redirect_uri.append(request.args.get('redirect_uri'))
        if request.cookies.get('login'):
            u, p = request.cookies.get('login').split('_')
            if users.get(u)[0] == p:
                uri = oauth_redirect_uri[0] + '?code=%s' % gen_auth_code(oauth_redirect_uri[0], u)
                return redirect(uri)
        return '''
            <form action="" method="post">
                <p><input type=text name=user>
                <p><input type=text name=pw>
                <p><input type=submit value=Login>
            </form>
        '''


# 客户端
@app.route('/client/login', methods=['POST', 'GET'])
def client_login():
    uri = 'http://localhost:5000/oauth?response_type=code&client_id=%s&redirect_uri=%s' % (client_id, redirect_uri)
    return redirect(uri)

@app.route('/client/passport', methods=['POST', 'GET'])
def client_passport():
    code = request.args.get('code')
    uri = 'http://localhost:5000/oauth?grant_type=authorization_code&code=%s&redirect_uri=%s&client_id=%s' % (code, redirect_uri, client_id)
    return redirect(uri)


# 资源服务器端
@app.route('/test1', methods=['POST', 'GET'])
def test():
    token = request.args.get('token')
    ret = verify_token(token)
    if ret:
        return json.dumps(ret)
    else:
        return 'error'

if __name__ == '__main__':
    app.run(debug=True)

运行http://localhost:5000/client/login

登陆得到token,把token拼接到test1中测试

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-08-28 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
访问管理
访问管理(Cloud Access Management,CAM)可以帮助您安全、便捷地管理对腾讯云服务和资源的访问。您可以使用CAM创建子用户、用户组和角色,并通过策略控制其访问范围。CAM支持用户和角色SSO能力,您可以根据具体管理场景针对性设置企业内用户和腾讯云的互通能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档