AgendaTasks/backend/app/services.py
m3mo cb308bbf68 Initial project setup with Clean Architecture
- Flutter frontend with Provider state management
- FastAPI backend with SQLAlchemy ORM
- Internationalization support (EN/DE)
- Clean Architecture folder structure
- GoRouter for navigation
- GetIt for dependency injection
2026-02-02 16:43:37 +01:00

84 lines
2.4 KiB
Python

from typing import Optional
from sqlalchemy.orm import Session
from .models import Task
from .schemas import TaskCreate, TaskUpdate
class TaskService:
def __init__(self, db: Session):
self.db = db
def get_tasks_by_date(
self, date: str, status: Optional[str] = None
) -> list[Task]:
query = self.db.query(Task).filter(Task.date == date)
if status == "active":
query = query.filter(Task.is_done == False)
elif status == "done":
query = query.filter(Task.is_done == True)
return query.order_by(Task.created_at.desc()).all()
def get_task_by_id(self, task_id: str) -> Optional[Task]:
return self.db.query(Task).filter(Task.id == task_id).first()
def create_task(self, task_data: TaskCreate) -> Task:
task = Task(
title=task_data.title,
description=task_data.description,
date=task_data.date,
time=task_data.time,
priority=task_data.priority.value,
)
self.db.add(task)
self.db.commit()
self.db.refresh(task)
return task
def update_task(self, task_id: str, task_data: TaskUpdate) -> Optional[Task]:
task = self.get_task_by_id(task_id)
if not task:
return None
update_data = task_data.model_dump(exclude_unset=True)
if "priority" in update_data and update_data["priority"]:
update_data["priority"] = update_data["priority"].value
for field, value in update_data.items():
setattr(task, field, value)
self.db.commit()
self.db.refresh(task)
return task
def delete_task(self, task_id: str) -> bool:
task = self.get_task_by_id(task_id)
if not task:
return False
self.db.delete(task)
self.db.commit()
return True
def toggle_task_status(self, task_id: str) -> Optional[Task]:
task = self.get_task_by_id(task_id)
if not task:
return None
task.is_done = not task.is_done
self.db.commit()
self.db.refresh(task)
return task
def reschedule_task(self, task_id: str, target_date: str) -> Optional[Task]:
task = self.get_task_by_id(task_id)
if not task:
return None
task.date = target_date
self.db.commit()
self.db.refresh(task)
return task