개발공부

[Flask 에서 JWT 사용] 회원가입 / 로그인 API에서, 토큰 생성해서 처리하는 방법 본문

Python/Flask

[Flask 에서 JWT 사용] 회원가입 / 로그인 API에서, 토큰 생성해서 처리하는 방법

mscha 2022. 6. 20. 18:07

app.py

from flask import Flask
from flask_jwt_extended import JWTManager
from flask_restful import Api
from config import Config

from resources.recipe import RecipeListResource
from resources.recipe_info import RecipeResource
from resources.recipe_publish import RecipePublishResource
from resources.user import UserLoginResource, UserRegisterResource

app = Flask(__name__)

# 환경변수 셋팅
app.config.from_object(Config)

# JWT 토큰 라이브러리만들기
jwt = JWTManager(app)

api = Api(app)

# 경로와 리소스(API 코드)를 연결한다.
# <int:recipe_id>
# 클라이언트에서 정수 recipe_id를 받아서 변수로 저장한다.
api.add_resource(RecipeListResource, '/recipes')
api.add_resource(RecipeResource, '/recipes/<int:recipe_id>')
api.add_resource(RecipePublishResource, '/recipes/<int:recipe_id>/publish')
api.add_resource(UserRegisterResource, '/users/register')
api.add_resource(UserLoginResource, '/users/login')
if __name__ == '__main__' :
    app.run()

mysql_connection.py

import mysql.connector

def get_connection() :
    connection = mysql.connector.connect(
        host = '본인의 host',
        database = 'recipe_db',
        user = 'recipe_user',
        password = '패스워드',
    )
    return connection

utils.py

from passlib.hash import pbkdf2_sha256

# 원문 비밀번호를, 암호화 하는 함수
# 암호화는 할 수 있지만 복호화는 안되는 hash 사용
# 단방향 암호화
# hash의 패턴을 파악하지 못하도록 salt(seed)를 설정하여 사용한다.
def hash_password(original_password) :
    salt = '시드로 사용할 문자열'
    password = original_password + salt
    password = pbkdf2_sha256.hash(password)
    return password

# 비밀번호가 맞는지 확인하는 함수
# True, False로 리턴한다.
def check_password(original_password, hashed_password) :
    salt = '시드로 사용할 문자열'
    check = pbkdf2_sha256.verify(original_password+salt, hashed_password)
    return check

user.py

import datetime
from http import HTTPStatus
from flask import request
from flask_jwt_extended import create_access_token
from flask_restful import Resource
from mysql.connector.errors import Error
from mysql_connection import get_connection
import mysql.connector
from email_validator import validate_email, EmailNotValidError

from utils import check_password, hash_password

class UserRegisterResource(Resource) : 
    def post(self) :
        # {
        #     "username" : "홍길동",
        #     "email" : "abc@naver.com",
        #     "password" : "1234"
        # }
        # api 실행 코드를 여기에 작성
        
        # 1. 클라이언트가 body에 보내준 json을 받아온다.
        data = request.get_json()

        # 2. 이메일 주소 형식이 제대로 된 주소 형식인지
        # 확인하는 코드
        try:
            email = validate_email(data['email']).email
            print(email)
        except EmailNotValidError as e:
            # email is not valid, exception message is human-readable
            print(str(e))
            return {'error' : str(e)}, 400

        # 3. 비밀번호의 길이를 체크한다.
        # 비밀번호의 길이는 4자리 이상, 12자리 이하로만!
        if len(data['password']) < 4 or len(data['password']) > 12 :
            return {'error' : '비밀번호의 길이를 확인하세요.'}, 400
        
        # 4. 비밀번호를 암호화 한다.
        hashed_password = hash_password(data['password'])
        print(hashed_password)
        
        # 5. 데이터베이스에 회원 정보를 저장한다!
        # 받아온 데이터를 디비 저장하면 된다.
        try :
            # 데이터 insert
            # 1. DB에 연결
            connection = get_connection()
            
            # 2. 쿼리문 만들기
            query = '''insert into user
                        (username, email, password)
                        values
                        (%s, %s, %s);'''
                    
            # recode 는 튜플 형태로 만든다.
            recode = (data['username'], data['email'], hashed_password)

            # 3. 커서를 가져온다.
            cursor = connection.cursor()

            # 4. 쿼리문을 커서를 이용해서 실행한다.
            cursor.execute(query, recode)

            # 5. 커넥션을 커밋해줘야 한다 => 디비에 영구적으로 반영하라는 뜻
            connection.commit()

            # 5-1. DB에 저장된 아이디 값 가져오기.
            user_id = cursor.lastrowid

            # 6. 자원 해제
            cursor.close()
            connection.close()

        except mysql.connector.Error as e :
            print(e)
            cursor.close()
            connection.close()
            return {"error" : str(e)}, 503

        # user_id를 바로 보내면 안되고,
        # JWT로 암호화해서 보내준다.
        # 암호화 하는 코드
        access_token = create_access_token(user_id, expires_delta = datetime.timedelta(minutes=1))

        # 숫자는 HTTPStatus 번호이다.
        return {"result" : "success", "access_token" : access_token}, 200


