2025-08-13 18:05:26 +02:00

89 lines
2.6 KiB
Python

from datetime import datetime, timedelta
from typing import Optional, Union
import uuid
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from app.core.config import settings
from app.models.user import User
from app.schemas.user import UserCreate
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash a password."""
return pwd_context.hash(password)
def create_access_token(
subject: Union[str, int], expires_delta: timedelta = None
) -> str:
"""Create a JWT access token."""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(to_encode, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
return encoded_jwt
def verify_token(token: str) -> Optional[str]:
"""Verify a JWT token and return the subject."""
try:
payload = jwt.decode(
token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM]
)
token_data = payload.get("sub")
return token_data
except JWTError:
return None
def authenticate_user(db: Session, username: str, password: str) -> Optional[User]:
"""Authenticate a user with username and password."""
user = db.query(User).filter(User.username == username).first()
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def get_user_by_username(db: Session, username: str) -> Optional[User]:
"""Get user by username."""
return db.query(User).filter(User.username == username).first()
def get_user_by_email(db: Session, email: str) -> Optional[User]:
"""Get user by email."""
return db.query(User).filter(User.email == email).first()
def create_user(db: Session, user: UserCreate) -> User:
"""Create a new user."""
hashed_password = get_password_hash(user.password)
db_user = User(
id=str(uuid.uuid4()),
username=user.username,
email=user.email,
hashed_password=hashed_password,
is_active=True,
is_superuser=False,
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user