Skip to content

Commit 07fc9aa

Browse files
authored
B #7211Fix parsing VM requirements (#3766)
Signed-off-by: Mirko Stojiljkovic <mstojiljkovic@opennebula.io>
1 parent fe67119 commit 07fc9aa

File tree

3 files changed

+131
-86
lines changed

3 files changed

+131
-86
lines changed

src/schedm_mad/remotes/one_drs/lib/mapper/model.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class DStoreRequirement:
9999
size: int
100100
# Whether a local disk of the assigned host can be used.
101101
allow_host_dstores: bool = True
102-
# The IDs of the matching host datastores.
102+
# The IDs of the matching host disks.
103103
# Dict {host ID: list of IDs of the matching disks}. If `None`, all
104104
# host disks are considered matching.
105105
host_dstore_ids: Optional[dict[int, list[int]]] = None
@@ -113,7 +113,8 @@ class DStoreMatches:
113113
vm_id: int
114114
# The ID or index of the datastore requirement.
115115
requirement: int
116-
# The IDs of the hosts (keys) with suitable storage (values).
116+
# The IDs of the hosts (keys), each with the list of IDs of suitable
117+
# disks (values).
117118
host_dstores: dict[int, list[int]]
118119
# The IDs of the shared datastores with suitable storage.
119120
shared_dstores: list[int]

src/schedm_mad/remotes/one_drs/lib/optimizer_parser.py

Lines changed: 101 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# limitations under the License. #
1616
# -------------------------------------------------------------------------- #
1717

18+
from collections import defaultdict
1819
import io
1920
import sys
2021
from dataclasses import replace
@@ -218,58 +219,59 @@ def build_optimizer(self) -> ILPOptimizer:
218219

