Content Security Policy header causing IP Blocking in Crowdsec

Hello All
We have recently migrated our ODK Central server to a new domain hosted behind a reverse proxy. An interesting issue has arisen wherein the clients IP were getting blocked.

We have traced this issue to a security scenario that is getting triggered by Crowdsec agent on our reverse proxy due to too many 403 responses to the POST requests to our ODK-central at the https://myserve.company.com/csp-report endpoint

Noticed that the default install of ODK central has this CSP -

   default-src 'none'; 
   connect-src 'self'; 
   font-src 'self'; 
   frame-src 'self' https://getodk.github.io/central/news.html; 
   img-src * data:; 
   manifest-src 'none'; 
   media-src 'none'; 
   object-src 'none'; 
   script-src 'self'; 
   style-src 'self'; 
   style-src-attr 'unsafe-inline'; 
   report-uri /csp-report 

The /csp-report URI does not exists and causes 403 errors.
As an interim arrangement, I am trying to bypass the URI at my reverse proxy level and handle the csp-report in a custom flask application so that it does not hit the ODK central server.

How can I modify the CSP for default installation of ODK-Central please ?
I would like to move from a Report only policy to an enforced policy as well please.

Thanks
Dr Vivek

Hello,

We may achieve this by making changes to two configuration files within the ODK Central directory: files/nginx/odk.conf.template and files/nginx/common-headers.conf.

Step 1: Update files/nginx/odk.conf.template (documentation reference here).

  • Replace the existing /csp-report route:
location /csp-report {
    proxy_pass https://${SENTRY_ORG_SUBDOMAIN}.ingest.sentry.io/api/${SENTRY_PROJECT}/security/?sentry_key=${SENTRY_KEY};
}
  • With the following:
location /csp-report {
    return 200 'CSP report discarded.';
    add_header Content-Type text/plain;
}
  • We may review and adjust the Content Security Policy (CSP) headers as needed.

Step 2: Check files/nginx/common-headers.conf

  • I am not sure but depending on the version of ODK Central in use, additional CSP settings might also be defined in 'files/nginx/common-headers.conf'. If so, those may be updated accordingly.

  • Once the changes are made, run the following command to rebuild and restart all containers:

docker compose build && docker compose stop && docker compose up -d

That should complete the process. Hope this helps - great day!

1 Like

Thanks
Will give those a try and update. For now, with a help from ChatGPT, a flask app to receive POST requests for CSP-report and a minimal dashboard is ready and will be deployed. Hopefully that will help me figure out the offending routes and policies.
Bests
Dr Vivek

1 Like

Dear All
Also a simple fix is to create a Sentry.io account, create anode JS project i it and to add the SENTRY variables to .env file. Then recreate the docker containers.

In terms of our self-hosted makeshift solution, following is the code

app.py

from flask import Flask, request, jsonify, render_template, Response
from flask_sqlalchemy import SQLAlchemy
from flask.cli import with_appcontext
from datetime import datetime, date
from urllib.parse import urlparse
from collections import Counter
import io
import base64
import json
import click
import logging
from logging.handlers import TimedRotatingFileHandler
import os

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///csp_reports.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)

if not os.path.exists('logs'):
    os.makedirs('logs')

log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
log_handler = TimedRotatingFileHandler('logs/app.log', when='midnight', interval=1, backupCount=7)
log_handler.setFormatter(log_formatter)
log_handler.setLevel(logging.INFO)
app.logger.addHandler(log_handler)
app.logger.setLevel(logging.INFO)
app.logger.info("CSP Logger app starting up...")

