#!/usr/bin/env python3 """ Download and pin external asset files from GitHub. Downloads files specified in manifest, pins them to commit SHAs, computes checksums, and optionally syncs vendor metadata back to site JSON files. """ import argparse import hashlib import json import os import sys import time from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional from urllib.parse import urlparse import requests class GitHubAPI: """Simple GitHub API client with rate limit handling.""" def __init__(self, token: Optional[str] = None, delay: float = 0.5): self.token = token or os.getenv('GITHUB_API_TOKEN') or os.getenv('GITHUB_TOKEN') self.session = requests.Session() if self.token: self.session.headers.update({ 'Authorization': f'token {self.token}', 'Accept': 'application/vnd.github.v3+json' }) self.base_url = 'https://api.github.com' self.delay = delay # Delay between requests in seconds self.last_request_time = 0 def _wait_for_rate_limit(self, response: requests.Response) -> None: """Wait if rate limited, using reset time from headers.""" if response.status_code == 403: # Check if it's a rate limit error rate_limit_remaining = response.headers.get('X-RateLimit-Remaining', '1') if rate_limit_remaining == '0' or 'rate limit' in response.text.lower(): reset_time = response.headers.get('X-RateLimit-Reset') if reset_time: reset_timestamp = int(reset_time) wait_seconds = max(0, reset_timestamp - int(time.time())) + 1 print(f" Rate limit exceeded. Waiting {wait_seconds} seconds until reset...", file=sys.stderr) time.sleep(wait_seconds) else: # Fallback: wait 60 seconds print(" Rate limit exceeded. Waiting 60 seconds...", file=sys.stderr) time.sleep(60) def _rate_limit_delay(self) -> None: """Add delay between requests to avoid hitting rate limits.""" current_time = time.time() time_since_last = current_time - self.last_request_time if time_since_last < self.delay: time.sleep(self.delay - time_since_last) self.last_request_time = time.time() def _make_request(self, method: str, url: str, max_retries: int = 3, **kwargs) -> requests.Response: """Make a request with rate limit handling and retries.""" for attempt in range(max_retries): self._rate_limit_delay() try: response = self.session.request(method, url, **kwargs) # Check rate limit if response.status_code == 403: self._wait_for_rate_limit(response) # Retry the request after waiting if attempt < max_retries - 1: continue # Check remaining rate limit remaining = response.headers.get('X-RateLimit-Remaining') if remaining: remaining_int = int(remaining) if remaining_int < 10: print(f" Warning: Only {remaining_int} API requests remaining. Adding delay...", file=sys.stderr) time.sleep(2) return response except requests.RequestException as e: if attempt < max_retries - 1: wait_time = 2 ** attempt # Exponential backoff print(f" Request failed, retrying in {wait_time}s... ({e})", file=sys.stderr) time.sleep(wait_time) else: raise return response def get_default_branch(self, owner: str, repo: str) -> str: """Get default branch for a repository.""" url = f"{self.base_url}/repos/{owner}/{repo}" try: response = self._make_request('GET', url) response.raise_for_status() return response.json().get('default_branch', 'main') except requests.RequestException as e: print(f"Warning: Could not get default branch for {owner}/{repo}: {e}", file=sys.stderr) return 'main' def get_file_sha(self, owner: str, repo: str, path: str, ref: str) -> Optional[str]: """ Get the commit SHA that last modified a file at a given ref. Uses Contents API to get file info, then finds the commit. """ # First, try to get file contents to verify it exists url = f"{self.base_url}/repos/{owner}/{repo}/contents/{path}" params = {'ref': ref} try: response = self._make_request('GET', url, params=params) if response.status_code == 404: # File doesn't exist at this ref, try default branch default_branch = self.get_default_branch(owner, repo) if default_branch != ref: params['ref'] = default_branch response = self._make_request('GET', url, params=params) response.raise_for_status() file_info = response.json() # Get the commit SHA from the file info # The Contents API returns 'sha' which is the blob SHA, not commit SHA # We need to find the commit that last modified this file commits_url = f"{self.base_url}/repos/{owner}/{repo}/commits" commits_params = { 'path': path, 'sha': ref, 'per_page': 1 } commits_response = self._make_request('GET', commits_url, params=commits_params) commits_response.raise_for_status() commits = commits_response.json() if commits: return commits[0]['sha'] # Fallback: use the ref as-is if it's already a SHA if len(ref) == 40 and all(c in '0123456789abcdef' for c in ref.lower()): return ref # Last resort: resolve ref to SHA ref_url = f"{self.base_url}/repos/{owner}/{repo}/git/ref/heads/{ref}" ref_response = self._make_request('GET', ref_url) if ref_response.status_code == 200: return ref_response.json()['object']['sha'] # If ref is a tag ref_url = f"{self.base_url}/repos/{owner}/{repo}/git/ref/tags/{ref}" ref_response = self._make_request('GET', ref_url) if ref_response.status_code == 200: return ref_response.json()['object']['sha'] return None except requests.RequestException as e: print(f"Error getting file SHA for {owner}/{repo}/{path}@{ref}: {e}", file=sys.stderr) return None def get_license(self, owner: str, repo: str, sha: str) -> Optional[str]: """Try to detect license from repository root at given SHA.""" license_files = ['LICENSE', 'LICENSE.txt', 'LICENSE.md', 'LICENCE', 'LICENCE.txt'] for license_file in license_files: url = f"{self.base_url}/repos/{owner}/{repo}/contents/{license_file}" params = {'ref': sha} try: response = self._make_request('GET', url, params=params) if response.status_code == 200: # Found a license file, return URL to it return f"https://raw.githubusercontent.com/{owner}/{repo}/{sha}/{license_file}" except requests.RequestException: continue # Try to get license from repository info try: repo_url = f"{self.base_url}/repos/{owner}/{repo}" response = self._make_request('GET', repo_url) response.raise_for_status() repo_info = response.json() license_info = repo_info.get('license') if license_info: return license_info.get('spdx_id') or license_info.get('url') except requests.RequestException: pass return None def compute_sha256(file_path: Path) -> str: """Compute SHA256 checksum of a file.""" sha256 = hashlib.sha256() with open(file_path, 'rb') as f: for chunk in iter(lambda: f.read(4096), b''): sha256.update(chunk) return sha256.hexdigest() def download_file(url: str, dest_path: Path) -> bool: """Download a file from URL to destination path.""" try: response = requests.get(url, stream=True, timeout=30) response.raise_for_status() # Create parent directories dest_path.parent.mkdir(parents=True, exist_ok=True) # Download file with open(dest_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return True except requests.RequestException as e: print(f"Error downloading {url}: {e}", file=sys.stderr) return False def update_manifest_entry( entry: Dict, api: GitHubAPI, repo_root: Path, dry_run: bool = False ) -> Dict: """Update a single manifest entry by downloading and pinning the file.""" source_repo = entry['source_repo'] owner, repo = source_repo.split('/', 1) source_path = entry['source_path'] source_ref = entry.get('source_ref', 'main') print(f"Processing {entry['id']} from {source_repo}/{source_path}@{source_ref}...") # Get commit SHA for the file commit_sha = api.get_file_sha(owner, repo, source_path, source_ref) if not commit_sha: print(f" Warning: Could not resolve SHA for {source_ref}, skipping", file=sys.stderr) entry['status'] = 'error' return entry # Build pinned raw URL pinned_raw_url = f"https://raw.githubusercontent.com/{owner}/{repo}/{commit_sha}/{source_path}" # Determine local path local_path = Path(entry['local_path']) if not local_path.is_absolute(): local_path = repo_root / local_path if dry_run: print(f" [DRY RUN] Would download to {local_path}") print(f" [DRY RUN] Pinned SHA: {commit_sha}") entry['pinned_sha'] = commit_sha entry['pinned_raw_url'] = pinned_raw_url entry['last_checked'] = datetime.now(timezone.utc).isoformat() entry['upstream_latest_sha'] = commit_sha entry['status'] = 'up-to-date' return entry # Download file print(f" Downloading from {pinned_raw_url}...") if not download_file(pinned_raw_url, local_path): entry['status'] = 'error' return entry # Compute checksum checksum = compute_sha256(local_path) print(f" Checksum: {checksum[:16]}...") # Get license info license_info = api.get_license(owner, repo, commit_sha) # Update entry entry['pinned_sha'] = commit_sha entry['pinned_raw_url'] = pinned_raw_url entry['checksum_sha256'] = checksum entry['last_checked'] = datetime.now(timezone.utc).isoformat() entry['upstream_latest_sha'] = commit_sha entry['status'] = 'up-to-date' if license_info: entry['license'] = license_info return entry def sync_to_site_json(entry: Dict, repo_root: Path) -> bool: """Sync vendor metadata back to the original site JSON file.""" orig_json_path = entry.get('orig_site_json') orig_item_id = entry.get('orig_item_id') if not orig_json_path or not orig_item_id: return False json_path = repo_root / orig_json_path if not json_path.exists(): print(f" Warning: Site JSON file not found: {json_path}", file=sys.stderr) return False try: with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) # Find the printed part in the nested structure def find_and_update_part(obj, target_id, path=''): if isinstance(obj, dict): # Check if this is a printedParts array if 'printedParts' in obj and isinstance(obj['printedParts'], list): for part in obj['printedParts']: if isinstance(part, dict) and part.get('id') == target_id: # Update this part if 'vendor' not in part: part['vendor'] = {} part['vendor'].update({ 'manifest_id': entry['id'], 'local_path': entry['local_path'], 'pinned_sha': entry['pinned_sha'], 'pinned_raw_url': entry['pinned_raw_url'], 'checksum_sha256': entry['checksum_sha256'], 'last_checked': entry['last_checked'], 'status': entry['status'] }) return True # Check bodyParts, knobs, etc. for key in ['bodyParts', 'knobs']: if key in obj and isinstance(obj[key], list): for part in obj[key]: if isinstance(part, dict) and part.get('id') == target_id: if 'vendor' not in part: part['vendor'] = {} part['vendor'].update({ 'manifest_id': entry['id'], 'local_path': entry['local_path'], 'pinned_sha': entry['pinned_sha'], 'pinned_raw_url': entry['pinned_raw_url'], 'checksum_sha256': entry['checksum_sha256'], 'last_checked': entry['last_checked'], 'status': entry['status'] }) return True # Recursively search for value in obj.values(): if find_and_update_part(value, target_id): return True elif isinstance(obj, list): for item in obj: if find_and_update_part(item, target_id): return True return False if not find_and_update_part(data, orig_item_id): print(f" Warning: Could not find part with id '{orig_item_id}' in {json_path}", file=sys.stderr) return False # Write back to file (preserve formatting) with open(json_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) print(f" Updated {json_path}") return True except (json.JSONDecodeError, IOError) as e: print(f" Error updating {json_path}: {e}", file=sys.stderr) return False def main(): parser = argparse.ArgumentParser( description='Download and pin external asset files from GitHub' ) parser.add_argument( '--manifest', type=Path, default=Path('manifest/vendor_manifest.json'), help='Path to manifest file (default: manifest/vendor_manifest.json)' ) parser.add_argument( '--entry', type=str, help='Process only a specific manifest entry by ID' ) parser.add_argument( '--dry-run', action='store_true', help='Show what would be done without downloading files' ) parser.add_argument( '--sync-site', action='store_true', help='Sync vendor metadata back to site JSON files' ) parser.add_argument( '--delay', type=float, default=0.5, help='Delay between API requests in seconds (default: 0.5)' ) args = parser.parse_args() # Resolve paths script_dir = Path(__file__).parent.parent manifest_path = (script_dir / args.manifest).resolve() repo_root = script_dir if not manifest_path.exists(): print(f"Error: Manifest file not found: {manifest_path}", file=sys.stderr) sys.exit(1) # Load manifest with open(manifest_path, 'r', encoding='utf-8') as f: manifest_data = json.load(f) # Convert to dict if it's a list if isinstance(manifest_data, list): manifest = {entry['id']: entry for entry in manifest_data} else: manifest = manifest_data # Filter entries if --entry specified if args.entry: if args.entry not in manifest: print(f"Error: Entry '{args.entry}' not found in manifest", file=sys.stderr) sys.exit(1) entries_to_process = {args.entry: manifest[args.entry]} else: entries_to_process = manifest # Initialize GitHub API with delay api = GitHubAPI(delay=args.delay) # Process entries updated_count = 0 for entry_id, entry in entries_to_process.items(): updated_entry = update_manifest_entry(entry, api, repo_root, dry_run=args.dry_run) manifest[entry_id] = updated_entry if args.sync_site and not args.dry_run: sync_to_site_json(updated_entry, repo_root) updated_count += 1 # Write updated manifest if not args.dry_run: manifest_list = sorted(manifest.values(), key=lambda x: x['id']) with open(manifest_path, 'w', encoding='utf-8') as f: json.dump(manifest_list, f, indent=2, sort_keys=False) print(f"\nUpdated manifest with {updated_count} entries.") else: print(f"\n[DRY RUN] Would update {updated_count} entries.") if __name__ == '__main__': main()