Newer
Older
geekbrain_io_web / app / services / gitbucket.py
import asyncio
from typing import List, Dict, Any
from app.config import get_settings
import httpx

# Simple in-memory cache: { 'data': [...], 'timestamp': float }
_cache: Dict[str, Any] = {"data": [], "timestamp": 0.0}

async def fetch_repos(force_refresh: bool = False) -> List[Dict[str, Any]]:
    """
    Fetch public repositories from GitBucket API with caching.

    Returns list of dicts with 'name' and 'description' keys.
    On error, returns cached data if available, else empty list.
    """
    settings = get_settings()
    now = asyncio.get_running_loop().time()

    # Check cache validity
    if not force_refresh and (now - _cache["timestamp"]) < settings.cache_ttl:
        return _cache["data"]

    try:
        async with httpx.AsyncClient() as client:
            response = await client.get(str(settings.gitbucket_url), timeout=10.0)
            response.raise_for_status()
            data = response.json()

            # Extract only name and description
            repos = [
                {"name": repo["name"], "description": repo.get("description", "")}
                for repo in data
            ]

            # Update cache
            _cache["data"] = repos
            _cache["timestamp"] = now
            return repos
    except Exception as e:
        # Log error (in real app use logging)
        print(f"GitBucket API error: {e}")
        # Return cached data if available, otherwise empty list
        return _cache["data"] if _cache["data"] else []