前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flask单点登录竟然只要几行代码就能搞定!

Flask单点登录竟然只要几行代码就能搞定!

作者头像
Python进击者
发布2020-09-14 11:24:28
3.4K0
发布2020-09-14 11:24:28
举报
文章被收录于专栏:JAVAandPython君JAVAandPython君

前言

好久不见,很久没有写原创了,主要是在忙着自己的一些事情。今天给大家写一篇之前在项目当中遇到的一个需求---flask接入统一认证服务。

现在单点登录已经是非常的普遍了, CAS(Central Authentication Service)统一认证服务也是属于我们必备的知识和技能,今天就来带大家学习如何接入统一认证服务。

当然,如果你没有学习单点登录相关的知识,可能本文有些内容不会那么容易理解。

Flask-CAS

首先想要接入统一认证服务,我们必须搞懂单点登录的原理:

上面这幅图可以说是CAS单点登录的核心了,只要看懂这幅图,那么单点登录的基本原理你也就能够理解了。这幅图并不难懂,只需要跟着它的顺序来进行阅读即可,这里不过多阐述。

关于flask的单点登录,主要依靠的是一个开源项目,这里给大家链接:

https://github.com/cameronbwhite/Flask-CAS

这个项目不能说非常的完美,但是flask的单点登录还是可以实现的。

当然,在我们实际开发当中,直接使用它的API可能无法满足我们的需求,这个时候就需要去修改它的源码了,好在这个开源项目的源码并不是非常的多,而且思路也还算比较清晰,所以可读性还不错。

所以的代码都在上图所示的几个文件当中

__ init__.py:

代码语言:javascript
复制
"""
flask_cas.__init__
"""

import flask
from flask import current_app

# Find the stack on which we want to store the database connection.
# Starting with Flask 0.9, the _app_ctx_stack is the correct one,
# before that we need to use the _request_ctx_stack.
try:
    from flask import _app_ctx_stack as stack
except ImportError:
    from flask import _request_ctx_stack as stack

from . import routing

from functools import wraps

class CAS(object):
    """
    Required Configs:

    |Key             |
    |----------------|
    |CAS_SERVER      |
    |CAS_AFTER_LOGIN |

    Optional Configs:

    |Key                        | Default               |
    |---------------------------|-----------------------|
    |CAS_TOKEN_SESSION_KEY      | _CAS_TOKEN            |
    |CAS_USERNAME_SESSION_KEY   | CAS_USERNAME          |
    |CAS_ATTRIBUTES_SESSION_KEY | CAS_ATTRIBUTES        |
    |CAS_LOGIN_ROUTE            | '/cas'                |
    |CAS_LOGOUT_ROUTE           | '/cas/logout'         |
    |CAS_VALIDATE_ROUTE         | '/cas/serviceValidate'|
    |CAS_AFTER_LOGOUT           | None                  |
    """

    def __init__(self, app=None, url_prefix=None):
        self._app = app
        if app is not None:
            self.init_app(app, url_prefix)

    def init_app(self, app, url_prefix=None):
        # Configuration defaults
        app.config.setdefault('CAS_TOKEN_SESSION_KEY', '_CAS_TOKEN')
        app.config.setdefault('CAS_USERNAME_SESSION_KEY', 'CAS_USERNAME')
        app.config.setdefault('CAS_ATTRIBUTES_SESSION_KEY', 'CAS_ATTRIBUTES')
        app.config.setdefault('CAS_LOGIN_ROUTE', '/cas')
        app.config.setdefault('CAS_LOGOUT_ROUTE', '/cas/logout')
        app.config.setdefault('CAS_VALIDATE_ROUTE', '/cas/serviceValidate')
        # Requires CAS 2.0
        app.config.setdefault('CAS_AFTER_LOGOUT', None)
        # Register Blueprint
        app.register_blueprint(routing.blueprint, url_prefix=url_prefix)

        # Use the newstyle teardown_appcontext if it's available,
        # otherwise fall back to the request context
        if hasattr(app, 'teardown_appcontext'):
            app.teardown_appcontext(self.teardown)
        else:
            app.teardown_request(self.teardown)

    def teardown(self, exception):
        ctx = stack.top
    
    @property
    def app(self):
        return self._app or current_app

    @property
    def username(self):
        return flask.session.get(
            self.app.config['CAS_USERNAME_SESSION_KEY'], None)

    @property
    def attributes(self):
        return flask.session.get(
            self.app.config['CAS_ATTRIBUTES_SESSION_KEY'], None)

    @property
    def token(self):
        return flask.session.get(
            self.app.config['CAS_TOKEN_SESSION_KEY'], None)

