#!/usr/bin/env python3 """ GitHub webhook handler for detecting upstream changes. Receives push events from GitHub and triggers vendor update checks for affected repositories and paths. """ import hashlib import hmac import json import os import subprocess import sys from pathlib import Path from typing import Dict, List, Optional from flask import Flask, request, jsonify app = Flask(__name__) def verify_signature(payload_body: bytes, signature_header: str, secret: str) -> bool: """Verify GitHub webhook signature.""" if not secret: return False hash_object = hmac.new( secret.encode('utf-8'), msg=payload_body, digestmod=hashlib.sha256 ) expected_signature = "sha256=" + hash_object.hexdigest() return hmac.compare_digest(expected_signature, signature_header) def load_manifest(manifest_path: Path) -> List[Dict]: """Load vendor manifest.""" if not manifest_path.exists(): return [] try: with open(manifest_path, 'r', encoding='utf-8') as f: data = json.load(f) if isinstance(data, list): return data elif isinstance(data, dict) and 'entries' in data: return data['entries'] return [] except (json.JSONDecodeError, IOError): return [] def find_affected_entries(push_event: Dict, manifest: List[Dict]) -> List[str]: """ Find manifest entries affected by a push event. Returns list of manifest entry IDs. """ affected = [] repo_full_name = push_event.get('repository', {}).get('full_name') if not repo_full_name: return affected commits = push_event.get('commits', []) changed_files = set() for commit in commits: changed_files.update(commit.get('added', [])) changed_files.update(commit.get('modified', [])) changed_files.update(commit.get('removed', [])) # Match changed files against manifest entries for entry in manifest: source_repo = entry.get('source_repo') source_path = entry.get('source_path') if source_repo == repo_full_name and source_path in changed_files: affected.append(entry['id']) return affected @app.route('/webhook', methods=['POST']) def webhook(): """Handle GitHub webhook POST requests.""" # Get webhook secret from environment webhook_secret = os.getenv('WEBHOOK_SECRET') # Verify signature if secret is configured if webhook_secret: signature = request.headers.get('X-Hub-Signature-256', '') if not verify_signature(request.data, signature, webhook_secret): return jsonify({'error': 'Invalid signature'}), 401 # Parse event try: event = request.json event_type = request.headers.get('X-GitHub-Event', '') except Exception as e: return jsonify({'error': f'Invalid JSON: {e}'}), 400 # Only process push events if event_type != 'push': return jsonify({'message': f'Ignoring event type: {event_type}'}), 200 # Load manifest script_dir = Path(__file__).parent.parent.parent manifest_path = script_dir / 'manifest' / 'vendor_manifest.json' manifest = load_manifest(manifest_path) # Find affected entries affected_ids = find_affected_entries(event, manifest) if not affected_ids: return jsonify({ 'message': 'No affected manifest entries', 'repo': event.get('repository', {}).get('full_name') }), 200 # Log the event repo_name = event.get('repository', {}).get('full_name', 'unknown') print(f"Webhook: Push event for {repo_name} affects {len(affected_ids)} entries: {affected_ids}") # Trigger check/update flow # In a production environment, you might want to enqueue this as a background job # For now, we'll just log and optionally run check_updates.py try: # Run check_updates.py to see what needs updating check_script = script_dir / 'scripts' / 'check_updates.py' result = subprocess.run( [sys.executable, str(check_script), '--manifest', str(manifest_path)], capture_output=True, text=True, timeout=300 ) if result.returncode == 0: return jsonify({ 'message': 'Update check completed', 'affected_entries': affected_ids, 'check_output': result.stdout }), 200 else: # Some entries are out-of-date return jsonify({ 'message': 'Updates available', 'affected_entries': affected_ids, 'check_output': result.stdout, 'stderr': result.stderr }), 200 except subprocess.TimeoutExpired: return jsonify({ 'message': 'Update check timed out', 'affected_entries': affected_ids }), 500 except Exception as e: return jsonify({ 'error': f'Failed to run update check: {e}', 'affected_entries': affected_ids }), 500 @app.route('/health', methods=['GET']) def health(): """Health check endpoint.""" return jsonify({'status': 'ok'}), 200 if __name__ == '__main__': # For local development port = int(os.getenv('PORT', 5000)) app.run(host='0.0.0.0', port=port, debug=os.getenv('FLASK_DEBUG', 'false').lower() == 'true') else: # For serverless/production deployment (e.g., AWS Lambda, Google Cloud Functions) # Export the Flask app pass