개발공부

[Flask 에서 JWT 사용] 토큰 유효기간 만료 시키는 방법 본문

Python/Flask

[Flask 에서 JWT 사용] 토큰 유효기간 만료 시키는 방법

mscha 2022. 6. 21. 10:25

토큰 유효기간 만료 시키는 방법은

config.py의 

JWT_ACCESS_TOKEN_EXPIRES = True 설정을 하고

access_token 생성 부분에

access_token = create_access_token(user_info['id'], expires_delta = datetime.timedelta(minutes=3))

위와 같은 식으로 datetime의 timedelta를 이용해 토큰의 만료 기간을 설정해준다.

 

예제

 

config.py

class Config :
    JWT_SECRET_KEY = 'mykey'
    JWT_ACCESS_TOKEN_EXPIRES = True
    PROPAGATE_EXCEPTIONS = True

 

my_connection.py

import mysql.connector

def get_connection() :
    connection = mysql.connector.connect(
        host = 'yh-db.cesawiuivilv.ap-northeast-2.rds.amazonaws.com',
        database = 'recipe_db',
        user = 'recipe_user',
        password = 'recipe1234',
    )
    return connection

utils.py

from passlib.hash import pbkdf2_sha256

# 원문 비밀번호를, 암호화 하는 함수
# 암호화는 할 수 있지만 복호화는 안되는 hash 사용
# 단방향 암호화
# hash의 패턴을 파악하지 못하도록 salt(seed)를 설정하여 사용한다.
def hash_password(original_password) :
    salt = 'yh*hello12'
    password = original_password + salt
    password = pbkdf2_sha256.hash(password)
    return password

# 비밀번호가 맞는지 확인하는 함수
# True, False로 리턴한다.
def check_password(original_password, hashed_password) :
    salt = 'yh*hello12'
    check = pbkdf2_sha256.verify(original_password+salt, hashed_password)
    return check

app.py

from flask import Flask
from flask_jwt_extended import JWTManager
from flask_restful import Api
from config import Config

from resources.user import UserLoginResource, UserRegisterResource

app = Flask(__name__)

# 환경변수 셋팅
app.config.from_object(Config)

# JWT 토큰 라이브러리만들기
jwt = JWTManager(app)

api = Api(app)

# 경로와 리소스(API 코드)를 연결한다.

api.add_resource(UserRegisterResource, '/users/register')
api.add_resource(UserLoginResource, '/users/login')


if __name__ == '__main__' :
    app.run()

 

/resource/user.py

import datetime
from http import HTTPStatus
from flask import request
from flask_jwt_extended import create_access_token, get_jwt_identity, jwt_required
from flask_restful import Resource
from mysql.connector.errors import Error
from mysql_connection import get_connection
import mysql.connector
from email_validator import validate_email, EmailNotValidError

from utils import check_password, hash_password

class UserRegisterResource(Resource) : 
    def post(self) :
        # {
        #     "username" : "홍길동",
        #     "email" : "abc@naver.com",
        #     "password" : "1234"
        # }
        # api 실행 코드를 여기에 작성
        
        # 1. 클라이언트가 body에 보내준 json을 받아온다.
        data = request.get_json()

        # 2. 이메일 주소 형식이 제대로 된 주소 형식인지
        # 확인하는 코드
        try:
            email = validate_email(data['email']).email
            print(email)
        except EmailNotValidError as e:
            # email is not valid, exception message is human-readable
            print(str(e))
            return {'error' : str(e)}, 400

        # 3. 비밀번호의 길이를 체크한다.
        # 비밀번호의 길이는 4자리 이상, 12자리 이하로만!
        if len(data['password']) < 4 or len(data['password']) > 12 :
            return {'error' : '비밀번호의 길이를 확인하세요.'}, 400
        
        # 4. 비밀번호를 암호화 한다.
        hashed_password = hash_password(data['password'])
        print(hashed_password)
        
        # 5. 데이터베이스에 회원 정보를 저장한다!
        # 받아온 데이터를 디비 저장하면 된다.
        try :
            # 데이터 insert
            # 1. DB에 연결
            connection = get_connection()
            
            # 2. 쿼리문 만들기
            query = '''insert into user
                        (username, email, password)
                        values
                        (%s, %s, %s);'''
                    
            # recode 는 튜플 형태로 만든다.
            recode = (data['username'], data['email'], hashed_password)

            # 3. 커서를 가져온다.
            cursor = connection.cursor()

            # 4. 쿼리문을 커서를 이용해서 실행한다.
            cursor.execute(query, recode)

            # 5. 커넥션을 커밋해줘야 한다 => 디비에 영구적으로 반영하라는 뜻
            connection.commit()

            # 5-1. DB에 저장된 아이디 값 가져오기.
            user_id = cursor.lastrowid

            # 6. 자원 해제
            cursor.close()
            connection.close()

        except mysql.connector.Error as e :
            print(e)
            cursor.close()
            connection.close()
            return {"error" : str(e)}, 503

        # user_id를 바로 보내면 안되고,
        # JWT로 암호화해서 보내준다.
        # 암호화 하는 코드
        # expires_delta = datetime.timedelta(minutes=3)
        # 토큰이 3분뒤 만료된다.
        access_token = create_access_token(user_id, expires_delta = datetime.timedelta(minutes=3))

        # 숫자는 HTTPStatus 번호이다.
        return {"result" : "success", "access_token" : access_token}, 200


