from datetime import datetime, timedelta from typing import Optional, Union import uuid from jose import JWTError, jwt from passlib.context import CryptContext from sqlalchemy.orm import Session from app.core.config import settings from app.models.user import User from app.schemas.user import UserCreate # Password hashing pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against its hash.""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """Hash a password.""" return pwd_context.hash(password) def create_access_token( subject: Union[str, int], expires_delta: timedelta = None ) -> str: """Create a JWT access token.""" if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta( minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES ) to_encode = {"exp": expire, "sub": str(subject)} encoded_jwt = jwt.encode(to_encode, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM) return encoded_jwt def verify_token(token: str) -> Optional[str]: """Verify a JWT token and return the subject.""" try: payload = jwt.decode( token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM] ) token_data = payload.get("sub") return token_data except JWTError: return None def authenticate_user(db: Session, username: str, password: str) -> Optional[User]: """Authenticate a user with username and password.""" user = db.query(User).filter(User.username == username).first() if not user: return None if not verify_password(password, user.hashed_password): return None return user def get_user_by_username(db: Session, username: str) -> Optional[User]: """Get user by username.""" return db.query(User).filter(User.username == username).first() def get_user_by_email(db: Session, email: str) -> Optional[User]: """Get user by email.""" return db.query(User).filter(User.email == email).first() def create_user(db: Session, user: UserCreate) -> User: """Create a new user.""" hashed_password = get_password_hash(user.password) db_user = User( id=str(uuid.uuid4()), username=user.username, email=user.email, hashed_password=hashed_password, is_active=True, is_superuser=False, ) db.add(db_user) db.commit() db.refresh(db_user) return db_user