219220
def _parse_vm_requirements(self) -> dict[int, VMRequirements]:
220221
vm_requirements = {}
222+
vm_pool = {
223+
vm.id: vm for vm in self.scheduler_driver_action.vm_pool.vm
224+
}
221225
for vm_req in self.scheduler_driver_action.requirements.vm:
222-
for vm in self.scheduler_driver_action.vm_pool.vm:
223-
if vm.id == vm_req.id:
224-
storage = self._build_vm_storage(vm, vm_req)
225-
cpu_current = float(vm.monitoring.cpu or 0)
226-
cpu_forecast = float(vm.monitoring.cpu_forecast or 0)
227-
net_current = float(vm.monitoring.nettx_bw or 0) + float(
228-
vm.monitoring.netrx_bw or 0
229-
)
230-
net_forecast = float(vm.monitoring.nettx_bw_forecast or 0) + float(
231-
vm.monitoring.netrx_bw_forecast or 0
232-
)
233-
disk_current = float(vm.monitoring.diskrdbytes_bw or 0) + float(
234-
vm.monitoring.diskwrbytes_bw or 0
235-
)
236-
disk_forecast = float(
237-
vm.monitoring.diskrdbytes_bw_forecast or 0
238-
) + float(vm.monitoring.diskwrbytes_bw_forecast or 0)
239-
# Predictive factor only for 'optimize'
240-
cpu_usage = (
241-
self._apply_predictive_adjustment(cpu_current, cpu_forecast)
242-
if self.mode.upper() == "OPTIMIZE"
243-
else cpu_current
244-
)
245-
net_usage = (
246-
self._apply_predictive_adjustment(net_current, net_forecast)
247-
if self.mode.upper() == "OPTIMIZE"
248-
else net_current
249-
)
250-
disk_usage = (
251-
self._apply_predictive_adjustment(disk_current, disk_forecast)
252-
if self.mode.upper() == "OPTIMIZE"
253-
else disk_current
254-
)
255-
vm_requirements[int(vm_req.id)] = VMRequirements(
256-
id=int(vm_req.id),
257-
state=self._map_vm_state(vm.state, vm.lcm_state),
258-
memory=int(vm.template.memory),
259-
cpu_ratio=float(vm.template.cpu),
260-
cpu_usage=cpu_usage,
261-
storage=storage,
262-
disk_usage=disk_usage,
263-
pci_devices=self._build_pci_devices_requirements(
264-
vm.template.pci
265-
),
266-
host_ids=set(vm_req.hosts.id),
267-
share_vnets=not self.config["DIFFERENT_VNETS"],
268-
nic_matches={nic.id: nic.vnets.id for nic in vm_req.nic},
269-
net_usage=net_usage,
270-
)
271-
else:
272-
self._build_used_dstores(vm)
226+
if vm := vm_pool.get(vm_req.id):
227+
storage = self._build_vm_storage(vm, vm_req)
228+
cpu_current = float(vm.monitoring.cpu or 0)
229+
cpu_forecast = float(vm.monitoring.cpu_forecast or 0)
230+
net_current = float(vm.monitoring.nettx_bw or 0) + float(
231+
vm.monitoring.netrx_bw or 0
232+
)
233+
net_forecast = float(vm.monitoring.nettx_bw_forecast or 0) + float(
234+
vm.monitoring.netrx_bw_forecast or 0
235+
)
236+
disk_current = float(vm.monitoring.diskrdbytes_bw or 0) + float(
237+
vm.monitoring.diskwrbytes_bw or 0
238+
)
239+
disk_forecast = float(
240+
vm.monitoring.diskrdbytes_bw_forecast or 0
241+
) + float(vm.monitoring.diskwrbytes_bw_forecast or 0)
242+
# Predictive factor only for 'optimize'
243+
cpu_usage = (
244+
self._apply_predictive_adjustment(cpu_current, cpu_forecast)
245+
if self.mode.upper() == "OPTIMIZE"
246+
else cpu_current
247+
)
248+
net_usage = (
249+
self._apply_predictive_adjustment(net_current, net_forecast)
250+
if self.mode.upper() == "OPTIMIZE"
251+
else net_current
252+
)
253+
disk_usage = (
254+
self._apply_predictive_adjustment(disk_current, disk_forecast)
255+
if self.mode.upper() == "OPTIMIZE"
256+
else disk_current
257+
)
258+
vm_requirements[int(vm_req.id)] = VMRequirements(
259+
id=int(vm_req.id),
260+
state=self._map_vm_state(vm.state, vm.lcm_state),
261+
memory=int(vm.template.memory),
262+
cpu_ratio=float(vm.template.cpu),
263+
cpu_usage=cpu_usage,
264+
storage=storage,
265+
disk_usage=disk_usage,
266+
pci_devices=self._build_pci_devices_requirements(
267+
vm.template.pci
268+
),
269+
host_ids=set(vm_req.hosts.id),
270+
share_vnets=not self.config["DIFFERENT_VNETS"],
271+
nic_matches={nic.id: nic.vnets.id for nic in vm_req.nic},
272+
net_usage=net_usage,
273+
)
274+
self._build_used_dstores(vm)
273275
return vm_requirements
274276

275277
def _parse_vm_groups(self) -> list[VMGroup]:
@@ -453,6 +455,7 @@ def _parse_host_capacities(self) -> list[HostCapacity]:
453455
]
454456

455457
def _parse_datastore_capacities(self) -> list[DStoreCapacity]:
458+
shared_dstore_ids = self.get_ds_map()[0]
456459
return [
457460
DStoreCapacity(
458461
id=int(store.id),
@@ -470,6 +473,7 @@ def _parse_datastore_capacities(self) -> list[DStoreCapacity]:
470473
cluster_ids=store.clusters.id,
471474
)
472475
for store in self.scheduler_driver_action.datastore_pool.datastore
476+
if int(store.id) in shared_dstore_ids
473477
]
474478

475479
def _parse_vnet_capacities(self) -> list[VNetCapacity]:
@@ -602,7 +606,7 @@ def _sanity_check(value):
602606
return max(0, min(1, value))
603607

