diff --git a/backend/modules/network_scanner.py b/backend/modules/network_scanner.py new file mode 100644 index 0000000..f9c4d53 --- /dev/null +++ b/backend/modules/network_scanner.py @@ -0,0 +1,78 @@ +import asyncio +import subprocess +from typing import Any + + +class NetworkScanner: + def __init__(self, subnet: str): + self.subnet = subnet + + def scan(self) -> dict[str, Any]: + import nmap + nm = nmap.PortScanner() + nm.scan(hosts=self.subnet, arguments="-sn") + return {"scan": dict(nm._scan_result.get("scan", {}))} + + def parse_scan_results(self, scan_data: dict[str, Any]) -> list[dict[str, Any]]: + nodes = [] + for ip, data in scan_data.get("scan", {}).items(): + hostnames = data.get("hostnames", [{}]) + hostname = hostnames[0].get("name", "") if hostnames else "" + vendors = data.get("vendor", {}) + vendor = next(iter(vendors.values()), "") if vendors else "" + nodes.append({ + "ip": ip, + "hostname": hostname, + "status": data.get("status", {}).get("state", "unknown"), + "vendor": vendor, + }) + nodes.sort(key=lambda n: tuple(int(p) for p in n["ip"].split("."))) + return nodes + + def merge_with_config( + self, + discovered: list[dict[str, Any]], + configured: list[dict[str, Any]], + ) -> list[dict[str, Any]]: + config_by_ip = {n["ip"]: n for n in configured} + merged = [] + seen_ips = set() + + for node in discovered: + ip = node["ip"] + seen_ips.add(ip) + if ip in config_by_ip: + entry = {**config_by_ip[ip], **{"status": node["status"], "auto_discovered": True}} + else: + entry = { + "name": node["hostname"] or ip, + "ip": ip, + "status": node["status"], + "vendor": node.get("vendor", ""), + "icon": "device", + "auto_discovered": True, + "connections": [], + } + merged.append(entry) + + for node in configured: + if node["ip"] not in seen_ips: + merged.append({**node, "status": "unknown", "auto_discovered": False}) + + return merged + + async def ping_host(self, ip: str) -> bool: + try: + proc = await asyncio.create_subprocess_exec( + "ping", "-c", "1", "-W", "1", ip, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + code = await proc.wait() + return code == 0 + except Exception: + return False + + async def ping_all(self, ips: list[str]) -> dict[str, bool]: + results = await asyncio.gather(*[self.ping_host(ip) for ip in ips]) + return dict(zip(ips, results)) diff --git a/backend/tests/test_network_scanner.py b/backend/tests/test_network_scanner.py new file mode 100644 index 0000000..a4367e8 --- /dev/null +++ b/backend/tests/test_network_scanner.py @@ -0,0 +1,70 @@ +import pytest +from unittest.mock import AsyncMock, MagicMock, patch +from modules.network_scanner import NetworkScanner + + +@pytest.fixture +def scanner(): + return NetworkScanner(subnet="192.168.1.0/24") + + +def test_parse_scan_results(scanner): + mock_scan_result = { + "scan": { + "192.168.1.1": { + "status": {"state": "up"}, + "hostnames": [{"name": "router.local", "type": "PTR"}], + "vendor": {"AA:BB:CC:DD:EE:FF": "Cisco"}, + }, + "192.168.1.10": { + "status": {"state": "up"}, + "hostnames": [{"name": "", "type": ""}], + "vendor": {}, + }, + } + } + + nodes = scanner.parse_scan_results(mock_scan_result) + assert len(nodes) == 2 + assert nodes[0]["ip"] == "192.168.1.1" + assert nodes[0]["hostname"] == "router.local" + assert nodes[0]["status"] == "up" + assert nodes[1]["ip"] == "192.168.1.10" + + +def test_merge_with_config(scanner): + discovered = [ + {"ip": "192.168.1.1", "hostname": "router.local", "status": "up", "vendor": "Cisco"}, + {"ip": "192.168.1.50", "hostname": "", "status": "up", "vendor": ""}, + ] + configured = [ + { + "name": "Router Principal", + "ip": "192.168.1.1", + "username": "admin", + "password": "pass", + "icon": "router", + "connections": [], + }, + ] + + merged = scanner.merge_with_config(discovered, configured) + # Router should have config data merged in + router = next(n for n in merged if n["ip"] == "192.168.1.1") + assert router["name"] == "Router Principal" + assert router["username"] == "admin" + assert router["status"] == "up" + # Unknown device keeps discovered data + unknown = next(n for n in merged if n["ip"] == "192.168.1.50") + assert unknown["name"] == "192.168.1.50" + assert unknown.get("username") is None + + +@pytest.mark.asyncio +async def test_ping_host(scanner): + with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) as mock_exec: + mock_proc = MagicMock() + mock_proc.wait = AsyncMock(return_value=0) + mock_exec.return_value = mock_proc + result = await scanner.ping_host("192.168.1.1") + assert result is True