opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
defence360agent
/
utils
➕ New
📤 Upload
✎ Editing:
net_transport.py
← Back
"""Networking transport helpers for urllib. This module provides a small abstraction on top of urllib.request so that callers can keep using urllib.request.Request, but routing of connections can be customized: - hostname resolution is handled in user code; - selected IP may be randomized or chosen using any complex logic; - for HTTPS: connects to a chosen IP but keeps correct SNI and certificate hostname validation for the original hostname (NOT the IP). Examples: Default behavior (plain urllib): from defence360agent.utils.net_transport import UrlTransport transport = UrlTransport() req = urllib.request.Request( "https://files.imunify360.com/static/sigs/v1/description.json" ) with transport.open(req, timeout=10) as resp: body = resp.read() Randomize target IP on each connection (A/AAAA -> random choice): from defence360agent.utils.net_transport import UrlTransport, RandomIpChooser chooser = RandomIpChooser() transport = UrlTransport(ip_chooser=chooser) req = urllib.request.Request( "https://files.imunify360.com/static/sigs/v1/description.json" ) with transport.open(req, timeout=10) as resp: body = resp.read() Notes: - HTTPS: connects to the chosen IP but keeps SNI/cert checks against original hostname. - HTTP: Host header stays original hostname because urllib builds it from the URL. """ import http.client import ipaddress import random import socket import threading import time import urllib.request from abc import ABC, abstractmethod from logging import getLogger from typing import Dict, Optional, Tuple, TYPE_CHECKING if TYPE_CHECKING: import ssl logger = getLogger(__name__) #: default cache TTL for DNS responses _DNS_DEFAULT_TTL_SECONDS = 300.0 def _is_ipv4(ip: str) -> bool: """Return True if *ip* is an IPv4 address string. Implementation relies solely on ipaddress.ip_address for correctness. """ try: return isinstance(ipaddress.ip_address(ip), ipaddress.IPv4Address) except ValueError: return False class IpChooser(ABC): """Select an IP address to connect to for a given hostname and port. Implementations may be stateful and can keep caches/metrics inside. """ @abstractmethod def choose(self, hostname: str, port: int) -> str: """Return an IP address (v4 or v6) for *hostname*:*port*.""" raise NotImplementedError def __call__(self, hostname: str, port: int) -> str: return self.choose(hostname, port) class DnsCacheResolver: """DNS cache for socket.getaddrinfo() results. It caches per (hostname, port, family). This is intentionally small and local: it is meant only to avoid excessive getaddrinfo() calls. """ def __init__( self, *, family: int = socket.AF_UNSPEC, ttl_seconds: float = _DNS_DEFAULT_TTL_SECONDS, ): self._family = family self._ttl_seconds = ttl_seconds self._cache: Dict[ Tuple[str, int, int], Tuple[float, Tuple[str, ...]] ] = {} self._lock = threading.Lock() def get_ips(self, hostname: str, port: int) -> Tuple[str, ...]: key = (hostname, port, self._family) now = time.time() with self._lock: cached = self._cache.get(key) if cached is not None: expires_at, ips = cached if now < expires_at: logger.debug( "DnsCacheResolver cache hit for %s:%s (family=%s)", hostname, port, self._family, ) return ips logger.debug( "DnsCacheResolver cache miss/expired for %s:%s (family=%s)", hostname, port, self._family, ) infos = socket.getaddrinfo( hostname, port, self._family, socket.SOCK_STREAM, ) ips = [] for _, _, _, _, sockaddr in infos: ip = sockaddr[0] if ip not in ips: ips.append(ip) if not ips: raise OSError("No IPs resolved for {}:{}".format(hostname, port)) ips_t = tuple(ips) with self._lock: self._cache[key] = (now + self._ttl_seconds, ips_t) logger.debug( "DnsCacheResolver resolved %s:%s (family=%s) to %s", hostname, port, self._family, ips_t, ) return ips_t class RandomIpChooserWithIPv6Toggle(IpChooser): """Resolve hostname and select a random IP. IPv6 selection can be enabled/disabled at runtime: - when IPv6 is enabled: choose from IPv4 + IPv6 candidates - when IPv6 is disabled: choose from IPv4-only candidates """ def __init__( self, *, resolver: Optional[DnsCacheResolver] = None, rng: Optional[random.Random] = None, ipv6_enabled: bool = True, ): self._resolver = resolver or DnsCacheResolver() self._rng = rng or random.Random() self._ipv6_enabled = ipv6_enabled self._last_ip: Optional[str] = None def enable_ipv6(self) -> None: self._ipv6_enabled = True def disable_ipv6(self) -> None: self._ipv6_enabled = False def is_ipv6_enabled(self) -> bool: return self._ipv6_enabled def last_ip(self) -> Optional[str]: return self._last_ip def last_ip_was_ipv6(self) -> bool: return bool(self._last_ip) and (":" in self._last_ip) def choose(self, hostname: str, port: int) -> str: ips = self._resolver.get_ips(hostname, port) if self._ipv6_enabled: chosen = self._rng.choice(ips) self._last_ip = chosen logger.debug( "RandomIpChooserWithIPv6Toggle selected IP %s for %s:%s " "(IPv6 enabled)", chosen, hostname, port, ) return chosen ipv4_ips = tuple(ip for ip in ips if _is_ipv4(ip)) if not ipv4_ips: raise OSError( "No IPv4 IPs resolved for {}:{}".format(hostname, port) ) chosen = self._rng.choice(ipv4_ips) self._last_ip = chosen logger.debug( "RandomIpChooserWithIPv6Toggle selected IPv4 IP %s for %s:%s " "(IPv6 disabled)", chosen, hostname, port, ) return chosen class RandomIpChooser(IpChooser): """Resolve hostname and select a random IP from resolved candidates.""" def __init__( self, *, resolver: Optional[DnsCacheResolver] = None, rng: Optional[random.Random] = None, ): self._resolver = resolver or DnsCacheResolver() self._rng = rng or random.Random() def choose(self, hostname: str, port: int) -> str: ips = self._resolver.get_ips(hostname, port) chosen = self._rng.choice(ips) logger.debug( "RandomIpChooser selected IP %s for %s:%s", chosen, hostname, port, ) return chosen class ForcedIPHTTPConnection(http.client.HTTPConnection): """HTTPConnection that connects to a chosen IP. Important: urllib builds the request URL with the original hostname, therefore the Host header stays correct. """ def __init__( self, hostname: str, port: Optional[int] = None, *, ip_chooser: IpChooser, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None, ): super().__init__( hostname, port=port, timeout=timeout, source_address=source_address, ) self._ip_chooser = ip_chooser def connect(self) -> None: port = self.port or 80 ip = self._ip_chooser.choose(self.host, port) logger.debug( "ForcedIPHTTPConnection connecting to %s:%s for hostname %s", ip, port, self.host, ) self.sock = socket.create_connection( (ip, port), self.timeout, self.source_address, ) class ForcedIPHTTPSConnection(http.client.HTTPSConnection): """HTTPSConnection that connects to a chosen IP. TLS details: - Uses original hostname for SNI (server_hostname in wrap_socket) - Certificate hostname validation is performed for the original hostname """ def __init__( self, hostname: str, port: Optional[int] = None, *, ip_chooser: IpChooser, context: "ssl.SSLContext", timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None, ): super().__init__( hostname, port=port, context=context, timeout=timeout, source_address=source_address, ) self._ip_chooser = ip_chooser def connect(self) -> None: port = self.port or 443 ip = self._ip_chooser.choose(self.host, port) logger.debug( "ForcedIPHTTPSConnection connecting to %s:%s for hostname %s", ip, port, self.host, ) raw_sock = socket.create_connection( (ip, port), self.timeout, self.source_address, ) if self._tunnel_host: self.sock = raw_sock self._tunnel() raw_sock = self.sock self.sock = self._context.wrap_socket( raw_sock, server_hostname=self.host, ) class ForcedIPHTTPHandler(urllib.request.HTTPHandler): """urllib handler that creates ForcedIPHTTPConnection.""" def __init__(self, *, ip_chooser: IpChooser): super().__init__() self._ip_chooser = ip_chooser def http_open(self, req) -> http.client.HTTPResponse: def factory(host, **kwargs): return ForcedIPHTTPConnection( host, ip_chooser=self._ip_chooser, timeout=kwargs.get("timeout"), ) return self.do_open(factory, req) class ForcedIPHTTPSHandler(urllib.request.HTTPSHandler): """urllib handler that creates ForcedIPHTTPSConnection.""" def __init__(self, *, ip_chooser: IpChooser, context: "ssl.SSLContext"): super().__init__(context=context) self._ip_chooser = ip_chooser self._context = context def https_open(self, req) -> http.client.HTTPResponse: def factory(host, **kwargs): return ForcedIPHTTPSConnection( host, ip_chooser=self._ip_chooser, context=self._context, timeout=kwargs.get("timeout"), ) return self.do_open(factory, req) class UrlTransport: """Single entrypoint for opening urllib requests. If *ip_chooser* is provided, the transport will connect to the selected IP address, while keeping correct Host/SNI/cert validation for the original hostname. If *ip_chooser* is not provided, it behaves like plain urllib. """ def __init__( self, *, ip_chooser: Optional[IpChooser] = None, ssl_context: Optional["ssl.SSLContext"] = None, ): import ssl as _ssl self._ssl_context = ssl_context or _ssl.create_default_context() if ip_chooser is None: self._opener = urllib.request.build_opener() else: self._opener = urllib.request.build_opener( ForcedIPHTTPHandler(ip_chooser=ip_chooser), ForcedIPHTTPSHandler( ip_chooser=ip_chooser, context=self._ssl_context, ), ) def open( self, req: urllib.request.Request, *, timeout: Optional[float] = None, ) -> http.client.HTTPResponse: if timeout is None: return self._opener.open(req) return self._opener.open(req, timeout=timeout)
💾 Save Changes
Cancel
📤 Upload File
×
Select File
Upload
Cancel
➕ Create New
×
Type
📄 File
📁 Folder
Name
Create
Cancel
✎ Rename Item
×
Current Name
New Name
Rename
Cancel
🔐 Change Permissions
×
Target File
Permission (e.g., 0755, 0644)
0755
0644
0777
Apply
Cancel