fahrengit-451/geoblock_watcher/watcher.py
Albert Armea cf99cd50f2 Initial commit
Change written by Claude Sonnet 4.6:

I need to set up a Git hosting service for personal projects where certain repositories need to be blocked using the locale corresponding to the user’s IP address.

Here are my constraints:
*   The entire system will be running on a single VPS instance. I should be able to `docker compose up -d` the whole thing.
*   For each repository that this feature is enabled, I need to be able to set the blocked locales down to the state level, along with a custom HTTP status code and response body.
*   You may assume that the IP address of the request is where it actually came from — for this exercise, if the user uses a VPN to bypass the restriction, that is on them.
*   To simplify a reverse proxy setup, you may assume that all Git operations will happen over HTTPS. I will firewall off SSH access.
*    I will be using Let's Encrypt for HTTPS.

Some suggestions from prior research:
*   nginx seems like a reasonable reverse proxy that supports all of the requirements, but you may use a different one if it is simpler to implement or maintain.
*   I can obtain a MaxMind API key to get a geo-IP lookup table. If you use this, you will need to add a service that automatically retrieves the table at a reasonable frequency.
*   Forgejo seems like a reasonable, lightweight Git service, but you may use a different one if you’re aware of one that actually supports these requirements out of the box.

Write me a production-ready `docker-compose.yml` and supporting scripts or configuration scaffolding for me to implement this.
2026-03-21 18:34:50 +00:00

474 lines
19 KiB
Python

