FastAPI 使用 JWT 認証のミドルウェア fastapi のミドルウェアはまだ少ないですが、JWT を単独で開発する必要があります。starlette 自体は認証関連の実装を提供しているため、AuthenticationBackend をカスタマイズするだけで実現できます。今回はミドルウェアを使用して JWT トークンを展開し、ペイロード内のユーザー情報を取得する方法を実装します。
プライベートに定義されたペイロードの形式は以下の通りです。
{
"usid": "SkDQBhEjUfygRSeEBech", //UUID Short
"uname": "テストユーザー名", // ユーザー名
"mid":"700010001" // メンバー ID
}
呼び出しコード
app = FastAPI()
app.add_middleware(AuthenticationMiddleware,backend=JWTAuthenticationBackend(secret_key="YOUR_SECRET_KEY"))
完全なコード
import jwt
from starlette.authentication import (
AuthenticationBackend, AuthenticationError, BaseUser, AuthCredentials,
UnauthenticatedUser )
class JWTUser(BaseUser):
def __init__(self, user_id_short: str, member_number: str, user_name: str,token: str, payload: dict) -> None:
self.user_name = user_name
self.user_id_short = user_id_short
self.member_number = member_number
self.token = token
self.payload = payload
@property
def is\_authenticated(self) -> bool:
return True
@property
def display\_name(self) -> str:
return self.user\_name ## 会員名の処理、\*を追加する?
@property
def display\_user\_id\_short(self) -> str:
return self.user\_id\_short ## 会員IDの処理
@property
def display\_member\_number(self) -> str:
return self.member\_number ## 会員番号の処理、\*を追加する?
class JWTAuthenticationBackend(AuthenticationBackend):
def \_\_init\_\_(self, secret\_key: str, algorithm: str = 'HS256', prefix: str = 'JWT', user\_name\_field:str = 'uname' , user\_id\_field: str = 'usid',member\_number\_field: str = 'mid'):
self.secret\_key = secret\_key
self.algorithm = algorithm
self.prefix = prefix
self.user\_name\_field = user\_name\_field
self.user\_id\_field = user\_id\_field
self.member\_number\_field = member\_number\_field
@classmethod
def get\_token\_from\_header(cls, authorization: str, prefix: str):
"""
Authorizationヘッダーを解析し、トークンのみを返します
:param authorization:
:return:
"""
try:
scheme, token = authorization.split()
except ValueError:
raise AuthenticationError('Authorizationスキームとトークンを分離できませんでした')
if scheme.lower() != prefix.lower():
raise AuthenticationError(f'Authorizationスキーム {scheme} はサポートされていません')
return token
async def authenticate(self, request):
if "Authorization" not in request.headers:
return None
auth = request.headers\["Authorization"\]
token = self.get\_token\_from\_header(authorization=auth, prefix=self.prefix)
try:
payload = jwt.decode(token, key=self.secret\_key, algorithms=self.algorithm)
except jwt.InvalidTokenError as e:
raise AuthenticationError(str(e))
return AuthCredentials(\["jwt\_authenticated"\]), JWTUser(user\_id\_short=payload\[self.user\_id\_field\], member\_number=payload\[self.member\_number\_field\],user\_name=payload\[self.user\_name\_field\],token=token,
payload=payload)
class JWTWebSocketAuthenticationBackend(AuthenticationBackend):
def \_\_init\_\_(self, secret\_key: str, algorithm: str = 'HS256', query\_param\_name: str = 'jwt',
user\_name\_field:str = 'uname' , user\_id\_field: str = 'usid',member\_number\_field: str = 'mid'):
self.secret\_key = secret\_key
self.algorithm = algorithm
self.query\_param\_name = query\_param\_name
self.user\_name\_field = user\_name\_field
self.user\_id\_field = user\_id\_field
self.member\_number\_field = member\_number\_field
async def authenticate(self, request):
if self.query\_param\_name not in request.query\_params:
return AuthCredentials(), UnauthenticatedUser()
token = request.query\_params\[self.query\_param\_name\]
try:
payload = jwt.decode(token, key=self.secret\_key, algorithms=self.algorithm)
except jwt.InvalidTokenError as e:
raise AuthenticationError(str(e))
return AuthCredentials(\["jwt\_authenticated"\]), JWTUser(user\_id\_short=payload\[self.user\_id\_field\], member\_number=payload\[self.member\_number\_field\],user\_name=payload\[self.user\_name\_field\],token=token,
payload=payload)