604608
def _build_vm_storage(self, vm, vm_req):
605-
_, _, host_ds = self.get_ds_map()
609+
storage_map = self.get_ds_map()
606610
ds_req = {}
607611
for req_id, disk in enumerate(vm.template.disk):
608612
disk_attrs = {e.qname.upper(): e.text for e in disk.any_element}
@@ -617,7 +621,9 @@ def _build_vm_storage(self, vm, vm_req):
617621
else:
618622
clone_attr = disk_attrs.get("CLONE")
619623
if clone_attr is None:
620-
host_ds, share_ds, allow = self._build_dstores(vm_req, host_ds)
624+
host_ds, share_ds, allow = self._build_dstores(
625+
vm_req, storage_map
626+
)
621627
ds_req[req_id] = DStoreRequirement(
622628
id=req_id,
623629
vm_id=int(vm.id),
@@ -660,7 +666,9 @@ def _build_vm_storage(self, vm, vm_req):
660666
)
661667
# system DS req
662668
else:
663-
host_ds, share_ds, allow = self._build_dstores(vm_req, host_ds)
669+
host_ds, share_ds, allow = self._build_dstores(
670+
vm_req, storage_map
671+
)
664672
ds_req[req_id] = DStoreRequirement(
665673
id=req_id,
666674
vm_id=int(vm.id),
@@ -670,7 +678,7 @@ def _build_vm_storage(self, vm, vm_req):
670678
shared_dstore_ids=share_ds,
671679
)
672680
if not vm.template.disk and vm_req.datastores.id:
673-
host_ds, share_ds, allow = self._build_dstores(vm_req, host_ds)
681+
host_ds, share_ds, allow = self._build_dstores(vm_req, storage_map)
674682
ds_req[0] = DStoreRequirement(
675683
id=0,
676684
vm_id=int(vm.id),
@@ -681,40 +689,52 @@ def _build_vm_storage(self, vm, vm_req):
681689
)
682690
return ds_req
683691

684-
def _build_dstores(self, vm_req, host_ds):
685-
host_ds, share_ds, allow_host_dstores = {}, [], False
692+
def _build_dstores(self, vm_req, storage_map):
693+
host_disks, share_ds = defaultdict(set), []
694+
all_shared, _, host_ds = storage_map
686695
for _ds_id in vm_req.datastores.id:
687-
if _ds_id in host_ds.keys():
688-
(
689-
host_ds[host_ds[_ds_id]].append(_ds_id)
690-
if _ds_id in host_ds
691-
else host_ds[host_ds[_ds_id]]
692-
)
693-
allow_host_dstores = True
694-
else:
696+
if _ds_id in host_ds:
697+
host_ids = host_ds[_ds_id]
698+
for host_id in host_ids:
699+
host_disks[host_id].add(0)
700+
elif _ds_id in all_shared:
695701
share_ds.append(_ds_id)
696-
return host_ds, share_ds, allow_host_dstores
702+
# TODO: Formulate the logic to decide whether shared or local
703+
# datastores are allowed <MS 2025-08-25>.
704+
# NOTE: Currently, the original logic is kept, which assumes
705+
# allowing local (and forbidding shared) datastores if they are
706+
# given in the VM requirements <MS 2025-08-25>.
707+
host_disks_ = {
708+
host_id: list(disk_ids) for host_id, disk_ids in host_disks.items()
709+
}
710+
return host_disks_, share_ds, bool(host_disks)
697711

698712
def _build_disk_capacity(self, host):
699-
return {
700-
disk.id: Capacity(total=disk.total_mb, usage=disk.used_mb)
701-
for disk in getattr(host.host_share.datastores, "ds", [])
702-
if hasattr(disk, "id")
703-
}
713+
dstores = host.host_share.datastores
714+
used = dstores.used_disk
715+
free = dstores.free_disk
716+
return {0: Capacity(total=free + used, usage=used)}
704717

705718
def _build_used_dstores(self, vm):
706719
if vm_hist := vm.history_records.history:
707720
last_rec = max(vm_hist, key=lambda item: item.seq)
721+
# NOTE: We can take this from the current placement.
708722
ds_id = last_rec.ds_id
709723
# Host DS
710-
_, _, host_ds = self.get_ds_map()
724+
all_shared, _, host_ds = self.get_ds_map()
711725
if ds_id in host_ds.keys():
712-
self.used_host_dstores[vm.id, 0] = ds_id
726+
# NOTE: These are host disks, not datastores.
727+
self.used_host_dstores[vm.id, 0] = 0
713728
# Shared system ds or image ds
714-
else:
729+
elif ds_id in all_shared:
715730
self.used_shared_dstores[vm.id, 0] = ds_id
716731

