"""Statistical analysis of HTTP benchmark results."""
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, cast
import numpy as np
import pandas as pd
from net_benchmark.dns_benchmark.core import QueryStatus
from net_benchmark.http_bench.core import HTTPProtocol, HTTPResult
[docs]
@dataclass
class TargetStats:
"""Statistics for a single HTTP target URL.
Field layout mirrors ResolverStats:
target ← resolver_name (identity)
method ← (no DNS equivalent — HTTP-specific)
total_requests ← total_queries
successful_requests ← successful_queries
success_rate ← success_rate
min/max/avg/... ← same latency stat fields, same formulas
http2_rate ← dnssec_validation_rate (protocol quality signal)
"""
target: str
method: str
total_requests: int
successful_requests: int
success_rate: float
# latency — identical field names and formulas as ResolverStats
min_latency: float
max_latency: float
avg_latency: float
median_latency: float
std_latency: float
p95_latency: float
p99_latency: float
jitter: float = 0.0
consistency_score: float = 0.0
# HTTP-specific timing
avg_ttfb_ms: float = 0.0
p95_ttfb_ms: float = 0.0
# protocol
http2_rate: float = 0.0 # requests that negotiated HTTP/2
redirect_rate: float = 0.0 # requests with at least one redirect
# response
avg_response_size_bytes: float = 0.0
avg_dns_ms: float = 0.0
avg_tcp_ms: float = 0.0
avg_tls_ms: float = 0.0
avg_compressed_size_bytes: float = 0.0
avg_redirect_time_ms: float = 0.0
http2_downgrade_rate: float = 0.0
cache_control_present: int = 0
etag_present: int = 0
last_modified_present: int = 0
age_present: int = 0
# security signals — counts across all requests for this target
hsts_present: int = 0
csp_present: int = 0
cdn_fingerprint: Optional[str] = None # most common CDN for this target
server_header: Optional[str] = None # most common server header
cert_expiry_days_min: Optional[int] = None # worst cert seen across requests
alt_svc: Optional[str] = None
ip_version: Optional[str] = None # most common across requests
[docs]
class HTTPAnalyzer:
"""Analyse HTTP benchmark results and compute statistics.
Mirrors BenchmarkAnalyzer structure exactly — same __init__, same
_create_dataframe pattern, same public method signatures.
"""
def __init__(self, results: List[HTTPResult]) -> None:
self.results = results
self.df = self._create_dataframe()
def _create_dataframe(self) -> pd.DataFrame:
"""Convert HTTPResult list to DataFrame.
Column mapping mirrors dns analysis.py _create_dataframe:
latency_ms → total_ms
completed → status == SUCCESS (no DNSSEC_FAILED equivalent)
resolver_name → target
protocol.value → protocol
"""
data = []
for r in self.results:
data.append(
{
"target": r.target,
"method": r.method,
"total_ms": r.total_ms,
"ttfb_ms": r.ttfb_ms,
"dns_resolve_ms": r.dns_resolve_ms,
"dns_resolver_ip": r.dns_resolver_ip,
"tcp_connect_ms": r.tcp_connect_ms,
"tls_handshake_ms": r.tls_handshake_ms,
"status": r.status.value,
"completed": r.status == QueryStatus.SUCCESS,
"http_status_code": r.http_status_code,
"protocol": r.protocol.value,
"alpn_negotiated": r.alpn_negotiated or "",
"http2": r.protocol == HTTPProtocol.HTTP2,
"redirect_count": r.redirect_count,
"response_size_bytes": r.response_size_bytes or 0,
"compressed": r.compressed,
"compressed_size_bytes": r.compressed_size_bytes,
"redirect_timings": r.redirect_timings,
"http2_downgraded": r.http2_downgraded,
"hsts": r.security_headers.get("strict-transport-security")
is not None,
"csp": r.security_headers.get("content-security-policy")
is not None,
"cdn_fingerprint": r.cdn_fingerprint or "",
"server_header": r.server_header or "",
"cert_expiry_days": r.cert_expiry_days,
"alt_svc": r.alt_svc or "",
"ip_version": r.ip_version or "",
"error_message": r.error_message or "",
"attempt_number": r.attempt_number,
"iteration": r.iteration,
"query_id": r.query_id,
"start_time": r.start_time,
"cache_control": r.cache_control or "",
"etag": r.etag or "",
"last_modified": r.last_modified or "",
"age": r.age or "",
"assertion_results": r.assertion_results,
}
)
return pd.DataFrame(data)
[docs]
def get_target_statistics(self) -> List[TargetStats]:
"""Compute per-target statistics. Mirrors get_resolver_statistics."""
stats_list = []
for target in self.df["target"].unique():
td = self.df[self.df["target"] == target]
method = td["method"].iloc[0]
total = len(td)
successful = int(td["completed"].sum())
success_rate = (successful / total * 100) if total > 0 else 0.0
latencies = td[td["completed"]]["total_ms"]
ttfb_vals = td[td["completed"] & td["ttfb_ms"].notna()]["ttfb_ms"]
# Timing breakdown averages
dns_vals = td[td["completed"] & td["dns_resolve_ms"].notna()][
"dns_resolve_ms"
]
tcp_vals = td[td["completed"] & td["tcp_connect_ms"].notna()][
"tcp_connect_ms"
]
tls_vals = td[td["completed"] & td["tls_handshake_ms"].notna()][
"tls_handshake_ms"
]
avg_dns = float(dns_vals.mean()) if len(dns_vals) > 0 else 0.0
avg_tcp = float(tcp_vals.mean()) if len(tcp_vals) > 0 else 0.0
avg_tls = float(tls_vals.mean()) if len(tls_vals) > 0 else 0.0
if len(latencies) > 0:
arr = latencies.values
min_l = float(latencies.min())
max_l = float(latencies.max())
avg_l = float(latencies.mean())
med_l = float(latencies.median())
std_l = float(latencies.std())
p95_l = float(latencies.quantile(0.95))
p99_l = float(latencies.quantile(0.99))
if len(arr) == 1:
jitter = 0.0
consistency = 100.0
else:
jitter = float(np.std(np.diff(arr)))
cv = std_l / avg_l if avg_l > 0 else 0.0
consistency = max(0.0, 100.0 - cv * 100.0)
else:
min_l = max_l = avg_l = med_l = std_l = float("nan")
p95_l = p99_l = 0.0
jitter = 0.0
consistency = 0.0
avg_ttfb = float(ttfb_vals.mean()) if len(ttfb_vals) > 0 else 0.0
p95_ttfb = float(ttfb_vals.quantile(0.95)) if len(ttfb_vals) > 0 else 0.0
# protocol signals
http2_rate = (
float(td[td["completed"]]["http2"].mean() * 100)
if successful > 0
else 0.0
)
redirect_rate = float((td["redirect_count"] > 0).mean() * 100)
avg_size = (
float(td[td["completed"]]["response_size_bytes"].mean())
if successful > 0
else 0.0
)
# security signals
hsts_count = int(td["hsts"].sum())
csp_count = int(td["csp"].sum())
# Cache header presence (count non‑empty values among completed requests)
cache_control_count = td[
td["completed"] & (td["cache_control"] != "")
].shape[0]
etag_count = td[td["completed"] & (td["etag"] != "")].shape[0]
last_modified_count = td[
td["completed"] & (td["last_modified"] != "")
].shape[0]
age_count = td[td["completed"] & (td["age"] != "")].shape[0]
# Compressed size average
comp_vals = td[td["completed"] & td["compressed_size_bytes"].notna()][
"compressed_size_bytes"
]
avg_comp = float(comp_vals.mean()) if len(comp_vals) > 0 else 0.0
# Average redirect time (flatten all hop timings)
redirect_times = []
for _, row in td[td["completed"]].iterrows():
for hop in row.get("redirect_timings", []):
redirect_times.append(hop["duration_ms"])
avg_redirect = (
sum(redirect_times) / len(redirect_times) if redirect_times else 0.0
)
# HTTP/2 downgrade rate
downgrade_count = int(
td[(td["completed"]) & (td["http2_downgraded"] == True)].shape[0]
)
http2_downgrade_rate = (
(downgrade_count / successful * 100) if successful > 0 else 0.0
)
# most common CDN and server header (mode, ignoring empty strings)
cdn_vals = td[td["cdn_fingerprint"] != ""]["cdn_fingerprint"]
cdn = str(cdn_vals.mode().iloc[0]) if len(cdn_vals) > 0 else None
srv_vals = td[td["server_header"] != ""]["server_header"]
srv = str(srv_vals.mode().iloc[0]) if len(srv_vals) > 0 else None
# worst (minimum) cert expiry seen for this target
cert_days_series = td["cert_expiry_days"].dropna()
cert_min = (
int(cert_days_series.min()) if len(cert_days_series) > 0 else None
)
alt_svc_vals = td[td["alt_svc"] != ""]["alt_svc"]
alt_svc = (
str(alt_svc_vals.mode().iloc[0]) if len(alt_svc_vals) > 0 else None
)
ip_vals = td[td["ip_version"] != ""]["ip_version"]
ip_version = str(ip_vals.mode().iloc[0]) if len(ip_vals) > 0 else None
stats_list.append(
TargetStats(
target=target,
method=method,
total_requests=total,
successful_requests=successful,
success_rate=success_rate,
min_latency=min_l,
max_latency=max_l,
avg_latency=avg_l,
median_latency=med_l,
std_latency=std_l,
p95_latency=p95_l,
p99_latency=p99_l,
jitter=jitter,
consistency_score=consistency,
avg_ttfb_ms=avg_ttfb,
p95_ttfb_ms=p95_ttfb,
http2_rate=http2_rate,
redirect_rate=redirect_rate,
avg_response_size_bytes=avg_size,
avg_dns_ms=avg_dns,
avg_tcp_ms=avg_tcp,
avg_tls_ms=avg_tls,
avg_compressed_size_bytes=avg_comp,
avg_redirect_time_ms=avg_redirect,
http2_downgrade_rate=http2_downgrade_rate,
cache_control_present=cache_control_count,
etag_present=etag_count,
last_modified_present=last_modified_count,
age_present=age_count,
hsts_present=hsts_count,
csp_present=csp_count,
cdn_fingerprint=cdn,
server_header=srv,
cert_expiry_days_min=cert_min,
alt_svc=alt_svc,
ip_version=ip_version,
)
)
return stats_list
[docs]
def get_overall_statistics(self) -> Dict[str, Any]:
"""Overall benchmark statistics. Mirrors BenchmarkAnalyzer.get_overall_statistics."""
total = len(self.df)
successful = int(self.df["completed"].sum())
success_rate = (successful / total * 100) if total > 0 else 0.0
latencies = self.df[self.df["completed"]]["total_ms"]
ttfb_vals = self.df[self.df["completed"] & self.df["ttfb_ms"].notna()][
"ttfb_ms"
]
avg_l = float(latencies.mean()) if len(latencies) > 0 else 0.0
med_l = float(latencies.median()) if len(latencies) > 0 else 0.0
avg_ttfb = float(ttfb_vals.mean()) if len(ttfb_vals) > 0 else 0.0
target_stats = self.get_target_statistics()
ranked = sorted(
[s for s in target_stats if s.successful_requests > 0],
key=lambda s: s.avg_latency,
)
http2_rate = (
float(self.df[self.df["completed"]]["http2"].mean() * 100)
if successful > 0
else 0.0
)
hsts_targets = sum(1 for s in target_stats if s.hsts_present > 0)
resolver_ip = self.results[0].dns_resolver_ip if self.results else None
assertion_pass_count = (
sum(
1
for r in self.results
if r.status == QueryStatus.SUCCESS and all(r.assertion_results.values())
)
if self.results
else 0
)
assertion_pass_rate = (assertion_pass_count / total * 100) if total > 0 else 0.0
return {
"total_requests": total,
"successful_requests": successful,
"overall_success_rate": success_rate,
"overall_avg_latency": avg_l,
"overall_median_latency": med_l,
"overall_avg_ttfb": avg_ttfb,
"fastest_target": ranked[0].target if ranked else "N/A",
"slowest_target": ranked[-1].target if ranked else "N/A",
"target_count": len(target_stats),
"http2_rate": http2_rate,
"hsts_coverage": (
(hsts_targets / len(target_stats) * 100) if target_stats else 0.0
),
"dns_resolver_ip": resolver_ip,
"assertion_pass_rate": assertion_pass_rate,
}
[docs]
def get_ttfb_statistics(self) -> List[Dict[str, Any]]:
"""Per-target TTFB breakdown. Mirrors get_domain_statistics."""
result = []
for target in self.df["target"].unique():
td = self.df[self.df["target"] == target]
vals = td[td["completed"] & td["ttfb_ms"].notna()]["ttfb_ms"]
result.append(
{
"target": target,
"avg_ttfb_ms": float(vals.mean()) if len(vals) > 0 else 0.0,
"median_ttfb_ms": float(vals.median()) if len(vals) > 0 else 0.0,
"p95_ttfb_ms": float(vals.quantile(0.95)) if len(vals) > 0 else 0.0,
"p99_ttfb_ms": float(vals.quantile(0.99)) if len(vals) > 0 else 0.0,
"min_ttfb_ms": float(vals.min()) if len(vals) > 0 else 0.0,
"max_ttfb_ms": float(vals.max()) if len(vals) > 0 else 0.0,
}
)
return result
[docs]
def get_protocol_distribution(self) -> List[Dict[str, Any]]:
"""HTTP/1.1 vs HTTP/2 breakdown. Mirrors get_protocol_statistics."""
result = []
for proto in self.df["protocol"].unique():
pd_ = self.df[self.df["protocol"] == proto]
total = len(pd_)
successful = int(pd_["completed"].sum())
latencies = pd_[pd_["completed"]]["total_ms"]
result.append(
{
"protocol": proto,
"total_requests": total,
"successful_requests": successful,
"success_rate": (successful / total * 100) if total > 0 else 0.0,
"avg_latency": (
float(latencies.mean()) if len(latencies) > 0 else 0.0
),
"median_latency": (
float(latencies.median()) if len(latencies) > 0 else 0.0
),
"p95_latency": (
float(latencies.quantile(0.95)) if len(latencies) > 0 else 0.0
),
}
)
return result
[docs]
def get_security_summary(self) -> Dict[str, Any]:
"""Aggregate security signal counts across all results.
Mirrors get_dnssec_statistics — the protocol-quality signal for HTTP.
"""
total = len(self.df)
completed = self.df[self.df["completed"]]
# per-header presence counts
header_counts: Dict[str, int] = {}
for r in self.results:
for h, v in r.security_headers.items():
if v is not None:
header_counts[h] = header_counts.get(h, 0) + 1
# CDN distribution
cdn_vals = self.df[self.df["cdn_fingerprint"] != ""]["cdn_fingerprint"]
cdn_dist = cdn_vals.value_counts().to_dict() if len(cdn_vals) > 0 else {}
# server header leak count (present = potential info disclosure)
server_leak_count = int((self.df["server_header"] != "").sum())
# cert expiry — worst across all results
cert_series = self.df["cert_expiry_days"].dropna()
cert_min = int(cert_series.min()) if len(cert_series) > 0 else None
return {
"security_header_counts": header_counts,
"cdn_distribution": cdn_dist,
"server_header_leak_count": server_leak_count,
"cert_expiry_days_min": cert_min,
"total_requests": total,
"completed_requests": int(completed["completed"].sum()),
}
[docs]
def get_status_code_distribution(self) -> List[Dict[str, Any]]:
"""HTTP status code breakdown. No DNS equivalent — HTTP-only."""
codes = self.df["http_status_code"].dropna().astype(int)
dist = codes.value_counts().rename_axis("status_code").reset_index(name="count")
dist["pct"] = (dist["count"] / len(self.df) * 100).round(2)
return cast(List[Dict[str, Any]], dist.to_dict(orient="records"))
[docs]
def get_error_statistics(self) -> Dict[str, int]:
"""Error message counts. Mirrors BenchmarkAnalyzer.get_error_statistics."""
errors = self.df[~self.df["completed"]]["error_message"]
return cast(Dict[str, int], errors.value_counts().to_dict())