import asyncio import subprocess from typing import Any class NetworkScanner: def __init__(self, subnet: str): self.subnet = subnet def scan(self) -> dict[str, Any]: import nmap nm = nmap.PortScanner() nm.scan(hosts=self.subnet, arguments="-sn") return {"scan": dict(nm._scan_result.get("scan", {}))} def parse_scan_results(self, scan_data: dict[str, Any]) -> list[dict[str, Any]]: nodes = [] for ip, data in scan_data.get("scan", {}).items(): hostnames = data.get("hostnames", [{}]) hostname = hostnames[0].get("name", "") if hostnames else "" vendors = data.get("vendor", {}) vendor = next(iter(vendors.values()), "") if vendors else "" nodes.append({ "ip": ip, "hostname": hostname, "status": data.get("status", {}).get("state", "unknown"), "vendor": vendor, }) nodes.sort(key=lambda n: tuple(int(p) for p in n["ip"].split("."))) return nodes def merge_with_config( self, discovered: list[dict[str, Any]], configured: list[dict[str, Any]], ) -> list[dict[str, Any]]: config_by_ip = {n["ip"]: n for n in configured} merged = [] seen_ips = set() for node in discovered: ip = node["ip"] seen_ips.add(ip) if ip in config_by_ip: entry = {**config_by_ip[ip], **{"status": node["status"], "auto_discovered": True}} else: entry = { "name": node["hostname"] or ip, "ip": ip, "status": node["status"], "vendor": node.get("vendor", ""), "icon": "device", "auto_discovered": True, "connections": [], } merged.append(entry) for node in configured: if node["ip"] not in seen_ips: merged.append({**node, "status": "unknown", "auto_discovered": False}) return merged async def ping_host(self, ip: str) -> bool: try: proc = await asyncio.create_subprocess_exec( "ping", "-c", "1", "-W", "1", ip, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) code = await proc.wait() return code == 0 except Exception: return False async def ping_all(self, ips: list[str]) -> dict[str, bool]: results = await asyncio.gather(*[self.ping_host(ip) for ip in ips]) return dict(zip(ips, results))