def login():
    return flask.redirect(flask.url_for('cas.login', _external=True))

def logout():
    return flask.redirect(flask.url_for('cas.logout', _external=True))

def login_required(function):
    @wraps(function)
    def wrap(*args, **kwargs):
        if 'CAS_USERNAME' not in flask.session:
            flask.session['CAS_AFTER_LOGIN_SESSION_URL'] = (
                flask.request.script_root +
                flask.request.full_path
            )
            return login()
        else:
            return function(*args, **kwargs)
    return wrap

这个文件主要写的是一些配置和基本的函数。

cas_urls.py:

代码语言:javascript
复制
"""
flask_cas.cas_urls

Functions for creating urls to access CAS.
"""

try:
    from urllib import quote
    from urllib import urlencode
    from urlparse import urljoin
except ImportError:
    from urllib.parse import quote
    from urllib.parse import urljoin
    from urllib.parse import urlencode


def create_url(base, path=None, *query):
    """ Create a url.

    Creates a url by combining base, path, and the query's list of
    key/value pairs. Escaping is handled automatically. Any
    key/value pair with a value that is None is ignored.

    Keyword arguments:
    base -- The left most part of the url (ex. http://localhost:5000).
    path -- The path after the base (ex. /foo/bar).
    query -- A list of key value pairs (ex. [('key', 'value')]).

    Example usage:
    >>> create_url(
    ...     'http://localhost:5000',
    ...     'foo/bar',
    ...     ('key1', 'value'),
    ...     ('key2', None),     # Will not include None
    ...     ('url', 'http://example.com'),
    ... )
    'http://localhost:5000/foo/bar?key1=value&url=http%3A%2F%2Fexample.com'
    """
    url = base
    # Add the path to the url if it's not None.
    if path is not None:
        url = urljoin(url, quote(path))
    # Remove key/value pairs with None values.
    query = filter(lambda pair: pair[1] is not None, query)
    # Add the query string to the url
    url = urljoin(url, '?{0}'.format(urlencode(list(query))))
    return url


def create_cas_login_url(cas_url, cas_route, service, renew=None, gateway=None):
    """ Create a CAS login URL .

    Keyword arguments:
    cas_url -- The url to the CAS (ex. http://sso.pdx.edu)
    cas_route -- The route where the CAS lives on server (ex. /cas)
    service -- (ex.  http://localhost:5000/login)
    renew -- "true" or "false"
    gateway -- "true" or "false"

    Example usage:
    >>> create_cas_login_url(
    ...     'http://sso.pdx.edu',
    ...     '/cas',
    ...     'http://localhost:5000',
    ... )
    'http://sso.pdx.edu/cas?service=http%3A%2F%2Flocalhost%3A5000'
    """
    return create_url(
        cas_url,
        cas_route,
        ('service', service),
        ('renew', renew),
        ('gateway', gateway),
    )


def create_cas_logout_url(cas_url, cas_route, service=None):
    """ Create a CAS logout URL.

    Keyword arguments:
    cas_url -- The url to the CAS (ex. http://sso.pdx.edu)
    cas_route -- The route where the CAS lives on server (ex. /cas/logout)
    url -- (ex.  http://localhost:5000/login)

    Example usage:
    >>> create_cas_logout_url(
    ...     'http://sso.pdx.edu',
    ...     '/cas/logout',
    ...     'http://localhost:5000',
    ... )
    'http://sso.pdx.edu/cas/logout?service=http%3A%2F%2Flocalhost%3A5000'
    """
    return create_url(
        cas_url,
        cas_route,
        ('service', service),
    )


