2024-04-10T14:53:52 Copying approximately 9861 rows...
INSERT LOW_PRIORITY IGNORE INTO `test_db`.`_ptosc_new` (`id`, `k`, `c`, `pad`) SELECT `id`, `k`, `c`, `pad` FROM `test_db`.`ptosc` FORCE INDEX(`PRIMARY`) WHERE ((`id` >= ?)) AND ((`id` <= ?)) LOCK IN SHARE MODE /*pt-online-schema-change 214419 copy nibble*/
SELECT /*!40001 SQL_NO_CACHE */ `id` FROM `test_db`.`ptosc` FORCE INDEX(`PRIMARY`) WHERE ((`id` >= ?)) ORDER BY `id` LIMIT ?, 2 /*next chunk boundary*/
2024-04-10T14:53:52 Copied rows OK.\
2024-04-10T14:53:52 Analyzing new table...
2024-04-10T14:53:52 Swapping tables...
RENAME TABLE `test_db`.`ptosc` TO `test_db`.`_ptosc_old`, `test_db`.`_ptosc_new` TO `test_db`.`ptosc`
2024-04-10T14:53:52 Swapped original and new tables OK.
2024-04-10T14:53:52 Dropping old table...
DROP TABLE IF EXISTS `test_db`.`_ptosc_old`
2024-04-10T14:53:52 Dropped old table `test_db`.`_ptosc_old` OK.
from datetime import datetime,timedelta
from jose import jwt
from typing import Optional
import logging,os
# 加密秘钥
SECRET_KEY = os.getenv('SECRET_KEY')
hours = os.getenv('hours')
def creat_access_token(user_id: str,
username:str,
hour:www.laipuhuo.com Optional[int] = int(hours))->str:
# 设置过期时间
expire = datetime.utcnow()+timedelta(hours=hour)
# exp 必传参数
to_encode = {"exp":expire,"user_id":user_id,"username":username}
# 生成token,algorithm使用 HS256 加密算法
access_www.laipuhuo.com token = jwt.encode(to_encode,SECRET_KEY,algorithm = 'HS256')
return access_token
from jose.exceptions import ExpiredSignatureError,JWTError,JWSError,JWKError
from jose import jwt
from fastapi import Header
# 签名秘钥
SECRET_KEY = os.getenv('SECRET_KEY')
def check_access_token(token:str=Header(...)):
try:
payload = jwt.decode(token,SECRET_KEY,algorithms = ['HS256'])
print www.laipuhuo.com ('token验证成功!')
return payload
except ExpiredSignatureError:
print('token过期')
except JWTError:
print('token验证失败')
from fastapi import APIRouter,Depends
from models.cms.user import User
from sqlalchemy.orm import Session
from schemas.cms.user import login
import logging
from plugin.pulgin_sqlalchamy import db
from libs.www.laipuhuo.com security import creat_access_token
log = logging.getLogger('uvicorn')
user = APIRouter()
def get_db():
try:
yield db
finally:
db.close()
@user.post('/login')
async def login(data: login, db: Session =Depends(get_db)):
user = db.query(User).filter_by(username = data.username,password = data.password).first()
# 若存在账号密码,创建token
if user:
user_id = user.id
token = www.laipuhuo.com creat_access_token(user_id, data.username)
return token
else:
return '账号密码不正确'
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。