181 lines
5.5 KiB
Python
Executable File
181 lines
5.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
GitHub webhook handler for detecting upstream changes.
|
|
|
|
Receives push events from GitHub and triggers vendor update checks
|
|
for affected repositories and paths.
|
|
"""
|
|
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional
|
|
|
|
from flask import Flask, request, jsonify
|
|
|
|
app = Flask(__name__)
|
|
|
|
|
|
def verify_signature(payload_body: bytes, signature_header: str, secret: str) -> bool:
|
|
"""Verify GitHub webhook signature."""
|
|
if not secret:
|
|
return False
|
|
|
|
hash_object = hmac.new(
|
|
secret.encode('utf-8'),
|
|
msg=payload_body,
|
|
digestmod=hashlib.sha256
|
|
)
|
|
expected_signature = "sha256=" + hash_object.hexdigest()
|
|
|
|
return hmac.compare_digest(expected_signature, signature_header)
|
|
|
|
|
|
def load_manifest(manifest_path: Path) -> List[Dict]:
|
|
"""Load vendor manifest."""
|
|
if not manifest_path.exists():
|
|
return []
|
|
|
|
try:
|
|
with open(manifest_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
if isinstance(data, list):
|
|
return data
|
|
elif isinstance(data, dict) and 'entries' in data:
|
|
return data['entries']
|
|
return []
|
|
except (json.JSONDecodeError, IOError):
|
|
return []
|
|
|
|
|
|
def find_affected_entries(push_event: Dict, manifest: List[Dict]) -> List[str]:
|
|
"""
|
|
Find manifest entries affected by a push event.
|
|
Returns list of manifest entry IDs.
|
|
"""
|
|
affected = []
|
|
|
|
repo_full_name = push_event.get('repository', {}).get('full_name')
|
|
if not repo_full_name:
|
|
return affected
|
|
|
|
commits = push_event.get('commits', [])
|
|
changed_files = set()
|
|
|
|
for commit in commits:
|
|
changed_files.update(commit.get('added', []))
|
|
changed_files.update(commit.get('modified', []))
|
|
changed_files.update(commit.get('removed', []))
|
|
|
|
# Match changed files against manifest entries
|
|
for entry in manifest:
|
|
source_repo = entry.get('source_repo')
|
|
source_path = entry.get('source_path')
|
|
|
|
if source_repo == repo_full_name and source_path in changed_files:
|
|
affected.append(entry['id'])
|
|
|
|
return affected
|
|
|
|
|
|
@app.route('/webhook', methods=['POST'])
|
|
def webhook():
|
|
"""Handle GitHub webhook POST requests."""
|
|
# Get webhook secret from environment
|
|
webhook_secret = os.getenv('WEBHOOK_SECRET')
|
|
|
|
# Verify signature if secret is configured
|
|
if webhook_secret:
|
|
signature = request.headers.get('X-Hub-Signature-256', '')
|
|
if not verify_signature(request.data, signature, webhook_secret):
|
|
return jsonify({'error': 'Invalid signature'}), 401
|
|
|
|
# Parse event
|
|
try:
|
|
event = request.json
|
|
event_type = request.headers.get('X-GitHub-Event', '')
|
|
except Exception as e:
|
|
return jsonify({'error': f'Invalid JSON: {e}'}), 400
|
|
|
|
# Only process push events
|
|
if event_type != 'push':
|
|
return jsonify({'message': f'Ignoring event type: {event_type}'}), 200
|
|
|
|
# Load manifest
|
|
script_dir = Path(__file__).parent.parent.parent
|
|
manifest_path = script_dir / 'manifest' / 'vendor_manifest.json'
|
|
manifest = load_manifest(manifest_path)
|
|
|
|
# Find affected entries
|
|
affected_ids = find_affected_entries(event, manifest)
|
|
|
|
if not affected_ids:
|
|
return jsonify({
|
|
'message': 'No affected manifest entries',
|
|
'repo': event.get('repository', {}).get('full_name')
|
|
}), 200
|
|
|
|
# Log the event
|
|
repo_name = event.get('repository', {}).get('full_name', 'unknown')
|
|
print(f"Webhook: Push event for {repo_name} affects {len(affected_ids)} entries: {affected_ids}")
|
|
|
|
# Trigger check/update flow
|
|
# In a production environment, you might want to enqueue this as a background job
|
|
# For now, we'll just log and optionally run check_updates.py
|
|
|
|
try:
|
|
# Run check_updates.py to see what needs updating
|
|
check_script = script_dir / 'scripts' / 'check_updates.py'
|
|
result = subprocess.run(
|
|
[sys.executable, str(check_script), '--manifest', str(manifest_path)],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=300
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
return jsonify({
|
|
'message': 'Update check completed',
|
|
'affected_entries': affected_ids,
|
|
'check_output': result.stdout
|
|
}), 200
|
|
else:
|
|
# Some entries are out-of-date
|
|
return jsonify({
|
|
'message': 'Updates available',
|
|
'affected_entries': affected_ids,
|
|
'check_output': result.stdout,
|
|
'stderr': result.stderr
|
|
}), 200
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return jsonify({
|
|
'message': 'Update check timed out',
|
|
'affected_entries': affected_ids
|
|
}), 500
|
|
except Exception as e:
|
|
return jsonify({
|
|
'error': f'Failed to run update check: {e}',
|
|
'affected_entries': affected_ids
|
|
}), 500
|
|
|
|
|
|
@app.route('/health', methods=['GET'])
|
|
def health():
|
|
"""Health check endpoint."""
|
|
return jsonify({'status': 'ok'}), 200
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# For local development
|
|
port = int(os.getenv('PORT', 5000))
|
|
app.run(host='0.0.0.0', port=port, debug=os.getenv('FLASK_DEBUG', 'false').lower() == 'true')
|
|
else:
|
|
# For serverless/production deployment (e.g., AWS Lambda, Google Cloud Functions)
|
|
# Export the Flask app
|
|
pass
|