AgendaTasks/backend/app/services.py
m3mo 911f192c38 Add JWT-based user authentication to backend
- Create User model with bcrypt password hashing
- Add auth routes: register, login, refresh, me
- Implement JWT access and refresh tokens
- Add get_current_user dependency for protected routes
- Update Task model with user_id foreign key for data isolation
- Update TaskService to filter tasks by authenticated user
- Add auth configuration (secret key, token expiry)
2026-02-02 22:57:38 +01:00

92 lines
2.6 KiB
Python

from typing import Optional
from sqlalchemy.orm import Session
from .models import Task
from .schemas import TaskCreate, TaskUpdate
class TaskService:
def __init__(self, db: Session, user_id: str):
self.db = db
self.user_id = user_id
def get_tasks_by_date(
self, date: str, status: Optional[str] = None
) -> list[Task]:
query = self.db.query(Task).filter(
Task.date == date,
Task.user_id == self.user_id
)
if status == "active":
query = query.filter(Task.is_done == False)
elif status == "done":
query = query.filter(Task.is_done == True)
return query.order_by(Task.created_at.desc()).all()
def get_task_by_id(self, task_id: str) -> Optional[Task]:
return self.db.query(Task).filter(
Task.id == task_id,
Task.user_id == self.user_id
).first()
def create_task(self, task_data: TaskCreate) -> Task:
task = Task(
title=task_data.title,
description=task_data.description,
date=task_data.date,
time=task_data.time,
priority=task_data.priority.value,
user_id=self.user_id,
)
self.db.add(task)
self.db.commit()
self.db.refresh(task)
return task
def update_task(self, task_id: str, task_data: TaskUpdate) -> Optional[Task]:
task = self.get_task_by_id(task_id)
if not task:
return None
update_data = task_data.model_dump(exclude_unset=True)
if "priority" in update_data and update_data["priority"]:
update_data["priority"] = update_data["priority"].value
for field, value in update_data.items():
setattr(task, field, value)
self.db.commit()
self.db.refresh(task)
return task
def delete_task(self, task_id: str) -> bool:
task = self.get_task_by_id(task_id)
if not task:
return False
self.db.delete(task)
self.db.commit()
return True
def toggle_task_status(self, task_id: str) -> Optional[Task]:
task = self.get_task_by_id(task_id)
if not task:
return None
task.is_done = not task.is_done
self.db.commit()
self.db.refresh(task)
return task
def reschedule_task(self, task_id: str, target_date: str) -> Optional[Task]:
task = self.get_task_by_id(task_id)
if not task:
return None
task.date = target_date
self.db.commit()
self.db.refresh(task)
return task