673 lines
26 KiB
Python
Executable File
673 lines
26 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Download and pin external asset files from GitHub.
|
|
|
|
Automatically scans website/src/data/components for parts with GitHub URLs,
|
|
updates the manifest, and then downloads/pins files.
|
|
"""
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple, Generator, Any
|
|
from urllib.parse import urlparse, unquote, parse_qs
|
|
|
|
import requests
|
|
|
|
|
|
class GitHubAPI:
|
|
"""Simple GitHub API client with rate limit handling."""
|
|
|
|
def __init__(self, token: Optional[str] = None, delay: float = 0.5):
|
|
self.token = token or os.getenv('GITHUB_API_TOKEN') or os.getenv('GITHUB_TOKEN')
|
|
self.session = requests.Session()
|
|
if self.token:
|
|
self.session.headers.update({
|
|
'Authorization': f'token {self.token}',
|
|
'Accept': 'application/vnd.github.v3+json'
|
|
})
|
|
self.base_url = 'https://api.github.com'
|
|
self.delay = delay # Delay between requests in seconds
|
|
self.last_request_time = 0
|
|
|
|
def _wait_for_rate_limit(self, response: requests.Response) -> None:
|
|
"""Wait if rate limited, using reset time from headers."""
|
|
if response.status_code == 403:
|
|
# Check if it's a rate limit error
|
|
rate_limit_remaining = response.headers.get('X-RateLimit-Remaining', '1')
|
|
if rate_limit_remaining == '0' or 'rate limit' in response.text.lower():
|
|
reset_time = response.headers.get('X-RateLimit-Reset')
|
|
if reset_time:
|
|
reset_timestamp = int(reset_time)
|
|
wait_seconds = max(0, reset_timestamp - int(time.time())) + 1
|
|
print(f" Rate limit exceeded. Waiting {wait_seconds} seconds until reset...", file=sys.stderr)
|
|
time.sleep(wait_seconds)
|
|
else:
|
|
# Fallback: wait 60 seconds
|
|
print(" Rate limit exceeded. Waiting 60 seconds...", file=sys.stderr)
|
|
time.sleep(60)
|
|
|
|
def _rate_limit_delay(self) -> None:
|
|
"""Add delay between requests to avoid hitting rate limits."""
|
|
current_time = time.time()
|
|
time_since_last = current_time - self.last_request_time
|
|
if time_since_last < self.delay:
|
|
time.sleep(self.delay - time_since_last)
|
|
self.last_request_time = time.time()
|
|
|
|
def _make_request(self, method: str, url: str, max_retries: int = 3, **kwargs) -> requests.Response:
|
|
"""Make a request with rate limit handling and retries."""
|
|
for attempt in range(max_retries):
|
|
self._rate_limit_delay()
|
|
|
|
try:
|
|
response = self.session.request(method, url, **kwargs)
|
|
|
|
# Check rate limit
|
|
if response.status_code == 403:
|
|
self._wait_for_rate_limit(response)
|
|
# Retry the request after waiting
|
|
if attempt < max_retries - 1:
|
|
continue
|
|
|
|
# Check remaining rate limit
|
|
remaining = response.headers.get('X-RateLimit-Remaining')
|
|
if remaining:
|
|
remaining_int = int(remaining)
|
|
if remaining_int < 10:
|
|
print(f" Warning: Only {remaining_int} API requests remaining. Adding delay...", file=sys.stderr)
|
|
time.sleep(2)
|
|
|
|
return response
|
|
|
|
except requests.RequestException as e:
|
|
if attempt < max_retries - 1:
|
|
wait_time = 2 ** attempt # Exponential backoff
|
|
print(f" Request failed, retrying in {wait_time}s... ({e})", file=sys.stderr)
|
|
time.sleep(wait_time)
|
|
else:
|
|
raise
|
|
|
|
return response
|
|
|
|
def get_default_branch(self, owner: str, repo: str) -> str:
|
|
"""Get default branch for a repository."""
|
|
url = f"{self.base_url}/repos/{owner}/{repo}"
|
|
try:
|
|
response = self._make_request('GET', url)
|
|
response.raise_for_status()
|
|
return response.json().get('default_branch', 'main')
|
|
except requests.RequestException as e:
|
|
print(f"Warning: Could not get default branch for {owner}/{repo}: {e}", file=sys.stderr)
|
|
return 'main'
|
|
|
|
def get_file_sha(self, owner: str, repo: str, path: str, ref: str) -> Optional[str]:
|
|
"""
|
|
Get the commit SHA that last modified a file at a given ref.
|
|
Uses Contents API to get file info, then finds the commit.
|
|
"""
|
|
# First, try to get file contents to verify it exists
|
|
url = f"{self.base_url}/repos/{owner}/{repo}/contents/{path}"
|
|
params = {'ref': ref}
|
|
|
|
try:
|
|
response = self._make_request('GET', url, params=params)
|
|
if response.status_code == 404:
|
|
# File doesn't exist at this ref, try default branch
|
|
default_branch = self.get_default_branch(owner, repo)
|
|
if default_branch != ref:
|
|
params['ref'] = default_branch
|
|
response = self._make_request('GET', url, params=params)
|
|
|
|
response.raise_for_status()
|
|
file_info = response.json()
|
|
|
|
# Get the commit SHA from the file info
|
|
# The Contents API returns 'sha' which is the blob SHA, not commit SHA
|
|
# We need to find the commit that last modified this file
|
|
commits_url = f"{self.base_url}/repos/{owner}/{repo}/commits"
|
|
commits_params = {
|
|
'path': path,
|
|
'sha': ref,
|
|
'per_page': 1
|
|
}
|
|
|
|
commits_response = self._make_request('GET', commits_url, params=commits_params)
|
|
commits_response.raise_for_status()
|
|
commits = commits_response.json()
|
|
|
|
if commits:
|
|
return commits[0]['sha']
|
|
|
|
# Fallback: use the ref as-is if it's already a SHA
|
|
if len(ref) == 40 and all(c in '0123456789abcdef' for c in ref.lower()):
|
|
return ref
|
|
|
|
# Last resort: resolve ref to SHA
|
|
ref_url = f"{self.base_url}/repos/{owner}/{repo}/git/ref/heads/{ref}"
|
|
ref_response = self._make_request('GET', ref_url)
|
|
if ref_response.status_code == 200:
|
|
return ref_response.json()['object']['sha']
|
|
|
|
# If ref is a tag
|
|
ref_url = f"{self.base_url}/repos/{owner}/{repo}/git/ref/tags/{ref}"
|
|
ref_response = self._make_request('GET', ref_url)
|
|
if ref_response.status_code == 200:
|
|
return ref_response.json()['object']['sha']
|
|
|
|
return None
|
|
|
|
except requests.RequestException as e:
|
|
print(f"Error getting file SHA for {owner}/{repo}/{path}@{ref}: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
def get_license(self, owner: str, repo: str, sha: str) -> Optional[str]:
|
|
"""Try to detect license from repository root at given SHA."""
|
|
license_files = ['LICENSE', 'LICENSE.txt', 'LICENSE.md', 'LICENCE', 'LICENCE.txt']
|
|
|
|
for license_file in license_files:
|
|
url = f"{self.base_url}/repos/{owner}/{repo}/contents/{license_file}"
|
|
params = {'ref': sha}
|
|
|
|
try:
|
|
response = self._make_request('GET', url, params=params)
|
|
if response.status_code == 200:
|
|
# Found a license file, return URL to it
|
|
return f"https://raw.githubusercontent.com/{owner}/{repo}/{sha}/{license_file}"
|
|
except requests.RequestException:
|
|
continue
|
|
|
|
# Try to get license from repository info
|
|
try:
|
|
repo_url = f"{self.base_url}/repos/{owner}/{repo}"
|
|
response = self._make_request('GET', repo_url)
|
|
response.raise_for_status()
|
|
repo_info = response.json()
|
|
license_info = repo_info.get('license')
|
|
if license_info:
|
|
return license_info.get('spdx_id') or license_info.get('url')
|
|
except requests.RequestException:
|
|
pass
|
|
|
|
return None
|
|
|
|
|
|
def compute_sha256(file_path: Path) -> str:
|
|
"""Compute SHA256 checksum of a file."""
|
|
sha256 = hashlib.sha256()
|
|
with open(file_path, 'rb') as f:
|
|
for chunk in iter(lambda: f.read(4096), b''):
|
|
sha256.update(chunk)
|
|
return sha256.hexdigest()
|
|
|
|
|
|
def download_file(url: str, dest_path: Path) -> bool:
|
|
"""Download a file from URL to destination path."""
|
|
try:
|
|
response = requests.get(url, stream=True, timeout=30)
|
|
response.raise_for_status()
|
|
|
|
# Create parent directories
|
|
dest_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Download file
|
|
with open(dest_path, 'wb') as f:
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
f.write(chunk)
|
|
|
|
return True
|
|
except requests.RequestException as e:
|
|
print(f"Error downloading {url}: {e}", file=sys.stderr)
|
|
return False
|
|
|
|
|
|
def parse_github_url(url: str) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]:
|
|
"""
|
|
Parse GitHub URL to return (owner, repo, ref, path).
|
|
Supports:
|
|
- https://github.com/owner/repo/blob/<ref>/path/to/file
|
|
- https://github.com/owner/repo/raw/<ref>/path/to/file
|
|
- https://raw.githubusercontent.com/owner/repo/<ref>/path/to/file
|
|
"""
|
|
if not url or not isinstance(url, str):
|
|
return None, None, None, None
|
|
|
|
# Check if it's a GitHub URL
|
|
if 'github.com' not in url:
|
|
return None, None, None, None
|
|
|
|
try:
|
|
# Handle raw.githubusercontent.com
|
|
if 'raw.githubusercontent.com' in url:
|
|
match_parts = url.split('/')
|
|
# https://raw.githubusercontent.com/OWNER/REPO/REF/PATH...
|
|
# parts: [https:, , raw.githubusercontent.com, OWNER, REPO, REF, PATH...]
|
|
if len(match_parts) >= 6:
|
|
owner = match_parts[3]
|
|
repo = match_parts[4]
|
|
ref = match_parts[5]
|
|
path = '/'.join(match_parts[6:]).split('?')[0]
|
|
return owner, repo, ref, unquote(path)
|
|
|
|
# Handle github.com and action.github.com
|
|
parsed = urlparse(url)
|
|
path = parsed.path.strip('/')
|
|
path_parts = path.split('/')
|
|
|
|
if len(path_parts) >= 4:
|
|
owner = path_parts[0]
|
|
repo = path_parts[1]
|
|
mode = path_parts[2] # 'blob' or 'raw'
|
|
|
|
if mode in ('blob', 'raw'):
|
|
ref = path_parts[3]
|
|
file_path = '/'.join(path_parts[4:])
|
|
|
|
# Check query params for ?raw=true
|
|
query_params = parse_qs(parsed.query)
|
|
if 'raw' in query_params or mode == 'raw':
|
|
return owner, repo, ref, unquote(file_path)
|
|
|
|
# Also treat 'blob' as a valid source if we just want the path
|
|
return owner, repo, ref, unquote(file_path)
|
|
|
|
except Exception:
|
|
pass
|
|
|
|
return None, None, None, None
|
|
|
|
|
|
def scan_site_components(components_dir: Path) -> Generator[Dict[str, Any], None, None]:
|
|
"""Recursively scan JSON files for parts with GitHub URLs."""
|
|
for json_file in components_dir.rglob('*.json'):
|
|
try:
|
|
with open(json_file, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
# Helper to find parts
|
|
queue = [data]
|
|
while queue:
|
|
item = queue.pop(0)
|
|
if isinstance(item, dict):
|
|
# Check if this item is a part
|
|
if 'id' in item and 'url' in item and item['url']:
|
|
owner, repo, ref, source_path = parse_github_url(item['url'])
|
|
if owner and repo and source_path:
|
|
yield {
|
|
'id': item['id'],
|
|
'url': item['url'],
|
|
'owner': owner,
|
|
'repo': repo,
|
|
'ref': ref or 'main',
|
|
'source_path': source_path,
|
|
'orig_site_json': json_file
|
|
}
|
|
|
|
# Add children to queue
|
|
queue.extend(item.values())
|
|
elif isinstance(item, list):
|
|
queue.extend(item)
|
|
|
|
except (json.JSONDecodeError, IOError) as e:
|
|
print(f"Warning: Could not read {json_file}: {e}", file=sys.stderr)
|
|
|
|
|
|
def regenerate_manifest(manifest_path: Path, repo_root: Path) -> Tuple[List[Dict], int]:
|
|
"""
|
|
Regenerate manifest from site data.
|
|
Preserves state of existing entries.
|
|
Returns (new_manifest_list, changes_count).
|
|
"""
|
|
print("Scanning website components to regenerate manifest...")
|
|
|
|
# Load existing manifest to preserve state
|
|
old_manifest = {}
|
|
if manifest_path.exists():
|
|
with open(manifest_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
if isinstance(data, list):
|
|
old_manifest = {entry['id']: entry for entry in data}
|
|
|
|
new_manifest = {}
|
|
components_dir = repo_root / 'website/src/data/components'
|
|
changes_count = 0
|
|
|
|
if not components_dir.exists():
|
|
print(f"Warning: Components directory not found: {components_dir}", file=sys.stderr)
|
|
return list(old_manifest.values()), 0
|
|
|
|
for part in scan_site_components(components_dir):
|
|
part_id = part['id']
|
|
old_entry = old_manifest.get(part_id)
|
|
|
|
# Calculate local path
|
|
# vendor/{owner}-{repo}/{path}
|
|
local_path = f"vendor/{part['owner']}-{part['repo']}/{part['source_path']}"
|
|
source_repo = f"{part['owner']}/{part['repo']}"
|
|
orig_site_json = str(part['orig_site_json'].relative_to(repo_root))
|
|
|
|
entry = {
|
|
'id': part_id,
|
|
'source_repo': source_repo,
|
|
'source_path': part['source_path'],
|
|
'source_ref': part['ref'],
|
|
'local_path': local_path,
|
|
'orig_site_json': orig_site_json,
|
|
'orig_item_id': part_id
|
|
}
|
|
|
|
# Preserve state if exists and config matches
|
|
if old_entry:
|
|
# Check if source config changed
|
|
config_changed = (
|
|
old_entry.get('source_repo') != source_repo or
|
|
old_entry.get('source_path') != part['source_path'] or
|
|
old_entry.get('source_ref') != part['ref']
|
|
)
|
|
|
|
if not config_changed:
|
|
# Copy state
|
|
for key in ['pinned_sha', 'pinned_raw_url', 'checksum_sha256', 'last_checked', 'status', 'license', 'upstream_latest_sha']:
|
|
if key in old_entry:
|
|
entry[key] = old_entry[key]
|
|
else:
|
|
print(f" Config changed for {part_id}, resetting status.")
|
|
entry['status'] = 'pending'
|
|
entry['pinned_sha'] = None
|
|
changes_count += 1
|
|
|
|
# Check if we updated manifest info (like orig_site_json moved)
|
|
if (old_entry.get('orig_site_json') != orig_site_json or
|
|
old_entry.get('local_path') != local_path):
|
|
changes_count += 1
|
|
else:
|
|
print(f" New part found: {part_id}")
|
|
entry['status'] = 'pending'
|
|
entry['pinned_sha'] = None
|
|
changes_count += 1
|
|
|
|
new_manifest[part_id] = entry
|
|
|
|
# Check for removed items
|
|
removed_count = len(old_manifest) - len(new_manifest)
|
|
if removed_count > 0:
|
|
print(f" Removed {removed_count} parts that are no longer in site JSONs.")
|
|
changes_count += removed_count
|
|
|
|
return sorted(new_manifest.values(), key=lambda x: x['id']), changes_count
|
|
|
|
|
|
def update_manifest_entry(
|
|
entry: Dict,
|
|
api: GitHubAPI,
|
|
repo_root: Path,
|
|
dry_run: bool = False
|
|
) -> Dict:
|
|
"""Update a single manifest entry by downloading and pinning the file."""
|
|
source_repo = entry['source_repo']
|
|
owner, repo = source_repo.split('/', 1)
|
|
source_path = entry['source_path']
|
|
source_ref = entry.get('source_ref', 'main')
|
|
|
|
print(f"Processing {entry['id']} from {source_repo}/{source_path}@{source_ref}...")
|
|
|
|
# Get commit SHA for the file
|
|
commit_sha = api.get_file_sha(owner, repo, source_path, source_ref)
|
|
if not commit_sha:
|
|
print(f" Warning: Could not resolve SHA for {source_ref}, skipping", file=sys.stderr)
|
|
entry['status'] = 'error'
|
|
return entry
|
|
|
|
# Build pinned raw URL
|
|
pinned_raw_url = f"https://raw.githubusercontent.com/{owner}/{repo}/{commit_sha}/{source_path}"
|
|
|
|
# Determine local path
|
|
local_path = Path(entry['local_path'])
|
|
if not local_path.is_absolute():
|
|
local_path = repo_root / local_path
|
|
|
|
# Check if file exists and is already at the correct version
|
|
current_pinned_sha = entry.get('pinned_sha')
|
|
if current_pinned_sha == commit_sha and local_path.exists():
|
|
if dry_run:
|
|
print(f" [DRY RUN] File up to date ({commit_sha}), would skip download.")
|
|
else:
|
|
print(f" File up to date ({commit_sha}), skipping download.")
|
|
# Ensure checksum is present
|
|
if 'checksum_sha256' not in entry or not entry['checksum_sha256']:
|
|
entry['checksum_sha256'] = compute_sha256(local_path)
|
|
|
|
entry['pinned_sha'] = commit_sha
|
|
entry['pinned_raw_url'] = pinned_raw_url
|
|
entry['last_checked'] = datetime.now(timezone.utc).isoformat()
|
|
entry['upstream_latest_sha'] = commit_sha
|
|
entry['status'] = 'up-to-date'
|
|
|
|
# If license is missing, try to get it, otherwise keep existing
|
|
if 'license' not in entry and not dry_run:
|
|
license_info = api.get_license(owner, repo, commit_sha)
|
|
if license_info:
|
|
entry['license'] = license_info
|
|
|
|
return entry
|
|
|
|
if dry_run:
|
|
print(f" [DRY RUN] Would download to {local_path}")
|
|
print(f" [DRY RUN] Pinned SHA: {commit_sha}")
|
|
entry['pinned_sha'] = commit_sha
|
|
entry['pinned_raw_url'] = pinned_raw_url
|
|
entry['last_checked'] = datetime.now(timezone.utc).isoformat()
|
|
entry['upstream_latest_sha'] = commit_sha
|
|
entry['status'] = 'up-to-date'
|
|
return entry
|
|
|
|
# Download file
|
|
print(f" Downloading from {pinned_raw_url}...")
|
|
if not download_file(pinned_raw_url, local_path):
|
|
entry['status'] = 'error'
|
|
return entry
|
|
|
|
# Compute checksum
|
|
checksum = compute_sha256(local_path)
|
|
print(f" Checksum: {checksum[:16]}...")
|
|
|
|
# Get license info
|
|
license_info = api.get_license(owner, repo, commit_sha)
|
|
|
|
# Update entry
|
|
entry['pinned_sha'] = commit_sha
|
|
entry['pinned_raw_url'] = pinned_raw_url
|
|
entry['checksum_sha256'] = checksum
|
|
entry['last_checked'] = datetime.now(timezone.utc).isoformat()
|
|
entry['upstream_latest_sha'] = commit_sha
|
|
entry['status'] = 'up-to-date'
|
|
if license_info:
|
|
entry['license'] = license_info
|
|
|
|
return entry
|
|
|
|
|
|
def sync_to_site_json(entry: Dict, repo_root: Path) -> bool:
|
|
"""Sync vendor metadata back to the original site JSON file."""
|
|
orig_json_path = entry.get('orig_site_json')
|
|
orig_item_id = entry.get('orig_item_id')
|
|
|
|
if not orig_json_path or not orig_item_id:
|
|
return False
|
|
|
|
json_path = repo_root / orig_json_path
|
|
if not json_path.exists():
|
|
print(f" Warning: Site JSON file not found: {json_path}", file=sys.stderr)
|
|
return False
|
|
|
|
try:
|
|
with open(json_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
# Find the printed part in the nested structure
|
|
def find_and_update_part(obj, target_id):
|
|
if isinstance(obj, dict):
|
|
# If this object IS the part (has the ID)
|
|
if obj.get('id') == target_id:
|
|
if 'vendor' not in obj:
|
|
obj['vendor'] = {}
|
|
obj['vendor'].update({
|
|
'manifest_id': entry['id'],
|
|
'local_path': entry['local_path'],
|
|
'pinned_sha': entry['pinned_sha'],
|
|
'pinned_raw_url': entry['pinned_raw_url'],
|
|
'checksum_sha256': entry['checksum_sha256'],
|
|
'last_checked': entry['last_checked'],
|
|
'status': entry['status']
|
|
})
|
|
return True
|
|
|
|
# Recursively search values
|
|
for value in obj.values():
|
|
if find_and_update_part(value, target_id):
|
|
return True
|
|
|
|
elif isinstance(obj, list):
|
|
for item in obj:
|
|
if find_and_update_part(item, target_id):
|
|
return True
|
|
|
|
return False
|
|
|
|
if not find_and_update_part(data, orig_item_id):
|
|
print(f" Warning: Could not find part with id '{orig_item_id}' in {json_path}", file=sys.stderr)
|
|
return False
|
|
|
|
# Write back to file (preserve formatting)
|
|
with open(json_path, 'w', encoding='utf-8') as f:
|
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f" Updated {json_path}")
|
|
return True
|
|
|
|
except (json.JSONDecodeError, IOError) as e:
|
|
print(f" Error updating {json_path}: {e}", file=sys.stderr)
|
|
return False
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Download and pin external asset files from GitHub'
|
|
)
|
|
parser.add_argument(
|
|
'--manifest',
|
|
type=Path,
|
|
default=Path('manifest/vendor_manifest.json'),
|
|
help='Path to manifest file (default: manifest/vendor_manifest.json)'
|
|
)
|
|
parser.add_argument(
|
|
'--entry',
|
|
type=str,
|
|
help='Process only a specific manifest entry by ID'
|
|
)
|
|
parser.add_argument(
|
|
'--dry-run',
|
|
action='store_true',
|
|
help='Show what would be done without downloading files'
|
|
)
|
|
parser.add_argument(
|
|
'--no-sync',
|
|
action='store_true',
|
|
help='Skip syncing vendor metadata back to site JSON files'
|
|
)
|
|
parser.add_argument(
|
|
'--delay',
|
|
type=float,
|
|
default=0.5,
|
|
help='Delay between API requests in seconds (default: 0.5)'
|
|
)
|
|
parser.add_argument(
|
|
'--no-scan',
|
|
action='store_true',
|
|
help='Skip scanning website for new components'
|
|
)
|
|
parser.add_argument(
|
|
'--scan-only',
|
|
action='store_true',
|
|
help='Only scan website and update manifest, do not check/download files'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Resolve paths
|
|
script_dir = Path(__file__).parent.parent
|
|
manifest_path = (script_dir / args.manifest).resolve()
|
|
repo_root = script_dir
|
|
|
|
# Regenerate manifest from website scan (unless disabled)
|
|
if not args.no_scan and not args.entry:
|
|
manifest_list, changes = regenerate_manifest(manifest_path, repo_root)
|
|
if changes > 0:
|
|
print(f"Manifest regenerated with {changes} changes.")
|
|
if not args.dry_run:
|
|
manifest_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(manifest_path, 'w', encoding='utf-8') as f:
|
|
json.dump(manifest_list, f, indent=2, sort_keys=False)
|
|
else:
|
|
print("No changes in manifest structure detected.")
|
|
|
|
if args.scan_only:
|
|
return
|
|
|
|
# Reload manifest data for processing
|
|
manifest_data = manifest_list
|
|
else:
|
|
if not manifest_path.exists():
|
|
print(f"Error: Manifest file not found: {manifest_path}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
with open(manifest_path, 'r', encoding='utf-8') as f:
|
|
manifest_data = json.load(f)
|
|
|
|
# Convert to dict if it's a list
|
|
if isinstance(manifest_data, list):
|
|
manifest = {entry['id']: entry for entry in manifest_data}
|
|
else:
|
|
manifest = manifest_data
|
|
|
|
# Filter entries if --entry specified
|
|
if args.entry:
|
|
if args.entry not in manifest:
|
|
print(f"Error: Entry '{args.entry}' not found in manifest", file=sys.stderr)
|
|
sys.exit(1)
|
|
entries_to_process = {args.entry: manifest[args.entry]}
|
|
else:
|
|
entries_to_process = manifest
|
|
|
|
# Initialize GitHub API with delay
|
|
api = GitHubAPI(delay=args.delay)
|
|
|
|
# Process entries
|
|
updated_count = 0
|
|
for entry_id, entry in entries_to_process.items():
|
|
updated_entry = update_manifest_entry(entry, api, repo_root, dry_run=args.dry_run)
|
|
manifest[entry_id] = updated_entry
|
|
|
|
if not args.no_sync and not args.dry_run:
|
|
sync_to_site_json(updated_entry, repo_root)
|
|
|
|
updated_count += 1
|
|
|
|
# Write updated manifest
|
|
if not args.dry_run:
|
|
manifest_list = sorted(manifest.values(), key=lambda x: x['id'])
|
|
with open(manifest_path, 'w', encoding='utf-8') as f:
|
|
json.dump(manifest_list, f, indent=2, sort_keys=False)
|
|
print(f"\nUpdated manifest with {updated_count} entries.")
|
|
else:
|
|
print(f"\n[DRY RUN] Would update {updated_count} entries.")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|