def create_cas_validate_url(cas_url, cas_route, service, ticket,
                            renew=None):
    """ Create a CAS validate URL.

    Keyword arguments:
    cas_url -- The url to the CAS (ex. http://sso.pdx.edu)
    cas_route -- The route where the CAS lives on server (ex. /cas/serviceValidate)
    service -- (ex.  http://localhost:5000/login)
    ticket -- (ex. 'ST-58274-x839euFek492ou832Eena7ee-cas')
    renew -- "true" or "false"

    Example usage:
    >>> create_cas_validate_url(
    ...     'http://sso.pdx.edu',
    ...     '/cas/serviceValidate',
    ...     'http://localhost:5000/login',
    ...     'ST-58274-x839euFek492ou832Eena7ee-cas'
    ... )
    'http://sso.pdx.edu/cas/serviceValidate?service=http%3A%2F%2Flocalhost%3A5000%2Flogin&ticket=ST-58274-x839euFek492ou832Eena7ee-cas'
    """
    return create_url(
        cas_url,
        cas_route,
        ('service', service),
        ('ticket', ticket),
        ('renew', renew),
    )

这个文件主要就是去生成一些CAS登录相关的url以及跳转的url

routing.py:

代码语言:javascript
复制
import flask
from xmltodict import parse
from flask import current_app
from .cas_urls import create_cas_login_url
from .cas_urls import create_cas_logout_url
from .cas_urls import create_cas_validate_url


try:
    from urllib import urlopen
except ImportError:
    from urllib.request import urlopen

blueprint = flask.Blueprint('cas', __name__)


@blueprint.route('/login/')
def login():
    """
    This route has two purposes. First, it is used by the user
    to login. Second, it is used by the CAS to respond with the
    `ticket` after the user logs in successfully.

    When the user accesses this url, they are redirected to the CAS
    to login. If the login was successful, the CAS will respond to this
    route with the ticket in the url. The ticket is then validated.
    If validation was successful the logged in username is saved in
    the user's session under the key `CAS_USERNAME_SESSION_KEY` and
    the user's attributes are saved under the key
    'CAS_USERNAME_ATTRIBUTE_KEY'
    """

    cas_token_session_key = current_app.config['CAS_TOKEN_SESSION_KEY']

    redirect_url = create_cas_login_url(
        current_app.config['CAS_SERVER'],
        current_app.config['CAS_LOGIN_ROUTE'],
        flask.url_for('.login', origin=flask.session.get('CAS_AFTER_LOGIN_SESSION_URL'), _external=True))

    if 'ticket' in flask.request.args:
        flask.session[cas_token_session_key] = flask.request.args['ticket']

    if cas_token_session_key in flask.session:

        if validate(flask.session[cas_token_session_key]):
            if 'CAS_AFTER_LOGIN_SESSION_URL' in flask.session:
                redirect_url = flask.session.pop('CAS_AFTER_LOGIN_SESSION_URL')
            elif flask.request.args.get('origin'):
                redirect_url = flask.request.args['origin']
            else:
                redirect_url = flask.url_for(
                    current_app.config['CAS_AFTER_LOGIN'])
        else:
            del flask.session[cas_token_session_key]

    current_app.logger.debug('Redirecting to: {0}'.format(redirect_url))

    return flask.redirect(redirect_url)


@blueprint.route('/logout/')
def logout():
    """
    When the user accesses this route they are logged out.
    """

    cas_username_session_key = current_app.config['CAS_USERNAME_SESSION_KEY']
    cas_attributes_session_key = current_app.config['CAS_ATTRIBUTES_SESSION_KEY']

    if cas_username_session_key in flask.session:
        del flask.session[cas_username_session_key]

    if cas_attributes_session_key in flask.session:
        del flask.session[cas_attributes_session_key]

    if(current_app.config['CAS_AFTER_LOGOUT'] is not None):
        redirect_url = create_cas_logout_url(
            current_app.config['CAS_SERVER'],
            current_app.config['CAS_LOGOUT_ROUTE'],
            current_app.config['CAS_AFTER_LOGOUT'])
    else:
        redirect_url = create_cas_logout_url(
            current_app.config['CAS_SERVER'],
            current_app.config['CAS_LOGOUT_ROUTE'])

    current_app.logger.debug('Redirecting to: {0}'.format(redirect_url))
    return flask.redirect(redirect_url)


