feat: implement Textual TUI and SQLite database for sensor management

This commit is contained in:
2025-12-29 09:39:33 +01:00
parent 4213b6101a
commit cfa24d1fa5
22 changed files with 1734 additions and 723 deletions

View File

@@ -0,0 +1,76 @@
import pytest
from unittest.mock import MagicMock, patch, call
from sensorpajen.mqtt_publisher import MQTTPublisher
import sensorpajen.config as config
@pytest.fixture
def mock_config():
with patch("sensorpajen.config.MQTT_HOST", "localhost"), \
patch("sensorpajen.config.MQTT_PORT", 1883), \
patch("sensorpajen.config.MQTT_USER", "user"), \
patch("sensorpajen.config.MQTT_PASSWORD", "pass"), \
patch("sensorpajen.config.MQTT_CLIENT_ID", "test_client"), \
patch("sensorpajen.config.MQTT_TOPIC_PREFIX", "test"):
yield
@pytest.fixture
def mock_config():
with patch("sensorpajen.config.MQTT_HOST", "localhost"), \
patch("sensorpajen.config.MQTT_PORT", 1883), \
patch("sensorpajen.config.MQTT_USER", "user"), \
patch("sensorpajen.config.MQTT_PASSWORD", "pass"), \
patch("sensorpajen.config.MQTT_CLIENT_ID", "test_client"), \
patch("sensorpajen.config.MQTT_TOPIC_PREFIX", "test"):
yield
def test_mqtt_publisher_init(mock_config):
with patch("paho.mqtt.client.Client") as mock_client:
publisher = MQTTPublisher()
mock_client.assert_called_once()
publisher.client.username_pw_set.assert_called_with("user", "pass")
def test_mqtt_publisher_connect(mock_config):
with patch("paho.mqtt.client.Client") as mock_client:
publisher = MQTTPublisher()
publisher.connect()
publisher.client.connect.assert_called_with("localhost", 1883, keepalive=60)
publisher.client.loop_start.assert_called_once()
def test_mqtt_publisher_publish(mock_config):
with patch("paho.mqtt.client.Client") as mock_client:
publisher = MQTTPublisher()
publisher.connected = True
with patch("sensorpajen.config.ENABLE_BATTERY", True):
publisher.publish_measurement("living_room", 22.5, 45, 3.0, 100)
# Check if publish was called for each metric
calls = [
call("test/living_room/temp", "22.5"),
call("test/living_room/humidity", "45"),
call("test/living_room/batteryvoltage", "3.000"),
call("test/living_room/batterylevel", "100")
]
publisher.client.publish.assert_has_calls(calls, any_order=True)
def test_mqtt_publisher_publish_no_battery(mock_config):
with patch("paho.mqtt.client.Client") as mock_client:
publisher = MQTTPublisher()
publisher.connected = True
with patch("sensorpajen.config.ENABLE_BATTERY", False):
publisher.publish_measurement("living_room", 22.5, 45, 3.0, 100)
# Should only publish temp and humidity
assert publisher.client.publish.call_count == 2
publisher.client.publish.assert_any_call("test/living_room/temp", "22.5")
publisher.client.publish.assert_any_call("test/living_room/humidity", "45")
def test_mqtt_publisher_not_connected(mock_config):
with patch("paho.mqtt.client.Client") as mock_client:
publisher = MQTTPublisher()
publisher.connected = False
# Should not raise error, just log warning
publisher.publish_measurement("living_room", 22.5, 45)
publisher.client.publish.assert_not_called()