import asyncio
from typing import List, Dict, Any
from app.config import get_settings
import httpx
# Simple in-memory cache: { 'data': [...], 'timestamp': float }
_cache: Dict[str, Any] = {"data": [], "timestamp": 0.0}
async def fetch_repos(force_refresh: bool = False) -> List[Dict[str, Any]]:
"""
Fetch public repositories from GitBucket API with caching.
Returns list of dicts with 'name' and 'description' keys.
On error, returns cached data if available, else empty list.
"""
settings = get_settings()
now = asyncio.get_running_loop().time()
# Check cache validity
if not force_refresh and (now - _cache["timestamp"]) < settings.cache_ttl:
return _cache["data"]
try:
async with httpx.AsyncClient() as client:
response = await client.get(str(settings.gitbucket_url), timeout=10.0)
response.raise_for_status()
data = response.json()
# Extract only name and description
repos = [
{"name": repo["name"], "description": repo.get("description", "")}
for repo in data
]
# Update cache
_cache["data"] = repos
_cache["timestamp"] = now
return repos
except Exception as e:
# Log error (in real app use logging)
print(f"GitBucket API error: {e}")
# Return cached data if available, otherwise empty list
return _cache["data"] if _cache["data"] else []