feat: add network scanner module with nmap discovery, config merge, and ping
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
78
backend/modules/network_scanner.py
Normal file
78
backend/modules/network_scanner.py
Normal file
@@ -0,0 +1,78 @@
|
||||
import asyncio
|
||||
import subprocess
|
||||
from typing import Any
|
||||
|
||||
|
||||
class NetworkScanner:
|
||||
def __init__(self, subnet: str):
|
||||
self.subnet = subnet
|
||||
|
||||
def scan(self) -> dict[str, Any]:
|
||||
import nmap
|
||||
nm = nmap.PortScanner()
|
||||
nm.scan(hosts=self.subnet, arguments="-sn")
|
||||
return {"scan": dict(nm._scan_result.get("scan", {}))}
|
||||
|
||||
def parse_scan_results(self, scan_data: dict[str, Any]) -> list[dict[str, Any]]:
|
||||
nodes = []
|
||||
for ip, data in scan_data.get("scan", {}).items():
|
||||
hostnames = data.get("hostnames", [{}])
|
||||
hostname = hostnames[0].get("name", "") if hostnames else ""
|
||||
vendors = data.get("vendor", {})
|
||||
vendor = next(iter(vendors.values()), "") if vendors else ""
|
||||
nodes.append({
|
||||
"ip": ip,
|
||||
"hostname": hostname,
|
||||
"status": data.get("status", {}).get("state", "unknown"),
|
||||
"vendor": vendor,
|
||||
})
|
||||
nodes.sort(key=lambda n: tuple(int(p) for p in n["ip"].split(".")))
|
||||
return nodes
|
||||
|
||||
def merge_with_config(
|
||||
self,
|
||||
discovered: list[dict[str, Any]],
|
||||
configured: list[dict[str, Any]],
|
||||
) -> list[dict[str, Any]]:
|
||||
config_by_ip = {n["ip"]: n for n in configured}
|
||||
merged = []
|
||||
seen_ips = set()
|
||||
|
||||
for node in discovered:
|
||||
ip = node["ip"]
|
||||
seen_ips.add(ip)
|
||||
if ip in config_by_ip:
|
||||
entry = {**config_by_ip[ip], **{"status": node["status"], "auto_discovered": True}}
|
||||
else:
|
||||
entry = {
|
||||
"name": node["hostname"] or ip,
|
||||
"ip": ip,
|
||||
"status": node["status"],
|
||||
"vendor": node.get("vendor", ""),
|
||||
"icon": "device",
|
||||
"auto_discovered": True,
|
||||
"connections": [],
|
||||
}
|
||||
merged.append(entry)
|
||||
|
||||
for node in configured:
|
||||
if node["ip"] not in seen_ips:
|
||||
merged.append({**node, "status": "unknown", "auto_discovered": False})
|
||||
|
||||
return merged
|
||||
|
||||
async def ping_host(self, ip: str) -> bool:
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
"ping", "-c", "1", "-W", "1", ip,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL,
|
||||
)
|
||||
code = await proc.wait()
|
||||
return code == 0
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
async def ping_all(self, ips: list[str]) -> dict[str, bool]:
|
||||
results = await asyncio.gather(*[self.ping_host(ip) for ip in ips])
|
||||
return dict(zip(ips, results))
|
||||
Reference in New Issue
Block a user