class CSPReport(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    timestamp = db.Column(db.String, nullable=False)
    client_ip = db.Column(db.String)
    user_agent = db.Column(db.String)
    json_data = db.Column(db.Text)
    violations = db.relationship('CSPViolation', backref='report', lazy=True)

class CSPViolation(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    report_id = db.Column(db.Integer, db.ForeignKey('csp_report.id'), nullable=False)
    domain = db.Column(db.String)
    uri_path = db.Column(db.String)
    violated_directive = db.Column(db.String)
    effective_directive = db.Column(db.String)
    blocked_uri = db.Column(db.String)
    referrer = db.Column(db.String)
    source_file = db.Column(db.String)
    line_number = db.Column(db.Integer)
    column_number = db.Column(db.Integer)
    script_sample = db.Column(db.Text)

@app.cli.command("init-db")
@with_appcontext
def init_db_command():
    db.drop_all()
    db.create_all()
    click.echo("Database initialized.")

def get_client_ip():
    if 'X-Forwarded-For' in request.headers:
        return request.headers['X-Forwarded-For'].split(',')[0].strip()
    elif 'Forwarded' in request.headers:
        forwarded = request.headers['Forwarded']
        parts = dict(x.split('=') for x in forwarded.replace(';', ',').split(',') if '=' in x)
        return parts.get('for', request.remote_addr).strip()
    else:
        return request.remote_addr

def split_document_uri(document_uri):
    if not document_uri:
        return 'N/A', 'N/A'
    parsed = urlparse(document_uri)
    domain = parsed.hostname or 'N/A'
    uri_path = parsed.path or '/'
    if parsed.query:
        uri_path += '?' + parsed.query
    return domain, uri_path

@app.route('/csp-report', methods=['POST'])
def csp_report():
    try:
        if not request.is_json:
            return jsonify({"error": "Invalid request: JSON expected"}), 400
        payload = request.get_json()
        if 'csp-report' not in payload:
            return jsonify({"error": "Missing 'csp-report' key in JSON"}), 400

        timestamp = datetime.utcnow().isoformat()
        client_ip = get_client_ip()
        user_agent = request.headers.get('User-Agent', 'N/A')

        report = CSPReport(timestamp=timestamp, client_ip=client_ip, user_agent=user_agent, json_data=json.dumps(payload, indent=2))
        db.session.add(report)
        db.session.commit()

        report_data = payload['csp-report']
        domain, uri_path = split_document_uri(report_data.get('document-uri'))

        violation = CSPViolation(
            report_id=report.id,
            domain=domain,
            uri_path=uri_path,
            violated_directive=report_data.get('violated-directive'),
            effective_directive=report_data.get('effective-directive'),
            blocked_uri=report_data.get('blocked-uri'),
            referrer=report_data.get('referrer'),
            source_file=report_data.get('source-file'),
            line_number=report_data.get('line-number'),
            column_number=report_data.get('column-number'),
            script_sample=report_data.get('script-sample')
        )
        db.session.add(violation)
        db.session.commit()
        return jsonify({"status": "CSP report logged"}), 200

    except Exception as e:
        db.session.rollback()
        app.logger.error(f"Error in /csp-report: {e}", exc_info=True)
        return jsonify({"error": f"Failed to process CSP report: {e}"}), 400

@app.route('/csp-report/dash')
def csp_dashboard():
    report_page = int(request.args.get('report_page', 1))
    violation_page = int(request.args.get('violation_page', 1))
    report_per_page = 50
    violation_per_page = 200

    total_reports = CSPReport.query.count()
    reports_today = CSPReport.query.filter(CSPReport.timestamp.like(f"{date.today().isoformat()}%")).count()
    reports = CSPReport.query.order_by(CSPReport.timestamp.desc()).offset((report_page-1)*report_per_page).limit(report_per_page).all()
    violations = CSPViolation.query.order_by(CSPViolation.id.desc()).offset((violation_page-1)*violation_per_page).limit(violation_per_page).all()

    ips = [r.client_ip for r in CSPReport.query.all()]
    top_ips = Counter(ips).most_common(20)
    ips_today = [r.client_ip for r in CSPReport.query.filter(CSPReport.timestamp.like(f"{date.today().isoformat()}%")).all()]
    top_ips_today = Counter(ips_today).most_common(20)

    directives = [v.violated_directive for v in CSPViolation.query.all() if v.violated_directive]
    effective = [v.effective_directive for v in CSPViolation.query.all() if v.effective_directive]
    blocked = [v.blocked_uri for v in CSPViolation.query.all() if v.blocked_uri]
    domains = [f"{v.domain}{v.uri_path}" for v in CSPViolation.query.all() if v.domain and v.uri_path]
    user_agents = [r.user_agent for r in CSPReport.query.all() if r.user_agent]

    def plot_and_encode(title, labels):
        counts = Counter(labels)
        if not counts:
            return None
        import matplotlib.pyplot as plt
        fig, ax = plt.subplots()
        ax.barh(list(counts.keys()), list(counts.values()))
        ax.set_title(title)
        fig.tight_layout()
        buf = io.BytesIO()
        plt.savefig(buf, format='png')
        plt.close(fig)
        buf.seek(0)
        return base64.b64encode(buf.read()).decode('utf-8')

    graphs = {
        'violated_directive': plot_and_encode("Violated Directives", directives),
        'effective_directive': plot_and_encode("Effective Directives", effective),
        'blocked_uri': plot_and_encode("Blocked URIs", blocked),
        'document_uri': plot_and_encode("Document URIs", domains),
        'user_agent': plot_and_encode("User Agents", user_agents)
    }

    return render_template('csp_dashboard.html', total_reports=total_reports, reports_today=reports_today,
                           top_ips=top_ips, top_ips_today=top_ips_today, reports=reports, violations=violations,
                           report_page=report_page, violation_page=violation_page, graphs=graphs)

@app.route('/csp-report/reports.csv')
def export_reports_csv():
    reports = CSPReport.query.all()
    def generate():
        header = ['ID', 'Timestamp', 'Client IP', 'User-Agent']
        yield ','.join(header) + '\n'
        for r in reports:
            row = [str(r.id), r.timestamp, r.client_ip, r.user_agent]
            yield ','.join([f'"{x}"' for x in row]) + '\n'
    return Response(generate(), mimetype='text/csv', headers={"Content-Disposition": "attachment; filename=reports.csv"})

@app.route('/csp-report/violations.csv')
def export_violations_csv():
    violations = CSPViolation.query.all()
    def generate():
        header = ['ID', 'Report ID', 'Domain', 'URI Path', 'Violated Directive', 'Effective Directive', 'Blocked URI', 'Referrer', 'Source File', 'Line Number', 'Column Number', 'Script Sample']
        yield ','.join(header) + '\n'
        for v in violations:
            row = [str(v.id), str(v.report_id), v.domain, v.uri_path, v.violated_directive, v.effective_directive, v.blocked_uri, v.referrer, v.source_file, str(v.line_number or ''), str(v.column_number or ''), v.script_sample or '']
            yield ','.join([f'"{x}"' for x in row]) + '\n'
    return Response(generate(), mimetype='text/csv', headers={"Content-Disposition": "attachment; filename=violations.csv"})

templates/csp_dashboard.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>CSP Dashboard</title>
    <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.2/dist/css/bootstrap.min.css" rel="stylesheet">
</head>
<body class="container py-4">
    <h1 class="mb-4">CSP Dashboard</h1>

    <div class="mb-4">
        <p><strong>Total Reports:</strong> {{ total_reports }}</p>
        <p><strong>Reports Today:</strong> {{ reports_today }}</p>
        <a class="btn btn-sm btn-outline-primary" href="{{ url_for('export_reports_csv') }}">Download Reports CSV</a>
        <a class="btn btn-sm btn-outline-success" href="{{ url_for('export_violations_csv') }}">Download Violations CSV</a>
    </div>

    <div class="row">
        <div class="col-md-6">
            <h3>Top IPs (Overall)</h3>
            <ul class="list-group">
                {% for ip, count in top_ips %}
                <li class="list-group-item d-flex justify-content-between align-items-center">
                    {{ ip }} <span class="badge bg-primary">{{ count }}</span>
                </li>
                {% endfor %}
            </ul>
        </div>
        <div class="col-md-6">
            <h3>Top IPs (Today)</h3>
            <ul class="list-group">
                {% for ip, count in top_ips_today %}
                <li class="list-group-item d-flex justify-content-between align-items-center">
                    {{ ip }} <span class="badge bg-success">{{ count }}</span>
                </li>
                {% endfor %}
            </ul>
        </div>
    </div>

    <hr>

    <h3>Reports (Page {{ report_page }})</h3>
    <ul class="list-group mb-4">
        {% for r in reports %}
        <li class="list-group-item">
            <strong>ID:</strong> {{ r.id }} | <strong>Time:</strong> {{ r.timestamp }} | <strong>IP:</strong> {{ r.client_ip }} | <strong>User-Agent:</strong> {{ r.user_agent }}
        </li>
        {% endfor %}
    </ul>
    <a class="btn btn-sm btn-outline-secondary" href="?report_page={{ report_page + 1 }}&violation_page={{ violation_page }}">Next Reports &raquo;</a>

    <h3>Violations (Page {{ violation_page }})</h3>
    <ul class="list-group mb-4">
        {% for v in violations %}
        <li class="list-group-item">
            <strong>ID:</strong> {{ v.id }} | <strong>Page:</strong> {{ v.domain }}{{ v.uri_path }} | <strong>Directive:</strong> {{ v.violated_directive }} | <strong>Blocked:</strong> {{ v.blocked_uri }}
        </li>
        {% endfor %}
    </ul>
    <a class="btn btn-sm btn-outline-secondary" href="?report_page={{ report_page }}&violation_page={{ violation_page + 1 }}">Next Violations &raquo;</a>

    <hr>

    {% for name, graph in graphs.items() %}
        {% if graph %}
            <h3>{{ name.replace('_', ' ').title() }}</h3>
            <img src="data:image/png;base64,{{ graph }}" class="img-fluid mb-4">
        {% endif %}
    {% endfor %}
</body>
</html>


Requirements.txt

Flask
Flask-SQLAlchemy
matplotlib
pytest

tests.py

import pytest
import json
from app import app, db, CSPReport, CSPViolation

@pytest.fixture
def client():
    app.config['TESTING'] = True
    app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
    ctx = app.app_context()
    ctx.push()
    db.drop_all()
    db.create_all()
    yield app.test_client()
    db.session.remove()
    db.drop_all()
    ctx.pop()

def test_csp_report_submission(client):
    payload = {
        "csp-report": {
            "document-uri": "https://example.com/page",
            "violated-directive": "script-src-elem",
            "effective-directive": "script-src",
            "blocked-uri": "https://malicious.com/script.js",
            "referrer": "https://example.com",
            "source-file": "https://example.com/page",
            "line-number": 42,
            "column-number": 15,
            "script-sample": "alert('xss')"
        }
    }
    response = client.post('/csp-report', json=payload, headers={"User-Agent": "pytest-agent"})
    assert response.status_code == 200
    assert b"CSP report logged" in response.data

    reports = CSPReport.query.all()
    violations = CSPViolation.query.all()
    assert len(reports) == 1
    assert len(violations) == 1
    assert reports[0].user_agent == "pytest-agent"

def test_dashboard_load(client):
    response = client.get('/csp-report/dash')
    assert response.status_code == 200
    assert b"CSP Dashboard" in response.data or b"Reports" in response.data

def test_csv_exports(client):
    report = CSPReport(timestamp="2025-06-02T12:34:56", client_ip="127.0.0.1", user_agent="pytest", json_data="{}")
    db.session.add(report)
    db.session.commit()
    violation = CSPViolation(report_id=report.id, domain="example.com", uri_path="/page", violated_directive="script-src")
    db.session.add(violation)
    db.session.commit()

    r = client.get('/csp-report/reports.csv')
    assert r.status_code == 200
    assert b"ID,Timestamp,Client IP,User-Agent" in r.data

    v = client.get('/csp-report/violations.csv')
    assert v.status_code == 200
    assert b"ID,Report ID,Domain,URI Path" in v.data


Send CSP Samples

import requests
import json
import random
import time

URL = "http://localhost:5000/csp-report"  # Adjust if your server runs on a different port

sample_reports = [
    {
        "csp-report": {
            "document-uri": "https://example.com/page1",
            "violated-directive": "script-src",
            "effective-directive": "script-src",
            "blocked-uri": "https://evil.com/script.js",
            "referrer": "https://referrer.com",
            "source-file": "https://example.com/page1",
            "line-number": 42,
            "column-number": 5,
            "script-sample": "alert('xss')"
        }
    },
    {
        "csp-report": {
            "document-uri": "https://another-site.com/login",
            "violated-directive": "style-src",
            "effective-directive": "style-src",
            "blocked-uri": "https://badcdn.com/styles.css",
            "referrer": "https://google.com",
            "source-file": "https://another-site.com/login",
            "line-number": 88,
            "column-number": 12,
            "script-sample": None
        }
    },
    {
        "csp-report": {
            "document-uri": "https://example.org/contact",
            "violated-directive": "img-src",
            "effective-directive": "img-src",
            "blocked-uri": "https://trackersite.com/pixel.png",
            "referrer": "https://search.com",
            "source-file": "https://example.org/contact",
            "line-number": 25,
            "column-number": 3,
            "script-sample": None
        }
    },
    {
        "csp-report": {
            "document-uri": "https://mysite.org/checkout",
            "violated-directive": "connect-src",
            "effective-directive": "connect-src",
            "blocked-uri": "https://api.malicious.com",
            "referrer": "https://mysite.org/home",
            "source-file": "https://mysite.org/checkout",
            "line-number": 14,
            "column-number": 7,
            "script-sample": None
        }
    },
]

def send_sample_report():
    payload = random.choice(sample_reports)
    response = requests.post(URL, json=payload, headers={"User-Agent": "Test-Agent"})
    print(f"Status: {response.status_code}, Response: {response.text}")

if __name__ == "__main__":
    for _ in range(5):
        send_sample_report()
        time.sleep(1)


STEPS

python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt

flask init-db
pytest

flask init-db
flask run

# New Terminal
source venv/bin/activate
python send_csp_samples.py

Visit http://127.0.0.1:5000/csp-report/dash

Bests
Vivek

1 Like

I find this surprising. csp-report definitely exists in Central (and has existed for as long as I can remember). Also, Central doesn't return 403s for that endpoint.

Did you have any changes to your Central install (either in the source or upstream in the proxy) that would explain the missing URI or the 403?

1 Like

Thanks Yanokwa. We did not make any changes to the install. Our .env is reproduced below

DOMAIN=dfdfdfdfdf.dfdfdfdfd.edu
SYSADMIN_EMAIL=dfdfdfdf@dfdfdfdf.edu
SSL_TYPE=upstream
HTTP_PORT=80
HTTPS_PORT=443
# Optional: configure a custom mail server
EMAIL_FROM= dfdfdf@dfdfdfdf.dfdfdf
EMAIL_HOST=dfdfdfdfdf.zoho.in
EMAIL_PORT=465
EMAIL_SECURE=true
EMAIL_IGNORE_TLS=false
EMAIL_USER=dfdfdf@dfdfdfdf.dfdfdfdf
EMAIL_PASSWORD=dfdfdfdfdfdfdfdfdf

docker-compose is as per standard from the github repo