99 lines
3.8 KiB
Python
99 lines
3.8 KiB
Python
import pytest
|
|
from unittest.mock import MagicMock, patch, call
|
|
import os
|
|
import sys
|
|
import time
|
|
|
|
from sensorpajen.sensor_reader import SensorReader, Measurement
|
|
from sensorpajen import config
|
|
|
|
@pytest.fixture
|
|
def mock_sensor_config():
|
|
cfg = MagicMock(spec=config.SensorConfig)
|
|
cfg.sensors = {"AA:BB:CC:DD:EE:FF": "Living Room"}
|
|
cfg.get_name.side_effect = lambda mac: cfg.sensors.get(mac.upper(), mac.upper())
|
|
return cfg
|
|
|
|
@pytest.fixture
|
|
def mock_discovery_manager():
|
|
return MagicMock()
|
|
|
|
def test_measurement_dataclass():
|
|
m = Measurement(temperature=22.5, humidity=45, voltage=3.0, battery=100, sensor_name="Test")
|
|
assert m.temperature == 22.5
|
|
assert m.humidity == 45
|
|
assert m.voltage == 3.0
|
|
assert m.battery == 100
|
|
assert m.sensor_name == "Test"
|
|
|
|
def test_sensor_reader_init(mock_sensor_config, mock_discovery_manager):
|
|
on_measurement = MagicMock()
|
|
reader = SensorReader(mock_sensor_config, mock_discovery_manager, on_measurement)
|
|
assert reader.sensor_config == mock_sensor_config
|
|
assert reader.on_measurement == on_measurement
|
|
assert reader.interface == 0
|
|
|
|
@patch("sensorpajen.sensor_reader.toggle_device")
|
|
@patch("sensorpajen.sensor_reader.enable_le_scan")
|
|
@patch("sensorpajen.sensor_reader.parse_le_advertising_events")
|
|
def test_sensor_reader_start(mock_parse, mock_enable, mock_toggle, mock_sensor_config, mock_discovery_manager):
|
|
on_measurement = MagicMock()
|
|
reader = SensorReader(mock_sensor_config, mock_discovery_manager, on_measurement)
|
|
|
|
# Mock bluez.hci_open_dev where it's used in sensor_reader
|
|
with patch("sensorpajen.sensor_reader.bluez.hci_open_dev", return_value=123):
|
|
# We need to stop the blocking call to parse_le_advertising_events
|
|
mock_parse.side_effect = KeyboardInterrupt()
|
|
|
|
reader.start()
|
|
|
|
mock_toggle.assert_called_with(0, True)
|
|
mock_enable.assert_called_with(123, filter_duplicates=False)
|
|
mock_parse.assert_called_once()
|
|
|
|
@patch("sensorpajen.sensor_reader.raw_packet_to_str")
|
|
def test_handle_ble_packet_known_sensor(mock_raw_to_str, mock_sensor_config, mock_discovery_manager):
|
|
on_measurement = MagicMock()
|
|
reader = SensorReader(mock_sensor_config, mock_discovery_manager, on_measurement)
|
|
|
|
# Mock data
|
|
mac = "AA:BB:CC:DD:EE:FF"
|
|
data = b"\x00" * 20
|
|
# ATC packet format: ... 1A18 AABBCCDDEEFF ...
|
|
# data_str[6:10] == "1A18"
|
|
# data_str[10:22] == "AABBCCDDEEFF"
|
|
mock_raw_to_str.return_value = "0000001A18AABBCCDDEEFF000000000000"
|
|
|
|
# Mock _parse_atc_data
|
|
with patch.object(reader, "_parse_atc_data") as mock_parse_atc:
|
|
mock_parse_atc.return_value = (22.5, 45, 100, 3.0, "123")
|
|
|
|
reader._handle_ble_packet(mac, 0, data, -70)
|
|
|
|
on_measurement.assert_called_once()
|
|
measurement = on_measurement.call_args[0][0]
|
|
assert measurement.temperature == 22.5
|
|
assert measurement.humidity == 45
|
|
assert measurement.sensor_name == "Living Room"
|
|
|
|
@patch("sensorpajen.sensor_reader.raw_packet_to_str")
|
|
def test_handle_ble_packet_unknown_sensor(mock_raw_to_str, mock_sensor_config, mock_discovery_manager):
|
|
on_measurement = MagicMock()
|
|
reader = SensorReader(mock_sensor_config, mock_discovery_manager, on_measurement)
|
|
|
|
# Mock data for unknown sensor
|
|
mac = "11:22:33:44:55:66"
|
|
data = b"\x00" * 20
|
|
mock_raw_to_str.return_value = "0000001A18112233445566000000000000"
|
|
|
|
with patch.object(reader, "_parse_atc_data") as mock_parse_atc:
|
|
mock_parse_atc.return_value = (20.0, 50, 80, 2.8, "456")
|
|
|
|
with patch.object(reader, "_handle_unknown_sensor") as mock_handle_unknown:
|
|
reader._handle_ble_packet(mac, 0, data, -80)
|
|
|
|
mock_handle_unknown.assert_called_once_with(
|
|
"11:22:33:44:55:66", -80, 20.0, 50, 80, 2.8
|
|
)
|
|
on_measurement.assert_not_called()
|