Files
ossm-configurator/scripts/vendor_update.py

673 lines
26 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Download and pin external asset files from GitHub.
Automatically scans website/src/data/components for parts with GitHub URLs,
updates the manifest, and then downloads/pins files.
"""
import argparse
import hashlib
import json
import os
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Generator, Any
from urllib.parse import urlparse, unquote, parse_qs
import requests
class GitHubAPI:
"""Simple GitHub API client with rate limit handling."""
def __init__(self, token: Optional[str] = None, delay: float = 0.5):
self.token = token or os.getenv('GITHUB_API_TOKEN') or os.getenv('GITHUB_TOKEN')
self.session = requests.Session()
if self.token:
self.session.headers.update({
'Authorization': f'token {self.token}',
'Accept': 'application/vnd.github.v3+json'
})
self.base_url = 'https://api.github.com'
self.delay = delay # Delay between requests in seconds
self.last_request_time = 0
def _wait_for_rate_limit(self, response: requests.Response) -> None:
"""Wait if rate limited, using reset time from headers."""
if response.status_code == 403:
# Check if it's a rate limit error
rate_limit_remaining = response.headers.get('X-RateLimit-Remaining', '1')
if rate_limit_remaining == '0' or 'rate limit' in response.text.lower():
reset_time = response.headers.get('X-RateLimit-Reset')
if reset_time:
reset_timestamp = int(reset_time)
wait_seconds = max(0, reset_timestamp - int(time.time())) + 1
print(f" Rate limit exceeded. Waiting {wait_seconds} seconds until reset...", file=sys.stderr)
time.sleep(wait_seconds)
else:
# Fallback: wait 60 seconds
print(" Rate limit exceeded. Waiting 60 seconds...", file=sys.stderr)
time.sleep(60)
def _rate_limit_delay(self) -> None:
"""Add delay between requests to avoid hitting rate limits."""
current_time = time.time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.delay:
time.sleep(self.delay - time_since_last)
self.last_request_time = time.time()
def _make_request(self, method: str, url: str, max_retries: int = 3, **kwargs) -> requests.Response:
"""Make a request with rate limit handling and retries."""
for attempt in range(max_retries):
self._rate_limit_delay()
try:
response = self.session.request(method, url, **kwargs)
# Check rate limit
if response.status_code == 403:
self._wait_for_rate_limit(response)
# Retry the request after waiting
if attempt < max_retries - 1:
continue
# Check remaining rate limit
remaining = response.headers.get('X-RateLimit-Remaining')
if remaining:
remaining_int = int(remaining)
if remaining_int < 10:
print(f" Warning: Only {remaining_int} API requests remaining. Adding delay...", file=sys.stderr)
time.sleep(2)
return response
except requests.RequestException as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
print(f" Request failed, retrying in {wait_time}s... ({e})", file=sys.stderr)
time.sleep(wait_time)
else:
raise
return response
def get_default_branch(self, owner: str, repo: str) -> str:
"""Get default branch for a repository."""
url = f"{self.base_url}/repos/{owner}/{repo}"
try:
response = self._make_request('GET', url)
response.raise_for_status()
return response.json().get('default_branch', 'main')
except requests.RequestException as e:
print(f"Warning: Could not get default branch for {owner}/{repo}: {e}", file=sys.stderr)
return 'main'
def get_file_sha(self, owner: str, repo: str, path: str, ref: str) -> Optional[str]:
"""
Get the commit SHA that last modified a file at a given ref.
Uses Contents API to get file info, then finds the commit.
"""
# First, try to get file contents to verify it exists
url = f"{self.base_url}/repos/{owner}/{repo}/contents/{path}"
params = {'ref': ref}
try:
response = self._make_request('GET', url, params=params)
if response.status_code == 404:
# File doesn't exist at this ref, try default branch
default_branch = self.get_default_branch(owner, repo)
if default_branch != ref:
params['ref'] = default_branch
response = self._make_request('GET', url, params=params)
response.raise_for_status()
file_info = response.json()
# Get the commit SHA from the file info
# The Contents API returns 'sha' which is the blob SHA, not commit SHA
# We need to find the commit that last modified this file
commits_url = f"{self.base_url}/repos/{owner}/{repo}/commits"
commits_params = {
'path': path,
'sha': ref,
'per_page': 1
}
commits_response = self._make_request('GET', commits_url, params=commits_params)
commits_response.raise_for_status()
commits = commits_response.json()
if commits:
return commits[0]['sha']
# Fallback: use the ref as-is if it's already a SHA
if len(ref) == 40 and all(c in '0123456789abcdef' for c in ref.lower()):
return ref
# Last resort: resolve ref to SHA
ref_url = f"{self.base_url}/repos/{owner}/{repo}/git/ref/heads/{ref}"
ref_response = self._make_request('GET', ref_url)
if ref_response.status_code == 200:
return ref_response.json()['object']['sha']
# If ref is a tag
ref_url = f"{self.base_url}/repos/{owner}/{repo}/git/ref/tags/{ref}"
ref_response = self._make_request('GET', ref_url)
if ref_response.status_code == 200:
return ref_response.json()['object']['sha']
return None
except requests.RequestException as e:
print(f"Error getting file SHA for {owner}/{repo}/{path}@{ref}: {e}", file=sys.stderr)
return None
def get_license(self, owner: str, repo: str, sha: str) -> Optional[str]:
"""Try to detect license from repository root at given SHA."""
license_files = ['LICENSE', 'LICENSE.txt', 'LICENSE.md', 'LICENCE', 'LICENCE.txt']
for license_file in license_files:
url = f"{self.base_url}/repos/{owner}/{repo}/contents/{license_file}"
params = {'ref': sha}
try:
response = self._make_request('GET', url, params=params)
if response.status_code == 200:
# Found a license file, return URL to it
return f"https://raw.githubusercontent.com/{owner}/{repo}/{sha}/{license_file}"
except requests.RequestException:
continue
# Try to get license from repository info
try:
repo_url = f"{self.base_url}/repos/{owner}/{repo}"
response = self._make_request('GET', repo_url)
response.raise_for_status()
repo_info = response.json()
license_info = repo_info.get('license')
if license_info:
return license_info.get('spdx_id') or license_info.get('url')
except requests.RequestException:
pass
return None
def compute_sha256(file_path: Path) -> str:
"""Compute SHA256 checksum of a file."""
sha256 = hashlib.sha256()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b''):
sha256.update(chunk)
return sha256.hexdigest()
def download_file(url: str, dest_path: Path) -> bool:
"""Download a file from URL to destination path."""
try:
response = requests.get(url, stream=True, timeout=30)
response.raise_for_status()
# Create parent directories
dest_path.parent.mkdir(parents=True, exist_ok=True)
# Download file
with open(dest_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return True
except requests.RequestException as e:
print(f"Error downloading {url}: {e}", file=sys.stderr)
return False
def parse_github_url(url: str) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]:
"""
Parse GitHub URL to return (owner, repo, ref, path).
Supports:
- https://github.com/owner/repo/blob/<ref>/path/to/file
- https://github.com/owner/repo/raw/<ref>/path/to/file
- https://raw.githubusercontent.com/owner/repo/<ref>/path/to/file
"""
if not url or not isinstance(url, str):
return None, None, None, None
# Check if it's a GitHub URL
if 'github.com' not in url:
return None, None, None, None
try:
# Handle raw.githubusercontent.com
if 'raw.githubusercontent.com' in url:
match_parts = url.split('/')
# https://raw.githubusercontent.com/OWNER/REPO/REF/PATH...
# parts: [https:, , raw.githubusercontent.com, OWNER, REPO, REF, PATH...]
if len(match_parts) >= 6:
owner = match_parts[3]
repo = match_parts[4]
ref = match_parts[5]
path = '/'.join(match_parts[6:]).split('?')[0]
return owner, repo, ref, unquote(path)
# Handle github.com and action.github.com
parsed = urlparse(url)
path = parsed.path.strip('/')
path_parts = path.split('/')
if len(path_parts) >= 4:
owner = path_parts[0]
repo = path_parts[1]
mode = path_parts[2] # 'blob' or 'raw'
if mode in ('blob', 'raw'):
ref = path_parts[3]
file_path = '/'.join(path_parts[4:])
# Check query params for ?raw=true
query_params = parse_qs(parsed.query)
if 'raw' in query_params or mode == 'raw':
return owner, repo, ref, unquote(file_path)
# Also treat 'blob' as a valid source if we just want the path
return owner, repo, ref, unquote(file_path)
except Exception:
pass
return None, None, None, None
def scan_site_components(components_dir: Path) -> Generator[Dict[str, Any], None, None]:
"""Recursively scan JSON files for parts with GitHub URLs."""
for json_file in components_dir.rglob('*.json'):
try:
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# Helper to find parts
queue = [data]
while queue:
item = queue.pop(0)
if isinstance(item, dict):
# Check if this item is a part
if 'id' in item and 'url' in item and item['url']:
owner, repo, ref, source_path = parse_github_url(item['url'])
if owner and repo and source_path:
yield {
'id': item['id'],
'url': item['url'],
'owner': owner,
'repo': repo,
'ref': ref or 'main',
'source_path': source_path,
'orig_site_json': json_file
}
# Add children to queue
queue.extend(item.values())
elif isinstance(item, list):
queue.extend(item)
except (json.JSONDecodeError, IOError) as e:
print(f"Warning: Could not read {json_file}: {e}", file=sys.stderr)
def regenerate_manifest(manifest_path: Path, repo_root: Path) -> Tuple[List[Dict], int]:
"""
Regenerate manifest from site data.
Preserves state of existing entries.
Returns (new_manifest_list, changes_count).
"""
print("Scanning website components to regenerate manifest...")
# Load existing manifest to preserve state
old_manifest = {}
if manifest_path.exists():
with open(manifest_path, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, list):
old_manifest = {entry['id']: entry for entry in data}
new_manifest = {}
components_dir = repo_root / 'website/src/data/components'
changes_count = 0
if not components_dir.exists():
print(f"Warning: Components directory not found: {components_dir}", file=sys.stderr)
return list(old_manifest.values()), 0
for part in scan_site_components(components_dir):
part_id = part['id']
old_entry = old_manifest.get(part_id)
# Calculate local path
# vendor/{owner}-{repo}/{path}
local_path = f"vendor/{part['owner']}-{part['repo']}/{part['source_path']}"
source_repo = f"{part['owner']}/{part['repo']}"
orig_site_json = str(part['orig_site_json'].relative_to(repo_root))
entry = {
'id': part_id,
'source_repo': source_repo,
'source_path': part['source_path'],
'source_ref': part['ref'],
'local_path': local_path,
'orig_site_json': orig_site_json,
'orig_item_id': part_id
}
# Preserve state if exists and config matches
if old_entry:
# Check if source config changed
config_changed = (
old_entry.get('source_repo') != source_repo or
old_entry.get('source_path') != part['source_path'] or
old_entry.get('source_ref') != part['ref']
)
if not config_changed:
# Copy state
for key in ['pinned_sha', 'pinned_raw_url', 'checksum_sha256', 'last_checked', 'status', 'license', 'upstream_latest_sha']:
if key in old_entry:
entry[key] = old_entry[key]
else:
print(f" Config changed for {part_id}, resetting status.")
entry['status'] = 'pending'
entry['pinned_sha'] = None
changes_count += 1
# Check if we updated manifest info (like orig_site_json moved)
if (old_entry.get('orig_site_json') != orig_site_json or
old_entry.get('local_path') != local_path):
changes_count += 1
else:
print(f" New part found: {part_id}")
entry['status'] = 'pending'
entry['pinned_sha'] = None
changes_count += 1
new_manifest[part_id] = entry
# Check for removed items
removed_count = len(old_manifest) - len(new_manifest)
if removed_count > 0:
print(f" Removed {removed_count} parts that are no longer in site JSONs.")
changes_count += removed_count
return sorted(new_manifest.values(), key=lambda x: x['id']), changes_count
def update_manifest_entry(
entry: Dict,
api: GitHubAPI,
repo_root: Path,
dry_run: bool = False
) -> Dict:
"""Update a single manifest entry by downloading and pinning the file."""
source_repo = entry['source_repo']
owner, repo = source_repo.split('/', 1)
source_path = entry['source_path']
source_ref = entry.get('source_ref', 'main')
print(f"Processing {entry['id']} from {source_repo}/{source_path}@{source_ref}...")
# Get commit SHA for the file
commit_sha = api.get_file_sha(owner, repo, source_path, source_ref)
if not commit_sha:
print(f" Warning: Could not resolve SHA for {source_ref}, skipping", file=sys.stderr)
entry['status'] = 'error'
return entry
# Build pinned raw URL
pinned_raw_url = f"https://raw.githubusercontent.com/{owner}/{repo}/{commit_sha}/{source_path}"
# Determine local path
local_path = Path(entry['local_path'])
if not local_path.is_absolute():
local_path = repo_root / local_path
# Check if file exists and is already at the correct version
current_pinned_sha = entry.get('pinned_sha')
if current_pinned_sha == commit_sha and local_path.exists():
if dry_run:
print(f" [DRY RUN] File up to date ({commit_sha}), would skip download.")
else:
print(f" File up to date ({commit_sha}), skipping download.")
# Ensure checksum is present
if 'checksum_sha256' not in entry or not entry['checksum_sha256']:
entry['checksum_sha256'] = compute_sha256(local_path)
entry['pinned_sha'] = commit_sha
entry['pinned_raw_url'] = pinned_raw_url
entry['last_checked'] = datetime.now(timezone.utc).isoformat()
entry['upstream_latest_sha'] = commit_sha
entry['status'] = 'up-to-date'
# If license is missing, try to get it, otherwise keep existing
if 'license' not in entry and not dry_run:
license_info = api.get_license(owner, repo, commit_sha)
if license_info:
entry['license'] = license_info
return entry
if dry_run:
print(f" [DRY RUN] Would download to {local_path}")
print(f" [DRY RUN] Pinned SHA: {commit_sha}")
entry['pinned_sha'] = commit_sha
entry['pinned_raw_url'] = pinned_raw_url
entry['last_checked'] = datetime.now(timezone.utc).isoformat()
entry['upstream_latest_sha'] = commit_sha
entry['status'] = 'up-to-date'
return entry
# Download file
print(f" Downloading from {pinned_raw_url}...")
if not download_file(pinned_raw_url, local_path):
entry['status'] = 'error'
return entry
# Compute checksum
checksum = compute_sha256(local_path)
print(f" Checksum: {checksum[:16]}...")
# Get license info
license_info = api.get_license(owner, repo, commit_sha)
# Update entry
entry['pinned_sha'] = commit_sha
entry['pinned_raw_url'] = pinned_raw_url
entry['checksum_sha256'] = checksum
entry['last_checked'] = datetime.now(timezone.utc).isoformat()
entry['upstream_latest_sha'] = commit_sha
entry['status'] = 'up-to-date'
if license_info:
entry['license'] = license_info
return entry
def sync_to_site_json(entry: Dict, repo_root: Path) -> bool:
"""Sync vendor metadata back to the original site JSON file."""
orig_json_path = entry.get('orig_site_json')
orig_item_id = entry.get('orig_item_id')
if not orig_json_path or not orig_item_id:
return False
json_path = repo_root / orig_json_path
if not json_path.exists():
print(f" Warning: Site JSON file not found: {json_path}", file=sys.stderr)
return False
try:
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Find the printed part in the nested structure
def find_and_update_part(obj, target_id):
if isinstance(obj, dict):
# If this object IS the part (has the ID)
if obj.get('id') == target_id:
if 'vendor' not in obj:
obj['vendor'] = {}
obj['vendor'].update({
'manifest_id': entry['id'],
'local_path': entry['local_path'],
'pinned_sha': entry['pinned_sha'],
'pinned_raw_url': entry['pinned_raw_url'],
'checksum_sha256': entry['checksum_sha256'],
'last_checked': entry['last_checked'],
'status': entry['status']
})
return True
# Recursively search values
for value in obj.values():
if find_and_update_part(value, target_id):
return True
elif isinstance(obj, list):
for item in obj:
if find_and_update_part(item, target_id):
return True
return False
if not find_and_update_part(data, orig_item_id):
print(f" Warning: Could not find part with id '{orig_item_id}' in {json_path}", file=sys.stderr)
return False
# Write back to file (preserve formatting)
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f" Updated {json_path}")
return True
except (json.JSONDecodeError, IOError) as e:
print(f" Error updating {json_path}: {e}", file=sys.stderr)
return False
def main():
parser = argparse.ArgumentParser(
description='Download and pin external asset files from GitHub'
)
parser.add_argument(
'--manifest',
type=Path,
default=Path('manifest/vendor_manifest.json'),
help='Path to manifest file (default: manifest/vendor_manifest.json)'
)
parser.add_argument(
'--entry',
type=str,
help='Process only a specific manifest entry by ID'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Show what would be done without downloading files'
)
parser.add_argument(
'--no-sync',
action='store_true',
help='Skip syncing vendor metadata back to site JSON files'
)
parser.add_argument(
'--delay',
type=float,
default=0.5,
help='Delay between API requests in seconds (default: 0.5)'
)
parser.add_argument(
'--no-scan',
action='store_true',
help='Skip scanning website for new components'
)
parser.add_argument(
'--scan-only',
action='store_true',
help='Only scan website and update manifest, do not check/download files'
)
args = parser.parse_args()
# Resolve paths
script_dir = Path(__file__).parent.parent
manifest_path = (script_dir / args.manifest).resolve()
repo_root = script_dir
# Regenerate manifest from website scan (unless disabled)
if not args.no_scan and not args.entry:
manifest_list, changes = regenerate_manifest(manifest_path, repo_root)
if changes > 0:
print(f"Manifest regenerated with {changes} changes.")
if not args.dry_run:
manifest_path.parent.mkdir(parents=True, exist_ok=True)
with open(manifest_path, 'w', encoding='utf-8') as f:
json.dump(manifest_list, f, indent=2, sort_keys=False)
else:
print("No changes in manifest structure detected.")
if args.scan_only:
return
# Reload manifest data for processing
manifest_data = manifest_list
else:
if not manifest_path.exists():
print(f"Error: Manifest file not found: {manifest_path}", file=sys.stderr)
sys.exit(1)
with open(manifest_path, 'r', encoding='utf-8') as f:
manifest_data = json.load(f)
# Convert to dict if it's a list
if isinstance(manifest_data, list):
manifest = {entry['id']: entry for entry in manifest_data}
else:
manifest = manifest_data
# Filter entries if --entry specified
if args.entry:
if args.entry not in manifest:
print(f"Error: Entry '{args.entry}' not found in manifest", file=sys.stderr)
sys.exit(1)
entries_to_process = {args.entry: manifest[args.entry]}
else:
entries_to_process = manifest
# Initialize GitHub API with delay
api = GitHubAPI(delay=args.delay)
# Process entries
updated_count = 0
for entry_id, entry in entries_to_process.items():
updated_entry = update_manifest_entry(entry, api, repo_root, dry_run=args.dry_run)
manifest[entry_id] = updated_entry
if not args.no_sync and not args.dry_run:
sync_to_site_json(updated_entry, repo_root)
updated_count += 1
# Write updated manifest
if not args.dry_run:
manifest_list = sorted(manifest.values(), key=lambda x: x['id'])
with open(manifest_path, 'w', encoding='utf-8') as f:
json.dump(manifest_list, f, indent=2, sort_keys=False)
print(f"\nUpdated manifest with {updated_count} entries.")
else:
print(f"\n[DRY RUN] Would update {updated_count} entries.")
if __name__ == '__main__':
main()