import pytest import os from pathlib import Path from sensorpajen.discovery_manager import DiscoveryManager, DiscoveredSensor class _DummyCompletedProcess: def __init__(self, returncode: int = 0, stderr: bytes = b""): self.returncode = returncode self.stderr = stderr def test_discovery_manager_init(tmp_path): db_file = tmp_path / "sensors.db" manager = DiscoveryManager(str(db_file)) assert db_file.exists() def test_discovery_manager_add_new(tmp_path): db_file = tmp_path / "sensors.db" manager = DiscoveryManager(str(db_file)) mac = "AA:BB:CC:DD:EE:FF" manager.add_or_update(mac, "ATC_123456", -70, 22.5, 45, 100, 3.0) pending = manager.get_pending() assert len(pending) == 1 sensor = pending[0] assert sensor.mac == mac assert sensor.name == "ATC_123456" assert sensor.sample_reading["temperature"] == 22.5 assert sensor.status == "pending" assert sensor.count == 1 def test_discovery_manager_update_existing(tmp_path): db_file = tmp_path / "sensors.db" manager = DiscoveryManager(str(db_file)) mac = "AA:BB:CC:DD:EE:FF" manager.add_or_update(mac, "ATC_123456", -70, 22.5, 45, 100, 3.0) # Update with new values manager.add_or_update(mac, "ATC_123456", -60, 23.0, 40, 99, 2.9) pending = manager.get_pending() assert len(pending) == 1 sensor = pending[0] assert sensor.rssi == -60 assert sensor.sample_reading["temperature"] == 23.0 assert sensor.sample_reading["humidity"] == 40.0 assert sensor.count == 2 def test_discovery_manager_persistence(tmp_path): db_file = tmp_path / "sensors.db" manager = DiscoveryManager(str(db_file)) mac = "AA:BB:CC:DD:EE:FF" manager.add_or_update(mac, "ATC_123456", -70, 22.5, 45, 100, 3.0) # Create new manager and load from same DB manager2 = DiscoveryManager(str(db_file)) pending = manager2.get_pending() assert len(pending) == 1 assert pending[0].mac == mac assert pending[0].name == "ATC_123456" def test_send_ntfy_notification_disabled(monkeypatch, tmp_path): from sensorpajen import discovery_manager as dm_mod monkeypatch.setattr(dm_mod.config, "NTFY_ENABLED", False) monkeypatch.setattr(dm_mod.config, "NTFY_TOKEN", "token") called = {"run": False} def _fake_run(*args, **kwargs): called["run"] = True return _DummyCompletedProcess(0) monkeypatch.setattr(dm_mod.subprocess, "run", _fake_run) manager = dm_mod.DiscoveryManager(str(tmp_path / "dummy.db")) sensor = dm_mod.DiscoveredSensor( mac="AA", name="N", rssi=-1, first_seen="now", last_seen="now", sample_reading={"temperature": 1, "humidity": 2, "battery_percent": 3}, ) manager.send_ntfy_notification(sensor) assert called["run"] is False def test_send_ntfy_notification_missing_token(monkeypatch, tmp_path): from sensorpajen import discovery_manager as dm_mod monkeypatch.setattr(dm_mod.config, "NTFY_ENABLED", True) monkeypatch.setattr(dm_mod.config, "NTFY_TOKEN", "") called = {"run": False} def _fake_run(*args, **kwargs): called["run"] = True return _DummyCompletedProcess(0) monkeypatch.setattr(dm_mod.subprocess, "run", _fake_run) manager = dm_mod.DiscoveryManager(str(tmp_path / "dummy2.db")) sensor = dm_mod.DiscoveredSensor( mac="AA", name="N", rssi=-1, first_seen="now", last_seen="now", sample_reading={"temperature": 1, "humidity": 2, "battery_percent": 3}, ) manager.send_ntfy_notification(sensor) assert called["run"] is False def test_send_ntfy_notification_message_mentions_tui(monkeypatch, tmp_path): from sensorpajen import discovery_manager as dm_mod monkeypatch.setattr(dm_mod.config, "NTFY_ENABLED", True) monkeypatch.setattr(dm_mod.config, "NTFY_TOKEN", "token") monkeypatch.setattr(dm_mod.config, "NTFY_URL", "https://ntfy.sh") monkeypatch.setattr(dm_mod.config, "NTFY_TOPIC", "sensorpajen") captured = {"args": None} def _fake_run(args, capture_output=True, timeout=10): captured["args"] = args return _DummyCompletedProcess(0) monkeypatch.setattr(dm_mod.subprocess, "run", _fake_run) manager = dm_mod.DiscoveryManager(str(tmp_path / "sensors.db")) sensor = dm_mod.DiscoveredSensor( mac="AA:BB", name="ATC_123", rssi=-1, first_seen="2025-01-01T00:00:00", last_seen="2025-01-01T00:00:00", sample_reading={"temperature": 10, "humidity": 20, "battery_percent": 30}, ) manager.send_ntfy_notification(sensor) assert captured["args"] is not None # curl args: [..., "-d", message, url] assert "-d" in captured["args"] message = captured["args"][captured["args"].index("-d") + 1] assert "sensorpajen-tui" in message