"""Core DNS benchmarking functionality."""
import asyncio
import ipaddress
import json
import ssl
import struct
import time
import uuid
from collections import defaultdict
from dataclasses import asdict, dataclass, field
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple, cast
import click
import dns.asyncresolver
import dns.exception
import dns.flags
import dns.message
import dns.name
import dns.rdatatype
import httpx
import idna
from net_benchmark.utils.messages import error, warning
[docs]
class QueryStatus(Enum):
SUCCESS = "success"
TIMEOUT = "timeout"
NXDOMAIN = "nxdomain"
SERVFAIL = "servfail"
CONNECTION_REFUSED = "connection_refused"
UNKNOWN_ERROR = "unknown_error"
DNSSEC_FAILED = "dnssec_failed"
TLS_ERROR = "tls_error"
[docs]
class QueryProtocol(Enum):
PLAIN = "plain" # traditional DNS over UDP/TCP, dnspython will handle protocol selection and fallback
DOH = "doh"
DOT = "dot"
[docs]
@dataclass
class DNSQueryResult:
"""Result of a single DNS query."""
resolver_ip: str
resolver_name: str
domain: str
record_type: str
start_time: float
end_time: float
latency_ms: float
status: QueryStatus
answers: List[str]
ttl: Optional[int]
error_message: Optional[str] = None
attempt_number: int = 1
cache_hit: bool = False
dnssec_validated: bool = False
protocol: QueryProtocol = QueryProtocol.PLAIN
iteration: int = 1 # which iteration this query belongs to
query_id: str = field(default_factory=lambda: uuid.uuid4().hex[:8])
[docs]
def to_dict(self) -> Dict[str, Any]:
d = asdict(self)
d["status"] = self.status.value
d["protocol"] = self.protocol.value
return d
[docs]
class DNSQueryEngine:
"""Async DNS query engine with rate limiting and retry logic."""
def __init__(
self,
max_concurrent_queries: int = 100,
timeout: float = 5.0,
max_retries: int = 2,
enable_cache: bool = False,
retry_backoff_multiplier: float = 0.1,
retry_backoff_base: float = 2.0,
enable_dnssec: bool = False,
enforce_dnssec: bool = False, # True when --dnssec-validate passed
) -> None:
self.max_concurrent_queries = max_concurrent_queries
self.timeout = timeout
self.max_retries = max_retries
# lazy-init async primitives to avoid creating them outside an event loop
self.semaphore: Optional[asyncio.Semaphore] = None
self._lock: Optional[asyncio.Lock] = None
self.progress_callback: Optional[Callable[[int, int], None]] = None
self.query_counter = 0
self.total_queries = 0
self.enable_cache = enable_cache
self.cache: Dict[str, DNSQueryResult] = {}
self.retry_backoff_multiplier = retry_backoff_multiplier
self.retry_backoff_base = retry_backoff_base
self.failed_resolvers: Dict[str, int] = defaultdict(int)
self.enable_dnssec = enable_dnssec
self.enforce_dnssec = enforce_dnssec
# Shared DoH clients and DoT connections, one per resolver IP.
# Reusing these avoids repeated TLS handshakes — biggest latency win
# for encrypted protocols. Cleaned up via engine.close().
# NOT thread-safe — safe only because asyncio is single-threaded.
# Do not access from threads without adding locks.
self._doh_clients: Dict[str, httpx.AsyncClient] = {}
self._dot_connections: Dict[
str, Tuple[asyncio.StreamReader, asyncio.StreamWriter]
] = {}
[docs]
def set_progress_callback(self, callback: Callable[[int, int], None]) -> None:
"""Set callback for progress updates with completed/total counts."""
self.progress_callback = callback
def _get_cache_key(self, resolver_ip: str, domain: str, record_type: str) -> str:
"""Generate cache key for query."""
return f"{resolver_ip}:{domain}:{record_type}"
def _validate_resolver(self, resolver: Dict[str, str]) -> None:
"""Validate resolver configuration."""
if "ip" not in resolver:
raise ValueError(f"Resolver missing 'ip' key: {resolver}")
if "name" not in resolver:
raise ValueError(f"Resolver missing 'name' key: {resolver}")
async def _ensure_async_primitives(self) -> None:
"""Create asyncio primitives when running inside an event loop."""
if self.semaphore is None:
self.semaphore = asyncio.Semaphore(self.max_concurrent_queries)
if self._lock is None:
self._lock = asyncio.Lock()
async def _update_progress(self) -> None:
"""Thread-safe progress update."""
await self._ensure_async_primitives()
assert self._lock is not None
async with self._lock:
self.query_counter += 1
if self.progress_callback:
self.progress_callback(self.query_counter, self.total_queries)
async def _get_doh_client(self, resolver_ip: str) -> httpx.AsyncClient:
"""Return cached AsyncClient for this resolver, creating if needed."""
if resolver_ip not in self._doh_clients:
self._doh_clients[resolver_ip] = httpx.AsyncClient(
http2=True,
timeout=self.timeout,
verify=True,
)
return self._doh_clients[resolver_ip]
async def _get_dot_connection(
self,
resolver_ip: str,
port: int = 853,
) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]:
"""Return a cached DoT connection for this resolver, creating if needed.
If the cached connection is dead (writer closing), it is evicted and
a fresh connection is opened. This avoids repeated TLS handshakes
across queries to the same resolver.
"""
existing = self._dot_connections.get(resolver_ip)
if existing:
reader, writer = existing
if not writer.is_closing():
return reader, writer
# Dead connection — evict and fall through to reconnect
del self._dot_connections[resolver_ip]
ssl_ctx = ssl.create_default_context()
ssl_ctx.verify_mode = ssl.CERT_REQUIRED
ssl_ctx.check_hostname = True
reader, writer = await asyncio.wait_for(
asyncio.open_connection(resolver_ip, port, ssl=ssl_ctx),
timeout=self.timeout,
)
self._dot_connections[resolver_ip] = (reader, writer)
return reader, writer
[docs]
async def close(self) -> None:
"""Close all shared DoH clients and DoT connections.
Must be awaited after run_benchmark completes — especially important
in FastAPI where connections are reused across requests.
"""
for client in self._doh_clients.values():
await client.aclose()
self._doh_clients.clear()
for _reader, writer in self._dot_connections.values():
writer.close()
try:
await writer.wait_closed()
except Exception:
pass
self._dot_connections.clear()
[docs]
async def query_single(
self,
resolver_ip: str,
resolver_name: str,
domain: str,
record_type: str = "A",
use_cache: bool = True,
iteration: int = 1,
) -> DNSQueryResult:
"""Execute a single DNS query with retry logic and caching."""
await self._ensure_async_primitives()
assert self.semaphore is not None
# Check cache if enabled
if self.enable_cache and use_cache:
cache_key = self._get_cache_key(resolver_ip, domain, record_type)
if cache_key in self.cache:
cached_result = self.cache[cache_key]
# Create new instance preserving original data but marking as cache hit
result = DNSQueryResult(
**{
**asdict(cached_result),
"cache_hit": True,
"iteration": iteration,
}
)
await self._update_progress()
return result
start_time = time.time() # fallback; overwritten inside semaphore per attempt
for attempt in range(self.max_retries + 1):
try:
async with self.semaphore:
start_time = time.time()
resolver = dns.asyncresolver.Resolver()
resolver.nameservers = [resolver_ip]
resolver.timeout = self.timeout
resolver.lifetime = self.timeout
if self.enable_dnssec:
resolver.use_edns(0, dns.flags.DO, 1232)
response = await resolver.resolve(
domain, record_type, raise_on_no_answer=False
)
end_time = time.time()
latency_ms = (end_time - start_time) * 1000
answers = (
[str(rdata) for rdata in response.rrset]
if response.rrset
else []
)
ttl = response.rrset.ttl if response.rrset else None
# DNSSEC: always read AD flag, enforce only if requested
ad_flag = False
try:
ad_flag = bool(response.response.flags & dns.flags.AD)
except AttributeError:
pass
dnssec_status = QueryStatus.SUCCESS
if self.enforce_dnssec and not ad_flag:
dnssec_status = QueryStatus.DNSSEC_FAILED
result = DNSQueryResult(
resolver_ip=resolver_ip,
resolver_name=resolver_name,
domain=domain,
record_type=record_type,
start_time=start_time,
end_time=end_time,
latency_ms=latency_ms,
status=dnssec_status,
answers=answers,
ttl=ttl,
attempt_number=attempt + 1,
cache_hit=False,
iteration=iteration,
dnssec_validated=ad_flag,
protocol=QueryProtocol.PLAIN,
)
# Cache successful result
if self.enable_cache:
cache_key = self._get_cache_key(
resolver_ip, domain, record_type
)
self.cache[cache_key] = result
await self._update_progress()
return result
except dns.exception.Timeout:
if attempt == self.max_retries:
end_time = time.time()
assert self._lock is not None
async with self._lock:
self.failed_resolvers[resolver_ip] += 1
result = DNSQueryResult(
resolver_ip=resolver_ip,
resolver_name=resolver_name,
domain=domain,
record_type=record_type,
start_time=start_time,
end_time=end_time,
latency_ms=(end_time - start_time) * 1000,
status=QueryStatus.TIMEOUT,
answers=[],
ttl=None,
error_message="Query timeout",
attempt_number=attempt + 1,
cache_hit=False,
iteration=iteration,
)
await self._update_progress()
return result
# Exponential backoff with configurable base
await asyncio.sleep(
self.retry_backoff_base**attempt * self.retry_backoff_multiplier
)
except dns.resolver.NXDOMAIN:
# NXDOMAIN is a definitive answer, no retry needed
end_time = time.time()
result = DNSQueryResult(
resolver_ip=resolver_ip,
resolver_name=resolver_name,
domain=domain,
record_type=record_type,
start_time=start_time,
end_time=end_time,
latency_ms=(end_time - start_time) * 1000,
status=QueryStatus.NXDOMAIN,
answers=[],
ttl=None,
error_message="Non-existent domain",
attempt_number=attempt + 1,
cache_hit=False,
iteration=iteration,
)
await self._update_progress()
return result
except dns.resolver.NoAnswer:
# Blocked/sinkholed domains (e.g. 0.0.0.0 from AdGuard/Pi-hole)
# return NOERROR with an empty rrset. This is a valid fast response,
# not a failure — do not retry.
end_time = time.time()
result = DNSQueryResult(
resolver_ip=resolver_ip,
resolver_name=resolver_name,
domain=domain,
record_type=record_type,
start_time=start_time,
end_time=end_time,
latency_ms=(end_time - start_time) * 1000,
status=QueryStatus.SUCCESS,
answers=[],
ttl=None,
attempt_number=attempt + 1,
cache_hit=False,
iteration=iteration,
)
await self._update_progress()
return result
except dns.resolver.NoNameservers:
# Server failure, no retry needed
end_time = time.time()
assert self._lock is not None
async with self._lock:
self.failed_resolvers[resolver_ip] += 1
result = DNSQueryResult(
resolver_ip=resolver_ip,
resolver_name=resolver_name,
domain=domain,
record_type=record_type,
start_time=start_time,
end_time=end_time,
latency_ms=(end_time - start_time) * 1000,
status=QueryStatus.SERVFAIL,
answers=[],
ttl=None,
error_message="Server failure",
attempt_number=attempt + 1,
cache_hit=False,
iteration=iteration,
)
await self._update_progress()
return result
except Exception as e:
if attempt == self.max_retries:
end_time = time.time()
error_status = QueryStatus.UNKNOWN_ERROR
if "refused" in str(e).lower():
error_status = QueryStatus.CONNECTION_REFUSED
assert self._lock is not None # mypy now knows _lock is Lock
async with self._lock:
self.failed_resolvers[resolver_ip] += 1
result = DNSQueryResult(
resolver_ip=resolver_ip,
resolver_name=resolver_name,
domain=domain,
record_type=record_type,
start_time=start_time,
end_time=end_time,
latency_ms=(end_time - start_time) * 1000,
status=error_status,
answers=[],
ttl=None,
error_message=str(e),
attempt_number=attempt + 1,
cache_hit=False,
iteration=iteration,
)
await self._update_progress()
return result
# Exponential backoff with configurable base
await asyncio.sleep(
self.retry_backoff_base**attempt * self.retry_backoff_multiplier
)
# This should never be reached due to loop logic, but provide fallback
# for safety
end_time = time.time()
result = DNSQueryResult(
resolver_ip=resolver_ip,
resolver_name=resolver_name,
domain=domain,
record_type=record_type,
start_time=start_time,
end_time=end_time,
latency_ms=(end_time - start_time) * 1000,
status=QueryStatus.UNKNOWN_ERROR,
answers=[],
ttl=None,
error_message="Unexpected error: exhausted all retries without return",
cache_hit=False,
iteration=iteration,
)
await self._update_progress()
return result
[docs]
async def query_single_doh(
self,
resolver_ip: str,
resolver_name: str,
domain: str,
doh_url: str,
record_type: str = "A",
iteration: int = 1,
) -> DNSQueryResult:
"""Execute a single DNS-over-HTTPS query."""
await self._ensure_async_primitives()
assert self.semaphore is not None
start_time = time.time()
client = await self._get_doh_client(resolver_ip)
for attempt in range(self.max_retries + 1):
try:
async with self.semaphore:
start_time = time.time()
qname = dns.name.from_text(domain)
rdtype = dns.rdatatype.from_text(record_type)
request = dns.message.make_query(qname, rdtype)
if self.enable_dnssec:
request.use_edns(ednsflags=dns.flags.DO)
wire = request.to_wire()
response_raw = await client.post(
doh_url,
content=wire,
headers={
"Content-Type": "application/dns-message",
"Accept": "application/dns-message",
},
)
response_raw.raise_for_status()
end_time = time.time()
latency_ms = (end_time - start_time) * 1000
dns_response = dns.message.from_wire(response_raw.content)
answers = [
str(rdata) for rrset in dns_response.answer for rdata in rrset
]
ttl = dns_response.answer[0].ttl if dns_response.answer else None
ad_flag = bool(dns_response.flags & dns.flags.AD)
dnssec_status = QueryStatus.SUCCESS
if self.enforce_dnssec and not ad_flag:
dnssec_status = QueryStatus.DNSSEC_FAILED
result = DNSQueryResult(
resolver_ip=resolver_ip,
resolver_name=resolver_name,
domain=domain,
record_type=record_type,
start_time=start_time,
end_time=end_time,
latency_ms=latency_ms,
status=dnssec_status,
answers=answers,
ttl=ttl,
attempt_number=attempt + 1,
cache_hit=False,
iteration=iteration,
dnssec_validated=ad_flag,
protocol=QueryProtocol.DOH,
)
await self._update_progress()
return result
except httpx.TimeoutException:
if attempt == self.max_retries:
end_time = time.time()
async with self._lock: # type: ignore[union-attr]
self.failed_resolvers[resolver_ip] += 1
result = DNSQueryResult(
resolver_ip=resolver_ip,
resolver_name=resolver_name,
domain=domain,
record_type=record_type,
start_time=start_time,
end_time=end_time,
latency_ms=(end_time - start_time) * 1000,
status=QueryStatus.TIMEOUT,
answers=[],
ttl=None,
error_message="DoH timeout",
attempt_number=attempt + 1,
cache_hit=False,
iteration=iteration,
protocol=QueryProtocol.DOH,
)
await self._update_progress()
return result
await asyncio.sleep(
self.retry_backoff_base**attempt * self.retry_backoff_multiplier
)
except httpx.HTTPStatusError as e:
if attempt == self.max_retries:
end_time = time.time()
async with self._lock: # type: ignore[union-attr]
self.failed_resolvers[resolver_ip] += 1
result = DNSQueryResult(
resolver_ip=resolver_ip,
resolver_name=resolver_name,
domain=domain,
record_type=record_type,
start_time=start_time,
end_time=end_time,
latency_ms=(end_time - start_time) * 1000,
status=QueryStatus.SERVFAIL,
answers=[],
ttl=None,
error_message=f"HTTP {e.response.status_code}",
attempt_number=attempt + 1,
cache_hit=False,
iteration=iteration,
protocol=QueryProtocol.DOH,
)
await self._update_progress()
return result
await asyncio.sleep(
self.retry_backoff_base**attempt * self.retry_backoff_multiplier
)
except Exception as e:
if attempt == self.max_retries:
end_time = time.time()
async with self._lock: # type: ignore[union-attr]
self.failed_resolvers[resolver_ip] += 1
result = DNSQueryResult(
resolver_ip=resolver_ip,
resolver_name=resolver_name,
domain=domain,
record_type=record_type,
start_time=start_time,
end_time=end_time,
latency_ms=(end_time - start_time) * 1000,
status=QueryStatus.UNKNOWN_ERROR,
answers=[],
ttl=None,
error_message=str(e),
attempt_number=attempt + 1,
cache_hit=False,
iteration=iteration,
protocol=QueryProtocol.DOH,
)
await self._update_progress()
return result
await asyncio.sleep(
self.retry_backoff_base**attempt * self.retry_backoff_multiplier
)
# unreachable fallback
return DNSQueryResult(
resolver_ip=resolver_ip,
resolver_name=resolver_name,
domain=domain,
record_type=record_type,
start_time=start_time,
end_time=time.time(),
latency_ms=0.0,
status=QueryStatus.UNKNOWN_ERROR,
answers=[],
ttl=None,
error_message="Exhausted retries",
cache_hit=False,
iteration=iteration,
protocol=QueryProtocol.DOH,
)
[docs]
async def query_single_dot(
self,
resolver_ip: str,
resolver_name: str,
domain: str,
record_type: str = "A",
port: int = 853,
iteration: int = 1,
) -> DNSQueryResult:
"""Execute a single DNS-over-TLS query.
Reuses a pooled TLS connection per resolver to avoid handshake overhead
on every query. Connection is evicted from the pool on any error so the
next query gets a fresh connection.
"""
await self._ensure_async_primitives()
assert self.semaphore is not None
start_time = time.time()
for attempt in range(self.max_retries + 1):
try:
async with self.semaphore:
start_time = time.time()
qname = dns.name.from_text(domain)
rdtype = dns.rdatatype.from_text(record_type)
request = dns.message.make_query(qname, rdtype)
if self.enable_dnssec:
request.use_edns(ednsflags=dns.flags.DO)
wire = request.to_wire()
# 2-byte length prefix required by RFC 7858
prefixed = struct.pack("!H", len(wire)) + wire
# Reuse pooled connection — no TLS handshake if already open
reader, writer = await self._get_dot_connection(resolver_ip, port)
writer.write(prefixed)
await writer.drain()
# Read 2-byte length prefix then full message
raw_len = await asyncio.wait_for(
reader.readexactly(2), timeout=self.timeout
)
msg_len = struct.unpack("!H", raw_len)[0]
raw_msg = await asyncio.wait_for(
reader.readexactly(msg_len), timeout=self.timeout
)
# Do NOT close writer — connection is pooled and reused
end_time = time.time()
latency_ms = (end_time - start_time) * 1000
dns_response = dns.message.from_wire(raw_msg)
answers = [
str(rdata) for rrset in dns_response.answer for rdata in rrset
]
ttl = dns_response.answer[0].ttl if dns_response.answer else None
ad_flag = bool(dns_response.flags & dns.flags.AD)
dnssec_status = QueryStatus.SUCCESS
if self.enforce_dnssec and not ad_flag:
dnssec_status = QueryStatus.DNSSEC_FAILED
result = DNSQueryResult(
resolver_ip=resolver_ip,
resolver_name=resolver_name,
domain=domain,
record_type=record_type,
start_time=start_time,
end_time=end_time,
latency_ms=latency_ms,
status=dnssec_status,
answers=answers,
ttl=ttl,
attempt_number=attempt + 1,
cache_hit=False,
iteration=iteration,
dnssec_validated=ad_flag,
protocol=QueryProtocol.DOT,
)
await self._update_progress()
return result
except asyncio.TimeoutError:
# Evict connection — may be in a bad state after timeout
self._dot_connections.pop(resolver_ip, None)
if attempt == self.max_retries:
end_time = time.time()
async with self._lock: # type: ignore[union-attr]
self.failed_resolvers[resolver_ip] += 1
result = DNSQueryResult(
resolver_ip=resolver_ip,
resolver_name=resolver_name,
domain=domain,
record_type=record_type,
start_time=start_time,
end_time=end_time,
latency_ms=(end_time - start_time) * 1000,
status=QueryStatus.TIMEOUT,
answers=[],
ttl=None,
error_message="DoT timeout",
attempt_number=attempt + 1,
cache_hit=False,
iteration=iteration,
protocol=QueryProtocol.DOT,
)
await self._update_progress()
return result
await asyncio.sleep(
self.retry_backoff_base**attempt * self.retry_backoff_multiplier
)
except ssl.SSLError as e:
# SSL errors are not retryable — evict and return immediately
self._dot_connections.pop(resolver_ip, None)
end_time = time.time()
async with self._lock: # type: ignore[union-attr]
self.failed_resolvers[resolver_ip] += 1
result = DNSQueryResult(
resolver_ip=resolver_ip,
resolver_name=resolver_name,
domain=domain,
record_type=record_type,
start_time=start_time,
end_time=end_time,
latency_ms=(end_time - start_time) * 1000,
status=QueryStatus.TLS_ERROR,
answers=[],
ttl=None,
error_message=f"TLS error: {e}",
attempt_number=attempt + 1,
cache_hit=False,
iteration=iteration,
protocol=QueryProtocol.DOT,
)
await self._update_progress()
return result
except Exception as e:
# Evict connection on any unknown error before retrying
self._dot_connections.pop(resolver_ip, None)
if attempt == self.max_retries:
end_time = time.time()
async with self._lock: # type: ignore[union-attr]
self.failed_resolvers[resolver_ip] += 1
result = DNSQueryResult(
resolver_ip=resolver_ip,
resolver_name=resolver_name,
domain=domain,
record_type=record_type,
start_time=start_time,
end_time=end_time,
latency_ms=(end_time - start_time) * 1000,
status=QueryStatus.UNKNOWN_ERROR,
answers=[],
ttl=None,
error_message=str(e),
attempt_number=attempt + 1,
cache_hit=False,
iteration=iteration,
protocol=QueryProtocol.DOT,
)
await self._update_progress()
return result
await asyncio.sleep(
self.retry_backoff_base**attempt * self.retry_backoff_multiplier
)
# unreachable fallback
return DNSQueryResult(
resolver_ip=resolver_ip,
resolver_name=resolver_name,
domain=domain,
record_type=record_type,
start_time=start_time,
end_time=time.time(),
latency_ms=0.0,
status=QueryStatus.UNKNOWN_ERROR,
answers=[],
ttl=None,
error_message="Exhausted retries",
cache_hit=False,
iteration=iteration,
protocol=QueryProtocol.DOT,
)
[docs]
async def run_benchmark(
self,
resolvers: List[Dict[str, str]],
domains: List[str],
record_types: Optional[List[str]] = None,
iterations: int = 1,
warmup: bool = False,
warmup_fast: bool = False,
use_cache: bool = False,
protocol: QueryProtocol = QueryProtocol.PLAIN,
doh_urls: Optional[Dict[str, str]] = None, # resolver_ip -> doh_url
) -> List[DNSQueryResult]:
"""Run benchmark across all resolvers and domains.
Args:
resolvers: List of resolver dicts with 'ip' and 'name' keys
domains: List of domain names to query
record_types: List of DNS record types (default: ["A"])
iterations: Number of times to repeat the benchmark
warmup: Run full warmup (all resolvers × all domains × all record types)
warmup_fast: Run fast warmup (one probe per resolver, overrides warmup)
use_cache: Allow cache usage across iterations
Returns:
List of DNSQueryResult objects
"""
# Validate resolvers
for resolver in resolvers:
self._validate_resolver(resolver)
if not record_types:
record_types = ["A"]
# Warmup uses same protocol as benchmark so connection overhead is
# representative. warmup_fast takes precedence over warmup.
if warmup_fast:
warmup_results = await self._run_fast_warmup(resolvers, protocol, doh_urls)
elif warmup:
warmup_results = await self._run_warmup(
resolvers, domains, record_types, protocol, doh_urls
)
else:
warmup_results = []
# Report warmup failures
for r in warmup_results:
if r.status != QueryStatus.SUCCESS:
click.echo(
warning(
f"Warmup failed: {r.resolver_name} ({r.resolver_ip}) → {r.status.value}"
)
)
# Reset counters after warmup so progress tracks benchmark queries only
self.query_counter = 0
self.total_queries = (
len(resolvers) * len(domains) * len(record_types) * iterations
)
tasks = []
for iteration in range(iterations):
for resolver in resolvers:
for domain in domains:
for record_type in record_types:
if protocol == QueryProtocol.DOH:
url = (doh_urls or {}).get(resolver["ip"], "")
if not url:
click.echo(
error(
f"No DoH URL configured for resolver {resolver['ip']} ({resolver['name']})"
)
)
task = self.query_single_doh(
resolver_ip=resolver["ip"],
resolver_name=resolver["name"],
domain=domain,
doh_url=url,
record_type=record_type,
iteration=iteration + 1,
)
elif protocol == QueryProtocol.DOT:
task = self.query_single_dot(
resolver_ip=resolver["ip"],
resolver_name=resolver["name"],
domain=domain,
record_type=record_type,
iteration=iteration + 1,
)
else:
task = self.query_single(
resolver_ip=resolver["ip"],
resolver_name=resolver["name"],
domain=domain,
record_type=record_type,
use_cache=use_cache,
iteration=iteration + 1,
)
tasks.append(task)
results = await asyncio.gather(*tasks)
return list(results)
async def _run_warmup(
self,
resolvers: List[Dict[str, str]],
domains: List[str],
record_types: List[str],
protocol: QueryProtocol = QueryProtocol.PLAIN,
doh_urls: Optional[Dict[str, str]] = None,
) -> List[DNSQueryResult]:
"""Run full warmup queries (all combinations).
Does not update progress counters or cache results.
"""
tasks = []
for resolver in resolvers:
for domain in domains:
for record_type in record_types:
if protocol == QueryProtocol.DOH:
url = (doh_urls or {}).get(resolver["ip"], "")
task = self.query_single_doh(
resolver_ip=resolver["ip"],
resolver_name=resolver["name"],
domain=domain,
doh_url=url,
record_type=record_type,
iteration=0, # Mark as warmup
)
elif protocol == QueryProtocol.DOT:
task = self.query_single_dot(
resolver_ip=resolver["ip"],
resolver_name=resolver["name"],
domain=domain,
record_type=record_type,
iteration=0,
)
else:
task = self.query_single(
resolver_ip=resolver["ip"],
resolver_name=resolver["name"],
domain=domain,
record_type=record_type,
use_cache=False,
iteration=0,
)
tasks.append(task)
return await asyncio.gather(*tasks)
async def _run_fast_warmup(
self,
resolvers: List[Dict[str, str]],
protocol: QueryProtocol = QueryProtocol.PLAIN,
doh_urls: Optional[Dict[str, str]] = None,
probe_domain: str = "example.com",
record_type: str = "A",
) -> List[DNSQueryResult]:
"""Lightweight warmup: one query per resolver.
Uses a known-good domain to verify resolver connectivity.
Respects the active protocol so warmup overhead matches benchmark overhead.
Does not update progress counters or cache results.
"""
tasks = []
for r in resolvers:
if protocol == QueryProtocol.DOH:
url = (doh_urls or {}).get(r["ip"], "")
task = self.query_single_doh(
resolver_ip=r["ip"],
resolver_name=r["name"],
domain=probe_domain,
doh_url=url,
record_type=record_type,
iteration=0, # Mark as warmup
)
elif protocol == QueryProtocol.DOT:
task = self.query_single_dot(
resolver_ip=r["ip"],
resolver_name=r["name"],
domain=probe_domain,
record_type=record_type,
iteration=0,
)
else:
task = self.query_single(
resolver_ip=r["ip"],
resolver_name=r["name"],
domain=probe_domain,
record_type=record_type,
use_cache=False,
iteration=0,
)
tasks.append(task)
return await asyncio.gather(*tasks)
[docs]
def clear_cache(self) -> None:
"""Clear the query cache."""
self.cache.clear()
[docs]
def get_cache_stats(self) -> Dict[str, int]:
"""Get cache statistics."""
return {
"cached_entries": len(self.cache),
"cache_enabled": self.enable_cache,
}
[docs]
def get_failed_resolvers(self) -> Dict[str, int]:
"""Get resolvers with failure counts."""
return dict(self.failed_resolvers)
[docs]
class ResolverManager:
"""Manage DNS resolver configurations with comprehensive database."""
# Comprehensive resolver database
RESOLVERS_DATABASE = [
{
"name": "Cloudflare",
"provider": "Cloudflare",
"ip": "1.1.1.1",
"ipv6": "2606:4700:4700::1111",
"type": "public",
"category": "privacy",
"features": ["DNSSEC", "Filtering", "Anycast", "DoH", "DoT"],
"description": "Fast privacy-focused DNS with malware protection",
"country": "Global",
"doh_url": "https://cloudflare-dns.com/dns-query",
},
{
"name": "Cloudflare Family",
"provider": "Cloudflare",
"ip": "1.1.1.3",
"ipv6": "2606:4700:4700::1113",
"type": "public",
"category": "family",
"features": ["Malware Blocking", "Adult Content Blocking", "DNSSEC"],
"description": "Family-friendly DNS with malware and adult content blocking",
"country": "Global",
"doh_url": "https://family.cloudflare-dns.com/dns-query",
},
{
"name": "Google",
"provider": "Google",
"ip": "8.8.8.8",
"ipv6": "2001:4860:4860::8888",
"type": "public",
"category": "performance",
"features": ["Anycast", "Global Infrastructure", "DoH"],
"description": "Google's public DNS with global anycast network",
"country": "Global",
"doh_url": "https://dns.google/dns-query",
},
{
"name": "Quad9",
"provider": "Quad9",
"ip": "9.9.9.9",
"ipv6": "2620:fe::fe",
"type": "public",
"category": "security",
"features": ["Malware Blocking", "Phishing Protection", "DNSSEC"],
"description": "Security-focused DNS with threat intelligence",
"country": "Global",
"doh_url": "https://dns.quad9.net/dns-query",
},
{
"name": "OpenDNS",
"provider": "Cisco",
"ip": "208.67.222.222",
"ipv6": "2620:119:35::35",
"type": "public",
"category": "security",
"features": ["Content Filtering", "Phishing Protection", "Customizable"],
"description": "Cisco's secure DNS with content filtering",
"country": "Global",
"doh_url": "https://doh.opendns.com/dns-query",
},
{
"name": "OpenDNS Family",
"provider": "Cisco",
"ip": "208.67.222.123",
"ipv6": "2620:119:35::123",
"type": "public",
"category": "family",
"features": ["Adult Content Blocking", "Malware Protection"],
"description": "FamilyShield with pre-configured adult content blocking",
"country": "Global",
"doh_url": "https://doh.familyshield.opendns.com/dns-query",
},
{
"name": "AdGuard",
"provider": "AdGuard",
"ip": "94.140.14.14",
"ipv6": "2a10:50c0::ad1:ff",
"type": "public",
"category": "privacy",
"features": ["Ad Blocking", "Tracker Blocking", "Malware Protection"],
"description": "Privacy-focused DNS with ad and tracker blocking",
"country": "Cyprus",
"doh_url": "https://dns.adguard.com/dns-query",
},
{
"name": "AdGuard Family",
"provider": "AdGuard",
"ip": "94.140.14.15",
"ipv6": "2a10:50c0::ad2:ff",
"type": "public",
"category": "family",
"features": ["Ad Blocking", "Adult Content Blocking", "Safe Search"],
"description": "Family protection with ad blocking and safe search",
"country": "Cyprus",
"doh_url": "https://dns-family.adguard.com/dns-query",
},
{
"name": "CleanBrowsing",
"provider": "CleanBrowsing",
"ip": "185.228.168.9",
"ipv6": "2a0d:2a00:1::",
"type": "public",
"category": "family",
"features": ["Adult Content Blocking", "Safe Search", "Malware Protection"],
"description": "Content filtering DNS for families",
"country": "USA",
"doh_url": "https://doh.cleanbrowsing.org/doh/family-filter/",
},
{
"name": "Yandex",
"provider": "Yandex",
"ip": "77.88.8.8",
"ipv6": "2a02:6b8::feed:0ff",
"type": "public",
"category": "regional",
"features": ["Regional Optimization", "Safe Search"],
"description": "Yandex DNS optimized for Russian and CIS regions",
"country": "Russia",
"doh_url": "https://dns.yandex.net/dns-query",
},
{
"name": "Neustar",
"provider": "Neustar",
"ip": "156.154.70.1",
"ipv6": "2610:a1:1018::1",
"type": "public",
"category": "security",
"features": ["Malware Protection", "Phishing Protection", "Performance"],
"description": "Neustar's security-focused recursive DNS",
"country": "USA",
"doh_url": "https://dns.neustar/dns-query",
},
{
"name": "SafeDNS",
"provider": "SafeDNS",
"ip": "195.46.39.39",
"ipv6": "",
"type": "public",
"category": "security",
"features": ["Content Filtering", "Malware Protection"],
"description": "SafeDNS with content filtering capabilities",
"country": "UK",
"doh_url": "https://doh.safedns.com/dns-query",
},
{
"name": "ControlD",
"provider": "ControlD",
"ip": "76.76.2.0",
"ipv6": "2606:1a40::",
"type": "public",
"category": "customizable",
"features": ["Custom Filtering", "Analytics", "DoH"],
"description": "Customizable DNS with extensive filtering options",
"country": "Canada",
"doh_url": "https://freedns.controld.com/p0",
},
{
"name": "Alternate DNS",
"provider": "Alternate",
"ip": "76.76.19.19",
"ipv6": "",
"type": "public",
"category": "privacy",
"features": ["Ad Blocking", "Tracker Blocking"],
"description": "Alternative DNS focused on privacy and ad blocking",
"country": "USA",
"doh_url": "https://dns.alternate-dns.com/dns-query",
},
{
"name": "CZ.NIC",
"provider": "CZ.NIC",
"ip": "193.17.47.1",
"ipv6": "2001:148f:ffff::1",
"type": "public",
"category": "regional",
"features": ["DNSSEC", "Local Optimization"],
"description": "Czech NIC's public DNS service",
"country": "Czech Republic",
"doh_url": "https://odvr.nic.cz/doh",
},
{
"name": "Mullvad",
"provider": "Mullvad",
"ip": "194.242.2.2",
"ipv6": "2a07:e340::2",
"type": "public",
"category": "privacy",
"features": [
"No Logging",
"DNSSEC",
"Ad/Tracker Blocking (optional)",
"DoH",
],
"description": "VPN provider's public DNS with strong privacy and optional ad-blocking",
"country": "Sweden",
"doh_url": "https://dns.mullvad.net/dns-query",
},
{
"name": "LibreDNS",
"provider": "LibreDNS (FDN)",
"ip": "116.202.176.26",
"ipv6": "2a01:4f8:1c1c:6c12::1",
"type": "public",
"category": "privacy",
"features": ["No Logging", "DNSSEC", "Open Source", "DoH"],
"description": "French association's privacy-respecting DNS, no filtering",
"country": "France",
"doh_url": "https://doh.libredns.gr/dns-query",
},
{
"name": "dns0.eu",
"provider": "dns0.eu",
"ip": "193.110.81.0",
"ipv6": "2a0f:fc80::",
"type": "public",
"category": "privacy",
"features": ["GDPR-compliant", "No Logging", "Malware Blocking", "DoH"],
"description": "European privacy-first DNS with malware protection",
"country": "Italy/EU",
"doh_url": "https://dns0.eu/dns-query",
},
{
"name": "CIRA Canadian Shield",
"provider": "CIRA",
"ip": "149.112.121.10",
"ipv6": "2620:10a:80aa::10",
"type": "public",
"category": "privacy",
"features": ["Privacy Focused", "Malware Blocking", "DNSSEC", "DoH"],
"description": "Canadian Internet Registry's private DNS with threat protection",
"country": "Canada",
"doh_url": "https://private.canadianshield.cira.ca/dns-query",
},
# China: DNSPod (Tencent)
{
"name": "DNSPod",
"provider": "Tencent",
"ip": "119.29.29.29",
"ipv6": "2402:4e00::",
"type": "public",
"category": "performance",
"features": ["Anycast", "DNSSEC", "DDoS Protection", "DoH"],
"description": "Tencent's public DNS with strong domestic performance.",
"country": "China",
"doh_url": "https://doh.pub/dns-query",
},
# China: AliDNS (Alibaba)
{
"name": "AliDNS",
"provider": "Alibaba",
"ip": "223.5.5.5",
"ipv6": "2400:3200::1",
"type": "public",
"category": "performance",
"features": ["Anycast", "DNSSEC", "Global Infrastructure", "DoH"],
"description": "Alibaba's public DNS integrated with global CDN.",
"country": "China",
"doh_url": "https://dns.alidns.com/dns-query",
},
# Russia: Comss DNS (security‑focused)
{
"name": "Comss DNS",
"provider": "Comss.one",
"ip": "77.75.236.66",
"ipv6": "",
"type": "public",
"category": "security",
"features": ["Malware Blocking", "Phishing Protection", "DoH"],
"description": "Russian cybersecurity company's public DNS with threat protection.",
"country": "Russia",
"doh_url": "https://dns.comss.one/dns-query",
},
# --- NO DoH support. If you know a valid endpoint, please open a Pull Request ---
# {
# "name": "Comodo Secure",
# "provider": "Comodo",
# "ip": "8.26.56.26",
# "ipv6": "",
# "type": "public",
# "category": "security",
# "features": ["Malware Protection", "Phishing Protection"],
# "description": "Comodo's secure DNS with threat protection",
# "country": "USA",
# },
# {
# "name": "Verisign",
# "provider": "Verisign",
# "ip": "64.6.64.6",
# "ipv6": "2620:74:1b::1:1",
# "type": "public",
# "category": "reliability",
# "features": ["Stability", "DNSSEC", "Anycast"],
# "description": "Verisign public DNS focused on stability and reliability",
# "country": "USA",
# },
# {
# "name": "DNS.WATCH",
# "provider": "DNS.WATCH",
# "ip": "84.200.69.80",
# "ipv6": "2001:1608:10:25::1c04:b12f",
# "type": "public",
# "category": "privacy",
# "features": ["No Filtering", "No Logging", "Net Neutrality"],
# "description": "German DNS provider with no filtering and strong privacy",
# "country": "Germany",
# },
# {
# "name": "Level3",
# "provider": "CenturyLink",
# "ip": "4.2.2.1",
# "ipv6": "",
# "type": "public",
# "category": "legacy",
# "features": ["Reliability", "Long History"],
# "description": "One of the original public DNS services",
# "country": "USA",
# },
# {
# "name": "Norton ConnectSafe",
# "provider": "Norton",
# "ip": "199.85.126.10",
# "ipv6": "",
# "type": "public",
# "category": "security",
# "features": ["Malware Protection", "Phishing Protection"],
# "description": "Norton's security-focused DNS service (discontinued)",
# "country": "USA",
# },
]
@staticmethod
def _match_resolver_name(resolver: Dict[str, Any], target: str) -> bool:
"""Check if resolver name matches target (handles both str and Sequence[str])."""
name_val = resolver.get("name", "")
if isinstance(name_val, str):
return name_val.lower() == target.lower()
elif isinstance(name_val, (list, tuple)) and name_val:
first_item = name_val[0]
if isinstance(first_item, str):
return first_item.lower() == target.lower()
return False
[docs]
@staticmethod
def parse_resolver_string(resolver_string: str) -> List[Dict[str, str]]:
"""
Parse comma-separated resolver IPs/names into resolver list.
Supports:
- IP addresses: "1.1.1.1,8.8.8.8"
- IPv6 addresses: "2606:4700:4700::1111,2001:4860:4860::8888"
- Named resolvers from database: "Cloudflare,Google"
- Mixed: "1.1.1.1,Cloudflare"
Args:
resolver_string: Comma-separated resolver identifiers
Returns:
List of resolver dictionaries with 'name' and 'ip' keys
"""
resolvers = []
# Split by comma and clean whitespace
parts: List[str] = [
part.strip() for part in resolver_string.split(",") if part.strip()
]
for part in parts:
# Try IP detection (IPv4 or IPv6)
try:
ip_obj = ipaddress.ip_address(part)
resolvers.append({"name": part, "ip": str(ip_obj)})
continue
except ValueError:
pass
# Try name lookup in database
match = next(
(
r
for r in ResolverManager.RESOLVERS_DATABASE
if ResolverManager._match_resolver_name(r, part)
),
None,
)
if match:
name_val = match["name"]
ip_val = match["ip"]
# Handle case where values might be sequences
name_str = (
name_val[0]
if isinstance(name_val, (list, tuple))
else str(name_val)
)
ip_str = ip_val[0] if isinstance(ip_val, (list, tuple)) else str(ip_val)
resolvers.append({"name": name_str, "ip": ip_str})
continue
# Fallback: treat as hostname or custom label
resolvers.append({"name": part, "ip": part})
return resolvers
[docs]
@staticmethod
def get_default_resolvers() -> List[Dict[str, str]]:
"""Get a list of commonly used public resolvers."""
return [
{"name": "Cloudflare", "ip": "1.1.1.1"},
{"name": "Google", "ip": "8.8.8.8"},
{"name": "Quad9", "ip": "9.9.9.9"},
{"name": "OpenDNS", "ip": "208.67.222.222"},
# No doh endpoints for these, so commented out for now.
# If you know valid DoH URLs,
# please open a Pull Request to add them back in.
# {"name": "Comodo", "ip": "8.26.56.26"},
]
[docs]
@staticmethod
def load_resolvers_from_file(file_path: str) -> List[Dict[str, str]]:
"""Load resolvers from JSON file."""
with open(file_path, "r") as f:
data: Dict[str, Any] = json.load(f)
return cast(List[Dict[str, str]], data.get("resolvers", []))
[docs]
@staticmethod
def get_all_resolvers() -> List[Dict[str, Any]]:
"""Get all available resolvers with detailed information."""
return ResolverManager.RESOLVERS_DATABASE
[docs]
@staticmethod
def get_resolvers_by_category(category: str) -> List[Dict[str, Any]]:
"""Get resolvers filtered by category."""
return [
r
for r in ResolverManager.RESOLVERS_DATABASE
if r.get("category") == category
]
[docs]
@staticmethod
def get_categories() -> List[str]:
"""Get all available resolver categories."""
categories: set[str] = {
str(r["category"]) for r in ResolverManager.RESOLVERS_DATABASE
}
return sorted(categories)
[docs]
class DomainManager:
"""Manage domain lists with comprehensive database."""
# Comprehensive domain database
DOMAINS_DATABASE: List[Dict[str, Any]] = [
{
"domain": "google.com",
"category": "search",
"description": "World's most popular search engine",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "youtube.com",
"category": "video",
"description": "Video sharing platform",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "facebook.com",
"category": "social",
"description": "Social networking service",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "amazon.com",
"category": "ecommerce",
"description": "E-commerce and cloud computing",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "twitter.com",
"category": "social",
"description": "Social media and microblogging",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "instagram.com",
"category": "social",
"description": "Photo and video sharing platform",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "linkedin.com",
"category": "professional",
"description": "Professional networking",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "wikipedia.org",
"category": "reference",
"description": "Free online encyclopedia",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "microsoft.com",
"category": "tech",
"description": "Software and technology company",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "apple.com",
"category": "tech",
"description": "Consumer electronics and software",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "netflix.com",
"category": "streaming",
"description": "Video streaming service",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "github.com",
"category": "tech",
"description": "Code hosting and collaboration",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "stackoverflow.com",
"category": "tech",
"description": "Programming Q&A community",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "reddit.com",
"category": "social",
"description": "Social news aggregation",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "whatsapp.com",
"category": "messaging",
"description": "Instant messaging platform",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "cloudflare.com",
"category": "infrastructure",
"description": "CDN and security services",
"country": "USA",
"dnssec_signed": True,
},
{
"domain": "isc.org",
"category": "infrastructure",
"description": "Internet Systems Consortium (BIND, DHCP)",
"country": "USA",
"dnssec_signed": True,
},
{
"domain": "nlnetlabs.nl",
"category": "dns",
"description": "NLnet Labs (NSD, Unbound, ldns)",
"country": "Netherlands",
"dnssec_signed": True,
},
{
"domain": "dnssec-tools.org",
"category": "security",
"description": "DNSSEC tools and test suite",
"country": "USA",
"dnssec_signed": True,
},
{
"domain": "ietf.org",
"category": "standards",
"description": "Internet Engineering Task Force",
"country": "USA",
"dnssec_signed": True,
},
{
"domain": "baidu.com",
"category": "search",
"description": "Chinese search engine",
"country": "China",
"dnssec_signed": False,
},
{
"domain": "taobao.com",
"category": "ecommerce",
"description": "Chinese online shopping",
"country": "China",
"dnssec_signed": False,
},
{
"domain": "qq.com",
"category": "portal",
"description": "Chinese web portal",
"country": "China",
"dnssec_signed": False,
},
{
"domain": "tmall.com",
"category": "ecommerce",
"description": "Chinese B2C online retail",
"country": "China",
"dnssec_signed": False,
},
{
"domain": "yahoo.com",
"category": "portal",
"description": "Web services portal",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "bing.com",
"category": "search",
"description": "Microsoft's search engine",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "live.com",
"category": "email",
"description": "Microsoft email and services",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "office.com",
"category": "productivity",
"description": "Microsoft Office suite",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "zoom.us",
"category": "communication",
"description": "Video conferencing platform",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "slack.com",
"category": "communication",
"description": "Business communication platform",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "dropbox.com",
"category": "storage",
"description": "Cloud storage service",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "adobe.com",
"category": "creative",
"description": "Creative software suite",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "paypal.com",
"category": "finance",
"description": "Online payments system",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "wordpress.com",
"category": "publishing",
"description": "Blogging and website platform",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "medium.com",
"category": "publishing",
"description": "Online publishing platform",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "quora.com",
"category": "qna",
"description": "Question and answer platform",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "imdb.com",
"category": "entertainment",
"description": "Movie and TV database",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "bbc.com",
"category": "news",
"description": "British broadcasting news",
"country": "UK",
"dnssec_signed": False,
},
{
"domain": "cnn.com",
"category": "news",
"description": "Cable news network",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "nytimes.com",
"category": "news",
"description": "New York Times newspaper",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "weather.com",
"category": "weather",
"description": "Weather forecasting service",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "espn.com",
"category": "sports",
"description": "Sports news and coverage",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "craigslist.org",
"category": "classifieds",
"description": "Classified advertisements",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "ebay.com",
"category": "ecommerce",
"description": "Online auction and shopping",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "aliexpress.com",
"category": "ecommerce",
"description": "Chinese online retail",
"country": "China",
"dnssec_signed": False,
},
{
"domain": "walmart.com",
"category": "ecommerce",
"description": "Multinational retail corporation",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "target.com",
"category": "ecommerce",
"description": "Retail corporation",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "bestbuy.com",
"category": "ecommerce",
"description": "Consumer electronics retailer",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "hulu.com",
"category": "streaming",
"description": "Video streaming service",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "spotify.com",
"category": "music",
"description": "Music streaming platform",
"country": "Sweden",
"dnssec_signed": False,
},
{
"domain": "soundcloud.com",
"category": "music",
"description": "Audio distribution platform",
"country": "Germany",
"dnssec_signed": False,
},
{
"domain": "deezer.com",
"category": "music",
"description": "Music streaming service",
"country": "France",
"dnssec_signed": False,
},
{
"domain": "twitch.tv",
"category": "gaming",
"description": "Live streaming for gamers",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "steampowered.com",
"category": "gaming",
"description": "Digital game distribution",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "epicgames.com",
"category": "gaming",
"description": "Video game and software developer",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "ubuntu.com",
"category": "tech",
"description": "Linux distribution",
"country": "UK",
"dnssec_signed": False,
},
{
"domain": "docker.com",
"category": "tech",
"description": "Container platform",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "kubernetes.io",
"category": "tech",
"description": "Container orchestration",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "gitlab.com",
"category": "tech",
"description": "DevOps platform",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "atlassian.com",
"category": "tech",
"description": "Software development tools",
"country": "Australia",
"dnssec_signed": False,
},
{
"domain": "notion.so",
"category": "productivity",
"description": "Note-taking and collaboration",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "figma.com",
"category": "design",
"description": "Collaborative design tool",
"country": "USA",
"dnssec_signed": False,
},
{
"domain": "canva.com",
"category": "design",
"description": "Graphic design platform",
"country": "Australia",
"dnssec_signed": False,
},
]
[docs]
@staticmethod
def parse_domain_string(domain_string: str) -> List[str]:
"""
Parse comma-separated domain names into domain list.
Supports:
- Domain names: "google.com,github.com,example.com"
- Domains with whitespace: "google.com, github.com, example.com"
- Mixed case: "Google.com,GitHub.com"
Args:
domain_string: Comma-separated domain names
Returns:
List of domain strings (lowercased and cleaned)
"""
domains = []
seen = set()
# Split by comma and clean whitespace
parts = [
part.strip().lower() for part in domain_string.split(",") if part.strip()
]
for part in parts:
# Remove trailing dot (FQDN normalization)
part = part.rstrip(".")
# Normalize IDN → punycode if possible
try:
part = idna.encode(part).decode("ascii")
except idna.IDNAError:
pass # keep original if invalid IDN
# Basic permissive validation (keep even if odd-looking)
if "." in part and len(part) > 3:
cleaned = part
else:
cleaned = part # still include; DNS resolver will decide
# Deduplicate while preserving order
if cleaned not in seen:
seen.add(cleaned)
domains.append(cleaned)
return domains
[docs]
@staticmethod
def parse_domains_input(input_value: Optional[str]) -> List[str]:
"""
Smart parser that handles both file paths and comma-separated inline values.
Detection logic:
1. If input contains comma -> treat as comma-separated
2. If file exists at path -> load from file
3. If input looks like single domain -> treat as single domain
4. Otherwise -> try as file path (will raise FileNotFoundError if not found)
Args:
input_value: File path or comma-separated domain string
Returns:
List of domain strings
Raises:
FileNotFoundError: If file path is invalid
"""
if not input_value:
raise ValueError("Domain input cannot be empty")
input_value = input_value.strip()
# Check if it's a file path that exists
path = Path(input_value)
if path.exists() and path.is_file():
return DomainManager.load_domains_from_file(str(path))
# Check if it contains comma (likely inline list)
if "," in input_value:
return DomainManager.parse_domain_string(input_value)
# Check if it looks like a single domain (has a dot)
if "." in input_value:
return [input_value.lower().strip()]
# Last resort: try as file path (will raise FileNotFoundError)
return DomainManager.load_domains_from_file(input_value)
[docs]
@staticmethod
def get_sample_domains() -> List[str]:
"""Get a list of sample domains for testing."""
return [
# Signed domains with DNSSEC
"cloudflare.com",
"isc.org",
"nlnetlabs.nl",
"dnssec-tools.org",
"ietf.org",
# Non-signed domains
"google.com",
"github.com",
"stackoverflow.com",
"wikipedia.org",
"reddit.com",
"twitter.com",
"linkedin.com",
"microsoft.com",
"apple.com",
"amazon.com",
]
[docs]
@staticmethod
def load_domains_from_file(file_path: str) -> List[str]:
"""Load domains from text file (one per line)."""
with open(file_path, "r") as f:
domains = [
line.strip() for line in f if line.strip() and not line.startswith("#")
]
return domains
[docs]
@staticmethod
def get_all_domains() -> List[Dict[str, Any]]:
"""Get all available domains with detailed information."""
return DomainManager.DOMAINS_DATABASE
[docs]
@staticmethod
def get_domains_by_category(category: str) -> List[Dict[str, Any]]:
"""Get domains filtered by category."""
return [
d for d in DomainManager.DOMAINS_DATABASE if d.get("category") == category
]
[docs]
@staticmethod
def get_categories() -> List[str]:
"""Get all available domain categories."""
categories = set(d["category"] for d in DomainManager.DOMAINS_DATABASE)
return sorted(list(categories))