Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 105 additions & 53 deletions tests/components/history/test_websocket_api.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
"""The tests the History component websocket_api."""

import asyncio
from collections.abc import Callable, Iterator
from contextlib import contextmanager
from datetime import timedelta
from typing import Any
from unittest.mock import ANY, patch

from freezegun import freeze_time
import pytest

from homeassistant.components import history
from homeassistant.components.history import websocket_api
from homeassistant.const import EVENT_HOMEASSISTANT_FINAL_WRITE, STATE_OFF, STATE_ON
from homeassistant.const import (
EVENT_HOMEASSISTANT_FINAL_WRITE,
EVENT_STATE_CHANGED,
STATE_OFF,
STATE_ON,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.setup import async_setup_component
Expand All @@ -35,6 +43,27 @@ def listeners_without_writes(listeners: dict[str, int]) -> dict[str, int]:
}


@contextmanager
def assert_no_listener_leak(hass: HomeAssistant) -> Iterator[None]:
"""Capture bus listeners on entry, assert no leak on exit.

EVENT_STATE_CHANGED is excluded because unrelated components can
asynchronously add or remove state_changed listeners during a test.
"""
excluded = {EVENT_HOMEASSISTANT_FINAL_WRITE, EVENT_STATE_CHANGED}

Comment thread
bdraco marked this conversation as resolved.
def _snapshot() -> dict[str, int]:
return {
key: value
for key, value in hass.bus.async_listeners().items()
if key not in excluded
}

before = _snapshot()
yield
assert _snapshot() == before


@pytest.mark.usefixtures("hass_history")
def test_setup() -> None:
"""Test setup method of history."""
Expand Down Expand Up @@ -1571,7 +1600,28 @@ async def test_overflow_queue(
"""Test overflowing the history stream queue."""
now = dt_util.utcnow()
wanted_entities = ["sensor.two", "sensor.four", "sensor.one"]
with patch.object(websocket_api, "MAX_PENDING_HISTORY_STATES", 5):

unsub_calls = 0

def spy_track_state_change_event(*args: Any, **kwargs: Any) -> Callable[[], None]:
nonlocal unsub_calls
real_unsub = async_track_state_change_event(*args, **kwargs)

def wrapped_unsub() -> None:
nonlocal unsub_calls
unsub_calls += 1
real_unsub()

return wrapped_unsub

with (
patch.object(websocket_api, "MAX_PENDING_HISTORY_STATES", 5),
patch.object(
websocket_api,
"async_track_state_change_event",
spy_track_state_change_event,
),
):
await async_setup_component(
hass,
"history",
Expand All @@ -1595,61 +1645,63 @@ async def test_overflow_queue(
await async_wait_recording_done(hass)

client = await hass_ws_client()
init_listeners = hass.bus.async_listeners()

await client.send_json(
{
with assert_no_listener_leak(hass):
await client.send_json(
{
"id": 1,
"type": "history/stream",
"entity_ids": wanted_entities,
"start_time": now.isoformat(),
"include_start_time_state": True,
"significant_changes_only": False,
"no_attributes": True,
"minimal_response": True,
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 1
assert response["type"] == "result"

response = await client.receive_json()
first_end_time = sensor_two_last_updated_timestamp

assert response == {
"event": {
"end_time": pytest.approx(first_end_time),
"start_time": pytest.approx(now.timestamp()),
"states": {
"sensor.one": [
{
"lu": pytest.approx(sensor_one_last_updated_timestamp),
"s": "on",
}
],
"sensor.two": [
{
"lu": pytest.approx(sensor_two_last_updated_timestamp),
"s": "off",
}
],
},
},
"id": 1,
"type": "history/stream",
"entity_ids": wanted_entities,
"start_time": now.isoformat(),
"include_start_time_state": True,
"significant_changes_only": False,
"no_attributes": True,
"minimal_response": True,
"type": "event",
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 1
assert response["type"] == "result"

response = await client.receive_json()
first_end_time = sensor_two_last_updated_timestamp

assert response == {
"event": {
"end_time": pytest.approx(first_end_time),
"start_time": pytest.approx(now.timestamp()),
"states": {
"sensor.one": [
{
"lu": pytest.approx(sensor_one_last_updated_timestamp),
"s": "on",
}
],
"sensor.two": [
{
"lu": pytest.approx(sensor_two_last_updated_timestamp),
"s": "off",
}
],
},
},
"id": 1,
"type": "event",
}

await async_recorder_block_till_done(hass)
# Overflow the queue
for val in range(10):
hass.states.async_set("sensor.one", str(val), attributes={"any": "attr"})
hass.states.async_set("sensor.two", str(val), attributes={"any": "attr"})
await async_recorder_block_till_done(hass)

assert listeners_without_writes(
hass.bus.async_listeners()
) == listeners_without_writes(init_listeners)
await async_recorder_block_till_done(hass)
# Overflow the queue
for val in range(10):
hass.states.async_set(
"sensor.one", str(val), attributes={"any": "attr"}
)
hass.states.async_set(
"sensor.two", str(val), attributes={"any": "attr"}
)
await async_recorder_block_till_done(hass)

assert unsub_calls == 1


@pytest.mark.usefixtures("recorder_mock")
Expand Down