def validate(ticket):
    """
    Will attempt to validate the ticket. If validation fails, then False
    is returned. If validation is successful, then True is returned
    and the validated username is saved in the session under the
    key `CAS_USERNAME_SESSION_KEY` while tha validated attributes dictionary
    is saved under the key 'CAS_ATTRIBUTES_SESSION_KEY'.
    """

    cas_username_session_key = current_app.config['CAS_USERNAME_SESSION_KEY']
    cas_attributes_session_key = current_app.config['CAS_ATTRIBUTES_SESSION_KEY']

    current_app.logger.debug("validating token {0}".format(ticket))

    cas_validate_url = create_cas_validate_url(
        current_app.config['CAS_SERVER'],
        current_app.config['CAS_VALIDATE_ROUTE'],
        flask.url_for('.login', origin=flask.session.get('CAS_AFTER_LOGIN_SESSION_URL'), _external=True),
        ticket)

    current_app.logger.debug("Making GET request to {0}".format(
        cas_validate_url))

    xml_from_dict = {}
    isValid = False

    try:
        xmldump = urlopen(cas_validate_url).read().strip().decode('utf8', 'ignore')
        xml_from_dict = parse(xmldump)
        isValid = True if "cas:authenticationSuccess" in xml_from_dict["cas:serviceResponse"] else False
    except ValueError:
        current_app.logger.error("CAS returned unexpected result")

    if isValid:
        current_app.logger.debug("valid")
        xml_from_dict = xml_from_dict["cas:serviceResponse"]["cas:authenticationSuccess"]
        username = xml_from_dict["cas:user"]
        flask.session[cas_username_session_key] = username

        if "cas:attributes" in xml_from_dict:
            attributes = xml_from_dict["cas:attributes"]

            if "cas:memberOf" in attributes:
                if not isinstance(attributes["cas:memberOf"], list):
                    attributes["cas:memberOf"] = attributes["cas:memberOf"].lstrip('[').rstrip(']').split(',')
                    for group_number in range(0, len(attributes['cas:memberOf'])):
                        attributes['cas:memberOf'][group_number] = attributes['cas:memberOf'][group_number].lstrip(' ').rstrip(' ')
                else:
                    for index in range(0, len(attributes['cas:memberOf'])):
                        attributes["cas:memberOf"][index] = attributes["cas:memberOf"][index].lstrip('[').rstrip(']').split(',')
                        for group_number in range(0, len(attributes['cas:memberOf'][index])):
                            attributes['cas:memberOf'][index][group_number] = attributes['cas:memberOf'][index][group_number].lstrip(' ').rstrip(' ')

            flask.session[cas_attributes_session_key] = attributes
    else:
        current_app.logger.debug("invalid")

    return isValid

这个文件内容可以说是整个项目的核心所在,主要包含了登录、登出、验证三个函数,这三个函数也是我们自定义时候重点修改的地方。

如果你有什么需要自定义的地方可以着重看这个文件。

项目的基本情况给大家介绍了,那么怎么去使用它呢? 我建议首先看一下readme文件,但是readme文件中的使用方法可能大部分人会报错,所以这里我提供一下之前写的一个demo,能够打开即用。

hello.py:

代码语言:javascript
复制
import flask
from flask import Flask
from flask_cas import CAS
from flask_cas import login_required

app = Flask(__name__)
cas = CAS(app)
# 此处填写server端地址
app.config['CAS_SERVER'] = 'https://server.cas.com:8443/cas'
# 填写cas登录后跳转的路由
app.config['CAS_AFTER_LOGIN'] = 'login'
app.config['SECRET_KEY'] ='dasdasdasdasdada123123423423aASDAS'
# 全局取消证书验证
import ssl
ssl._create_default_https_context = ssl._create_unverified_context


@app.route('/')
@login_required
def login():
    return flask.render_template(
        'demo.html',
        username=cas.username,
    )


if __name__ == '__main__':
    app.run("app1.cas.com", 9100, debug=True)  # 此处填写客户端路由

demo.html:

代码语言:javascript
复制
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<h1>{{username}}</h1>
</body>
</html>

大家会发现,使用这个库所涉及的代码非常的少。

总结

关于flask-cas这个库,我的建议是去阅读源码并且尝试着去修改,因为涉及的代码并不是非常多。想要看更多关于单点登录相关的文章可以留言告诉我~

如果这篇文章对你有所帮助,麻烦给个点赞在看吧!!!

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-09-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Python进击者 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • Flask-CAS
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档