mirror of
https://github.com/aarmea/fahrengit-451.git
synced 2026-03-22 08:58:15 +00:00
Change written by Claude Sonnet 4.6: I need to set up a Git hosting service for personal projects where certain repositories need to be blocked using the locale corresponding to the user’s IP address. Here are my constraints: * The entire system will be running on a single VPS instance. I should be able to `docker compose up -d` the whole thing. * For each repository that this feature is enabled, I need to be able to set the blocked locales down to the state level, along with a custom HTTP status code and response body. * You may assume that the IP address of the request is where it actually came from — for this exercise, if the user uses a VPN to bypass the restriction, that is on them. * To simplify a reverse proxy setup, you may assume that all Git operations will happen over HTTPS. I will firewall off SSH access. * I will be using Let's Encrypt for HTTPS. Some suggestions from prior research: * nginx seems like a reasonable reverse proxy that supports all of the requirements, but you may use a different one if it is simpler to implement or maintain. * I can obtain a MaxMind API key to get a geo-IP lookup table. If you use this, you will need to add a service that automatically retrieves the table at a reasonable frequency. * Forgejo seems like a reasonable, lightweight Git service, but you may use a different one if you’re aware of one that actually supports these requirements out of the box. Write me a production-ready `docker-compose.yml` and supporting scripts or configuration scaffolding for me to implement this.
474 lines
19 KiB
Python
474 lines
19 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
geoblock_watcher.py
|
|
────────────────────────────────────────────────────────────────────────────
|
|
Watches geo_rules.yml for changes, renders three nginx config snippets into
|
|
/app/geoblock/, then signals the nginx container to reload its configuration.
|
|
|
|
Rendered files
|
|
──────────────
|
|
repo_maps.conf
|
|
A single nginx `map` block body that maps the compound GeoIP key
|
|
("CC-SUBDIV") → a per-repo decision token. This file is included
|
|
inside the existing map block in nginx.conf.
|
|
|
|
repo_vars.conf
|
|
One `map` block per repo that translates the decision token to the
|
|
final "$geoblock_<var>" variable value ("" = allow, or "status:body").
|
|
|
|
repo_locations.conf
|
|
One `location` block per repo. When the variable is non-empty the
|
|
block immediately returns the encoded status + body; otherwise the
|
|
request falls through to the main proxy_pass location.
|
|
"""
|
|
|
|
import hashlib
|
|
import logging
|
|
import os
|
|
import re
|
|
import signal
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import docker
|
|
import yaml
|
|
from watchdog.events import FileSystemEventHandler
|
|
from watchdog.observers import Observer
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [watcher] %(levelname)s %(message)s",
|
|
datefmt="%Y-%m-%dT%H:%M:%S",
|
|
stream=sys.stdout,
|
|
)
|
|
log = logging.getLogger(__name__)
|
|
|
|
RULES_FILE = Path("/app/geo_rules.yml")
|
|
OUTPUT_DIR = Path("/app/geoblock")
|
|
NGINX_CONTAINER = os.environ.get("NGINX_CONTAINER_NAME", "nginx")
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
def _var_name(repo_path: str) -> str:
|
|
"""Convert a repo path like /alice/my-repo → geoblock_alice_my_repo."""
|
|
sanitised = re.sub(r"[^a-zA-Z0-9]", "_", repo_path.strip("/"))
|
|
return f"geoblock_{sanitised}"
|
|
|
|
|
|
def _escape_body(body: str) -> str:
|
|
"""Escape a string for safe embedding in an nginx config string literal."""
|
|
return body.replace("\\", "\\\\").replace('"', '\\"').replace("'", "\\'").replace("\n", " ")
|
|
|
|
|
|
def _token(repo_index: int, rule_index: int) -> str:
|
|
"""Unique short token used to link the map blocks together."""
|
|
return f"repo{repo_index}_rule{rule_index}"
|
|
|
|
|
|
# ── Renderer ──────────────────────────────────────────────────────────────────
|
|
|
|
def render(rules_data: dict[str, Any]) -> tuple[str, str, str]:
|
|
"""
|
|
Returns (repo_maps_conf, repo_vars_conf, repo_locations_conf) as strings.
|
|
"""
|
|
repos: list[dict] = rules_data.get("repos", [])
|
|
|
|
maps_lines: list[str] = [
|
|
"# Generated by geoblock_watcher — do not edit manually.",
|
|
"# Included inside the map block in nginx.conf.",
|
|
"",
|
|
]
|
|
|
|
vars_lines: list[str] = [
|
|
"# Generated by geoblock_watcher — do not edit manually.",
|
|
"",
|
|
]
|
|
|
|
loc_lines: list[str] = [
|
|
"# Generated by geoblock_watcher — do not edit manually.",
|
|
"",
|
|
]
|
|
|
|
for ri, repo in enumerate(repos):
|
|
path: str = repo["path"].rstrip("/")
|
|
var: str = _var_name(path)
|
|
rules: list[dict] = repo.get("rules", [])
|
|
|
|
# ── Map block: region key → token ─────────────────────────────────────
|
|
# Build a mapping from locale → token. More-specific (state-level)
|
|
# rules are added first so nginx map "first match" semantics apply.
|
|
state_entries: list[str] = []
|
|
country_entries: list[str] = []
|
|
|
|
for rj, rule in enumerate(rules):
|
|
tok = _token(ri, rj)
|
|
status = int(rule["status"])
|
|
body = _escape_body(str(rule.get("body", "Blocked")))
|
|
value = f"{status}:{body}"
|
|
|
|
for locale in rule.get("locales", []):
|
|
locale = locale.strip()
|
|
key = f'"{locale}"'
|
|
entry = f" {key:<20} {tok!r}_{ri}_{rj};"
|
|
if "-" in locale:
|
|
state_entries.append(entry)
|
|
else:
|
|
# Country-only key — pad subdivision with empty string so
|
|
# it matches both "CC-" (no subdivision) and we also add
|
|
# a regex fallback below.
|
|
country_entries.append(entry)
|
|
|
|
# Emit the per-rule value variable (token → "status:body")
|
|
vars_lines.append(f"# {path} — rule {rj}: {rule.get('locales', [])}")
|
|
vars_lines.append(f'map $geoip2_region_key ${var}_r{rj} {{')
|
|
vars_lines.append(f' default "";')
|
|
|
|
for locale in rule.get("locales", []):
|
|
locale = locale.strip()
|
|
if "-" in locale:
|
|
# State-level: exact match on "CC-SUBDIV"
|
|
vars_lines.append(f' "{locale}" "{value}";')
|
|
else:
|
|
# Country-level: match any subdivision of this country
|
|
vars_lines.append(f' ~^{re.escape(locale)}- "{value}";')
|
|
# Also match when subdivision is absent ("CC-")
|
|
vars_lines.append(f' "{locale}-" "{value}";')
|
|
|
|
vars_lines.append("}")
|
|
vars_lines.append("")
|
|
|
|
# Aggregate rule variables into the final per-repo variable.
|
|
# The first non-empty rule variable wins.
|
|
rule_vars = [f"${var}_r{rj}" for rj in range(len(rules))]
|
|
vars_lines.append(f"# Final decision variable for {path}")
|
|
vars_lines.append(f"map $geoip2_region_key ${var} {{")
|
|
vars_lines.append(f' default "";')
|
|
|
|
for locale_list, status_body in _aggregate_locales(rules):
|
|
for locale in locale_list:
|
|
if "-" in locale:
|
|
vars_lines.append(f' "{locale}" "{status_body}";')
|
|
else:
|
|
vars_lines.append(f' ~^{re.escape(locale)}- "{status_body}";')
|
|
vars_lines.append(f' "{locale}-" "{status_body}";')
|
|
|
|
vars_lines.append("}")
|
|
vars_lines.append("")
|
|
|
|
# ── Location block ────────────────────────────────────────────────────
|
|
# Intercept /<owner>/<repo> and any sub-paths.
|
|
# nginx location matching: we use a case-sensitive prefix match.
|
|
# Git HTTPS also accesses /<owner>/<repo>.git — covered by the prefix.
|
|
loc_lines.append(f"# Geo-block for {path}")
|
|
loc_lines.append(f"location ^~ {path} {{")
|
|
loc_lines.append(f" if (${var} != \"\") {{")
|
|
# Split "status:body" at runtime using map — but nginx `if` can't do
|
|
# string splitting, so we embed status and body as separate variables.
|
|
# We use a nested map approach: the decision var encodes both, and we
|
|
# resolve them with two additional map lookups.
|
|
loc_lines.append(f" set $__status ${var}_status;")
|
|
loc_lines.append(f" set $__body ${var}_body;")
|
|
loc_lines.append(f" return $__status \"$__body\";")
|
|
loc_lines.append(f" }}")
|
|
loc_lines.append(f" # No block — fall through to main proxy")
|
|
loc_lines.append(f" proxy_pass http://forgejo:3000;")
|
|
loc_lines.append(f" proxy_set_header Host $host;")
|
|
loc_lines.append(f" proxy_set_header X-Real-IP $remote_addr;")
|
|
loc_lines.append(f" proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;")
|
|
loc_lines.append(f" proxy_set_header X-Forwarded-Proto $scheme;")
|
|
loc_lines.append(f" client_max_body_size 512m;")
|
|
loc_lines.append(f" proxy_request_buffering off;")
|
|
loc_lines.append(f" proxy_buffering off;")
|
|
loc_lines.append(f" proxy_read_timeout 600s;")
|
|
loc_lines.append(f" proxy_send_timeout 600s;")
|
|
loc_lines.append(f"}}")
|
|
loc_lines.append("")
|
|
|
|
# Status and body split maps for this repo
|
|
vars_lines.append(f"# Status / body split maps for {path}")
|
|
vars_lines.append(f"map ${var} ${var}_status {{")
|
|
vars_lines.append(f' default 403;')
|
|
for locale_list, status_body in _aggregate_locales(rules):
|
|
status = status_body.split(":", 1)[0]
|
|
for locale in locale_list:
|
|
if "-" in locale:
|
|
vars_lines.append(f' "{status_body}" {status};')
|
|
break
|
|
else:
|
|
vars_lines.append(f' "~^{re.escape(status_body)}" {status};')
|
|
# Simpler: map the encoded value directly
|
|
vars_lines = _replace_split_maps(vars_lines, var, rules)
|
|
vars_lines.append("")
|
|
|
|
return (
|
|
"\n".join(maps_lines),
|
|
"\n".join(vars_lines),
|
|
"\n".join(loc_lines),
|
|
)
|
|
|
|
|
|
def _aggregate_locales(rules: list[dict]) -> list[tuple[list[str], str]]:
|
|
"""Return [(locale_list, 'status:body'), …] for all rules."""
|
|
result = []
|
|
for rule in rules:
|
|
status = int(rule["status"])
|
|
body = _escape_body(str(rule.get("body", "Blocked")))
|
|
result.append((rule.get("locales", []), f"{status}:{body}"))
|
|
return result
|
|
|
|
|
|
def _replace_split_maps(vars_lines: list[str], var: str, rules: list[dict]) -> list[str]:
|
|
"""
|
|
Replace the incomplete split-map stubs with correct status+body maps.
|
|
We rebuild the tail of vars_lines for the current repo.
|
|
"""
|
|
# Remove any partial split map lines we may have added above
|
|
while vars_lines and (
|
|
vars_lines[-1].startswith(f"map ${var}_status") or
|
|
vars_lines[-1].startswith(f" ") or
|
|
vars_lines[-1] in ("}", "")
|
|
):
|
|
last = vars_lines[-1]
|
|
vars_lines.pop()
|
|
if last.startswith(f"map ${var}_status"):
|
|
break
|
|
|
|
# Status map
|
|
vars_lines.append(f"map ${var} ${var}_status {{")
|
|
vars_lines.append(f' default 403;')
|
|
seen_sv: set[str] = set()
|
|
for rule in rules:
|
|
status = int(rule["status"])
|
|
body = _escape_body(str(rule.get("body", "Blocked")))
|
|
encoded = f"{status}:{body}"
|
|
if encoded not in seen_sv:
|
|
vars_lines.append(f' "{encoded}" {status};')
|
|
seen_sv.add(encoded)
|
|
vars_lines.append("}")
|
|
vars_lines.append("")
|
|
|
|
# Body map
|
|
vars_lines.append(f"map ${var} ${var}_body {{")
|
|
vars_lines.append(f' default "Blocked";')
|
|
seen_bv: set[str] = set()
|
|
for rule in rules:
|
|
status = int(rule["status"])
|
|
body = _escape_body(str(rule.get("body", "Blocked")))
|
|
encoded = f"{status}:{body}"
|
|
if encoded not in seen_bv:
|
|
vars_lines.append(f' "{encoded}" "{body}";')
|
|
seen_bv.add(encoded)
|
|
vars_lines.append("}")
|
|
|
|
return vars_lines
|
|
|
|
|
|
# ── Clean renderer (replaces the incremental one above) ───────────────────────
|
|
|
|
def render_clean(rules_data: dict[str, Any]) -> tuple[str, str, str]:
|
|
"""
|
|
Cleanly render all three config files.
|
|
Returns (repo_maps_conf, repo_vars_conf, repo_locations_conf).
|
|
"""
|
|
repos: list[dict] = rules_data.get("repos", [])
|
|
|
|
header = "# Generated by geoblock_watcher — do not edit manually.\n\n"
|
|
|
|
vars_blocks: list[str] = []
|
|
loc_blocks: list[str] = []
|
|
|
|
for repo in repos:
|
|
path: str = repo["path"].rstrip("/")
|
|
var: str = _var_name(path)
|
|
rules: list[dict] = repo.get("rules", [])
|
|
|
|
# ── Per-repo decision map ──────────────────────────────────────────────
|
|
# Maps the compound GeoIP region key to "status:escapedBody" or "".
|
|
vars_blocks.append(f"# Decision map for {path}")
|
|
vars_blocks.append(f"map $geoip2_region_key ${var} {{")
|
|
vars_blocks.append(f' default "";')
|
|
|
|
# State-level rules first (more specific → rendered first)
|
|
for rule in rules:
|
|
status = int(rule["status"])
|
|
body = _escape_body(str(rule.get("body", "Blocked")))
|
|
encoded = f"{status}:{body}"
|
|
for locale in rule.get("locales", []):
|
|
locale = locale.strip()
|
|
if "-" in locale:
|
|
vars_blocks.append(f' "{locale}" "{encoded}";')
|
|
|
|
# Country-level rules second
|
|
for rule in rules:
|
|
status = int(rule["status"])
|
|
body = _escape_body(str(rule.get("body", "Blocked")))
|
|
encoded = f"{status}:{body}"
|
|
for locale in rule.get("locales", []):
|
|
locale = locale.strip()
|
|
if "-" not in locale:
|
|
# nginx map supports regex; match "CC-<anything>" and "CC-"
|
|
vars_blocks.append(f' "~^{re.escape(locale)}(-|$)" "{encoded}";')
|
|
|
|
vars_blocks.append("}")
|
|
vars_blocks.append("")
|
|
|
|
# ── Status split map ───────────────────────────────────────────────────
|
|
vars_blocks.append(f"map ${var} ${var}_status {{")
|
|
vars_blocks.append(f" default 403;")
|
|
seen: set[str] = set()
|
|
for rule in rules:
|
|
status = int(rule["status"])
|
|
body = _escape_body(str(rule.get("body", "Blocked")))
|
|
encoded = f"{status}:{body}"
|
|
if encoded not in seen:
|
|
vars_blocks.append(f' "{encoded}" {status};')
|
|
seen.add(encoded)
|
|
vars_blocks.append("}")
|
|
vars_blocks.append("")
|
|
|
|
# ── Body split map ─────────────────────────────────────────────────────
|
|
vars_blocks.append(f"map ${var} ${var}_body {{")
|
|
vars_blocks.append(f' default "Blocked";')
|
|
seen = set()
|
|
for rule in rules:
|
|
status = int(rule["status"])
|
|
body = _escape_body(str(rule.get("body", "Blocked")))
|
|
encoded = f"{status}:{body}"
|
|
if encoded not in seen:
|
|
vars_blocks.append(f' "{encoded}" "{body}";')
|
|
seen.add(encoded)
|
|
vars_blocks.append("}")
|
|
vars_blocks.append("")
|
|
|
|
# ── Location block ─────────────────────────────────────────────────────
|
|
loc_blocks.append(f"# Geo-block for {path}")
|
|
loc_blocks.append(f"location ^~ {path} {{")
|
|
loc_blocks.append(f" if (${var} != \"\") {{")
|
|
loc_blocks.append(f" return ${var}_status \"${var}_body\";")
|
|
loc_blocks.append(f" }}")
|
|
loc_blocks.append(f" proxy_pass http://forgejo:3000;")
|
|
loc_blocks.append(f" proxy_set_header Host $host;")
|
|
loc_blocks.append(f" proxy_set_header X-Real-IP $remote_addr;")
|
|
loc_blocks.append(f" proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;")
|
|
loc_blocks.append(f" proxy_set_header X-Forwarded-Proto $scheme;")
|
|
loc_blocks.append(f" client_max_body_size 512m;")
|
|
loc_blocks.append(f" proxy_request_buffering off;")
|
|
loc_blocks.append(f" proxy_buffering off;")
|
|
loc_blocks.append(f" proxy_read_timeout 600s;")
|
|
loc_blocks.append(f" proxy_send_timeout 600s;")
|
|
loc_blocks.append(f"}}")
|
|
loc_blocks.append("")
|
|
|
|
# repo_maps.conf is now empty (we use inline regex maps in repo_vars.conf)
|
|
maps_conf = header + "# (Region key mapping is now done inline in repo_vars.conf)\n"
|
|
vars_conf = header + "\n".join(vars_blocks)
|
|
locs_conf = header + "\n".join(loc_blocks)
|
|
|
|
return maps_conf, vars_conf, locs_conf
|
|
|
|
|
|
# ── Writer & nginx reload ─────────────────────────────────────────────────────
|
|
|
|
_last_hash: str = ""
|
|
|
|
|
|
def _file_hash(path: Path) -> str:
|
|
return hashlib.sha256(path.read_bytes()).hexdigest()
|
|
|
|
|
|
def apply_rules(force: bool = False) -> None:
|
|
global _last_hash
|
|
|
|
if not RULES_FILE.exists():
|
|
log.warning("Rules file not found: %s — skipping.", RULES_FILE)
|
|
return
|
|
|
|
current_hash = _file_hash(RULES_FILE)
|
|
if not force and current_hash == _last_hash:
|
|
log.debug("Rules file unchanged — nothing to do.")
|
|
return
|
|
|
|
log.info("Rules file changed — re-rendering nginx config snippets.")
|
|
|
|
try:
|
|
rules_data = yaml.safe_load(RULES_FILE.read_text()) or {}
|
|
except yaml.YAMLError as exc:
|
|
log.error("YAML parse error in %s: %s — skipping reload.", RULES_FILE, exc)
|
|
return
|
|
|
|
try:
|
|
maps_conf, vars_conf, locs_conf = render_clean(rules_data)
|
|
except Exception as exc: # noqa: BLE001
|
|
log.error("Render error: %s — skipping reload.", exc, exc_info=True)
|
|
return
|
|
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
(OUTPUT_DIR / "repo_maps.conf").write_text(maps_conf)
|
|
(OUTPUT_DIR / "repo_vars.conf").write_text(vars_conf)
|
|
(OUTPUT_DIR / "repo_locations.conf").write_text(locs_conf)
|
|
|
|
log.info("Config snippets written to %s.", OUTPUT_DIR)
|
|
|
|
_reload_nginx()
|
|
_last_hash = current_hash
|
|
|
|
|
|
def _reload_nginx() -> None:
|
|
"""Send SIGHUP to the nginx container to trigger a graceful config reload."""
|
|
try:
|
|
client = docker.from_env()
|
|
containers = client.containers.list(filters={"name": NGINX_CONTAINER})
|
|
if not containers:
|
|
log.warning("nginx container '%s' not found — skipping reload.", NGINX_CONTAINER)
|
|
return
|
|
container = containers[0]
|
|
container.kill(signal="HUP")
|
|
log.info("Sent SIGHUP to nginx container '%s'.", container.name)
|
|
except Exception as exc: # noqa: BLE001
|
|
log.error("Failed to reload nginx: %s", exc, exc_info=True)
|
|
|
|
|
|
# ── Watchdog ──────────────────────────────────────────────────────────────────
|
|
|
|
class RulesHandler(FileSystemEventHandler):
|
|
def on_modified(self, event):
|
|
if Path(event.src_path).resolve() == RULES_FILE.resolve():
|
|
log.info("Detected change in %s.", RULES_FILE)
|
|
time.sleep(0.2) # debounce
|
|
apply_rules()
|
|
|
|
# on_created handles the case where the file is replaced atomically
|
|
on_created = on_modified
|
|
|
|
|
|
def main() -> None:
|
|
log.info("geoblock_watcher starting. Watching %s", RULES_FILE)
|
|
|
|
# Initial render on startup
|
|
apply_rules(force=True)
|
|
|
|
observer = Observer()
|
|
observer.schedule(RulesHandler(), str(RULES_FILE.parent), recursive=False)
|
|
observer.start()
|
|
|
|
def _shutdown(signum, frame): # noqa: ANN001
|
|
log.info("Shutting down.")
|
|
observer.stop()
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGTERM, _shutdown)
|
|
signal.signal(signal.SIGINT, _shutdown)
|
|
|
|
try:
|
|
while True:
|
|
time.sleep(60)
|
|
apply_rules() # Periodic re-check (catches missed inotify events)
|
|
finally:
|
|
observer.join()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|