#!/usr/bin/env python3
"""
geoblock_watcher.py
────────────────────────────────────────────────────────────────────────────
Watches geo_rules.yml for changes, renders three nginx config snippets into
/app/geoblock/, then signals the nginx container to reload its configuration.
Rendered files
──────────────
repo_maps.conf
A single nginx `map` block body that maps the compound GeoIP key
("CC-SUBDIV") → a per-repo decision token. This file is included
inside the existing map block in nginx.conf.
repo_vars.conf
One `map` block per repo that translates the decision token to the
final "$geoblock_<var>" variable value ("" = allow, or "status:body").
repo_locations.conf
One `location` block per repo. When the variable is non-empty the
block immediately returns the encoded status + body; otherwise the
request falls through to the main proxy_pass location.
"""
import hashlib
import logging
import os
import re
import signal
import sys
import time
from pathlib import Path
from typing import Any
import docker
import yaml
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [watcher] %(levelname)s %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S",
stream=sys.stdout,
)
log = logging.getLogger(__name__)
RULES_FILE = Path("/app/geo_rules.yml")
OUTPUT_DIR = Path("/app/geoblock")
NGINX_CONTAINER = os.environ.get("NGINX_CONTAINER_NAME", "nginx")
# ── Helpers ───────────────────────────────────────────────────────────────────
def _var_name(repo_path: str) -> str:
"""Convert a repo path like /alice/my-repo → geoblock_alice_my_repo."""
sanitised = re.sub(r"[^a-zA-Z0-9]", "_", repo_path.strip("/"))
return f"geoblock_{sanitised}"
def _escape_body(body: str) -> str:
"""Escape a string for safe embedding in an nginx config string literal."""
return body.replace("\\", "\\\\").replace('"', '\\"').replace("'", "\\'").replace("\n", " ")
def _token(repo_index: int, rule_index: int) -> str:
"""Unique short token used to link the map blocks together."""
return f"repo{repo_index}_rule{rule_index}"
# ── Renderer ──────────────────────────────────────────────────────────────────
def render(rules_data: dict[str, Any]) -> tuple[str, str, str]:
"""
Returns (repo_maps_conf, repo_vars_conf, repo_locations_conf) as strings.
"""
repos: list[dict] = rules_data.get("repos", [])
maps_lines: list[str] = [
"# Generated by geoblock_watcher — do not edit manually.",
"# Included inside the map block in nginx.conf.",
"",
]
vars_lines: list[str] = [
"# Generated by geoblock_watcher — do not edit manually.",
"",
]
loc_lines: list[str] = [
"# Generated by geoblock_watcher — do not edit manually.",
"",
]
for ri, repo in enumerate(repos):
path: str = repo["path"].rstrip("/")
var: str = _var_name(path)
rules: list[dict] = repo.get("rules", [])
# ── Map block: region key → token ─────────────────────────────────────
# Build a mapping from locale → token. More-specific (state-level)
# rules are added first so nginx map "first match" semantics apply.
state_entries: list[str] = []
country_entries: list[str] = []
for rj, rule in enumerate(rules):
tok = _token(ri, rj)
status = int(rule["status"])
body = _escape_body(str(rule.get("body", "Blocked")))
value = f"{status}:{body}"
for locale in rule.get("locales", []):
locale = locale.strip()
key = f'"{locale}"'
entry = f" {key:<20} {tok!r}_{ri}_{rj};"
if "-" in locale:
state_entries.append(entry)
else:
# Country-only key — pad subdivision with empty string so
# it matches both "CC-" (no subdivision) and we also add
# a regex fallback below.
country_entries.append(entry)
# Emit the per-rule value variable (token → "status:body")
vars_lines.append(f"# {path} — rule {rj}: {rule.get('locales', [])}")
vars_lines.append(f'map $geoip2_region_key ${var}_r{rj} {{')
vars_lines.append(f' default "";')
for locale in rule.get("locales", []):
locale = locale.strip()
if "-" in locale:
# State-level: exact match on "CC-SUBDIV"
vars_lines.append(f' "{locale}" "{value}";')
else:
# Country-level: match any subdivision of this country
vars_lines.append(f' ~^{re.escape(locale)}- "{value}";')
# Also match when subdivision is absent ("CC-")
vars_lines.append(f' "{locale}-" "{value}";')
vars_lines.append("}")
vars_lines.append("")
# Aggregate rule variables into the final per-repo variable.
# The first non-empty rule variable wins.
rule_vars = [f"${var}_r{rj}" for rj in range(len(rules))]
vars_lines.append(f"# Final decision variable for {path}")
vars_lines.append(f"map $geoip2_region_key ${var} {{")
vars_lines.append(f' default "";')
for locale_list, status_body in _aggregate_locales(rules):
for locale in locale_list:
if "-" in locale:
vars_lines.append(f' "{locale}" "{status_body}";')
else:
vars_lines.append(f' ~^{re.escape(locale)}- "{status_body}";')
vars_lines.append(f' "{locale}-" "{status_body}";')
vars_lines.append("}")
vars_lines.append("")
# ── Location block ────────────────────────────────────────────────────
# Intercept /<owner>/<repo> and any sub-paths.
# nginx location matching: we use a case-sensitive prefix match.
# Git HTTPS also accesses /<owner>/<repo>.git — covered by the prefix.
loc_lines.append(f"# Geo-block for {path}")
loc_lines.append(f"location ^~ {path} {{")
loc_lines.append(f" if (${var} != \"\") {{")
# Split "status:body" at runtime using map — but nginx `if` can't do
# string splitting, so we embed status and body as separate variables.
# We use a nested map approach: the decision var encodes both, and we
# resolve them with two additional map lookups.
loc_lines.append(f" set $__status ${var}_status;")
loc_lines.append(f" set $__body ${var}_body;")
loc_lines.append(f" return $__status \"$__body\";")
loc_lines.append(f" }}")
loc_lines.append(f" # No block — fall through to main proxy")
loc_lines.append(f" proxy_pass http://forgejo:3000;")
loc_lines.append(f" proxy_set_header Host $host;")
loc_lines.append(f" proxy_set_header X-Real-IP $remote_addr;")
loc_lines.append(f" proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;")
loc_lines.append(f" proxy_set_header X-Forwarded-Proto $scheme;")
loc_lines.append(f" client_max_body_size 512m;")
loc_lines.append(f" proxy_request_buffering off;")
loc_lines.append(f" proxy_buffering off;")
loc_lines.append(f" proxy_read_timeout 600s;")
loc_lines.append(f" proxy_send_timeout 600s;")
loc_lines.append(f"}}")
loc_lines.append("")
# Status and body split maps for this repo
vars_lines.append(f"# Status / body split maps for {path}")
vars_lines.append(f"map ${var} ${var}_status {{")
vars_lines.append(f' default 403;')
for locale_list, status_body in _aggregate_locales(rules):
status = status_body.split(":", 1)[0]
for locale in locale_list:
if "-" in locale:
vars_lines.append(f' "{status_body}" {status};')
break
else:
vars_lines.append(f' "~^{re.escape(status_body)}" {status};')
# Simpler: map the encoded value directly
vars_lines = _replace_split_maps(vars_lines, var, rules)
vars_lines.append("")
return (
"\n".join(maps_lines),
"\n".join(vars_lines),
"\n".join(loc_lines),
)
def _aggregate_locales(rules: list[dict]) -> list[tuple[list[str], str]]:
"""Return [(locale_list, 'status:body'), …] for all rules."""
result = []
for rule in rules:
status = int(rule["status"])
body = _escape_body(str(rule.get("body", "Blocked")))
result.append((rule.get("locales", []), f"{status}:{body}"))
return result
def _replace_split_maps(vars_lines: list[str], var: str, rules: list[dict]) -> list[str]:
"""
Replace the incomplete split-map stubs with correct status+body maps.
We rebuild the tail of vars_lines for the current repo.
"""
# Remove any partial split map lines we may have added above
while vars_lines and (
vars_lines[-1].startswith(f"map ${var}_status") or
vars_lines[-1].startswith(f" ") or
vars_lines[-1] in ("}", "")
):
last = vars_lines[-1]
vars_lines.pop()
if last.startswith(f"map ${var}_status"):
break
# Status map
vars_lines.append(f"map ${var} ${var}_status {{")
vars_lines.append(f' default 403;')
seen_sv: set[str] = set()
for rule in rules:
status = int(rule["status"])
body = _escape_body(str(rule.get("body", "Blocked")))
encoded = f"{status}:{body}"
if encoded not in seen_sv:
vars_lines.append(f' "{encoded}" {status};')
seen_sv.add(encoded)
vars_lines.append("}")
vars_lines.append("")
# Body map
vars_lines.append(f"map ${var} ${var}_body {{")
vars_lines.append(f' default "Blocked";')
seen_bv: set[str] = set()
for rule in rules:
status = int(rule["status"])
body = _escape_body(str(rule.get("body", "Blocked")))
encoded = f"{status}:{body}"
if encoded not in seen_bv:
vars_lines.append(f' "{encoded}" "{body}";')
seen_bv.add(encoded)
vars_lines.append("}")
return vars_lines
# ── Clean renderer (replaces the incremental one above) ───────────────────────
def render_clean(rules_data: dict[str, Any]) -> tuple[str, str, str]:
"""
Cleanly render all three config files.
Returns (repo_maps_conf, repo_vars_conf, repo_locations_conf).
"""
repos: list[dict] = rules_data.get("repos", [])
header = "# Generated by geoblock_watcher — do not edit manually.\n\n"
vars_blocks: list[str] = []
loc_blocks: list[str] = []
for repo in repos:
path: str = repo["path"].rstrip("/")
var: str = _var_name(path)
rules: list[dict] = repo.get("rules", [])
# ── Per-repo decision map ──────────────────────────────────────────────
# Maps the compound GeoIP region key to "status:escapedBody" or "".
vars_blocks.append(f"# Decision map for {path}")
vars_blocks.append(f"map $geoip2_region_key ${var} {{")
vars_blocks.append(f' default "";')
# State-level rules first (more specific → rendered first)
for rule in rules:
status = int(rule["status"])
body = _escape_body(str(rule.get("body", "Blocked")))
encoded = f"{status}:{body}"
for locale in rule.get("locales", []):
locale = locale.strip()
if "-" in locale:
vars_blocks.append(f' "{locale}" "{encoded}";')
# Country-level rules second
for rule in rules:
status = int(rule["status"])
body = _escape_body(str(rule.get("body", "Blocked")))
encoded = f"{status}:{body}"
for locale in rule.get("locales", []):
locale = locale.strip()
if "-" not in locale:
# nginx map supports regex; match "CC-<anything>" and "CC-"
vars_blocks.append(f' "~^{re.escape(locale)}(-|$)" "{encoded}";')
vars_blocks.append("}")
vars_blocks.append("")
# ── Status split map ───────────────────────────────────────────────────
vars_blocks.append(f"map ${var} ${var}_status {{")
vars_blocks.append(f" default 403;")
seen: set[str] = set()
for rule in rules:
status = int(rule["status"])
body = _escape_body(str(rule.get("body", "Blocked")))
encoded = f"{status}:{body}"
if encoded not in seen:
vars_blocks.append(f' "{encoded}" {status};')
seen.add(encoded)
vars_blocks.append("}")
vars_blocks.append("")
# ── Body split map ─────────────────────────────────────────────────────
vars_blocks.append(f"map ${var} ${var}_body {{")
vars_blocks.append(f' default "Blocked";')
seen = set()
for rule in rules:
status = int(rule["status"])
body = _escape_body(str(rule.get("body", "Blocked")))
encoded = f"{status}:{body}"
if encoded not in seen:
vars_blocks.append(f' "{encoded}" "{body}";')
seen.add(encoded)
vars_blocks.append("}")
vars_blocks.append("")
# ── Location block ─────────────────────────────────────────────────────
loc_blocks.append(f"# Geo-block for {path}")
loc_blocks.append(f"location ^~ {path} {{")
loc_blocks.append(f" if (${var} != \"\") {{")
loc_blocks.append(f" return ${var}_status \"${var}_body\";")
loc_blocks.append(f" }}")
loc_blocks.append(f" proxy_pass http://forgejo:3000;")
loc_blocks.append(f" proxy_set_header Host $host;")
loc_blocks.append(f" proxy_set_header X-Real-IP $remote_addr;")
loc_blocks.append(f" proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;")
loc_blocks.append(f" proxy_set_header X-Forwarded-Proto $scheme;")
loc_blocks.append(f" client_max_body_size 512m;")
loc_blocks.append(f" proxy_request_buffering off;")
loc_blocks.append(f" proxy_buffering off;")
loc_blocks.append(f" proxy_read_timeout 600s;")
loc_blocks.append(f" proxy_send_timeout 600s;")
loc_blocks.append(f"}}")
loc_blocks.append("")
# repo_maps.conf is now empty (we use inline regex maps in repo_vars.conf)
maps_conf = header + "# (Region key mapping is now done inline in repo_vars.conf)\n"
vars_conf = header + "\n".join(vars_blocks)
locs_conf = header + "\n".join(loc_blocks)
return maps_conf, vars_conf, locs_conf
# ── Writer & nginx reload ─────────────────────────────────────────────────────
_last_hash: str = ""
def _file_hash(path: Path) -> str:
return hashlib.sha256(path.read_bytes()).hexdigest()
def apply_rules(force: bool = False) -> None:
global _last_hash
if not RULES_FILE.exists():
log.warning("Rules file not found: %s — skipping.", RULES_FILE)
return
current_hash = _file_hash(RULES_FILE)
if not force and current_hash == _last_hash:
log.debug("Rules file unchanged — nothing to do.")
return
log.info("Rules file changed — re-rendering nginx config snippets.")
try:
rules_data = yaml.safe_load(RULES_FILE.read_text()) or {}
except yaml.YAMLError as exc:
log.error("YAML parse error in %s: %s — skipping reload.", RULES_FILE, exc)
return
try:
maps_conf, vars_conf, locs_conf = render_clean(rules_data)
except Exception as exc: # noqa: BLE001
log.error("Render error: %s — skipping reload.", exc, exc_info=True)
return
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
(OUTPUT_DIR / "repo_maps.conf").write_text(maps_conf)
(OUTPUT_DIR / "repo_vars.conf").write_text(vars_conf)
(OUTPUT_DIR / "repo_locations.conf").write_text(locs_conf)
log.info("Config snippets written to %s.", OUTPUT_DIR)
_reload_nginx()
_last_hash = current_hash
def _reload_nginx() -> None:
"""Send SIGHUP to the nginx container to trigger a graceful config reload."""
try:
client = docker.from_env()
containers = client.containers.list(filters={"name": NGINX_CONTAINER})
if not containers:
log.warning("nginx container '%s' not found — skipping reload.", NGINX_CONTAINER)
return
container = containers[0]
container.kill(signal="HUP")
log.info("Sent SIGHUP to nginx container '%s'.", container.name)
except Exception as exc: # noqa: BLE001
log.error("Failed to reload nginx: %s", exc, exc_info=True)
# ── Watchdog ──────────────────────────────────────────────────────────────────
class RulesHandler(FileSystemEventHandler):
def on_modified(self, event):
if Path(event.src_path).resolve() == RULES_FILE.resolve():
log.info("Detected change in %s.", RULES_FILE)
time.sleep(0.2) # debounce
apply_rules()
# on_created handles the case where the file is replaced atomically
on_created = on_modified
def main() -> None:
log.info("geoblock_watcher starting. Watching %s", RULES_FILE)
# Initial render on startup
apply_rules(force=True)
observer = Observer()
observer.schedule(RulesHandler(), str(RULES_FILE.parent), recursive=False)
observer.start()
def _shutdown(signum, frame): # noqa: ANN001
log.info("Shutting down.")
observer.stop()
sys.exit(0)
signal.signal(signal.SIGTERM, _shutdown)
signal.signal(signal.SIGINT, _shutdown)
try:
while True:
time.sleep(60)
apply_rules() # Periodic re-check (catches missed inotify events)
finally:
observer.join()
if __name__ == "__main__":
main()