class UserLoginResource(Resource) :
    def post(self) :
        # {
        #     "email" : "abc@naver.com",
        #     "password" : "1234"
        # }   

        # 1. 클라이언트가 body에 보내준 json을 받아온다.
        data = request.get_json()

        # 2. 이메일로, DB에서 이 이메일과 일치하는 데이터를 가져온다.
        # 데이터를 select 해온다.
        # 디비로부터 데이터를 받아서, 클라이언트에 보내준다.
        try :
            # 1. DB에 연결
            connection = get_connection()
            
            # 2. 쿼리문 만들기
            query = '''select * from user
                        where email = %s;'''                 

            record = (data['email'],)

            # 3. 커서를 가져온다.
            # select를 할 때는 dictionary = True로 설정한다.
            cursor = connection.cursor(dictionary = True)

            # 4. 쿼리문을 커서를 이용해서 실행한다.
            cursor.execute(query, record)

            # 5. select 문은, 아래 함수를 이용해서, 데이터를 받아온다.
            result_list = cursor.fetchall()

            # 중요! 디비에서 가져온 timstamp는 
            # 파이썬의 datetime 으로 자동 변경된다.
            # 문제는 이 데이터를 json으로 바로 보낼 수 없으므로,
            # 문자열로 바꿔서 다시 저장해서 보낸다.
            i=0
            for record in result_list :
                result_list[i]['created_at'] = record['created_at'].isoformat()
                result_list[i]['updated_at'] = record['updated_at'].isoformat()
                i = i+1

            # 6. 자원 해제
            cursor.close()
            connection.close()
        except mysql.connector.Error as e :
            print(e)
            cursor.close()
            connection.close()
            return {"error" : str(e)}, 503


        # 3. result_list의 행의 갯수가 1개이면, 
        # 유저 데이터를 정상적으로 받아온 것이고,
        # 행의 갯수가 0이면, 클라이언트가 요청한 이메일은, 
        # 회원가입이 되어있지 않은 이메일이다.
        if len(result_list) != 1 :
            return {"error" : '회원가입이 안된 이메일입니다.'}, 400
        

        
        # 4. 클라이언트로부터 받은 비밀번호와 DB의 비밀번호가 같은지 확인한다.
        user_info = result_list[0]
        hashed_password = user_info['password']
        if check_password(data['password'], hashed_password) == False :
            return {'error' : '비밀번호가 틀렸습니다.'}, 400          
        
        # user_id를 바로 보내면 안되고,
        # JWT로 암호화해서 보내준다.
        # 암호화 하는 코드
        access_token = create_access_token(user_info['id'], expires_delta = datetime.timedelta(minutes=1))

        return {
            "result" : "success",
            "access_token" : access_token}, 200

 

postman에서 확인하기

 

회원가입 API를 만든후 json을 아래와같이 입력한다음 send를 하겠다.

그럼 아래와같이 성공했다는 메시지와 user_id에대한 jwt가 나온다.

 

Mysql Workbench로 가서 user 테이블에 정상적으로 김민수에대한 내용일 생겼는지 확인해보자.

 

원래 있던 user 테이블

 

김민수가 추가된 user 테이블

 

이제 만들어진 계정으로 로그인이 잘 되는지 테스트해보자.

아래와 같은 로그인 API를 만든후 json 문을 입력한후 send 해보자.

아래와 같이 잘 실행되는 것을 볼 수 있다.