"""Statistical analysis of DNS benchmark results."""
from dataclasses import dataclass
from typing import Any, Dict, List, cast
import numpy as np
import pandas as pd
from net_benchmark.dns_benchmark.core import DNSQueryResult, QueryStatus
[docs]
@dataclass
class ResolverStats:
"""Statistics for a single resolver."""
resolver_name: str
resolver_ip: str
total_queries: int
successful_queries: int
success_rate: float
min_latency: float
max_latency: float
avg_latency: float
median_latency: float
std_latency: float
p95_latency: float
p99_latency: float
jitter: float = 0.0
consistency_score: float = 0.0
dnssec_validated_queries: int = 0
dnssec_validation_rate: float = 0.0
[docs]
class BenchmarkAnalyzer:
"""Analyze DNS benchmark results and compute statistics."""
def __init__(self, results: List[DNSQueryResult]):
self.results = results
self.df = self._create_dataframe()
def _create_dataframe(self) -> pd.DataFrame:
"""Convert results to pandas DataFrame."""
data = []
for result in self.results:
data.append(
{
"resolver_name": result.resolver_name,
"resolver_ip": result.resolver_ip,
"domain": result.domain,
"record_type": result.record_type,
"latency_ms": result.latency_ms,
"status": result.status.value,
# True for SUCCESS only — used for success rate reporting
"success": result.status == QueryStatus.SUCCESS,
# True for SUCCESS or DNSSEC_FAILED — query completed at network
# level so latency is valid and should be included in stats.
"completed": result.status
in (
QueryStatus.SUCCESS,
QueryStatus.DNSSEC_FAILED,
),
"answers_count": len(result.answers),
"ttl": result.ttl or 0,
"error_message": result.error_message or "",
"attempt_number": result.attempt_number,
"cache_hit": result.cache_hit,
"iteration": result.iteration,
"query_id": result.query_id,
"protocol": result.protocol.value,
"dnssec_validated": result.dnssec_validated,
}
)
return pd.DataFrame(data)
[docs]
def get_resolver_statistics(self) -> List[ResolverStats]:
"""Compute comprehensive statistics per resolver."""
resolver_stats = []
for resolver_name in self.df["resolver_name"].unique():
resolver_data = self.df[self.df["resolver_name"] == resolver_name]
resolver_ip = resolver_data["resolver_ip"].iloc[0]
# Basic counts
total_queries = len(resolver_data)
successful_queries = len(resolver_data[resolver_data["completed"] == True])
success_rate = (
(successful_queries / total_queries) * 100 if total_queries > 0 else 0
)
dnssec_validated_queries = int(resolver_data["dnssec_validated"].sum())
dnssec_validation_rate = (
(dnssec_validated_queries / total_queries) * 100
if total_queries > 0
else 0.0
)
# Latency statistics (only for successful queries)
successful_latencies = resolver_data[resolver_data["completed"] == True][
"latency_ms"
]
if len(successful_latencies) > 0:
latencies_array = successful_latencies.values
min_latency = float(successful_latencies.min())
max_latency = float(successful_latencies.max())
avg_latency = float(successful_latencies.mean())
median_latency = float(successful_latencies.median())
std_latency = float(successful_latencies.std())
p95_latency = float(successful_latencies.quantile(0.95))
p99_latency = float(successful_latencies.quantile(0.99))
# Calculate jitter (variance in latency)
jitter = (
float(np.std(np.diff(latencies_array)))
if len(latencies_array) > 1
else 0.0
)
# Consistency score (inverse of coefficient of variation)
cv = std_latency / avg_latency if avg_latency > 0 else 0
consistency_score = max(0, 100 - (cv * 100))
else:
min_latency = max_latency = avg_latency = median_latency = (
std_latency
) = float("nan")
p95_latency = p99_latency = jitter = 0.0
consistency_score = 0.0
stats = ResolverStats(
resolver_name=resolver_name,
resolver_ip=resolver_ip,
total_queries=total_queries,
successful_queries=successful_queries,
success_rate=success_rate,
min_latency=min_latency,
max_latency=max_latency,
avg_latency=avg_latency,
median_latency=median_latency,
std_latency=std_latency,
p95_latency=p95_latency,
p99_latency=p99_latency,
jitter=jitter,
consistency_score=consistency_score,
dnssec_validated_queries=dnssec_validated_queries,
dnssec_validation_rate=dnssec_validation_rate,
)
resolver_stats.append(stats)
return resolver_stats
[docs]
def get_overall_statistics(self) -> Dict[str, Any]:
"""Get overall benchmark statistics."""
total_queries = len(self.df)
successful_queries = len(self.df[self.df["completed"] == True])
overall_success_rate = (
(successful_queries / total_queries) * 100 if total_queries > 0 else 0
)
successful_latencies = self.df[self.df["completed"] == True]["latency_ms"]
if len(successful_latencies) > 0:
overall_avg_latency = float(successful_latencies.mean())
overall_median_latency = float(successful_latencies.median())
overall_std_latency = float(successful_latencies.std())
else:
overall_avg_latency = overall_median_latency = overall_std_latency = 0.0
# Rank resolvers by average latency
resolver_stats = self.get_resolver_statistics()
ranked_resolvers = sorted(
[r for r in resolver_stats if r.successful_queries > 0],
key=lambda x: x.avg_latency,
)
return {
"total_queries": total_queries,
"successful_queries": successful_queries,
"overall_success_rate": overall_success_rate,
"overall_avg_latency": overall_avg_latency,
"overall_median_latency": overall_median_latency,
"overall_std_latency": overall_std_latency,
"fastest_resolver": (
ranked_resolvers[0].resolver_name if ranked_resolvers else "N/A"
),
"slowest_resolver": (
ranked_resolvers[-1].resolver_name if ranked_resolvers else "N/A"
),
"resolver_count": len(resolver_stats),
"domain_count": len(self.df["domain"].unique()),
"record_types": list(self.df["record_type"].unique()),
"protocols_used": list(self.df["protocol"].unique()),
"dnssec_validated_queries": int(self.df["dnssec_validated"].sum()),
"dnssec_validation_rate": (
float(self.df["dnssec_validated"].sum() / total_queries * 100)
if total_queries > 0
else 0.0
),
}
[docs]
def get_domain_statistics(self) -> List[Dict[str, Any]]:
"""Compute statistics per domain across all resolvers."""
domain_stats: List[Dict[str, Any]] = []
for domain in self.df["domain"].unique():
dd = self.df[self.df["domain"] == domain]
total = len(dd)
success = len(dd[dd["completed"] == True])
rate = (success / total) * 100 if total > 0 else 0.0
latencies = dd[dd["completed"] == True]["latency_ms"]
# Find fastest and slowest resolvers for this domain
if len(latencies) > 0:
fastest_idx = dd[dd["completed"] == True]["latency_ms"].idxmin()
slowest_idx = dd[dd["completed"] == True]["latency_ms"].idxmax()
fastest_resolver = dd.loc[fastest_idx, "resolver_name"]
slowest_resolver = dd.loc[slowest_idx, "resolver_name"]
else:
fastest_resolver = slowest_resolver = "N/A"
stats = {
"domain": domain,
"total_queries": total,
"successful_queries": success,
"success_rate": rate,
"min_latency": float(latencies.min()) if len(latencies) else 0.0,
"avg_latency": float(latencies.mean()) if len(latencies) else 0.0,
"median_latency": float(latencies.median()) if len(latencies) else 0.0,
"max_latency": float(latencies.max()) if len(latencies) else 0.0,
"p95_latency": (
float(latencies.quantile(0.95)) if len(latencies) else 0.0
),
"fastest_resolver": fastest_resolver,
"slowest_resolver": slowest_resolver,
}
domain_stats.append(stats)
return domain_stats
[docs]
def get_record_type_statistics(self) -> List[Dict[str, Any]]:
"""Compute statistics per DNS record type across all resolvers/domains."""
rt_stats: List[Dict[str, Any]] = []
for rt in self.df["record_type"].unique():
rt_df = self.df[self.df["record_type"] == rt]
total = len(rt_df)
success = len(rt_df[rt_df["completed"] == True])
rate = (success / total) * 100 if total > 0 else 0.0
latencies = rt_df[rt_df["completed"] == True]["latency_ms"]
rt_stats.append(
{
"record_type": rt,
"total_queries": total,
"successful_queries": success,
"success_rate": rate,
"avg_latency": float(latencies.mean()) if len(latencies) else 0.0,
"p95_latency": (
float(latencies.quantile(0.95)) if len(latencies) else 0.0
),
}
)
return rt_stats
[docs]
def get_error_statistics(self) -> Dict[str, int]:
"""Count errors by message across all failed queries."""
errors = self.df[self.df["success"] == False]["error_message"]
return cast(Dict[str, int], errors.value_counts().to_dict())
[docs]
def get_protocol_statistics(self) -> List[Dict[str, Any]]:
"""Compute statistics broken down by protocol (plain/doh/dot)."""
proto_stats: List[Dict[str, Any]] = []
for proto in self.df["protocol"].unique():
proto_df = self.df[self.df["protocol"] == proto]
total = len(proto_df)
success = int(proto_df["completed"].sum())
rate = (success / total) * 100 if total > 0 else 0.0
latencies = proto_df[proto_df["completed"] == True]["latency_ms"]
dnssec_validated = int(proto_df["dnssec_validated"].sum())
proto_stats.append(
{
"protocol": proto,
"total_queries": total,
"successful_queries": success,
"success_rate": rate,
"avg_latency": float(latencies.mean()) if len(latencies) else 0.0,
"median_latency": (
float(latencies.median()) if len(latencies) else 0.0
),
"p95_latency": (
float(latencies.quantile(0.95)) if len(latencies) else 0.0
),
"dnssec_validated_queries": dnssec_validated,
"dnssec_validation_rate": (
(dnssec_validated / total * 100) if total > 0 else 0.0
),
}
)
return proto_stats
[docs]
def get_dnssec_statistics(self) -> List[Dict[str, Any]]:
"""DNSSEC validation breakdown per resolver + protocol combination."""
dnssec_stats: List[Dict[str, Any]] = []
for resolver_name in self.df["resolver_name"].unique():
resolver_df = self.df[self.df["resolver_name"] == resolver_name]
for proto in resolver_df["protocol"].unique():
proto_df = resolver_df[resolver_df["protocol"] == proto]
total = len(proto_df)
validated = int(proto_df["dnssec_validated"].sum())
dnssec_stats.append(
{
"resolver_name": resolver_name,
"resolver_ip": proto_df["resolver_ip"].iloc[0],
"protocol": proto,
"total_queries": total,
"dnssec_validated_queries": validated,
"dnssec_validation_rate": (
(validated / total * 100) if total > 0 else 0.0
),
# True only if ALL queries for this resolver+protocol validated
"fully_validated": validated == total,
}
)
return dnssec_stats