class UserLoginResource(Resource) :
    def post(self) :
        # {
        #     "email" : "abc@naver.com",
        #     "password" : "1234"
        # }   

        # 1. 클라이언트가 body에 보내준 json을 받아온다.
        data = request.get_json()

        # 2. 이메일로, DB에서 이 이메일과 일치하는 데이터를 가져온다.
        # 데이터를 select 해온다.
        # 디비로부터 데이터를 받아서, 클라이언트에 보내준다.
        try :
            # 1. DB에 연결
            connection = get_connection()
            
            # 2. 쿼리문 만들기
            query = '''select * from user
                        where email = %s;'''                 

            record = (data['email'],)

            # 3. 커서를 가져온다.
            # select를 할 때는 dictionary = True로 설정한다.
            cursor = connection.cursor(dictionary = True)

            # 4. 쿼리문을 커서를 이용해서 실행한다.
            cursor.execute(query, record)

            # 5. select 문은, 아래 함수를 이용해서, 데이터를 받아온다.
            result_list = cursor.fetchall()

            # 중요! 디비에서 가져온 timstamp는 
            # 파이썬의 datetime 으로 자동 변경된다.
            # 문제는 이 데이터를 json으로 바로 보낼 수 없으므로,
            # 문자열로 바꿔서 다시 저장해서 보낸다.
            i=0
            for record in result_list :
                result_list[i]['created_at'] = record['created_at'].isoformat()
                result_list[i]['updated_at'] = record['updated_at'].isoformat()
                i = i+1

            # 6. 자원 해제
            cursor.close()
            connection.close()
        except mysql.connector.Error as e :
            print(e)
            cursor.close()
            connection.close()
            return {"error" : str(e)}, 503


        # 3. result_list의 행의 갯수가 1개이면, 
        # 유저 데이터를 정상적으로 받아온 것이고,
        # 행의 갯수가 0이면, 클라이언트가 요청한 이메일은, 
        # 회원가입이 되어있지 않은 이메일이다.
        if len(result_list) != 1 :
            return {"error" : '회원가입이 안된 이메일입니다.'}, 400
        

        
        # 4. 클라이언트로부터 받은 비밀번호와 DB의 비밀번호가 같은지 확인한다.
        user_info = result_list[0]
        hashed_password = user_info['password']
        if check_password(data['password'], hashed_password) == False :
            return {'error' : '비밀번호가 틀렸습니다.'}, 400          
        
        # user_id를 바로 보내면 안되고,
        # JWT로 암호화해서 보내준다.
        # 암호화 하는 코드
        # expires_delta = datetime.timedelta(minutes=3)
        # 토큰이 3분뒤 만료된다.
        access_token = create_access_token(user_info['id'], expires_delta = datetime.timedelta(minutes=3))

        return {
            "result" : "success",
            "access_token" : access_token}, 200