Authentik to IP whitelist traefik (WIP)

Disclaimer: This document is not yet finished and will be subject of change s long as there is WIP (Work in Progress) in the title of this post.

For my homelab I am using as you might already know Authentik for a Single Sign On sollution. This authentik is behind a traefik reverse proxy. I also have other applications behind that reverse proxy, like Immich, Ollama, etc. So I was thinking of hardening those applications by only providing IP access to those applications to IP’s on the Internet after they successfully authenticated with Authentik. This would drastically reduce the risk as only “trusted IPs” are allowed to access the applications. They still need to log in (with Authentik or through another means).

Prerequisites

Let’s first define what already is configured before starting with this project.

  • A docker stack at least the following containers
    • Traefik
    • Authentik
    • A container (in my case Ollama) for test purposes
  • All the containers should be fully configured and operational with a FQDN (Full Qualified Domain Name).
    • Traefik should be in front of all containers and successfully can manage Let’s Encrypt certificates (out of scope, but just good practice to have)
    • Authentik should be fully operational
    • Ollama should ideally be configured to use Authentik for Single Sign On (not mandatory).

The setup consists of 3 parts:

  1. A webhook script (running in it’s own docker container) that received webhook messages from Authentik and creating the whitelist
  2. The configuration in Authentik
  3. The configuration in traefik

Webhook script

This is the most comprehensive part and the part that contains the most intellegence. For this setup I created one folder with the docker compose file etc and one folder that contains non-volatile storage (the IP database). The filestructure is as follows:

.../authentik-webhook/
      docker-compose.yml
      Dockerfile
      main.py

docker-compose.yml

version: "3.9"

services:
  authentik-webhook:
    build: .
    container_name: authentik-webhook
    networks:
      LocalNet:
        ipv4_address: <IP>
    volumes:
      - <app folder>:/app/db
      - <traefik-external-folder>:/traefik
    restart: unless-stopped
    environment:
      - STAIC_IPS="<comma separated IPs with or without netmask"
      - TTL_DAYS=<number of days IPs should be stored>

networks:
  LocalNet:
    external: true

Dockerfile

FROM python:3.12-slim

WORKDIR /app
COPY main.py .

RUN pip install fastapi uvicorn aiosqlite pyyaml

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

main.py

from fastapi import FastAPI, Request
from datetime import datetime
import aiosqlite
import asyncio
import os
import ipaddress
import yaml
from tempfile import NamedTemporaryFile
import shutil

DB_PATH = "/app/db/ip_logins.db"

# Directory inside container where Traefik dynamic config is mounted
TRAEFIK_DIR = "/traefik"
WHITELIST_PATH = f"{TRAEFIK_DIR}/authentik-ips.yaml"

# TTL in days (default 30)
TTL_DAYS = int(os.getenv("TTL_DAYS", "30"))

# Static IPs from environment variable: "1.2.3.4,5.6.7.8"
STATIC_IPS = os.getenv("STATIC_IPS", "")
STATIC_IPS = [
    ip.strip()
    for ip in STATIC_IPS.split(",")
    if ip.strip()
]

app = FastAPI()
db = None


def is_valid_ip(value: str) -> bool:
    try:
        ipaddress.ip_address(value)
        return True
    except Exception:
        return False


async def write_traefik_whitelist():
    """Generate a Traefik dynamic config file with static + dynamic IPs."""
    cursor = await db.execute("SELECT ip FROM logins")
    rows = await cursor.fetchall()
    dynamic_ips = [ip for (ip,) in rows]

    # Merge static + dynamic, validate, dedupe, sort
    all_ips = []
    for ip in STATIC_IPS + dynamic_ips:
        if is_valid_ip(ip):
            all_ips.append(ip)

    all_ips = sorted(set(all_ips))

    config = {
        "http": {
            "middlewares": {
                "authentik-ip-allowlist": {
                    "ipAllowList": {
                        "sourceRange": all_ips
                    }
                }
            }
        }
    }

    # Ensure directory exists
    os.makedirs(TRAEFIK_DIR, exist_ok=True)

    # Atomic write
    with NamedTemporaryFile("w", delete=False) as tmp:
        yaml.dump(config, tmp)
        temp_name = tmp.name

    shutil.move(temp_name, WHITELIST_PATH)
    print(f"[{datetime.utcnow().isoformat()}] Updated Traefik whitelist with {len(all_ips)} IPs")


@app.on_event("startup")
async def startup():
    global db
    db = await aiosqlite.connect(DB_PATH)
    await db.execute("""
        CREATE TABLE IF NOT EXISTS logins (
            ip TEXT PRIMARY KEY,
            last_seen INTEGER
        )
    """)
    await db.commit()

    asyncio.create_task(prune_expired_ips())


async def prune_expired_ips():
    while True:
        now = int(datetime.utcnow().timestamp())
        cutoff = now - (TTL_DAYS * 24 * 3600)

        await db.execute("DELETE FROM logins WHERE last_seen < ?", (cutoff,))
        await db.commit()

        print(f"[{datetime.utcnow().isoformat()}] Pruned IPs older than {TTL_DAYS} days")

        # Regenerate whitelist after pruning
        await write_traefik_whitelist()

        await asyncio.sleep(3600)


@app.post("/authentik/webhook")
async def authentik_webhook(request: Request):
    # Safely parse JSON (Auth. test transport sends empty body)
    try:
        payload = await request.json()
    except Exception:
        payload = {}

    print("RAW PAYLOAD:", payload)

    # Timestamp
    created = payload.get("created")
    if created:
        try:
            timestamp = int(datetime.fromisoformat(created).timestamp())
        except Exception:
            timestamp = int(datetime.utcnow().timestamp())
    else:
        timestamp = int(datetime.utcnow().timestamp())

    # IP extraction
    client_ip = (
        payload.get("ip")
        or payload.get("client_ip")
        or None
    )

    # Validate IP
    if not client_ip or not is_valid_ip(client_ip):
        print(f"[{datetime.utcnow().isoformat()}] Ignored invalid IP: {client_ip}")
        return {"status": "ignored"}

    # Store valid IP
    await db.execute(
        "INSERT OR REPLACE INTO logins (ip, last_seen) VALUES (?, ?)",
        (client_ip, timestamp)
    )
    await db.commit()

    # Update Traefik whitelist
    await write_traefik_whitelist()

    print(f"[{datetime.utcnow().isoformat()}] Stored login from IP: {client_ip}")

    return {"status": "ok"}


@app.get("/ips")
async def list_ips():
    cursor = await db.execute("SELECT ip, last_seen FROM logins")
    rows = await cursor.fetchall()
    return [{"ip": ip, "last_seen": last_seen} for ip, last_seen in rows]

Deployment

When you are in the folder with the Dockerfile, docker-compose.yml and main.py you just issue one command to start and one command to stop:

docker compose up --build -d
docker compose down -v

Authentik configuration

Authentik needs to notify the webhook script when someone successfully logs in (or is already authenticated and interacts with authentik). This needs to be configured in several different places.

Notification Transports