fahrengit-451/geoblock_watcher/watcher.py
Albert Armea 08d61b7ac7 Add support for HTML responses when blocked
Change written by Claude Code:

❯ Consider the geofencing rules defined at <config/geo_rules.yml> and consumed
by <geoblock_watcher/watcher.py>. Make it so that you can pass in an HTML file
response instead of a short plain text body. For this exercise, you may assume
that the entire contents of the HTML (HTML, CSS, JS, image resources) will be
included inline in the file. You may have to modify the <docker-compose.yml> to
 provide a new (read-only) bind-mount for these files.
2026-03-22 03:00:31 +00:00

306 lines
12 KiB
Python

#!/usr/bin/env python3
"""
geoblock_watcher.py
────────────────────────────────────────────────────────────────────────────
Watches geo_rules.yml for changes, renders three nginx config snippets into
/app/geoblock/, then signals the nginx container to reload its configuration.
Key constraint: nginx `return` requires a literal integer status code — it
cannot take a variable. We therefore render one map variable and one `if`
block *per distinct status code* per repo, so every `return` statement has a
hardcoded integer.
Rendered files
──────────────
repo_maps.conf (stub — logic lives in repo_vars.conf)
repo_vars.conf per-repo map blocks: region key → body string (or "")
repo_locations.conf per-repo location blocks with one `if` per status code
"""
import hashlib
import logging
import os
import re
import signal
import sys
import time
from collections import defaultdict
from pathlib import Path
from typing import Any
import docker
import yaml
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [watcher] %(levelname)s %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
stream=sys.stdout,
)
log = logging.getLogger(__name__)
RULES_FILE = Path("/app/host/geo_rules.yml")
OUTPUT_DIR = Path("/app/geoblock")
GEOBLOCK_PAGES_DIR = Path("/app/geoblock_pages")
NGINX_PAGES_ROOT = "/etc/nginx/geoblock_pages"
NGINX_CONTAINER = os.environ.get("NGINX_CONTAINER_NAME", "nginx")
PROXY_DIRECTIVES = """\
proxy_pass http://forgejo:3000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
client_max_body_size 512m;
proxy_request_buffering off;
proxy_buffering off;
proxy_read_timeout 600s;
proxy_send_timeout 600s;"""
# ── Helpers ───────────────────────────────────────────────────────────────────
def _var_name(repo_path: str) -> str:
"""Convert /alice/my-repo → geoblock_alice_my_repo"""
sanitised = re.sub(r"[^a-zA-Z0-9]", "_", repo_path.strip("/"))
return f"geoblock_{sanitised}"
def _escape_body(body: str) -> str:
return (
body
.replace("\\", "\\\\")
.replace('"', '\\"')
.replace("'", "\\'")
.replace("\n", " ")
)
# ── Renderer ──────────────────────────────────────────────────────────────────
def _normalize_file_path(body_file: str) -> str:
"""Ensure the path starts with / for use in nginx try_files."""
path = body_file.strip()
return path if path.startswith("/") else f"/{path}"
def render_clean(rules_data: dict[str, Any]) -> tuple[str, str, str]:
"""
Returns (repo_maps_conf, repo_vars_conf, repo_locations_conf).
For each repo we emit:
• One map per distinct status code:
map $geoip2_region_key $geoblock_<repo>_<status> { ... }
For text body rules, the value is the escaped body string when blocked.
For body_file rules, the value is "1" when blocked (a flag).
• One location block with one `if` per distinct status code.
Text body: if ($var != "") { return <status> "$var"; }
File body: error_page <status> @<var>_page;
if ($var != "") { return <status>; }
• For body_file rules, a named location outside the repo location block:
location @<var>_page { internal; root ...; try_files <file> =500; }
"""
repos: list[dict] = rules_data.get("repos", [])
header = "# Generated by geoblock_watcher — do not edit manually.\n\n"
vars_blocks: list[str] = []
loc_blocks: list[str] = []
named_locs: list[str] = [] # file-serving named locations (server-level)
for repo in repos:
path: str = repo["path"].rstrip("/")
base_var: str = _var_name(path)
rules: list[dict] = repo.get("rules", [])
# status_info: status -> {"is_file": bool, "file": str, "entries": [(locale, val)]}
status_info: dict[int, dict] = {}
for rule in rules:
status = int(rule["status"])
is_file = "body_file" in rule
if status not in status_info:
status_info[status] = {
"is_file": is_file,
"file": _normalize_file_path(rule["body_file"]) if is_file else "",
"entries": [],
}
else:
existing_is_file = status_info[status]["is_file"]
if existing_is_file != is_file:
log.warning(
"%s: status %d has mixed body/body_file rules — "
"treating all as %s.",
path, status, "body_file" if existing_is_file else "body",
)
if is_file:
if GEOBLOCK_PAGES_DIR.exists():
full = GEOBLOCK_PAGES_DIR / rule["body_file"].lstrip("/")
if not full.exists():
log.warning("body_file not found: %s", full)
value = "1"
else:
value = _escape_body(str(rule.get("body", "Blocked")))
for locale in rule.get("locales", []):
status_info[status]["entries"].append((locale.strip(), value))
# ── One map variable per distinct status code ──────────────────────
for status, info in status_info.items():
var = f"{base_var}_{status}"
entries = info["entries"]
vars_blocks.append(f"# {path} — HTTP {status}")
vars_blocks.append(f"map $geoip2_region_key ${var} {{")
vars_blocks.append(f' default "";')
# State-level rules first (more specific)
for locale, val in entries:
if "-" in locale:
vars_blocks.append(f' "{locale}" "{val}";')
# Country-level rules second
for locale, val in entries:
if "-" not in locale:
vars_blocks.append(f' "~^{re.escape(locale)}(-|$)" "{val}";')
vars_blocks.append("}")
vars_blocks.append("")
# ── Location block ─────────────────────────────────────────────────
loc_blocks.append(f"# Geo-block for {path}")
loc_blocks.append(f"location ^~ {path} {{")
for status in sorted(status_info.keys()):
info = status_info[status]
var = f"{base_var}_{status}"
if info["is_file"]:
loc_blocks.append(f' error_page {status} @{var}_page;')
loc_blocks.append(f' if (${var} != "") {{')
loc_blocks.append(f' return {status};')
loc_blocks.append(f' }}')
named_locs.append(f"# HTML error page for {path} — HTTP {status}")
named_locs.append(f"location @{var}_page {{")
named_locs.append(f" internal;")
named_locs.append(f" default_type text/html;")
named_locs.append(f" root {NGINX_PAGES_ROOT};")
named_locs.append(f" try_files {info['file']} =500;")
named_locs.append(f"}}")
named_locs.append("")
else:
loc_blocks.append(f' if (${var} != "") {{')
loc_blocks.append(f' return {status} "${var}";')
loc_blocks.append(f' }}')
loc_blocks.append(PROXY_DIRECTIVES)
loc_blocks.append("}")
loc_blocks.append("")
maps_conf = header + "# (Region key mapping done inline in repo_vars.conf)\n"
vars_conf = header + "\n".join(vars_blocks)
locs_conf = header + "\n".join(loc_blocks + named_locs)
return maps_conf, vars_conf, locs_conf
# ── Writer & nginx reload ─────────────────────────────────────────────────────
_last_hash: str = ""
def _file_hash(path: Path) -> str:
return hashlib.sha256(path.read_bytes()).hexdigest()
def apply_rules(force: bool = False) -> None:
global _last_hash
if not RULES_FILE.exists():
log.warning("Rules file not found: %s — skipping.", RULES_FILE)
return
current_hash = _file_hash(RULES_FILE)
if not force and current_hash == _last_hash:
log.debug("Rules file unchanged — nothing to do.")
return
log.info("Rules file changed — re-rendering nginx config snippets.")
try:
rules_data = yaml.safe_load(RULES_FILE.read_text()) or {}
except yaml.YAMLError as exc:
log.error("YAML parse error in %s: %s — skipping reload.", RULES_FILE, exc)
return
try:
maps_conf, vars_conf, locs_conf = render_clean(rules_data)
except Exception as exc:
log.error("Render error: %s — skipping reload.", exc, exc_info=True)
return
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
(OUTPUT_DIR / "repo_maps.conf").write_text(maps_conf)
(OUTPUT_DIR / "repo_vars.conf").write_text(vars_conf)
(OUTPUT_DIR / "repo_locations.conf").write_text(locs_conf)
log.info("Config snippets written to %s.", OUTPUT_DIR)
_reload_nginx()
_last_hash = current_hash
def _reload_nginx() -> None:
try:
client = docker.from_env()
containers = client.containers.list(filters={"name": NGINX_CONTAINER})
if not containers:
log.warning("nginx container '%s' not found — skipping reload.", NGINX_CONTAINER)
return
container = containers[0]
container.kill(signal="HUP")
log.info("Sent SIGHUP to nginx container '%s'.", container.name)
except Exception as exc:
log.error("Failed to reload nginx: %s", exc, exc_info=True)
# ── Watchdog ──────────────────────────────────────────────────────────────────
class RulesHandler(FileSystemEventHandler):
def on_modified(self, event):
if Path(event.src_path).resolve() == RULES_FILE.resolve():
log.info("Detected change in %s.", RULES_FILE)
time.sleep(0.2) # debounce
apply_rules()
on_created = on_modified
def main() -> None:
log.info("geoblock_watcher starting. Watching %s", RULES_FILE)
apply_rules(force=True)
observer = Observer()
observer.schedule(RulesHandler(), str(RULES_FILE.parent), recursive=False)
observer.start()
def _shutdown(signum, frame):
log.info("Shutting down.")
observer.stop()
sys.exit(0)
signal.signal(signal.SIGTERM, _shutdown)
signal.signal(signal.SIGINT, _shutdown)
try:
while True:
time.sleep(60)
apply_rules()
finally:
observer.join()
if __name__ == "__main__":
main()