- Create User model with bcrypt password hashing - Add auth routes: register, login, refresh, me - Implement JWT access and refresh tokens - Add get_current_user dependency for protected routes - Update Task model with user_id foreign key for data isolation - Update TaskService to filter tasks by authenticated user - Add auth configuration (secret key, token expiry)
59 lines
1.9 KiB
Python
59 lines
1.9 KiB
Python
from typing import Optional
|
|
from sqlalchemy.orm import Session
|
|
|
|
from .models import User
|
|
from .schemas import UserCreate
|
|
from .utils import get_password_hash, verify_password, create_access_token, create_refresh_token, decode_token
|
|
|
|
|
|
class AuthService:
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
|
|
def get_user_by_email(self, email: str) -> Optional[User]:
|
|
return self.db.query(User).filter(User.email == email).first()
|
|
|
|
def get_user_by_id(self, user_id: str) -> Optional[User]:
|
|
return self.db.query(User).filter(User.id == user_id).first()
|
|
|
|
def create_user(self, user_data: UserCreate) -> User:
|
|
password_hash = get_password_hash(user_data.password)
|
|
user = User(
|
|
email=user_data.email,
|
|
name=user_data.name,
|
|
password_hash=password_hash,
|
|
)
|
|
self.db.add(user)
|
|
self.db.commit()
|
|
self.db.refresh(user)
|
|
return user
|
|
|
|
def authenticate_user(self, email: str, password: str) -> Optional[User]:
|
|
user = self.get_user_by_email(email)
|
|
if not user:
|
|
return None
|
|
if not verify_password(password, user.password_hash):
|
|
return None
|
|
return user
|
|
|
|
def create_tokens(self, user: User) -> dict:
|
|
access_token = create_access_token(data={"sub": user.id, "email": user.email})
|
|
refresh_token = create_refresh_token(data={"sub": user.id, "email": user.email})
|
|
return {
|
|
"access_token": access_token,
|
|
"refresh_token": refresh_token,
|
|
"token_type": "bearer",
|
|
}
|
|
|
|
def refresh_tokens(self, refresh_token: str) -> Optional[dict]:
|
|
payload = decode_token(refresh_token)
|
|
if payload is None or payload.get("type") != "refresh":
|
|
return None
|
|
|
|
user_id = payload.get("sub")
|
|
user = self.get_user_by_id(user_id)
|
|
if user is None:
|
|
return None
|
|
|
|
return self.create_tokens(user)
|