""" File API routes for file operations and analysis """ import os import aiofiles from typing import List, Dict from fastapi import APIRouter, HTTPException, UploadFile, File from pydantic import BaseModel from app.core.config import settings router = APIRouter() class FileInfo(BaseModel): """File information model""" path: str name: str size: int extension: str is_supported: bool language: str = None class FileContent(BaseModel): """File content model""" path: str content: str language: str = None size: int class DirectoryStructure(BaseModel): """Directory structure model""" name: str path: str is_file: bool children: List['DirectoryStructure'] = [] file_info: FileInfo = None @router.get("/supported-extensions") async def get_supported_extensions(): """Get list of supported file extensions""" return { "extensions": settings.SUPPORTED_EXTENSIONS, "max_file_size": settings.MAX_FILE_SIZE } @router.post("/analyze", response_model=List[FileInfo]) async def analyze_files(file_paths: List[str]): """Analyze multiple files and return their information""" file_infos = [] for file_path in file_paths: try: if not os.path.exists(file_path): continue stat = os.stat(file_path) if stat.st_size > settings.MAX_FILE_SIZE: continue name = os.path.basename(file_path) extension = os.path.splitext(name)[1].lower() is_supported = extension in settings.SUPPORTED_EXTENSIONS # Determine language from extension language = get_language_from_extension(extension) file_info = FileInfo( path=file_path, name=name, size=stat.st_size, extension=extension, is_supported=is_supported, language=language ) file_infos.append(file_info) except Exception as e: print(f"Error analyzing file {file_path}: {e}") continue return file_infos @router.post("/read", response_model=FileContent) async def read_file_content(file_path: str): """Read and return file content""" try: if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="File not found") stat = os.stat(file_path) if stat.st_size > settings.MAX_FILE_SIZE: raise HTTPException( status_code=413, detail=f"File too large. Maximum size is {settings.MAX_FILE_SIZE} bytes" ) extension = os.path.splitext(file_path)[1].lower() if extension not in settings.SUPPORTED_EXTENSIONS: raise HTTPException( status_code=415, detail=f"Unsupported file type: {extension}" ) async with aiofiles.open(file_path, 'r', encoding='utf-8') as f: content = await f.read() language = get_language_from_extension(extension) return FileContent( path=file_path, content=content, language=language, size=len(content) ) except UnicodeDecodeError: raise HTTPException( status_code=415, detail="File contains non-text content or unsupported encoding" ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.post("/read-multiple", response_model=List[FileContent]) async def read_multiple_files(file_paths: List[str]): """Read multiple files and return their contents""" file_contents = [] for file_path in file_paths: try: if not os.path.exists(file_path): continue stat = os.stat(file_path) if stat.st_size > settings.MAX_FILE_SIZE: continue extension = os.path.splitext(file_path)[1].lower() if extension not in settings.SUPPORTED_EXTENSIONS: continue async with aiofiles.open(file_path, 'r', encoding='utf-8') as f: content = await f.read() language = get_language_from_extension(extension) file_content = FileContent( path=file_path, content=content, language=language, size=len(content) ) file_contents.append(file_content) except Exception as e: print(f"Error reading file {file_path}: {e}") continue return file_contents @router.get("/directory-structure") async def get_directory_structure(path: str, max_depth: int = 3): """Get directory structure for file explorer""" try: if not os.path.exists(path): raise HTTPException(status_code=404, detail="Directory not found") if not os.path.isdir(path): raise HTTPException(status_code=400, detail="Path is not a directory") structure = await build_directory_structure(path, max_depth) return structure except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.post("/upload") async def upload_file(file: UploadFile = File(...)): """Upload a file for analysis""" try: if file.size > settings.MAX_FILE_SIZE: raise HTTPException( status_code=413, detail=f"File too large. Maximum size is {settings.MAX_FILE_SIZE} bytes" ) extension = os.path.splitext(file.filename)[1].lower() if extension not in settings.SUPPORTED_EXTENSIONS: raise HTTPException( status_code=415, detail=f"Unsupported file type: {extension}" ) content = await file.read() content_str = content.decode('utf-8') language = get_language_from_extension(extension) return FileContent( path=file.filename, content=content_str, language=language, size=len(content_str) ) except UnicodeDecodeError: raise HTTPException( status_code=415, detail="File contains non-text content or unsupported encoding" ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) async def build_directory_structure(path: str, max_depth: int, current_depth: int = 0) -> DirectoryStructure: """Recursively build directory structure""" name = os.path.basename(path) if not name: name = path structure = DirectoryStructure( name=name, path=path, is_file=False ) if current_depth >= max_depth: return structure try: entries = os.listdir(path) entries.sort() for entry in entries: if entry.startswith('.'): # Skip hidden files continue entry_path = os.path.join(path, entry) if os.path.isfile(entry_path): # File entry stat = os.stat(entry_path) extension = os.path.splitext(entry)[1].lower() is_supported = extension in settings.SUPPORTED_EXTENSIONS language = get_language_from_extension(extension) file_info = FileInfo( path=entry_path, name=entry, size=stat.st_size, extension=extension, is_supported=is_supported, language=language ) file_structure = DirectoryStructure( name=entry, path=entry_path, is_file=True, file_info=file_info ) structure.children.append(file_structure) elif os.path.isdir(entry_path): # Directory entry dir_structure = await build_directory_structure( entry_path, max_depth, current_depth + 1 ) structure.children.append(dir_structure) except PermissionError: pass # Skip directories we can't read return structure def get_language_from_extension(extension: str) -> str: """Map file extension to programming language""" extension_map = { '.py': 'python', '.js': 'javascript', '.ts': 'typescript', '.jsx': 'javascript', '.tsx': 'typescript', '.java': 'java', '.go': 'go', '.rs': 'rust', '.cpp': 'cpp', '.cc': 'cpp', '.cxx': 'cpp', '.c': 'c', '.h': 'c', '.hpp': 'cpp', '.cs': 'csharp', '.php': 'php', '.rb': 'ruby', '.swift': 'swift', '.kt': 'kotlin', '.scala': 'scala', '.sh': 'shell', '.bash': 'shell', '.zsh': 'shell', '.html': 'html', '.htm': 'html', '.css': 'css', '.scss': 'scss', '.sass': 'sass', '.sql': 'sql', '.md': 'markdown', '.yaml': 'yaml', '.yml': 'yaml', '.json': 'json', '.xml': 'xml' } return extension_map.get(extension.lower(), 'text')