- Flutter frontend with Provider state management - FastAPI backend with SQLAlchemy ORM - Internationalization support (EN/DE) - Clean Architecture folder structure - GoRouter for navigation - GetIt for dependency injection
84 lines
2.4 KiB
Python
84 lines
2.4 KiB
Python
from typing import Optional
|
|
from sqlalchemy.orm import Session
|
|
|
|
from .models import Task
|
|
from .schemas import TaskCreate, TaskUpdate
|
|
|
|
|
|
class TaskService:
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
|
|
def get_tasks_by_date(
|
|
self, date: str, status: Optional[str] = None
|
|
) -> list[Task]:
|
|
query = self.db.query(Task).filter(Task.date == date)
|
|
|
|
if status == "active":
|
|
query = query.filter(Task.is_done == False)
|
|
elif status == "done":
|
|
query = query.filter(Task.is_done == True)
|
|
|
|
return query.order_by(Task.created_at.desc()).all()
|
|
|
|
def get_task_by_id(self, task_id: str) -> Optional[Task]:
|
|
return self.db.query(Task).filter(Task.id == task_id).first()
|
|
|
|
def create_task(self, task_data: TaskCreate) -> Task:
|
|
task = Task(
|
|
title=task_data.title,
|
|
description=task_data.description,
|
|
date=task_data.date,
|
|
time=task_data.time,
|
|
priority=task_data.priority.value,
|
|
)
|
|
self.db.add(task)
|
|
self.db.commit()
|
|
self.db.refresh(task)
|
|
return task
|
|
|
|
def update_task(self, task_id: str, task_data: TaskUpdate) -> Optional[Task]:
|
|
task = self.get_task_by_id(task_id)
|
|
if not task:
|
|
return None
|
|
|
|
update_data = task_data.model_dump(exclude_unset=True)
|
|
if "priority" in update_data and update_data["priority"]:
|
|
update_data["priority"] = update_data["priority"].value
|
|
|
|
for field, value in update_data.items():
|
|
setattr(task, field, value)
|
|
|
|
self.db.commit()
|
|
self.db.refresh(task)
|
|
return task
|
|
|
|
def delete_task(self, task_id: str) -> bool:
|
|
task = self.get_task_by_id(task_id)
|
|
if not task:
|
|
return False
|
|
|
|
self.db.delete(task)
|
|
self.db.commit()
|
|
return True
|
|
|
|
def toggle_task_status(self, task_id: str) -> Optional[Task]:
|
|
task = self.get_task_by_id(task_id)
|
|
if not task:
|
|
return None
|
|
|
|
task.is_done = not task.is_done
|
|
self.db.commit()
|
|
self.db.refresh(task)
|
|
return task
|
|
|
|
def reschedule_task(self, task_id: str, target_date: str) -> Optional[Task]:
|
|
task = self.get_task_by_id(task_id)
|
|
if not task:
|
|
return None
|
|
|
|
task.date = target_date
|
|
self.db.commit()
|
|
self.db.refresh(task)
|
|
return task
|