Dear All
Also a simple fix is to create a Sentry.io account, create anode JS project i it and to add the SENTRY variables to .env file. Then recreate the docker containers.
In terms of our self-hosted makeshift solution, following is the code
app.py
from flask import Flask, request, jsonify, render_template, Response
from flask_sqlalchemy import SQLAlchemy
from flask.cli import with_appcontext
from datetime import datetime, date
from urllib.parse import urlparse
from collections import Counter
import io
import base64
import json
import click
import logging
from logging.handlers import TimedRotatingFileHandler
import os
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///csp_reports.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
if not os.path.exists('logs'):
os.makedirs('logs')
log_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
log_handler = TimedRotatingFileHandler('logs/app.log', when='midnight', interval=1, backupCount=7)
log_handler.setFormatter(log_formatter)
log_handler.setLevel(logging.INFO)
app.logger.addHandler(log_handler)
app.logger.setLevel(logging.INFO)
app.logger.info("CSP Logger app starting up...")
class CSPReport(db.Model):
id = db.Column(db.Integer, primary_key=True)
timestamp = db.Column(db.String, nullable=False)
client_ip = db.Column(db.String)
user_agent = db.Column(db.String)
json_data = db.Column(db.Text)
violations = db.relationship('CSPViolation', backref='report', lazy=True)
class CSPViolation(db.Model):
id = db.Column(db.Integer, primary_key=True)
report_id = db.Column(db.Integer, db.ForeignKey('csp_report.id'), nullable=False)
domain = db.Column(db.String)
uri_path = db.Column(db.String)
violated_directive = db.Column(db.String)
effective_directive = db.Column(db.String)
blocked_uri = db.Column(db.String)
referrer = db.Column(db.String)
source_file = db.Column(db.String)
line_number = db.Column(db.Integer)
column_number = db.Column(db.Integer)
script_sample = db.Column(db.Text)
@app.cli.command("init-db")
@with_appcontext
def init_db_command():
db.drop_all()
db.create_all()
click.echo("Database initialized.")
def get_client_ip():
if 'X-Forwarded-For' in request.headers:
return request.headers['X-Forwarded-For'].split(',')[0].strip()
elif 'Forwarded' in request.headers:
forwarded = request.headers['Forwarded']
parts = dict(x.split('=') for x in forwarded.replace(';', ',').split(',') if '=' in x)
return parts.get('for', request.remote_addr).strip()
else:
return request.remote_addr
def split_document_uri(document_uri):
if not document_uri:
return 'N/A', 'N/A'
parsed = urlparse(document_uri)
domain = parsed.hostname or 'N/A'
uri_path = parsed.path or '/'
if parsed.query:
uri_path += '?' + parsed.query
return domain, uri_path
@app.route('/csp-report', methods=['POST'])
def csp_report():
try:
if not request.is_json:
return jsonify({"error": "Invalid request: JSON expected"}), 400
payload = request.get_json()
if 'csp-report' not in payload:
return jsonify({"error": "Missing 'csp-report' key in JSON"}), 400
timestamp = datetime.utcnow().isoformat()
client_ip = get_client_ip()
user_agent = request.headers.get('User-Agent', 'N/A')
report = CSPReport(timestamp=timestamp, client_ip=client_ip, user_agent=user_agent, json_data=json.dumps(payload, indent=2))
db.session.add(report)
db.session.commit()
report_data = payload['csp-report']
domain, uri_path = split_document_uri(report_data.get('document-uri'))
violation = CSPViolation(
report_id=report.id,
domain=domain,
uri_path=uri_path,
violated_directive=report_data.get('violated-directive'),
effective_directive=report_data.get('effective-directive'),
blocked_uri=report_data.get('blocked-uri'),
referrer=report_data.get('referrer'),
source_file=report_data.get('source-file'),
line_number=report_data.get('line-number'),
column_number=report_data.get('column-number'),
script_sample=report_data.get('script-sample')
)
db.session.add(violation)
db.session.commit()
return jsonify({"status": "CSP report logged"}), 200
except Exception as e:
db.session.rollback()
app.logger.error(f"Error in /csp-report: {e}", exc_info=True)
return jsonify({"error": f"Failed to process CSP report: {e}"}), 400
@app.route('/csp-report/dash')
def csp_dashboard():
report_page = int(request.args.get('report_page', 1))
violation_page = int(request.args.get('violation_page', 1))
report_per_page = 50
violation_per_page = 200
total_reports = CSPReport.query.count()
reports_today = CSPReport.query.filter(CSPReport.timestamp.like(f"{date.today().isoformat()}%")).count()
reports = CSPReport.query.order_by(CSPReport.timestamp.desc()).offset((report_page-1)*report_per_page).limit(report_per_page).all()
violations = CSPViolation.query.order_by(CSPViolation.id.desc()).offset((violation_page-1)*violation_per_page).limit(violation_per_page).all()
ips = [r.client_ip for r in CSPReport.query.all()]
top_ips = Counter(ips).most_common(20)
ips_today = [r.client_ip for r in CSPReport.query.filter(CSPReport.timestamp.like(f"{date.today().isoformat()}%")).all()]
top_ips_today = Counter(ips_today).most_common(20)
directives = [v.violated_directive for v in CSPViolation.query.all() if v.violated_directive]
effective = [v.effective_directive for v in CSPViolation.query.all() if v.effective_directive]
blocked = [v.blocked_uri for v in CSPViolation.query.all() if v.blocked_uri]
domains = [f"{v.domain}{v.uri_path}" for v in CSPViolation.query.all() if v.domain and v.uri_path]
user_agents = [r.user_agent for r in CSPReport.query.all() if r.user_agent]
def plot_and_encode(title, labels):
counts = Counter(labels)
if not counts:
return None
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
ax.barh(list(counts.keys()), list(counts.values()))
ax.set_title(title)
fig.tight_layout()
buf = io.BytesIO()
plt.savefig(buf, format='png')
plt.close(fig)
buf.seek(0)
return base64.b64encode(buf.read()).decode('utf-8')
graphs = {
'violated_directive': plot_and_encode("Violated Directives", directives),
'effective_directive': plot_and_encode("Effective Directives", effective),
'blocked_uri': plot_and_encode("Blocked URIs", blocked),
'document_uri': plot_and_encode("Document URIs", domains),
'user_agent': plot_and_encode("User Agents", user_agents)
}
return render_template('csp_dashboard.html', total_reports=total_reports, reports_today=reports_today,
top_ips=top_ips, top_ips_today=top_ips_today, reports=reports, violations=violations,
report_page=report_page, violation_page=violation_page, graphs=graphs)
@app.route('/csp-report/reports.csv')
def export_reports_csv():
reports = CSPReport.query.all()
def generate():
header = ['ID', 'Timestamp', 'Client IP', 'User-Agent']
yield ','.join(header) + '\n'
for r in reports:
row = [str(r.id), r.timestamp, r.client_ip, r.user_agent]
yield ','.join([f'"{x}"' for x in row]) + '\n'
return Response(generate(), mimetype='text/csv', headers={"Content-Disposition": "attachment; filename=reports.csv"})
@app.route('/csp-report/violations.csv')
def export_violations_csv():
violations = CSPViolation.query.all()
def generate():
header = ['ID', 'Report ID', 'Domain', 'URI Path', 'Violated Directive', 'Effective Directive', 'Blocked URI', 'Referrer', 'Source File', 'Line Number', 'Column Number', 'Script Sample']
yield ','.join(header) + '\n'
for v in violations:
row = [str(v.id), str(v.report_id), v.domain, v.uri_path, v.violated_directive, v.effective_directive, v.blocked_uri, v.referrer, v.source_file, str(v.line_number or ''), str(v.column_number or ''), v.script_sample or '']
yield ','.join([f'"{x}"' for x in row]) + '\n'
return Response(generate(), mimetype='text/csv', headers={"Content-Disposition": "attachment; filename=violations.csv"})
templates/csp_dashboard.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>CSP Dashboard</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.2/dist/css/bootstrap.min.css" rel="stylesheet">
</head>
<body class="container py-4">
<h1 class="mb-4">CSP Dashboard</h1>
<div class="mb-4">
<p><strong>Total Reports:</strong> {{ total_reports }}</p>
<p><strong>Reports Today:</strong> {{ reports_today }}</p>
<a class="btn btn-sm btn-outline-primary" href="{{ url_for('export_reports_csv') }}">Download Reports CSV</a>
<a class="btn btn-sm btn-outline-success" href="{{ url_for('export_violations_csv') }}">Download Violations CSV</a>
</div>
<div class="row">
<div class="col-md-6">
<h3>Top IPs (Overall)</h3>
<ul class="list-group">
{% for ip, count in top_ips %}
<li class="list-group-item d-flex justify-content-between align-items-center">
{{ ip }} <span class="badge bg-primary">{{ count }}</span>
</li>
{% endfor %}
</ul>
</div>
<div class="col-md-6">
<h3>Top IPs (Today)</h3>
<ul class="list-group">
{% for ip, count in top_ips_today %}
<li class="list-group-item d-flex justify-content-between align-items-center">
{{ ip }} <span class="badge bg-success">{{ count }}</span>
</li>
{% endfor %}
</ul>
</div>
</div>
<hr>
<h3>Reports (Page {{ report_page }})</h3>
<ul class="list-group mb-4">
{% for r in reports %}
<li class="list-group-item">
<strong>ID:</strong> {{ r.id }} | <strong>Time:</strong> {{ r.timestamp }} | <strong>IP:</strong> {{ r.client_ip }} | <strong>User-Agent:</strong> {{ r.user_agent }}
</li>
{% endfor %}
</ul>
<a class="btn btn-sm btn-outline-secondary" href="?report_page={{ report_page + 1 }}&violation_page={{ violation_page }}">Next Reports »</a>
<h3>Violations (Page {{ violation_page }})</h3>
<ul class="list-group mb-4">
{% for v in violations %}
<li class="list-group-item">
<strong>ID:</strong> {{ v.id }} | <strong>Page:</strong> {{ v.domain }}{{ v.uri_path }} | <strong>Directive:</strong> {{ v.violated_directive }} | <strong>Blocked:</strong> {{ v.blocked_uri }}
</li>
{% endfor %}
</ul>
<a class="btn btn-sm btn-outline-secondary" href="?report_page={{ report_page }}&violation_page={{ violation_page + 1 }}">Next Violations »</a>
<hr>
{% for name, graph in graphs.items() %}
{% if graph %}
<h3>{{ name.replace('_', ' ').title() }}</h3>
<img src="data:image/png;base64,{{ graph }}" class="img-fluid mb-4">
{% endif %}
{% endfor %}
</body>
</html>
Requirements.txt
Flask
Flask-SQLAlchemy
matplotlib
pytest
tests.py
import pytest
import json
from app import app, db, CSPReport, CSPViolation
@pytest.fixture
def client():
app.config['TESTING'] = True
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
ctx = app.app_context()
ctx.push()
db.drop_all()
db.create_all()
yield app.test_client()
db.session.remove()
db.drop_all()
ctx.pop()
def test_csp_report_submission(client):
payload = {
"csp-report": {
"document-uri": "https://example.com/page",
"violated-directive": "script-src-elem",
"effective-directive": "script-src",
"blocked-uri": "https://malicious.com/script.js",
"referrer": "https://example.com",
"source-file": "https://example.com/page",
"line-number": 42,
"column-number": 15,
"script-sample": "alert('xss')"
}
}
response = client.post('/csp-report', json=payload, headers={"User-Agent": "pytest-agent"})
assert response.status_code == 200
assert b"CSP report logged" in response.data
reports = CSPReport.query.all()
violations = CSPViolation.query.all()
assert len(reports) == 1
assert len(violations) == 1
assert reports[0].user_agent == "pytest-agent"
def test_dashboard_load(client):
response = client.get('/csp-report/dash')
assert response.status_code == 200
assert b"CSP Dashboard" in response.data or b"Reports" in response.data
def test_csv_exports(client):
report = CSPReport(timestamp="2025-06-02T12:34:56", client_ip="127.0.0.1", user_agent="pytest", json_data="{}")
db.session.add(report)
db.session.commit()
violation = CSPViolation(report_id=report.id, domain="example.com", uri_path="/page", violated_directive="script-src")
db.session.add(violation)
db.session.commit()
r = client.get('/csp-report/reports.csv')
assert r.status_code == 200
assert b"ID,Timestamp,Client IP,User-Agent" in r.data
v = client.get('/csp-report/violations.csv')
assert v.status_code == 200
assert b"ID,Report ID,Domain,URI Path" in v.data
Send CSP Samples
import requests
import json
import random
import time
URL = "http://localhost:5000/csp-report" # Adjust if your server runs on a different port
sample_reports = [
{
"csp-report": {
"document-uri": "https://example.com/page1",
"violated-directive": "script-src",
"effective-directive": "script-src",
"blocked-uri": "https://evil.com/script.js",
"referrer": "https://referrer.com",
"source-file": "https://example.com/page1",
"line-number": 42,
"column-number": 5,
"script-sample": "alert('xss')"
}
},
{
"csp-report": {
"document-uri": "https://another-site.com/login",
"violated-directive": "style-src",
"effective-directive": "style-src",
"blocked-uri": "https://badcdn.com/styles.css",
"referrer": "https://google.com",
"source-file": "https://another-site.com/login",
"line-number": 88,
"column-number": 12,
"script-sample": None
}
},
{
"csp-report": {
"document-uri": "https://example.org/contact",
"violated-directive": "img-src",
"effective-directive": "img-src",
"blocked-uri": "https://trackersite.com/pixel.png",
"referrer": "https://search.com",
"source-file": "https://example.org/contact",
"line-number": 25,
"column-number": 3,
"script-sample": None
}
},
{
"csp-report": {
"document-uri": "https://mysite.org/checkout",
"violated-directive": "connect-src",
"effective-directive": "connect-src",
"blocked-uri": "https://api.malicious.com",
"referrer": "https://mysite.org/home",
"source-file": "https://mysite.org/checkout",
"line-number": 14,
"column-number": 7,
"script-sample": None
}
},
]
def send_sample_report():
payload = random.choice(sample_reports)
response = requests.post(URL, json=payload, headers={"User-Agent": "Test-Agent"})
print(f"Status: {response.status_code}, Response: {response.text}")
if __name__ == "__main__":
for _ in range(5):
send_sample_report()
time.sleep(1)
STEPS
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
flask init-db
pytest
flask init-db
flask run
# New Terminal
source venv/bin/activate
python send_csp_samples.py
Visit http://127.0.0.1:5000/csp-report/dash
Bests
Vivek