Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
"request": "launch",
"program": "/venv/bin/virtac",
"console": "integratedTerminal",
"args": [
"-v"
],
"env": {
"EPICS_CA_SERVER_PORT": "8064",
"EPICS_CA_REPEATER_PORT": "8065",
Expand Down
48 changes: 38 additions & 10 deletions src/atip/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ class ATSimulator:
for new changes to the AT
lattice and recalculate the
physics data upon a change.
_new_data_lock (asyncio.Lock): A lock which can be taken
to stop new caput callbacks
being added to the queue while
held.
"""

_loop: asyncio.BaseEventLoop
Expand All @@ -105,6 +109,7 @@ class ATSimulator:
_quit_thread: asyncio.Event
_up_to_date: asyncio.Event
_calculation_task: asyncio.Task
_new_data_lock: asyncio.Lock

@classmethod
async def create(cls, at_lattice, callback=None, disable_emittance=False):
Expand Down Expand Up @@ -146,6 +151,7 @@ async def create(cls, at_lattice, callback=None, disable_emittance=False):
self._quit_thread = asyncio.Event()
self._up_to_date = asyncio.Event()
self._up_to_date.set()
self._new_data_lock = asyncio.Lock()

self._calculation_task = asyncio.create_task(
self._recalculate_phys_data(callback)
Expand All @@ -160,11 +166,13 @@ async def queue_set(self, func, field, value):
field (str): The field to be changed.
value (float): The value to be set.
"""
await self._queue.put((func, field, value))
# If this flag gets cleared while we are recalculating, then it can cause
# everything to lock, so we setup a lock between this function and the
# recalculate function
logging.debug(f"Added task to async queue. qsize={self._queue.qsize()}")
async with self._new_data_lock:
await self._queue.put((func, field, value))
# If this flag gets cleared while we are recalculating, then it can cause
# everything to lock, so we setup a lock between this function and the
# recalculate function
self._up_to_date.clear()
logging.debug(f"Added task to async queue. qsize={self._queue.qsize()}")

async def _gather_one_sample(self):
"""If the queue is empty Wait() yields until an item is added. When the
Expand Down Expand Up @@ -234,19 +242,39 @@ async def _recalculate_phys_data(self, callback):
# update lattice data. TODO: We currently update the pvs anyway
# but this wont do anything, so could be improved
warn(at.AtWarning(e), stacklevel=1)
# try:
# self._lattice_data = calculate_optics(
# self._at_lat,
# self._rp,
# self._disable_emittance,
# )
# except Exception as e:
# # If an error is found while doing the calculations we dont
# # update lattice data. TODO: We currently update the pvs anyway
# # but this wont do anything, so could be improved
# warn(at.AtWarning(e), stacklevel=1)

# Signal the up to date flag since the physics data is now up to
# date. We do this before the callback is executed in case the
# callback checks the flag.
self._up_to_date.set()
logging.debug("Simulation up to date.")
if callback is not None:
logging.debug(f"Executing callback function: {callback.__name__}")
await callback()
logging.debug("Callback completed.")
async with self._new_data_lock:
if callback is not None:
logging.debug(
f"Executing callback function: {callback.__name__}"

@ptsOSL ptsOSL May 1, 2025

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lock stops incoming caputs from clearing the _up_to_date flag while we are trying to output the new data to our pvs.

When we set the pytac simulated lattice, we always wait for the up_to_date flag to be set, but if we recalculate data and then a caput comes in, we still want to update our pvs with the calculations we just did. This way our pvs are in sync with our pytac lattice, but they may not be in sync with caputs coming in. This is because we accept these caputs immediately but only process the data later.

If we did process the data and recalculate the data immediately and then output it to the pvs, each caput would take 500ms which isnt feasible.

Checking for new data immediately after recalculating and before updating the pvs and throwing away the new data if there is a new caput is also not feasible due to the frequency of incoming caputs.

Outputting data to the pvs should be very quick, the only reason it needs to be async is because we need to await on a flag to be set, but when we are setting pytac lattices from this function, that flag is guarenteed to be set due to the lock, so this lock shouldnt cause any noticeable performance issues as it is used here.

)
# For Virtac this function calls update_pvs() which gets data
# from the pytac datasource to update the softioc pvs with. The
# data source is sim_data_sources.py and its get_value()
# function waits on the wait_for_calculation() function which waits for the
# up_to_date flag which currently will always be set, so this
# process is pointless.
await callback()
logging.debug("Callback completed.")
# After this point we assume new setpoints have made the data stale. We
# cant clear this flag in queue_set() as the callbacks can depend on
# this being set.
self._up_to_date.clear()

def toggle_calculations(self):
"""Pause or unpause the physics calculations by setting or clearing the
Expand Down