banner
李大仁博客

李大仁博客

天地虽大,但有一念向善,心存良知,虽凡夫俗子,皆可为圣贤。

FastAPI 使用JWT認證的中間件

FastAPI 使用 JWT 認證的中間件 fastapi 的中間件還是太少,單獨開發 JWT 需要,starlette 本身提供認證相關實現,只需要自定義一個 AuthenticationBackend 即可,本次我們實現使用中間價方式拆包 JWT 的令牌,獲取 payload 裡面的用戶信息

私有定義的 payload 內容格式如下

{
"usid": "SkDQBhEjUfygRSeEBech", //UUID Short
"uname": "測試用戶名", //Username
"mid":"700010001" // 會員 ID
}

調用代碼

app = FastAPI()
app.add_middleware(AuthenticationMiddleware,backend=JWTAuthenticationBackend(secret_key="YOUR_SECRET_KEY"))

完整的代碼

import jwt

from starlette.authentication import (
AuthenticationBackend, AuthenticationError, BaseUser, AuthCredentials,
UnauthenticatedUser )

class JWTUser(BaseUser):
def __init__(self, user_id_short: str, member_number: str, user_name: str,token: str, payload: dict) -> None:
self.user_name = user_name
self.user_id_short = user_id_short
self.member_number = member_number
self.token = token
self.payload = payload

@property
def is\_authenticated(self) -> bool:
    return True

@property
def display\_name(self) -> str:
    return self.user\_name   ## 會員名處理,加\*?

@property
def display\_user\_id\_short(self) -> str:
    return self.user\_id\_short   ## 會員id處理

@property
def display\_member\_number(self) -> str:
    return self.member\_number  ## 會員號處理,加\*?

class JWTAuthenticationBackend(AuthenticationBackend):

def \_\_init\_\_(self, secret\_key: str, algorithm: str = 'HS256', prefix: str = 'JWT', user\_name\_field:str = 'uname' , user\_id\_field: str = 'usid',member\_number\_field: str = 'mid'):
    self.secret\_key = secret\_key
    self.algorithm = algorithm
    self.prefix = prefix
    self.user\_name\_field = user\_name\_field
    self.user\_id\_field = user\_id\_field
    self.member\_number\_field = member\_number\_field
    

@classmethod
def get\_token\_from\_header(cls, authorization: str, prefix: str):
    """
    解析Authorization標頭並僅返回令牌
    :param authorization:
    :return:
    """
    try:
        scheme, token = authorization.split()
    except ValueError:
        raise AuthenticationError('無法分離Authorization方案和令牌')
    if scheme.lower() != prefix.lower():
        raise AuthenticationError(f'不支持的Authorization方案 {scheme}')
    return token

async def authenticate(self, request):
    if "Authorization" not in request.headers:
        return None

    auth = request.headers\["Authorization"\]
    token = self.get\_token\_from\_header(authorization=auth, prefix=self.prefix)
    try:
        payload = jwt.decode(token, key=self.secret\_key, algorithms=self.algorithm)
    except jwt.InvalidTokenError as e:
        raise AuthenticationError(str(e))

    return AuthCredentials(\["jwt\_authenticated"\]), JWTUser(user\_id\_short=payload\[self.user\_id\_field\], member\_number=payload\[self.member\_number\_field\],user\_name=payload\[self.user\_name\_field\],token=token,
                                                       payload=payload)

class JWTWebSocketAuthenticationBackend(AuthenticationBackend):

def \_\_init\_\_(self, secret\_key: str, algorithm: str = 'HS256', query\_param\_name: str = 'jwt',
             user\_name\_field:str = 'uname' , user\_id\_field: str = 'usid',member\_number\_field: str = 'mid'):
    self.secret\_key = secret\_key
    self.algorithm = algorithm
    self.query\_param\_name = query\_param\_name
    self.user\_name\_field = user\_name\_field
    self.user\_id\_field = user\_id\_field
    self.member\_number\_field = member\_number\_field

async def authenticate(self, request):
    if self.query\_param\_name not in request.query\_params:
        return AuthCredentials(), UnauthenticatedUser()

    token = request.query\_params\[self.query\_param\_name\]

    try:
        payload = jwt.decode(token, key=self.secret\_key, algorithms=self.algorithm)
    except jwt.InvalidTokenError as e:
        raise AuthenticationError(str(e))

    return AuthCredentials(\["jwt\_authenticated"\]), JWTUser(user\_id\_short=payload\[self.user\_id\_field\], member\_number=payload\[self.member\_number\_field\],user\_name=payload\[self.user\_name\_field\],token=token,
                                                       payload=payload)
載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。