Files
ossm-configurator/scripts/check_updates.py

269 lines
9.7 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Check for upstream updates to vendored files.
Queries GitHub API to detect if upstream files have changed since
they were pinned. Produces a report of up-to-date and out-of-date entries.
"""
import argparse
import json
import os
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional
import requests
class GitHubAPI:
"""Simple GitHub API client for checking updates."""
def __init__(self, token: Optional[str] = None, delay: float = 0.5):
self.token = token or os.getenv('GITHUB_API_TOKEN') or os.getenv('GITHUB_TOKEN')
self.session = requests.Session()
if self.token:
self.session.headers.update({
'Authorization': f'token {self.token}',
'Accept': 'application/vnd.github.v3+json'
})
self.base_url = 'https://api.github.com'
self.delay = delay # Delay between requests in seconds
self.last_request_time = 0
def _wait_for_rate_limit(self, response: requests.Response) -> None:
"""Wait if rate limited, using reset time from headers."""
if response.status_code == 403:
# Check if it's a rate limit error
rate_limit_remaining = response.headers.get('X-RateLimit-Remaining', '1')
if rate_limit_remaining == '0' or 'rate limit' in response.text.lower():
reset_time = response.headers.get('X-RateLimit-Reset')
if reset_time:
reset_timestamp = int(reset_time)
wait_seconds = max(0, reset_timestamp - int(time.time())) + 1
print(f" Rate limit exceeded. Waiting {wait_seconds} seconds until reset...", file=sys.stderr)
time.sleep(wait_seconds)
else:
# Fallback: wait 60 seconds
print(" Rate limit exceeded. Waiting 60 seconds...", file=sys.stderr)
time.sleep(60)
def _rate_limit_delay(self) -> None:
"""Add delay between requests to avoid hitting rate limits."""
current_time = time.time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.delay:
time.sleep(self.delay - time_since_last)
self.last_request_time = time.time()
def _make_request(self, method: str, url: str, max_retries: int = 3, **kwargs) -> requests.Response:
"""Make a request with rate limit handling and retries."""
for attempt in range(max_retries):
self._rate_limit_delay()
try:
response = self.session.request(method, url, **kwargs)
# Check rate limit
if response.status_code == 403:
self._wait_for_rate_limit(response)
# Retry the request after waiting
if attempt < max_retries - 1:
continue
# Check remaining rate limit
remaining = response.headers.get('X-RateLimit-Remaining')
if remaining:
remaining_int = int(remaining)
if remaining_int < 10:
print(f" Warning: Only {remaining_int} API requests remaining. Adding delay...", file=sys.stderr)
time.sleep(2)
return response
except requests.RequestException as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
print(f" Request failed, retrying in {wait_time}s... ({e})", file=sys.stderr)
time.sleep(wait_time)
else:
raise
return response
def get_latest_commit_sha(self, owner: str, repo: str, path: str, ref: str) -> Optional[str]:
"""
Get the latest commit SHA that modified a file at the given ref.
"""
commits_url = f"{self.base_url}/repos/{owner}/{repo}/commits"
params = {
'path': path,
'sha': ref,
'per_page': 1
}
try:
response = self._make_request('GET', commits_url, params=params)
response.raise_for_status()
commits = response.json()
if commits:
return commits[0]['sha']
# If no commits found, try to resolve the ref to a SHA
# Check if ref is already a SHA
if len(ref) == 40 and all(c in '0123456789abcdef' for c in ref.lower()):
return ref
# Try to resolve branch/tag to SHA
ref_url = f"{self.base_url}/repos/{owner}/{repo}/git/ref/heads/{ref}"
ref_response = self._make_request('GET', ref_url)
if ref_response.status_code == 200:
return ref_response.json()['object']['sha']
# Try tag
ref_url = f"{self.base_url}/repos/{owner}/{repo}/git/ref/tags/{ref}"
ref_response = self._make_request('GET', ref_url)
if ref_response.status_code == 200:
return ref_response.json()['object']['sha']
return None
except requests.RequestException as e:
print(f"Error checking updates for {owner}/{repo}/{path}@{ref}: {e}", file=sys.stderr)
return None
def check_entry(entry: Dict, api: GitHubAPI) -> Dict:
"""Check a single manifest entry for updates."""
source_repo = entry['source_repo']
owner, repo = source_repo.split('/', 1)
source_path = entry['source_path']
source_ref = entry.get('source_ref', 'main')
pinned_sha = entry.get('pinned_sha')
# Get latest commit SHA
latest_sha = api.get_latest_commit_sha(owner, repo, source_path, source_ref)
if not latest_sha:
entry['status'] = 'unknown'
entry['upstream_latest_sha'] = None
return entry
# Update upstream_latest_sha
entry['upstream_latest_sha'] = latest_sha
entry['last_checked'] = datetime.now(timezone.utc).isoformat()
# Compare with pinned SHA
if not pinned_sha:
entry['status'] = 'unknown'
elif latest_sha == pinned_sha:
entry['status'] = 'up-to-date'
else:
entry['status'] = 'out-of-date'
return entry
def main():
parser = argparse.ArgumentParser(
description='Check for upstream updates to vendored files'
)
parser.add_argument(
'--manifest',
type=Path,
default=Path('manifest/vendor_manifest.json'),
help='Path to manifest file (default: manifest/vendor_manifest.json)'
)
parser.add_argument(
'--output',
type=Path,
help='Path to write report JSON (optional)'
)
parser.add_argument(
'--delay',
type=float,
default=0.5,
help='Delay between API requests in seconds (default: 0.5)'
)
args = parser.parse_args()
# Resolve paths
script_dir = Path(__file__).parent.parent
manifest_path = (script_dir / args.manifest).resolve()
if not manifest_path.exists():
print(f"Error: Manifest file not found: {manifest_path}", file=sys.stderr)
sys.exit(1)
# Load manifest
with open(manifest_path, 'r', encoding='utf-8') as f:
manifest_data = json.load(f)
# Convert to list if it's a dict
if isinstance(manifest_data, dict):
manifest_list = list(manifest_data.values())
else:
manifest_list = manifest_data
# Initialize GitHub API with delay
api = GitHubAPI(delay=args.delay)
# Check each entry
print("Checking for upstream updates...")
updated_entries = []
out_of_date_count = 0
for entry in manifest_list:
updated_entry = check_entry(entry, api)
updated_entries.append(updated_entry)
if updated_entry['status'] == 'out-of-date':
out_of_date_count += 1
print(f" ⚠️ {updated_entry['id']}: OUT-OF-DATE")
print(f" Pinned: {updated_entry.get('pinned_sha', 'N/A')[:8]}...")
print(f" Latest: {updated_entry.get('upstream_latest_sha', 'N/A')[:8]}...")
elif updated_entry['status'] == 'up-to-date':
print(f"{updated_entry['id']}: up-to-date")
else:
print(f" ? {updated_entry['id']}: {updated_entry['status']}")
# Create report
report = {
'generated_at': datetime.now(timezone.utc).isoformat(),
'total_entries': len(updated_entries),
'up_to_date': sum(1 for e in updated_entries if e['status'] == 'up-to-date'),
'out_of_date': out_of_date_count,
'unknown': sum(1 for e in updated_entries if e['status'] == 'unknown'),
'entries': updated_entries
}
# Write report if requested
if args.output:
output_path = (script_dir / args.output).resolve()
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, sort_keys=False)
print(f"\nReport written to {output_path}")
# Print summary
print(f"\nSummary:")
print(f" Total entries: {report['total_entries']}")
print(f" Up-to-date: {report['up_to_date']}")
print(f" Out-of-date: {report['out_of_date']}")
print(f" Unknown: {report['unknown']}")
# Exit with non-zero code if any entries are out-of-date
if out_of_date_count > 0:
print(f"\n⚠️ {out_of_date_count} entries need updates!", file=sys.stderr)
sys.exit(1)
print("\n✓ All entries are up-to-date.")
if __name__ == '__main__':
main()