前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Django OAuth2 和 JWT 案例

Django OAuth2 和 JWT 案例

作者头像
用户1416054
发布2018-08-02 11:44:40
1.3K0
发布2018-08-02 11:44:40
举报

Django OAuth2 和 JWT 案例

Posted August 08, 2017

在重写 Ansible 监控平台时, 需要前后端分离, 并且需要使用公司的账户系统。 而前后端认证我一直采取的 JWT 认证规范,具体为什么这么选择, 这里不多讲。而符合DRF 的JWT 框架, 默认使用的是 Django 自带的账户系统做的。 所以再 OAuth2 和 JWT 结合需要做点工作。

OAuth2认证方法

此步骤主要包含, 从资源服务器交换 Token, 然后根据 token 获取当前用户的 profile 信息, 一般为 email 和 avatar 信息. 然后创建 Django 自带的 User。 也可以通过函数实现。

Python

# -*- coding: utf-8 -*-

"""
web.auth
~~~~~~~~

Cable 认证组件

这里只接受 code, 通过 code 及相对应的密钥换取 Token 后

获取用户信息, 并根据数据库是否有此用户。

如果没有此用户则创建, 并设置未激活状态

如果有此用户, 并且处于未激活状态, 则提示用户找管理员激活

如果已经激活, 返回登录此用户并返回 jwt.
"""

import requests
from web.models import Profile
from web.models import User
from rest_framework.exceptions import APIException


class TeambitionAuthenticationFailed(APIException):
    status_code = 401
    default_code = 'Unauthorized'


class CableOAuth2(object):

    access_token_url = "https://account.teambition.com/oauth2/access_token"
    user_profile_url = "https://api.teambition.com/api/users/me"

    def __init__(self, CK, SK):
        self.CK = CK
        self.SK = SK

    def get_profile(self, email=""):
        profile = Profile.objects.filter(user__email=email).first()

        return profile

    def get_user(self, email=""):
        user = User.objects.filter(email=email).first()

        return user

    def is_actived(self, user):
        return user.is_active

    def get_access_token(self, code):
        payload = {
            'client_id': self.CK,
            'client_secret': self.SK,
            'code': code,
            'grant_type': 'Bearer'
        }

        resp = self.request(
            "POST",
            self.access_token_url,
            data=payload
        )

        data = resp.json()
        return data.get('access_token', None)

    def get_me(self, access_token):
        """
        获取用户信息
        """
        resp = self.request(
            "GET",
            self.user_profile_url + '?access_token=' + access_token
        )

        return resp.json()

    def request(self, method, url, **kwargs):
        resp = requests.request(method, url, **kwargs)

        try:
            resp.raise_for_status()
        except requests.HTTPError as e:
            m = resp.json()

            detail = "%s %s" % (url, str(e))
            if 'message' in m and 'name' in m:
                detail = "%s %s" % (m["name"], m["message"])

            raise TeambitionAuthenticationFailed(detail)
        return resp

    def authentication(self, code):
        access_token = self.get_access_token(code)

        # 如果没有获取到 access token 那么后面就不继续了
        # 直接报错给客户端
        if not access_token:
            detail = "Get access token failure"
            raise TeambitionAuthenticationFailed(detail)

        me = self.get_me(access_token)

        # 这里做下安全保护, 如果非 teambition 域名的用户
        # 直接返回, 并给出非法警告.
        if not me["email"].endswith('teambition.com'):
            detail = "非法操作, 您不是 teambtion 员工,请立即停止此行为!!"
            raise TeambitionAuthenticationFailed(detail)

        user = self.get_user(me["email"])
        profile = self.get_profile(me["email"])

        if not user:
            user = User(
                username=me["email"].split("@")[0],
                email=me["email"],
                is_active=False
            )

            user.save()

        profile_data = {
            "avatar": me["avatarUrl"]
        }

        Profile.objects.update_or_create(user=user, defaults=profile_data)

        return user

JWT APIView*

引入的包

Python

from django.utils.translation import ugettext as _
from rest_framework_jwt.serializers import JSONWebTokenSerializer
from rest_framework_jwt.serializers import Serializer
from rest_framework_jwt.serializers import jwt_payload_handler
from rest_framework_jwt.serializers import jwt_encode_handler
from rest_framework_jwt.serializers import jwt_decode_handler
from rest_framework_jwt.serializers import jwt_get_username_from_payload
from rest_framework_jwt.views import JSONWebTokenAPIView
from rest_framework_jwt.views import jwt_response_payload_handler
from rest_framework_jwt.settings import api_settings
from rest_framework import serializers
from rest_framework import status
from rest_framework.response import Response
from django.conf import settings
from web.auth import CableOAuth2

由于 OAuth2 返回时仅返回 code, 所以需要在JWTSerializer中获取此 code 并通过上面方法认证.

Python

class TeambitionJWTSerializer(Serializer):

    def validate(self, attrs):
        code = self.initial_data.get("code", None)

        oa = CableOAuth2(CK=settings.OAUTH_CLIENT_KEY, SK=settings.OAUTH_SECRET_KEY)

        user = oa.authentication(code)

        if user:
            if not user.is_active:
                msg = _('User account is not actived or disabled.')
                raise serializers.ValidationError(msg)

            payload = jwt_payload_handler(user)

            return {
                'token': jwt_encode_handler(payload),
                'user': user
            }
        else:
            msg = _('Unable to log in with provided credentials.')
            raise serializers.ValidationError(msg)

默认的 JWT APIView 方法是 POST, OAuth2 Callback URL 是 GET 方式, 所以需要自定义个JWTView, 目的是通过回调ˇ GET 的方式获得 Code.

Python

class TeambitionJWTAPIView(JSONWebTokenAPIView):

    def get(self, request, *args, **kwargs):
        serializer = self.get_serializer(data=request.GET)

        if serializer.is_valid():
            user = serializer.object.get('user') or request.user
            token = serializer.object.get('token')
            response_data = jwt_response_payload_handler(token, user, request)
            response = Response(response_data)
            if api_settings.JWT_AUTH_COOKIE:
                expiration = (datetime.utcnow() +
                              api_settings.JWT_EXPIRATION_DELTA)
                response.set_cookie(api_settings.JWT_AUTH_COOKIE,
                                    token,
                                    expires=expiration,
                                    httponly=True)
            return response

        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

绑定到 Serializer

Raw

class TeambitionObtainJSONWebToken(TeambitionJWTAPIView):
    serializer_class = TeambitionJWTSerializer


teambition_obtain_jwt_token = TeambitionObtainJSONWebToken.as_view()

替换默认的签发路径

并把资源服务的应用程序回掉地址改为http://<host:port>/jwt/teambition/obtain

Python

from django.conf.urls import url, include
from web.auth.jwt import teambition_obtain_jwt_token
from rest_framework_jwt.views import obtain_jwt_token
from rest_framework_jwt.views import refresh_jwt_token
from rest_framework_jwt.views import verify_jwt_token


urlpatterns = [
    # oauth2 
    url(r'^jwt/teambition/obtain', teambition_obtain_jwt_token),

    # username & password auth
    # url(r'^jwt/obtain', obtain_jwt_token),
    url(r'^jwt/refresh', refresh_jwt_token),
    url(r'^jwt/verify', verify_jwt_token),
]
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Django OAuth2 和 JWT 案例
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档