From 47d21a72043a0bd23d1ea6f4d4fb526b41c946d4 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 21 May 2026 14:08:03 -0500 Subject: [PATCH 1/2] Fix flaky test_overflow_queue in history websocket tests Comparing the full bus listener dict can fail when unrelated components asynchronously add or remove EVENT_STATE_CHANGED listeners during the test; capture listeners before and after via a context manager that excludes state_changed. --- .../components/history/test_websocket_api.py | 132 +++++++++++------- 1 file changed, 80 insertions(+), 52 deletions(-) diff --git a/tests/components/history/test_websocket_api.py b/tests/components/history/test_websocket_api.py index 21bb7f6c385eb6..08cddd40338385 100644 --- a/tests/components/history/test_websocket_api.py +++ b/tests/components/history/test_websocket_api.py @@ -1,6 +1,8 @@ """The tests the History component websocket_api.""" import asyncio +from collections.abc import Iterator +from contextlib import contextmanager from datetime import timedelta from unittest.mock import ANY, patch @@ -9,7 +11,12 @@ from homeassistant.components import history from homeassistant.components.history import websocket_api -from homeassistant.const import EVENT_HOMEASSISTANT_FINAL_WRITE, STATE_OFF, STATE_ON +from homeassistant.const import ( + EVENT_HOMEASSISTANT_FINAL_WRITE, + EVENT_STATE_CHANGED, + STATE_OFF, + STATE_ON, +) from homeassistant.core import HomeAssistant, callback from homeassistant.helpers.event import async_track_state_change_event from homeassistant.setup import async_setup_component @@ -35,6 +42,27 @@ def listeners_without_writes(listeners: dict[str, int]) -> dict[str, int]: } +@contextmanager +def assert_no_listener_leak(hass: HomeAssistant) -> Iterator[None]: + """Capture bus listeners on entry, assert no leak on exit. + + EVENT_STATE_CHANGED is excluded because unrelated components can + asynchronously add or remove state_changed listeners during a test. + """ + excluded = {EVENT_HOMEASSISTANT_FINAL_WRITE, EVENT_STATE_CHANGED} + + def _snapshot() -> dict[str, int]: + return { + key: value + for key, value in hass.bus.async_listeners().items() + if key not in excluded + } + + before = _snapshot() + yield + assert _snapshot() == before + + @pytest.mark.usefixtures("hass_history") def test_setup() -> None: """Test setup method of history.""" @@ -1595,61 +1623,61 @@ async def test_overflow_queue( await async_wait_recording_done(hass) client = await hass_ws_client() - init_listeners = hass.bus.async_listeners() - await client.send_json( - { + with assert_no_listener_leak(hass): + await client.send_json( + { + "id": 1, + "type": "history/stream", + "entity_ids": wanted_entities, + "start_time": now.isoformat(), + "include_start_time_state": True, + "significant_changes_only": False, + "no_attributes": True, + "minimal_response": True, + } + ) + response = await client.receive_json() + assert response["success"] + assert response["id"] == 1 + assert response["type"] == "result" + + response = await client.receive_json() + first_end_time = sensor_two_last_updated_timestamp + + assert response == { + "event": { + "end_time": pytest.approx(first_end_time), + "start_time": pytest.approx(now.timestamp()), + "states": { + "sensor.one": [ + { + "lu": pytest.approx(sensor_one_last_updated_timestamp), + "s": "on", + } + ], + "sensor.two": [ + { + "lu": pytest.approx(sensor_two_last_updated_timestamp), + "s": "off", + } + ], + }, + }, "id": 1, - "type": "history/stream", - "entity_ids": wanted_entities, - "start_time": now.isoformat(), - "include_start_time_state": True, - "significant_changes_only": False, - "no_attributes": True, - "minimal_response": True, + "type": "event", } - ) - response = await client.receive_json() - assert response["success"] - assert response["id"] == 1 - assert response["type"] == "result" - - response = await client.receive_json() - first_end_time = sensor_two_last_updated_timestamp - assert response == { - "event": { - "end_time": pytest.approx(first_end_time), - "start_time": pytest.approx(now.timestamp()), - "states": { - "sensor.one": [ - { - "lu": pytest.approx(sensor_one_last_updated_timestamp), - "s": "on", - } - ], - "sensor.two": [ - { - "lu": pytest.approx(sensor_two_last_updated_timestamp), - "s": "off", - } - ], - }, - }, - "id": 1, - "type": "event", - } - - await async_recorder_block_till_done(hass) - # Overflow the queue - for val in range(10): - hass.states.async_set("sensor.one", str(val), attributes={"any": "attr"}) - hass.states.async_set("sensor.two", str(val), attributes={"any": "attr"}) - await async_recorder_block_till_done(hass) - - assert listeners_without_writes( - hass.bus.async_listeners() - ) == listeners_without_writes(init_listeners) + await async_recorder_block_till_done(hass) + # Overflow the queue + for val in range(10): + hass.states.async_set( + "sensor.one", str(val), attributes={"any": "attr"} + ) + hass.states.async_set( + "sensor.two", str(val), attributes={"any": "attr"} + ) + await async_recorder_block_till_done(hass) @pytest.mark.usefixtures("recorder_mock") From 974b5ece190c39ea11c5d28862b04d6598fe650e Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 21 May 2026 14:21:35 -0500 Subject: [PATCH 2/2] Verify state_changed unsubscribe is invoked on overflow Add a spy on async_track_state_change_event so the test directly asserts the live stream's unsubscribe callable is invoked when the queue overflows, instead of relying on the (unreliable) shared EVENT_STATE_CHANGED bus listener count. --- .../components/history/test_websocket_api.py | 28 +++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/tests/components/history/test_websocket_api.py b/tests/components/history/test_websocket_api.py index 08cddd40338385..b8f3f78cefc961 100644 --- a/tests/components/history/test_websocket_api.py +++ b/tests/components/history/test_websocket_api.py @@ -1,9 +1,10 @@ """The tests the History component websocket_api.""" import asyncio -from collections.abc import Iterator +from collections.abc import Callable, Iterator from contextlib import contextmanager from datetime import timedelta +from typing import Any from unittest.mock import ANY, patch from freezegun import freeze_time @@ -1599,7 +1600,28 @@ async def test_overflow_queue( """Test overflowing the history stream queue.""" now = dt_util.utcnow() wanted_entities = ["sensor.two", "sensor.four", "sensor.one"] - with patch.object(websocket_api, "MAX_PENDING_HISTORY_STATES", 5): + + unsub_calls = 0 + + def spy_track_state_change_event(*args: Any, **kwargs: Any) -> Callable[[], None]: + nonlocal unsub_calls + real_unsub = async_track_state_change_event(*args, **kwargs) + + def wrapped_unsub() -> None: + nonlocal unsub_calls + unsub_calls += 1 + real_unsub() + + return wrapped_unsub + + with ( + patch.object(websocket_api, "MAX_PENDING_HISTORY_STATES", 5), + patch.object( + websocket_api, + "async_track_state_change_event", + spy_track_state_change_event, + ), + ): await async_setup_component( hass, "history", @@ -1679,6 +1701,8 @@ async def test_overflow_queue( ) await async_recorder_block_till_done(hass) + assert unsub_calls == 1 + @pytest.mark.usefixtures("recorder_mock") async def test_history_during_period_for_invalid_entity_ids(