717-
def get_ds_map(self) -> tuple[set[int], set[int], dict[int, int]]:
732+
def get_ds_map(self) -> tuple[set[int], set[int], dict[int, list[int]]]:
733+
# NOTE: Retuned items:
734+
# [0]: `set` of the IDs of shared system datastores
735+
# [1]: `set` of the IDs of image datastores
736+
# [2]: `dict` of the IDs of shared local datastores (keys) and
737+
# `list` of the IDs of the corresponding hosts (values)
718738
shared_ds, image_ds = set(), set()
719739
for ds in self.scheduler_driver_action.datastore_pool.datastore:
720740
ds_attrs = {
@@ -725,13 +745,12 @@ def get_ds_map(self) -> tuple[set[int], set[int], dict[int, int]]:
725745
image_ds.add(int(ds.id))
726746
elif ds_attrs.get("SHARED") == "YES":
727747
shared_ds.add(int(ds.id))
728-
host_ds_dict = {
729-
int(host_ds.id): int(host.id)
730-
for host in self.scheduler_driver_action.host_pool.host
731-
for host_ds in getattr(host.host_share.datastores, "ds", [])
732-
if hasattr(host_ds, "id")
733-
}
734-
return shared_ds, image_ds, host_ds_dict
748+
host_ds_dict: defaultdict[int, list[int]] = defaultdict(list)
749+
for host in self.scheduler_driver_action.host_pool.host:
750+
host_id = int(host.id)
751+
for host_ds in host.host_share.datastores.ds:
752+
host_ds_dict[int(host_ds.id)].append(host_id)
753+
return shared_ds, image_ds, dict(host_ds_dict)
735754

736755
def get_system_ds(self, host_id):
737756
host = next(

src/schedm_mad/remotes/one_drs/lib/optimizer_serializer.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# limitations under the License. #
1616
# -------------------------------------------------------------------------- #
1717

18+
from collections import defaultdict
1819
from typing import Union
1920

2021
from xsdata.formats.dataclass.context import XmlContext
@@ -70,6 +71,7 @@ def build_optimizer_output(
7071
datastore_info = (
7172
f"using system datastore {ds_id}"
7273
if shared
74+
# TODO: Check if this works well when ID is zero.
7375
else (f"using host datastore {ds_id}" if ds_id else "without datastore")
7476
)
7577
logs.append(
@@ -83,7 +85,11 @@ def build_optimizer_output(
8385
return plan, logs
8486

8587
def _get_vm_ds(self, alloc) -> tuple[int, bool]:
86-
shared_ds, _, _ = self.parser.get_ds_map()
88+
# NOTE: It is probably redundant to call the method
89+
# ``OptimizerParser.get_ds_map`` once for each VM. It would
90+
# probably be enough calling it once, maybe just from the
91+
# parser.
92+
shared_ds, _, local = self.parser.get_ds_map()
8793
if alloc.shared_dstore_ids:
8894
# NOTE: This can contain both system shared datastores and
8995
# image datastores. We need the system shared datastore.
@@ -92,7 +98,26 @@ def _get_vm_ds(self, alloc) -> tuple[int, bool]:
9298
return _ds, True
9399
elif alloc.host_dstore_ids:
94100
# NOTE: The VM can only have one host datastore.
95-
return next(iter(alloc.dstore_ids)), False
101+
102+
# TODO: Optimize this so that mappings are built only once.
103+
# Mapping: host ID -> set of local dstore IDs.
104+
host_storage: defaultdict[int, set[int]] = defaultdict(set)
105+
for local_dstore_id, host_ids in local.items():
106+
for host_id in host_ids:
107+
host_storage[host_id].add(local_dstore_id)
108+
# Mapping: VM ID -> set of dstore IDs.
109+
vm_storage: dict[int, set[int]] = {}
110+
for vm_req in self.parser.scheduler_driver_action.requirements.vm:
111+
vm_storage[vm_req.id] = set(vm_req.datastores.id)
112+
# Matching datastore IDs:
113+
# TODO: Consider optimization in the case of large sets.
114+
# Iterate over the smaller and return the first element that
115+
# appears in the larger set (e.g.
116+
# `match_ = next(id for id in smaller if id in larger)`).
117+
matches = host_storage[alloc.host_id] & vm_storage[alloc.vm_id]
118+
match_ = matches.pop() # next(iter(matches))
119+
return match_, False
120+
96121
sys_ds = self.parser.get_system_ds(alloc.host_id)
97122
return sys_ds, False
98123

0 commit comments

Comments
 (0)