From 2ee436ff4cf46d28c876e889fbb80a52553b35e4 Mon Sep 17 00:00:00 2001 From: Sevy Harris Date: Fri, 29 May 2026 17:38:09 -0400 Subject: [PATCH 1/7] Add tests for UQ assign_intermediate_uncertainties --- test/rmgpy/tools/uncertaintyTest.py | 75 +++++++++++++++++++++++++---- 1 file changed, 66 insertions(+), 9 deletions(-) diff --git a/test/rmgpy/tools/uncertaintyTest.py b/test/rmgpy/tools/uncertaintyTest.py index 35ac31049a..69e3badde7 100644 --- a/test/rmgpy/tools/uncertaintyTest.py +++ b/test/rmgpy/tools/uncertaintyTest.py @@ -169,16 +169,73 @@ def test_uncertainty_assignment(self): thermo_unc = self.uncertainty.thermo_input_uncertainties kinetic_unc = self.uncertainty.kinetic_input_uncertainties - np.testing.assert_allclose( - thermo_unc, - [1.5, 1.5, 2.61966, 2.51994, 2.23886, 1.5, 2.30761, 2.41611, 2.61966, 2.51994, 2.61966, 2.51994, 2.61966, 1.5, 2.23886, 2.30761, 1.5, 2.61966, 2.07366, 2.19376, 2.19376, 2.30761, 1.94616, 2.07366, 2.07366], - rtol=1e-4, - ) - np.testing.assert_allclose( - kinetic_unc, - [0.5, 1.118, 1.9783, 1.9783, 1.5363, 0.5, 2.0, 1.5363, 1.5363, 0.5], - rtol=1e-4 + expected_uncorrelated_thermo_uncertainties = np.array([1.5, 1.5, 2.61966, 2.51994, 2.23886, 1.5, 2.30761, 2.41611, 2.61966, 2.51994, 2.61966, 2.51994, 2.61966, 1.5, 2.23886, 2.30761, 1.5, 2.61966, 2.07366, 2.19376, 2.19376, 2.30761, 1.94616, 2.07366, 2.07366]) + expected_uncorrelated_kinetic_uncertainties = np.array([0.5, 1.118, 1.9783, 1.9783, 1.5363, 0.5, 2.0, 1.5363, 1.5363, 0.5]) + np.testing.assert_allclose(thermo_unc, expected_uncorrelated_thermo_uncertainties, rtol=1e-4) + np.testing.assert_allclose(kinetic_unc, expected_uncorrelated_kinetic_uncertainties, rtol=1e-4) + + # ---------------------------- Now repeat for assign_intermediate_uncertainties ----------------------------- + # uncorrelated + self.uncertainty.assign_intermediate_uncertainties(correlated=False) + intermediate_thermo_unc = self.uncertainty.thermo_intermediate_uncertainties + intermediate_kinetic_unc = self.uncertainty.kinetic_intermediate_uncertainties + np.testing.assert_allclose(intermediate_thermo_unc, expected_uncorrelated_thermo_uncertainties, rtol=1e-4) + np.testing.assert_allclose(intermediate_kinetic_unc, expected_uncorrelated_kinetic_uncertainties, rtol=1e-4) + + # correlated + self.uncertainty.assign_intermediate_uncertainties(correlated=True) + + # do a spot check on some of the intermediates (dG/dq) these are derivatives, not uncertainties + # Thermo library example + assert self.uncertainty.thermo_intermediate_uncertainties[0].keys() == {'Library O(0)'} + assert self.uncertainty.thermo_intermediate_uncertainties[0]['Library O(0)'] == 1 + + # Thermo GAV example + assert tuple(sorted(self.uncertainty.thermo_intermediate_uncertainties[2].keys())) == ('Estimation HO2(2)', 'Group(group) O2s-OsH', 'Group(other) R', 'Group(radical) HOOJ') + assert self.uncertainty.thermo_intermediate_uncertainties[2]['Estimation HO2(2)'] == 1 + assert self.uncertainty.thermo_intermediate_uncertainties[2]['Group(group) O2s-OsH'] == 2 + assert self.uncertainty.thermo_intermediate_uncertainties[2]['Group(other) R'] == 2 + assert self.uncertainty.thermo_intermediate_uncertainties[2]['Group(radical) HOOJ'] == 1 + + # Thermo library + GAV + assert tuple(sorted(self.uncertainty.thermo_intermediate_uncertainties[14].keys())) == ('Estimation CH3(14)', 'Group(radical) CH3', 'Library CH4(16)') + assert self.uncertainty.thermo_intermediate_uncertainties[14]['Estimation CH3(14)'] == 1 + assert self.uncertainty.thermo_intermediate_uncertainties[14]['Group(radical) CH3'] == 1 + assert self.uncertainty.thermo_intermediate_uncertainties[14]['Library CH4(16)'] == 1 + + # Kinetics library + assert self.uncertainty.kinetic_intermediate_uncertainties[0].keys() == {'Library O(0)+H2O2(3)<=>OH(1)+HO2(2)'} + + # Rate rule (exact) + assert tuple(sorted(self.uncertainty.kinetic_intermediate_uncertainties[1].keys())) == ('Estimation Family CH3(14)+PC3H7(15)<=>CH4(16)+CH2CH2CH2(17)', 'Rate Rule H_Abstraction C/H3/Cs;C_methyl') + assert self.uncertainty.kinetic_intermediate_uncertainties[1]['Estimation Family CH3(14)+PC3H7(15)<=>CH4(16)+CH2CH2CH2(17)'] == 1 + assert self.uncertainty.kinetic_intermediate_uncertainties[1]['Rate Rule H_Abstraction C/H3/Cs;C_methyl'] == 1 + + # Rate rule (non-exact, multiple rule weights) + assert tuple(sorted(self.uncertainty.kinetic_intermediate_uncertainties[3].keys())) == ( + 'Estimation Family C2H3(20)+C3H8(19)<=>C2H4(11)+PC3H7(15)', + 'Estimation Nonexact C2H3(20)+C3H8(19)<=>C2H4(11)+PC3H7(15)', + 'Rate Rule H_Abstraction C/H3/Cs\\H2\\Cs|O;Cd_Cd\\H2_rad/Cs', + 'Rate Rule H_Abstraction C/H3/Cs\\H3;Cd_Cd\\H2_pri_rad', ) + assert self.uncertainty.kinetic_intermediate_uncertainties[3]['Estimation Family C2H3(20)+C3H8(19)<=>C2H4(11)+PC3H7(15)'] == 1 + assert np.isclose(self.uncertainty.kinetic_intermediate_uncertainties[3]['Estimation Nonexact C2H3(20)+C3H8(19)<=>C2H4(11)+PC3H7(15)'], 0.4771212547, rtol=1e-4) + assert self.uncertainty.kinetic_intermediate_uncertainties[3]['Rate Rule H_Abstraction C/H3/Cs\\H2\\Cs|O;Cd_Cd\\H2_rad/Cs'] == 0.5 + assert self.uncertainty.kinetic_intermediate_uncertainties[3]['Rate Rule H_Abstraction C/H3/Cs\\H3;Cd_Cd\\H2_pri_rad'] == 0.5 + + # Training reaction + assert self.uncertainty.kinetic_intermediate_uncertainties[5].keys() == {'Training H_Abstraction CH3(14)+C2H6(18)<=>CH4(16)+C2H5(12)'} + assert self.uncertainty.kinetic_intermediate_uncertainties[5]['Training H_Abstraction CH3(14)+C2H6(18)<=>CH4(16)+C2H5(12)'] == 1 + + # PDEP + assert self.uncertainty.kinetic_intermediate_uncertainties[6].keys() == {'PDep HCCO(10)(+M)<=>O(0)+C2H(8)(+M)'} + assert self.uncertainty.kinetic_intermediate_uncertainties[6]['PDep HCCO(10)(+M)<=>O(0)+C2H(8)(+M)'] == 1 + + # correlated uncertainties should match uncorrelated, so check diagonal of covariance matrix + thermo_covariance = np.sqrt(self.uncertainty.get_thermo_covariance_matrix().diagonal()) + kinetic_covariance = np.sqrt(self.uncertainty.get_kinetic_covariance_matrix().diagonal()) + assert np.isclose(thermo_covariance, expected_uncorrelated_thermo_uncertainties, rtol=1e-4).all() + assert np.isclose(kinetic_covariance, expected_uncorrelated_kinetic_uncertainties, rtol=1e-4).all() def test_local_analysis(self): """ From 33b5fa0826691c9cfcd95ea37217db26907414a6 Mon Sep 17 00:00:00 2001 From: Sevy Harris Date: Wed, 27 May 2026 12:06:23 -0400 Subject: [PATCH 2/7] Add uncertainty tests for correlated library match Add some tests to make sure the correct index is referenced when assembling thermo sources that use libraries. For example, if CH3 is estimated from a CH4 library + a radical correction, we want to make sure the uncertainty source points to CH4 as the index for the library value used (as opposed to CH3) --- test/rmgpy/tools/uncertaintyTest.py | 46 +++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/test/rmgpy/tools/uncertaintyTest.py b/test/rmgpy/tools/uncertaintyTest.py index 69e3badde7..b3772ebecf 100644 --- a/test/rmgpy/tools/uncertaintyTest.py +++ b/test/rmgpy/tools/uncertaintyTest.py @@ -237,6 +237,52 @@ def test_uncertainty_assignment(self): assert np.isclose(thermo_covariance, expected_uncorrelated_thermo_uncertainties, rtol=1e-4).all() assert np.isclose(kinetic_covariance, expected_uncorrelated_kinetic_uncertainties, rtol=1e-4).all() + def test_source_correlations(self): + # Check some examples of different species containing the same sources + + # ------------------------------------------------------------------------------ + # Make sure CH3 (Library + Radical) has a library index/value in common with CH4 + i_CH4 = rmgpy.tools.uncertainty.get_i_thing(rmgpy.species.Species(smiles='C'), self.uncertainty.species_list) + assert i_CH4 >= 0 + + i_CH3 = rmgpy.tools.uncertainty.get_i_thing(rmgpy.species.Species(smiles='[CH3]'), self.uncertainty.species_list) + assert i_CH3 >= 0 + + self.uncertainty.extract_sources_from_model() + self.uncertainty.assign_parameter_uncertainties(correlated=True) + + src1 = self.uncertainty.species_sources_dict[self.uncertainty.species_list[i_CH4]] # CH4 + src2 = self.uncertainty.species_sources_dict[self.uncertainty.species_list[i_CH3]] # CH3 + + assert 'Library' in src1 + assert 'Library' in src2 + assert 'GAV' in src2 + assert src1['Library'] == src2['Library'] # make sure they refer to the same library source + + # ----------------------------------------------------------------------------- + # Make sure CH3X (Library + GAV + Adsorption Correction) has a library index/value in common with CH4 (Library) + i_CH3X = rmgpy.tools.uncertainty.get_i_thing(rmgpy.species.Species(smiles='C*'), self.uncertainty.species_list) + assert i_CH3X == -1 + # This is not in the model, so add it to the species list + CH3X = rmgpy.species.Species(smiles='C*') + CH3X.thermo = self.uncertainty.database.thermo.get_thermo_data(CH3X) + self.uncertainty.species_list.append(CH3X) + i_CH3X = rmgpy.tools.uncertainty.get_i_thing(CH3X, self.uncertainty.species_list) + assert i_CH3X >= 0 + + self.uncertainty.extract_sources_from_model() + self.uncertainty.assign_parameter_uncertainties(correlated=True) + + src1 = self.uncertainty.species_sources_dict[self.uncertainty.species_list[i_CH4]] # CH4 + src2 = self.uncertainty.species_sources_dict[self.uncertainty.species_list[i_CH3X]] # CH3X + + assert 'Library' in src1 + assert 'Library' in src2 + assert 'ADS' in src2 + assert 'GAV' in src2 + assert src1['Library'] == src2['Library'] # make sure they refer to the same library source + self.uncertainty.species_list.pop() # remove the extra species so it doesn't affect other tests + def test_local_analysis(self): """ Test to run uncorrelated and then correlated local_analysis and make sure the results are expected From 6f5a3ead77047608f45e2ee31dc6a88ba7308c7e Mon Sep 17 00:00:00 2001 From: Sevy Harris Date: Thu, 28 May 2026 09:44:42 -0400 Subject: [PATCH 3/7] refactor thermo extract_source_from_comments This refactors the extract_source_from_comments function and changes library sources so that a tuple of the library name and the library entry object is returned instead of just the library name. It also returns a tuple of QM method and molecule used for the case of QM sources. --- rmgpy/data/thermo.py | 272 +++++++++++++++++++++++----------- test/rmgpy/data/thermoTest.py | 117 +++++---------- 2 files changed, 227 insertions(+), 162 deletions(-) diff --git a/rmgpy/data/thermo.py b/rmgpy/data/thermo.py index 5a699ae223..3dd817dab9 100644 --- a/rmgpy/data/thermo.py +++ b/rmgpy/data/thermo.py @@ -1643,7 +1643,7 @@ def correct_binding_energy(self, thermo, species, metal_to_scale_from=None, meta thermo.comment += f" Binding energy corrected by LSR ({'+'.join(comments)}) from {metal_to_scale_from} (H={change_in_binding_energy/1e3:+.0f}kJ/mol)" return thermo - def get_thermo_data_for_surface_species(self, species): + def get_thermo_data_for_surface_species(self, species, return_resonance_data=False): """ Get the thermo data for an adsorbed species, by desorbing it, finding the thermo of the gas-phase @@ -1652,6 +1652,9 @@ def get_thermo_data_for_surface_species(self, species): Does not apply linear scaling relationship. Returns a :class:`ThermoData` object, with no Cp0 or CpInf + + option to return resonance data if return_resonance_data is True, + which is useful for identifying the specific molecule chosen """ # define the comparison function to find the lowest energy @@ -1741,7 +1744,7 @@ def species_enthalpy(species): atom.label = label # a tuple of molecule and its thermo - resonance_data.append((molecule, thermo)) + resonance_data.append((molecule, thermo, gas_phase_species)) # Get the enthalpy of formation of every adsorbate at 298 K and # determine the resonance structure with the lowest enthalpy of formation. @@ -1751,7 +1754,7 @@ def species_enthalpy(species): resonance_data = sorted(resonance_data, key=lambda x: x[1].H298.value_si) # reorder the resonance structures (molecules) by their H298 - species.molecule = [mol for mol, thermo in resonance_data] + species.molecule = [mol for mol, thermo, gas_phase_species in resonance_data] thermo = resonance_data[0][1] @@ -1760,6 +1763,9 @@ def species_enthalpy(species): find_cp0_and_cpinf(species, thermo) + if return_resonance_data is True: # is True makes this a little more error-proof in case user accidentally provides another argument that evaluates to True + return thermo, resonance_data + return thermo def _add_adsorption_correction(self, adsorption_thermo, adsorption_groups, molecule, surface_sites): @@ -2854,82 +2860,9 @@ def get_ring_groups_from_comments(self, thermo_data): return ring_groups, polycyclic_groups - def extract_source_from_comments(self, species): - """ - `species`: A species object containing thermo data and thermo data comments - - Parses the verbose string of comments from the thermo data of the species object, - and extracts the thermo sources. - - Returns a dictionary with keys of 'Library', 'QM', 'ADS', and/or 'GAV'. - Commonly, species thermo are estimated using only one of these sources. - However, a radical can be estimated with more than one type of source, for - instance a saturated library value and a GAV HBI correction, or a QM saturated value - and a GAV HBI correction. Adsorbates can be estimated using a single library - for the adsorbate or a combination of a gas phase library for the - gas phase portion and an adsorption correction. - - source = {'Library': String_Name_of_Library_Used, - 'QM': String_of_Method_Used, - 'GAV': Dictionary_of_Groups_Used, - 'ADS': Dictionary_of_Adsorption_Group_Used, - } - - The Dictionary_of_Groups_Used looks like - {'groupType':[List of tuples containing (Entry, Weight)] - """ - comment = species.thermo.comment - tokens = comment.split() - - source = {} - - if comment.startswith('Thermo library'): - # Store name of the library source, which is the 3rd token in the comments - source['Library'] = tokens[2] - - elif comment.startswith('QM'): - # Store the level of the calculation, which is the 2nd token in the comments - source['QM'] = tokens[1] - - elif comment.startswith('Gas phase thermo'): - # Handle adsorption correction thermo data of the following format: - # Library example - # Gas phase thermo for C(T) from Thermo library: primaryThermoLibrary. - # Adsorption correction: + Thermo group additivity estimation: adsorptionPt111(Cq*) - - # GAV example - # Gas phase thermo for [CH]CC from Thermo group additivity estimation: group(Cs-CsCsHH) + group(Cs-CsHHH) + group(Cs-CsHHH) + radical(CCJ2_triplet). - # Adsorption correction: + Thermo group additivity estimation: adsorptionPt111(C=*RCR3)" - - comment = comment.replace(r'\n', ' ') - comment = comment.replace('\n', ' ') - if 'Adsorption correction:' not in comment: - raise ValueError(f'adsorption correction in unrecognized format {comment}') - - # Handle the gas-phase portion first - gas_comment = comment.split('Adsorption correction: + ')[0].strip() - if gas_comment.endswith('.'): - gas_comment = gas_comment[:-1] # delete the . at the end if it exists - gas_comment = gas_comment[gas_comment.find('from ', len('Gas phase thermo for ')) + len('from '):] - dummy_gas_phase_species = Species() - dummy_gas_phase_species.thermo = NASA() - dummy_gas_phase_species.thermo.comment = gas_comment - source = self.extract_source_from_comments(dummy_gas_phase_species) - - # This is an adsorption correction - # comment is split into two parts: the gas phase, and the surface adsorption correction - ads_correction_comment = comment.split('Adsorption correction: +')[-1].strip() - dummy_adsorption_correction_species = Species() - dummy_adsorption_correction_species.thermo = NASA() - dummy_adsorption_correction_species.thermo.comment = ads_correction_comment - source['ADS'] = self.extract_source_from_comments(dummy_adsorption_correction_species)['GAV'] - - return source - - # Check for group additivity contributions to the thermo in this species - - # The contribution of the groups can be either additive or subtracting - # after changes to the polycyclic algorithm + def _parse_gav_groups(self, comment): + """Extract the groups from the comment""" + groups = {} comment = comment.replace(' + ', ' +') comment = comment.replace(' - ', ' -') @@ -2939,10 +2872,12 @@ def extract_source_from_comments(self, species): # groups are still split by spaces comment = comment.replace(')\n+', ') +') comment = comment.replace(')\n-', ') -') + # `Thermo group additivity estimation:\nadsorptionPt111(...)` shows up in + # adsorbate comments - keep the trailing colon separated from the group token. + comment = comment.replace(':\n', ': ') comment = comment.replace('\n', '') tokens = comment.split(' ') - groups = {} group_types = list(self.groups.keys()) regex = r"\((.*)\)" # only hit outermost parentheses @@ -2970,14 +2905,185 @@ def extract_source_from_comments(self, species): if groups: # Indicate that group additivity is used when it is either an HBI correction - # onto a thermo library or QM value, or if the entire molecule is estimated using group additivity + # onto a thermo library or QM value, or if the entire molecule is estimated using group additivity # Save the groups into the source dictionary - # Convert groups back into tuples + # Convert groups back into tuples for groupType, groupDict in groups.items(): groups[groupType] = list(groupDict.items()) - source['GAV'] = groups + return groups + + def _parse_library_source(self, comment, library_species): + # handle the library source comment, which looks like "Thermo library: library_name" + # we then need to retrieve the specific library entry given the species + # but may have unfortunate line breaks in the middle + + # trim the comment down to just the library portion so it starts with Thermo library: + split_loc = comment.find('Thermo library:') + if split_loc == -1: + raise ValueError(f"Expected 'Thermo library:' in comment, got {comment}") + + comment = comment[split_loc:] + + # library name is the token that comes immediately after 'Thermo library:' + assert 'Thermo library:' in comment, f"Expected 'Thermo library:' in comment, got {comment}" + tokens = comment.split() + library_name = tokens[2] + if library_name not in self.libraries: + raise DatabaseError(f"Thermo library '{library_name}' referenced in comment is not loaded. Make sure libraries match input file used to generate thermo.") + + results = self.get_thermo_data_from_library(library_species, self.libraries[library_name]) + if results is None: + raise DatabaseError(f"Could not find a library match for {library_species} in library {library_name}") + + data, thermo_library, library_entry = results + return (library_name, library_entry) + + def _parse_adsorption_correction(self, comment): + # handle the adsorption correction comment, which looks like + # "Adsorption correction: + Thermo group additivity estimation: adsorptionPt111(C-XR2CR3)" + # but may have unfortunate line breaks in the middle + + # check that the number of tokens matches our expectation for an adsorption correction + # should be 8, maybe 9 if there was a weird line break + tokens = comment.split() + if len(tokens) not in [8, 9]: + raise ValueError(f"Expected 8 or 9 tokens in adsorption correction comment, got {len(tokens)}: {comment}") + + ADS = self._parse_gav_groups(comment) + + if len(ADS) > 1: + raise ValueError("Only adsorption corrections should be present in the adsorption correction portion of the comment. Found: {}".format(ADS)) + + return ADS + + def extract_source_from_comments(self, species): + """ + `species`: A species object containing thermo data and thermo data comments + + Parses the verbose string of comments from the thermo data of the species object, + and extracts the thermo sources. + + Returns a dictionary with keys of 'Library', 'QM', 'ADS', and/or 'GAV'. + Commonly, species thermo are estimated using only one of these sources. + However, a radical can be estimated with more than one type of source, for + instance a saturated library value and a GAV HBI correction, or a QM saturated value + and a GAV HBI correction. Adsorbates can be estimated using a single library + for the adsorbate or a combination of a gas phase library for the + gas phase portion and an adsorption correction. + + source = {'Library': (String_Name_of_Library_Used, Library_Entry_Used), + 'QM': (String_of_Method_Used, Species_Used_for_QM), + 'GAV': Dictionary_of_Groups_Used, + 'ADS': Dictionary_of_Adsorption_Group_Used, + } + + The Dictionary_of_Groups_Used looks like + {'groupType':[List of tuples containing (Entry, Weight)] + """ + + # TODO: solvent, electrocat, LSR + source = {} + + comment = species.thermo.comment + tokens = comment.split() + + ads_correction = 'Gas phase thermo' in comment and 'Adsorption correction:' in comment + library = 'Thermo library' in comment + QM = 'QM' in tokens + GAV = 'Thermo group additivity estimation:' in comment # ambiguous since ads correction looks identical to group + + # the biggest thing to split on first is the adsorption correction + if ads_correction: + # The source options here are: + # (Library(gas-phase species), Adsorption correction) + # (QM(gas-phase species), Adsorption correction) + # (GAV(gas-phase species), Adsorption correction) + # (Library(gas-phase species), GAV(radical correction), Adsorption correction) + # (QM(gas-phase species), GAV(radical correction), Adsorption correction) + + # split the comment into the gas phase thermo portion and the adsorption correction portion + split_loc = comment.find('Adsorption correction:') + if split_loc == -1: + raise ValueError(f"Expected 'Adsorption correction:' in comment, got {comment}") + gas_comment = comment[:split_loc].strip() + if gas_comment.endswith('.'): + gas_comment = gas_comment[:-1] # the period that closed the gas-phase sentence + ads_correction_comment = comment[split_loc:].strip() + source['ADS'] = self._parse_adsorption_correction(ads_correction_comment) + + # get the desorbed gas species + species_copy = deepcopy(species) + thermo, resonance_data = self.get_thermo_data_for_surface_species(species_copy, return_resonance_data=True) + desorbed_gas_species = resonance_data[0][2] + + groups = self._parse_gav_groups(gas_comment) + if groups: # (Library/QM + GAV + ADS) or (GAV + ADS) + source['GAV'] = groups # handle the GAV portion + + if library or QM: # (Library/QM + GAV + ADS) + # this means the library/QM species is the desorbed, saturated gas-phase version of the adsorbate + + assert desorbed_gas_species.molecule[0].is_radical(), "Method only valid for radicals." + molecule = desorbed_gas_species.molecule[0] # no need to deepcopy again since get_desorbed_molecules already does deepcopy + molecule.saturate_radicals() # note, this returns a dictionary instead of the Molecule object, but it modifies the molecule in place, so we can just ignore the returned dictionary + saturated_desorbed_gas_species = Species(molecule=[molecule]) + saturated_desorbed_gas_species.generate_resonance_structures() + + if library: # (Library(gas-phase species), GAV(radical correction), Adsorption correction) + source['Library'] = self._parse_library_source(gas_comment, saturated_desorbed_gas_species) + if QM: # (QM(gas-phase species), GAV(radical correction), Adsorption correction) + # whatever token comes immediately after 'QM' is the method used + source['QM'] = (tokens[tokens.index('QM') + 1], saturated_desorbed_gas_species) + + else: + # no groups, so this is (Library/QM + ADS) + # in this case, the library/QM species is the desorbed gas-phase molecule of the adsorbate + if library: + source['Library'] = self._parse_library_source(gas_comment, desorbed_gas_species) + if QM: + # whatever token comes immediately after 'QM' is the method used + source['QM'] = (tokens[tokens.index('QM') + 1], desorbed_gas_species) + + else: + # gas phase only, source options are: + # (Library) + # (QM) + # (GAV) + # (Library, GAV) + # (QM, GAV) + groups = self._parse_gav_groups(comment) + GAV = 'Thermo group additivity estimation:' in comment + if GAV and not groups: + raise ValueError("No groups were found in the comments but 'Thermo group additivity estimation:' was in the comment. Comment: {}".format(comment)) + elif not GAV and groups: + if 'radical' not in groups.keys(): + raise ValueError("Groups were found in the comments but 'Thermo group additivity estimation:' was not in the comment. Comment: {}".format(comment)) + + if groups: + # Get groups first + source['GAV'] = groups + if library or QM: # (Library/QM, GAV) + # get the saturated species for the library source + if 'radical' not in groups.keys(): + raise ValueError("Method only valid for radicals, but no radical groups were found. Comment: {}".format(comment)) + molecule = deepcopy(species.molecule[0]) + assert molecule.is_radical(), "Method only valid for radicals." + molecule.saturate_radicals() # note, this returns a dictionary instead of the Molecule object, but it modifies the molecule in place, so we can just ignore the returned dictionary + saturated_species = Species(molecule=[molecule]) + saturated_species.generate_resonance_structures() + if library: # (Library, GAV) + source['Library'] = self._parse_library_source(comment, saturated_species) + if QM: # (QM, GAV) + # whatever token comes immediately after 'QM' is the method used + source['QM'] = (tokens[tokens.index('QM') + 1], saturated_species) + else: # (Library) or (QM) + if library: + source['Library'] = self._parse_library_source(comment, species) + if QM: + # whatever token comes immediately after 'QM' is the method used + source['QM'] = (tokens[tokens.index('QM') + 1], species) # Perform a sanity check that this molecule is estimated by at least one method if not list(source.keys()): diff --git a/test/rmgpy/data/thermoTest.py b/test/rmgpy/data/thermoTest.py index 656f4b8554..2d87e22de8 100644 --- a/test/rmgpy/data/thermoTest.py +++ b/test/rmgpy/data/thermoTest.py @@ -289,96 +289,50 @@ def test_parse_thermo_comments(self): assert len(source["GAV"]["longDistanceInteraction_cyclic"]) == 1 assert len(source["GAV"]["ring"]) == 1 - # Pure library thermo - dipk = Species( + propane = Species( index=1, - label="DIPK", + label="propane", thermo=NASA( polynomials=[ - NASAPolynomial( - coeffs=[ - 3.35002, - 0.017618, - -2.46235e-05, - 1.7684e-08, - -4.87962e-12, - 35555.7, - 5.75335, - ], - Tmin=(100, "K"), - Tmax=(888.28, "K"), - ), - NASAPolynomial( - coeffs=[ - 6.36001, - 0.00406378, - -1.73509e-06, - 5.05949e-10, - -4.49975e-14, - 35021, - -8.41155, - ], - Tmin=(888.28, "K"), - Tmax=(5000, "K"), - ), + NASAPolynomial(coeffs=[0.4987,0.0308692,-7.94064e-06,-5.75209e-09,3.10575e-12,-14122,21.6027], Tmin=(298,'K'), Tmax=(1036.31,'K')), + NASAPolynomial(coeffs=[2.0491,0.0302287,-1.47485e-05,3.60335e-09,-3.51534e-13,-14730.3,12.6832], Tmin=(1036.31,'K'), Tmax=(3000,'K')) ], - Tmin=(100, "K"), - Tmax=(5000, "K"), - comment="""Thermo library: DIPK""", + Tmin=(298,'K'), Tmax=(3000,'K'), E0=(-119.854,'kJ/mol'), Cp0=(33.2579,'J/(mol*K)'), CpInf=(249.434,'J/(mol*K)'), + label="""C3H8""", + comment="""Thermo library: DFT_QCI_thermo""" ), - molecule=[Molecule(smiles="CC(C)C(=O)C(C)C")], + molecule=[Molecule(smiles="CCC")], ) - source = self.database.extract_source_from_comments(dipk) - assert "Library" in source + CCC_source = self.database.extract_source_from_comments(propane) + assert "Library" in CCC_source + assert CCC_source["Library"][0] == "DFT_QCI_thermo" # Mixed library and HBI thermo - dipk_rad = Species( + propane_rad = Species( index=4, - label="R_tert", + label="propane_rad", thermo=NASA( polynomials=[ - NASAPolynomial( - coeffs=[ - 2.90061, - 0.0298018, - -7.06268e-05, - 6.9636e-08, - -2.42414e-11, - 54431, - 5.44492, - ], - Tmin=(100, "K"), - Tmax=(882.19, "K"), - ), - NASAPolynomial( - coeffs=[ - 6.70999, - 0.000201027, - 6.65617e-07, - -7.99543e-11, - 4.08721e-15, - 54238.6, - -9.73662, - ], - Tmin=(882.19, "K"), - Tmax=(5000, "K"), - ), + NASAPolynomial(coeffs=[0.599885,0.0320439,-2.63211e-05,1.15072e-08,-2.01662e-12,66381.4,20.2736], Tmin=(298,'K'), Tmax=(1331.58,'K')), + NASAPolynomial(coeffs=[6.43539,0.0145141,-6.57382e-06,1.62046e-09,-1.60381e-13,64827.3,-9.55036], Tmin=(1331.58,'K'), Tmax=(3000,'K')) ], - Tmin=(100, "K"), - Tmax=(5000, "K"), - comment="""Thermo library: DIPK + radical(C2CJCHO)""", + Tmin=(298,'K'), Tmax=(3000,'K'), E0=(549.676,'kJ/mol'), Cp0=(33.2579,'J/(mol*K)'), CpInf=(249.434,'J/(mol*K)'), + comment="""Thermo library: DFT_QCI_thermo + radical(CJ3)""" ), molecule=[ - Molecule(smiles="C[C](C)C(=O)C(C)C"), - Molecule(smiles="CC(C)=C([O])C(C)C"), + Molecule(smiles="CC[C]") ], ) - source = self.database.extract_source_from_comments(dipk_rad) - assert "Library" in source - assert "GAV" in source - assert len(source["GAV"]["radical"]) == 1 + CCC_rad_source = self.database.extract_source_from_comments(propane_rad) + assert "Library" in CCC_rad_source + assert "GAV" in CCC_rad_source + assert len(CCC_rad_source["GAV"]["radical"]) == 1 + + assert CCC_source["Library"][0] == CCC_rad_source["Library"][0], "The library source should be the same for the radical and non-radical species since they come from the same library entry." + assert CCC_source["Library"][1].item.is_isomorphic(CCC_rad_source["Library"][1].item), "The library source should be the same molecule for the radical and non-radical species since they come from the same library entry." + # Pure QM thermo cineole = Species( @@ -422,6 +376,7 @@ def test_parse_thermo_comments(self): source = self.database.extract_source_from_comments(cineole) assert "QM" in source + assert source["QM"][0] == "MopacMolPM3" # Mixed QM and HBI thermo cineole_rad = Species( @@ -465,6 +420,7 @@ def test_parse_thermo_comments(self): source = self.database.extract_source_from_comments(cineole_rad) assert "QM" in source + assert source["QM"][0] == "MopacMolPM3" assert "GAV" in source assert len(source["GAV"]["radical"]) == 1 @@ -604,9 +560,12 @@ def test_parse_thermo_comments(self): OX = rmgpy.species.Species(smiles="O=*") OX.thermo = rmgpy.thermo.NASA() OX.thermo.comment = 'Thermo library: surfaceThermoPt111' - source = self.database.extract_source_from_comments(OX) + # load the surfaceThermoPt111 library into databaseWithNoLibraries for this one example since it's not worth the ~7s it takes to make a separate test database + self.databaseWithoutLibraries.load_libraries(path=os.path.join(settings["database.directory"], "thermo", "libraries"), libraries=["surfaceThermoPt111"]) + source = self.databaseWithoutLibraries.extract_source_from_comments(OX) + self.databaseWithoutLibraries.unload_libraries('surfaceThermoPt111') # unload the library again so it doesn't interfere with other tests assert "Library" in source - assert source["Library"] == "surfaceThermoPt111" + assert source["Library"][0] == "surfaceThermoPt111" # Gas library + adsorption correction CH2X = rmgpy.species.Species(smiles="[CH2]=*") @@ -614,7 +573,7 @@ def test_parse_thermo_comments(self): CH2X.thermo.comment = 'Gas phase thermo for CH2(T) from Thermo library: primaryThermoLibrary. Adsorption correction: + Thermo group additivity estimation:\nadsorptionPt111(C=XR2)' source = self.database.extract_source_from_comments(CH2X) assert "Library" in source - assert source["Library"] == "primaryThermoLibrary" + assert source["Library"][0] == "primaryThermoLibrary" assert "ADS" in source assert source['ADS']['adsorptionPt111'][0][0].label == 'C=XR2' assert source['ADS']['adsorptionPt111'][0][1] == 1 # weight should be 1 @@ -637,10 +596,10 @@ def test_parse_thermo_comments(self): # Gas library + radical for HBI + adsorption correction CHOX = rmgpy.species.Species(smiles="O=[CH]*") CHOX.thermo = rmgpy.thermo.NASA() - CHOX.thermo.comment = 'Gas phase thermo for [CH]=O from Thermo library: primaryThermoLibrary + radical(HCdsJO). Adsorption correction: + Thermo group additivity estimation: adsorptionPt111(C-XR3)' + CHOX.thermo.comment = 'Gas phase thermo for [CH]=O from Thermo library: DFT_QCI_thermo + radical(HCdsJO). Adsorption correction: + Thermo group additivity estimation: adsorptionPt111(C-XR3)' source = self.database.extract_source_from_comments(CHOX) assert "Library" in source - assert source["Library"] == "primaryThermoLibrary" + assert source["Library"][0] == "DFT_QCI_thermo" assert "GAV" in source assert source["GAV"]["radical"][0][0].label == "HCdsJO" assert source["GAV"]["radical"][0][1] == 1 # weight should be 1 @@ -656,7 +615,7 @@ def test_parse_thermo_comments(self): OX.thermo.comment = OX_comment # set the comment to be the generated comment source = self.database.extract_source_from_comments(OX) assert "Library" in source - assert source["Library"] == "primaryThermoLibrary" + assert source["Library"][0] == "primaryThermoLibrary" assert 'ADS' in source assert source['ADS']['adsorptionPt111'][0][0].label == 'OX' assert source['ADS']['adsorptionPt111'][0][1] == 1 # weight should be 1 @@ -667,7 +626,7 @@ def test_parse_thermo_comments(self): CH2CHCH_ads.thermo = self.database.get_thermo_data(CH2CHCH_ads) source = self.database.extract_source_from_comments(CH2CHCH_ads) assert "Library" in source - assert source["Library"] == "DFT_QCI_thermo" + assert source["Library"][0] == "DFT_QCI_thermo" assert "GAV" in source assert source["GAV"]["radical"][0][0].label == "AllylJ2_triplet" assert source["GAV"]["radical"][0][1] == 1 # weight should be 1 From 210f585cc218662199fad23b31b5be3cbb58b71b Mon Sep 17 00:00:00 2001 From: Sevy Harris Date: Thu, 28 May 2026 10:52:22 -0400 Subject: [PATCH 4/7] return kinetic library entry when parsing comments Before this, the kinetics version of extract_source_from_comment returned the library name, but now it returns a tuple of the library name and the library entry. This makes things much easier downstream in the uncertainty tool. --- rmgpy/data/kinetics/database.py | 22 ++++++++++++++++++++-- test/rmgpy/data/kinetics/kineticsTest.py | 2 +- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/rmgpy/data/kinetics/database.py b/rmgpy/data/kinetics/database.py index e92b3df3c9..12f474fad5 100644 --- a/rmgpy/data/kinetics/database.py +++ b/rmgpy/data/kinetics/database.py @@ -695,7 +695,7 @@ def extract_source_from_comments(self, reaction): A reaction can only be estimated using one of these methods. source = {'RateRules': (Family_Label, OriginalTemplate, RateRules), - 'Library': String_Name_of_Library_Used, + 'Library': (String_Name_of_Library_Used, Library_Entry), 'PDep': Network_Index, 'Training': (Family_Label, Training_Reaction_Entry), } @@ -715,7 +715,25 @@ def extract_source_from_comments(self, reaction): source['Rate Rules'] = data_source elif isinstance(reaction, LibraryReaction): # This reaction comes from a reaction library or seed mechanism - source['Library'] = reaction.library + if reaction.library not in self.libraries: + raise ValueError(f'Library {reaction.library} not found in kinetics database. Make sure libraries match input file used to generate kinetics.') + library_reactions = self.generate_reactions_from_library( + library=self.libraries[reaction.library], + reactants=reaction.reactants, + products=reaction.products + ) + if not library_reactions: + raise ValueError(f'Could not find reaction {reaction} in library {reaction.library} when trying to extract source data from comments.') + entry = self.libraries[reaction.library].entries[library_reactions[0].entry.index] + if len(library_reactions) > 1: + # Library contains multiple reactions of the same type, so pick the one with matching kinetics + for rxn in library_reactions: + if rxn.entry.data.is_similar_to(reaction.kinetics): # units might have been changed, so use is_similar_to instead of direct comparison + entry = self.libraries[reaction.library].entries[rxn.entry.index] + break + else: + raise ValueError(f'Found multiple instances of reaction {reaction} in library {reaction.library}, but none of them have kinetics identical to the reaction kinetics.') + source['Library'] = (reaction.library, entry) elif isinstance(reaction, PDepReaction): # This reaction is a pressure-dependent reaction diff --git a/test/rmgpy/data/kinetics/kineticsTest.py b/test/rmgpy/data/kinetics/kineticsTest.py index 36f21e1085..2f89918b7e 100644 --- a/test/rmgpy/data/kinetics/kineticsTest.py +++ b/test/rmgpy/data/kinetics/kineticsTest.py @@ -672,7 +672,7 @@ def test_parse_kinetics(self): # ------------------------------------------------------------------- # # Source 0 comes from a kinetics library assert "Library" in sources[0] - assert sources[0]["Library"] == "GRI-Mech3.0" + assert sources[0]["Library"][0] == "GRI-Mech3.0" reconstructed_kinetics = self.database.kinetics.reconstruct_kinetics_from_source(reactions[0], sources[0], fix_barrier_height=True) A = reconstructed_kinetics.A.value_si From 004d62c9d64def32e8b26d98ed628f63f2907d39 Mon Sep 17 00:00:00 2001 From: Sevy Harris Date: Thu, 28 May 2026 13:46:50 -0400 Subject: [PATCH 5/7] update uncertainty tool to use library entry objects instead of names When the library sources only included the name of the library, the uncertainty code had to compensate by adding extra species that are not in the model. Now that we're using library entry objects we are able to remove the complexity of the uncertainty.extra_species object. --- rmgpy/tools/uncertainty.py | 237 ++++++++-------------------- test/rmgpy/tools/uncertaintyTest.py | 9 +- 2 files changed, 74 insertions(+), 172 deletions(-) diff --git a/rmgpy/tools/uncertainty.py b/rmgpy/tools/uncertainty.py index 965df1db6d..368a5eae40 100644 --- a/rmgpy/tools/uncertainty.py +++ b/rmgpy/tools/uncertainty.py @@ -69,9 +69,9 @@ def get_uncertainty_value(self, source): varG += self.dG_library * self.dG_library if 'Surface_Library' in source: surf_lib_varG = self.dG_surf_lib * self.dG_surf_lib - # covariance libraries should overrule the default uncertainties when available + # covariance libraries should overrule the default uncertainties when available, if self.other_covariances is not None: - label = f'Surface_Library {source["Surface_Library"]}' # match the covariance dict label format + label = source["Surface_Library"][2] # match the covariance dict label format cov_label = (label, label) if cov_label in self.other_covariances: surf_lib_varG = self.other_covariances[cov_label] @@ -92,26 +92,29 @@ def get_partial_uncertainty_value(self, source, corr_source_type, corr_param=Non """ Obtain the partial uncertainty dG/dG_corr*dG_corr, where dG_corr is the correlated parameter - `corr_param` is the parameter identifier itself, which is a integer for QM and library parameters, or a string for group values + `corr_param` is the parameter identifier itself, which is + (method_name, species) for QM + (library_name, library_entry, entry_label) for library parameters + or a string for group values `corr_source_type` is a string, being either 'Library', 'QM', 'GAV', 'ADS', or 'Estimation' `corr_group_type` is a string used only when the source type is 'GAV' and indicates grouptype """ if corr_source_type == 'Library': - if 'Library' in source: - if source['Library'] == corr_param: + if 'Library' in source: # check if same library and index match, could do isomorphism check on the entries, but this is faster + if source['Library'][0] == corr_param[0] and source['Library'][1].index == corr_param[1].index: # Correlated parameter is a source of the overall parameter return self.dG_library elif corr_source_type == 'Surface_Library': if 'Surface_Library' in source: - if source['Surface_Library'] == corr_param: + if source['Surface_Library'][0] == corr_param[0] and source['Surface_Library'][1].index == corr_param[1].index: # Correlated parameter is a source of the overall parameter return self.dG_surf_lib elif corr_source_type == 'QM': - if 'QM' in source: - if source['QM'] == corr_param: + if 'QM' in source: # here we do an isomorphism check since no index is saved for QM + if source['QM'][0] == corr_param[0] and source['QM'][1].is_isomorphic(corr_param[1]): # Correlated parameter is a source of the overall parameter return self.dG_QM @@ -439,9 +442,6 @@ def __init__(self, species_list=None, reaction_list=None, output_directory='', t self.thermo_covariance_groups = thermo_covariance_groups self.thermo_covariances_dict = {} # dictionary to store covariances from covariance libraries - # For extra species needed for correlated analysis but not in model - self.extra_species = [] - # Make output directory if it does not yet exist: if not os.path.exists(self.output_directory): try: @@ -500,33 +500,6 @@ def load_model(self, chemkin_path, dictionary_path, transport_path=None, surface transport_path=transport_path, surface_path=surface_path) - def retrieve_saturated_species_from_list(self, species): - """ - Given a radical `species`, this function retrieves the saturated species objects from a list of species objects - and returns the saturated species object along with a boolean that indicates if the species is not part of the model - (True->not in the model, False->in the model) - """ - - molecule = species.molecule[0] - assert molecule.is_radical(), "Method only valid for radicals." - saturated_struct = molecule.copy(deep=True) - saturated_struct.saturate_radicals() - for otherSpecies in self.species_list: - if otherSpecies.is_isomorphic(saturated_struct): - return otherSpecies, False - - # couldn't find saturated species in the model, try libraries - new_spc = Species(molecule=[saturated_struct]) - new_spc.generate_resonance_structures() - thermo = self.database.thermo.get_thermo_data_from_libraries(new_spc) - - if thermo is not None: - new_spc.thermo = thermo - self.species_list.append(new_spc) - return new_spc, True - else: - raise Exception('Could not retrieve saturated species form of {0} from the species list'.format(species)) - def load_thermo_covariances_from_libraries(self): """ This function populates the self.thermo_covariances_dict with covariance data (in units of (kcal/mol)^2) from the given covariance libraries @@ -537,7 +510,7 @@ def load_thermo_covariances_from_libraries(self): See the RMG-database/scripts/compile_BEEF_cov.ipynb Jupyter notebook for more details on how to generate these covariance libraries. - This function only adds covariance data for species that are actually in the model, (or in the extra_species as in the case of the radical/HBI correction) + This function only adds covariance data for species that are actually in the model, (which may still include a subset of library species that are not in the model as in the case of the radical/HBI correction) and only for the thermo source associated with that library. The goal is to keep the dictionary as small as possible because the lookups scale badly. Note: the covariance.npy matrix is in units of (kJ/mol)^2, but gets converted to (kcal/mol)^2 in this function to match the rest of the analysis @@ -596,7 +569,8 @@ def load_thermo_covariances_from_libraries(self): try: label = f'{surface_prefix}Library {self.species_list[i_sp].to_chemkin()}' except IndexError: - label = f'{surface_prefix}Library {self.extra_species[i_sp - len(self.species_list)].to_chemkin()}' + # uses InChI now + label = f'{surface_prefix}Library {self.species_list[i_sp].molecule[0].to_inchi()}' lib_species.label = label subset_indices.append(i_lib) @@ -626,7 +600,7 @@ def load_thermo_covariances_from_groups(self): See the RMG-database/scripts/compile_BEEF_cov.ipynb Jupyter notebook for more details on how to generate these covariance libraries. - This function only adds covariance data for groups and species that are actually in the model, (or in the extra_species as in the case of the radical/HBI correction) + This function only adds covariance data for groups and species that are actually in the model, (which may still include a subset of library entries for species not in the model as in the case of the radical/HBI correction) and only for the thermo source associated with that group/library. The goal is to keep the dictionary as small as possible because the lookups scale badly. Note: the covariance.npy matrix is in units of (kJ/mol)^2, but gets converted to (kcal/mol)^2 in this function to match the rest of the analysis @@ -752,7 +726,7 @@ def load_thermo_covariances_from_groups(self): try: label = f'{surface_prefix}Library {self.species_list[i_sp].to_chemkin()}' except IndexError: - label = f'{surface_prefix}Library {self.extra_species[i_sp - len(self.species_list)].to_chemkin()}' + label = f'{surface_prefix}Library {self.species_list[i_sp].molecule[0].to_inchi()}' lib_species.label = label subset_species_indices.append(i_lib) @@ -772,105 +746,52 @@ def extract_sources_from_model(self): Must be done after loading model and database to work. """ self.species_sources_dict = {} - self.extra_species = [] allowed_source_keys = {'Library', 'QM', 'GAV', 'ADS'} for species in self.species_list: - if species not in self.extra_species: - source = self.database.thermo.extract_source_from_comments(species) - unexpected_source_keys = set(source.keys()) - allowed_source_keys - if unexpected_source_keys: - raise ValueError( - f'Source of thermo must be either Library, QM, GAV, or ADS; ' - f'got unexpected source keys {unexpected_source_keys} for species {species.label}' - ) - - # Now prep the source data - # Do not alter the GAV information, but reassign QM and Library sources to the species indices that they came from - # Also specify the source as a Surface Library (if it has surface sites and comes from a library), for better differentiation when assigning uncertainties - if len(source) == 1: - # The thermo came from a single source, so we know it comes from a value describing the exact species - if 'Library' in source: - # Use just the species index in self.species_list, for better shorter printouts when debugging - source['Library'] = self.species_list.index(species) - if species.contains_surface_site(): - source['Surface_Library'] = source.pop('Library') - if 'QM' in source: - source['QM'] = self.species_list.index(species) - - elif len(source) == 2: - # The thermo has two sources, which indicates it's an HBI correction on top of a library or QM value... - # OR it is an adsorption correction with gas-phase thermo from Library/QM/GAV (no need to edit GAV source) - if 'ADS' in source: - # Need to retrieve the gas-phase molecule that the adsorption correction was applied to, and assign the source of the thermo to be that molecule instead of the surface species - if not species.contains_surface_site(): - raise ValueError('Species uses adsorption correction but does not contain any surface sites') - dummy_gas_species = Species() - dummy_gas_species.molecule = species.molecule[0].get_desorbed_molecules() - # add to species list if it's not already there, so we can reference it in the source dictionary - for spc in self.species_list: - if spc.is_isomorphic(dummy_gas_species): - dummy_gas_species = spc - break - else: - dummy_gas_species.thermo = self.database.thermo.get_thermo_data(dummy_gas_species) - self.species_list.append(dummy_gas_species) - self.extra_species.append(dummy_gas_species) - - if 'Library' in source: - # Use just the species index in self.species_list, for better shorter printouts when debugging - source['Library'] = self.species_list.index(dummy_gas_species) - if 'QM' in source: - source['QM'] = self.species_list.index(dummy_gas_species) - else: - # We must retrieve the original saturated molecule's thermo instead of using the radical species as the source of thermo - saturated_species, ignore_spc = self.retrieve_saturated_species_from_list(species) - - if ignore_spc: # this is saturated species that isn't in the actual model - self.extra_species.append(saturated_species) - - if 'Library' in source: - source['Library'] = self.species_list.index(saturated_species) - - if saturated_species.contains_surface_site(): - source['Surface_Library'] = source.pop('Library') # surface species library + radical correction - if 'QM' in source: - source['QM'] = self.species_list.index(saturated_species) - elif len(source) == 3: - # combination of adsorption correction, GAV (radical), and Library/ML - - if not species.contains_surface_site(): - raise ValueError( - f'Only surface species should have 3 thermo sources (adsorption correction, GAV, and library/QM); ' - f'got species={species.label}, source={source}' - ) - - # retrieve the desorbed version of the surface species-- the thing the adsorption correction was applied to during thermo estimation - dummy_gas_species = Species() - dummy_gas_species.molecule = species.molecule[0].get_desorbed_molecules() - saturated_species, ignore_spc = self.retrieve_saturated_species_from_list(dummy_gas_species) - - if ignore_spc: # this is saturated species that isn't in the actual model - self.extra_species.append(saturated_species) - - if 'Library' in source: - source['Library'] = self.species_list.index(saturated_species) - if 'QM' in source: - source['QM'] = self.species_list.index(saturated_species) + source = self.database.thermo.extract_source_from_comments(species) + unexpected_source_keys = set(source.keys()) - allowed_source_keys + if unexpected_source_keys: + raise ValueError( + f'Source of thermo must be either Library, QM, GAV, or ADS; ' + f'got unexpected source keys {unexpected_source_keys} for species {species.label}' + ) + + # Prep the source data: + # extend the library/QM sources to include species labels specific to the model + # Specify the source as a Surface Library (if it has surface sites and comes from a library), for better differentiation when assigning uncertainties + if 'Library' in source: + lib_species = rmgpy.species.Species(molecule=[source["Library"][1].item]) + label = self.get_species_label(lib_species) + if lib_species.contains_surface_site(): + extended_source = source['Library'] + (f'Surface_Library {label}',) + source['Surface_Library'] = extended_source + source.pop('Library') else: - raise Exception('Source of thermo should not use more than three sources out of ADS, QM, Library, or GAV.') + extended_source = source['Library'] + (f'Library {label}',) + source['Library'] = extended_source - self.species_sources_dict[species] = source + if 'QM' in source: + qm_species = source["QM"][1] + label = self.get_species_label(qm_species) + extended_source = source['QM'] + (f'QM {label}',) + source['QM'] = extended_source + + self.species_sources_dict[species] = source self.reaction_sources_dict = {} for reaction in self.reaction_list: source = self.database.kinetics.extract_source_from_comments(reaction) # Prep the source data # Consider any library or PDep reaction to be an independent parameter for now - # and assign the source to the index of the reaction within self.reaction_list if 'Library' in source: - source['Library'] = self.reaction_list.index(reaction) + # add a label to the end of the tuple with the official name to be used to match covariances if reaction.is_surface_reaction(): - source['Surface_Library'] = source.pop('Library') + extended_source = source['Library'] + (f'Surface_Library {reaction.to_chemkin(self.species_list, kinetics=False)}',) + source['Surface_Library'] = extended_source + source.pop('Library') + else: + extended_source = source['Library'] + (f'Library {reaction.to_chemkin(self.species_list, kinetics=False)}',) + source['Library'] = extended_source elif 'PDep' in source: source['PDep'] = self.reaction_list.index(reaction) elif 'Training' in source: @@ -882,9 +803,6 @@ def extract_sources_from_model(self): raise Exception('Source of kinetics must be either Library, PDep, Training, or Rate Rules') self.reaction_sources_dict[reaction] = source - for spc in self.extra_species: - self.species_list.remove(spc) - # -------------------- load covariance libraries ------------------------# self.load_thermo_covariances_from_libraries() self.load_thermo_covariances_from_groups() @@ -979,6 +897,13 @@ def compile_all_sources(self): for family_label in all_kinetic_sources['Training'].keys(): self.all_kinetic_sources['Training'][family_label] = list(all_kinetic_sources['Training'][family_label]) + def get_species_label(self, species): + i_sp = get_i_thing(species, self.species_list) + if i_sp >= 0: + return self.species_list[i_sp].to_chemkin() + else: + return species.to_chemkin() + def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=None, correlated=False): """ Assign uncertainties based on the sources of the species thermo and reaction kinetics. @@ -993,36 +918,22 @@ def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=Non for species in self.species_list: if not correlated: - entry = self.species_sources_dict[species] - if 'Surface_Library' in entry: # preconditioning for covariance - # this is an ugly workaround to handle covariances: because get_uncertainty_value needs the species chemkin string to get the covariance - # but the source dictionary only has the index of the surface library entry - entry_copy = entry.copy() - entry_copy['Surface_Library'] = self.species_list[entry_copy['Surface_Library']].to_chemkin() - dG = g_param_engine.get_uncertainty_value(entry_copy) - else: - dG = g_param_engine.get_uncertainty_value(self.species_sources_dict[species]) + dG = g_param_engine.get_uncertainty_value(self.species_sources_dict[species]) self.thermo_input_uncertainties.append(dG) else: source = self.species_sources_dict[species] dG = {} if 'Library' in source: pdG = g_param_engine.get_partial_uncertainty_value(source, 'Library', corr_param=source['Library']) - try: - label = 'Library {}'.format(self.species_list[source['Library']].to_chemkin()) - except IndexError: - label = 'Library {}'.format(self.extra_species[source['Library'] - len(self.species_list)].to_chemkin()) + label = source['Library'][2] # the label we added to the end of the library source tuple in extract_sources_from_model dG[label] = pdG if 'Surface_Library' in source: pdG = g_param_engine.get_partial_uncertainty_value(source, 'Surface_Library', corr_param=source['Surface_Library']) - try: - label = 'Surface_Library {}'.format(self.species_list[source['Surface_Library']].to_chemkin()) - except IndexError: - label = 'Surface_Library {}'.format(self.extra_species[source['Surface_Library'] - len(self.species_list)].to_chemkin()) + label = source['Surface_Library'][2] dG[label] = pdG if 'QM' in source: pdG = g_param_engine.get_partial_uncertainty_value(source, 'QM', corr_param=source['QM']) - label = 'QM {}'.format(self.species_list[source['QM']].to_chemkin()) + label = source['QM'][2] dG[label] = pdG if 'ADS' in source: for adsGroupType, groupList in source['ADS'].items(): @@ -1089,12 +1000,12 @@ def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=Non elif 'Library' in source: dplnk = k_param_engine.get_partial_uncertainty_value(source, 'Library', source['Library']) - label = 'Library {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) + label = source['Library'][2] dlnk[label] = dplnk elif 'Surface_Library' in source: dplnk = k_param_engine.get_partial_uncertainty_value(source, 'Surface_Library', source['Surface_Library']) - label = 'Surface_Library {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) + label = source['Surface_Library'][2] dlnk[label] = dplnk elif 'Training' in source: @@ -1134,33 +1045,19 @@ def assign_intermediate_uncertainties(self, g_param_engine=None, k_param_engine= for species in self.species_list: if not correlated: - entry = self.species_sources_dict[species] - if 'Surface_Library' in entry: # preconditioning for covariance - # this is an ugly workaround to handle covariances: because get_uncertainty_value needs the species chemkin string to get the covariance - # but the source dictionary only has the index of the surface library entry - entry_copy = entry.copy() - entry_copy['Surface_Library'] = self.species_list[entry_copy['Surface_Library']].to_chemkin() - dG = g_param_engine.get_uncertainty_value(entry_copy) - else: - dG = g_param_engine.get_uncertainty_value(self.species_sources_dict[species]) + dG = g_param_engine.get_uncertainty_value(self.species_sources_dict[species]) self.thermo_intermediate_uncertainties.append(dG) # in the uncorrelated case, the intermediate is just the uncertainty value itself, since there is only one parameter that contributes to the uncertainty else: source = self.species_sources_dict[species] dGdq = {} if 'Library' in source: - try: - label = 'Library {}'.format(self.species_list[source['Library']].to_chemkin()) - except IndexError: - label = 'Library {}'.format(self.extra_species[source['Library'] - len(self.species_list)].to_chemkin()) + label = source['Library'][2] dGdq[label] = 1 # dG/dG_lib = 1, because the parameter is never scaled by anything other than 1 when it is used if 'Surface_Library' in source: - try: - label = 'Surface_Library {}'.format(self.species_list[source['Surface_Library']].to_chemkin()) - except IndexError: - label = 'Surface_Library {}'.format(self.extra_species[source['Surface_Library'] - len(self.species_list)].to_chemkin()) + label = source['Surface_Library'][2] dGdq[label] = 1 # dG/dG_surf = 1, because the parameter is never scaled by anything other than 1 when it is used if 'QM' in source: - label = 'QM {}'.format(self.species_list[source['QM']].to_chemkin()) + label = source['QM'][2] dGdq[label] = 1 if 'ADS' in source: for adsGroupType, groupList in source['ADS'].items(): diff --git a/test/rmgpy/tools/uncertaintyTest.py b/test/rmgpy/tools/uncertaintyTest.py index b3772ebecf..573239ce48 100644 --- a/test/rmgpy/tools/uncertaintyTest.py +++ b/test/rmgpy/tools/uncertaintyTest.py @@ -137,7 +137,12 @@ def test_uncertainty_assignment(self): assert grp == grp_expected assert rad == rad_expected assert other == other_expected - assert sorted(self.uncertainty.all_thermo_sources["Library"]) == [0, 1, 5, 13, 16] + + library_indices = [] + for source in self.uncertainty.all_thermo_sources["Library"]: + sp = rmgpy.species.Species(molecule=[source[1].item]) + library_indices.append(rmgpy.tools.uncertainty.get_i_thing(sp, self.uncertainty.species_list)) + assert sorted(library_indices) == [0, 1, 5, 13, 16] assert not self.uncertainty.all_thermo_sources["QM"] # Check kinetics sources @@ -160,7 +165,7 @@ def test_uncertainty_assignment(self): rr = set([e.label for e in self.uncertainty.all_kinetic_sources["Rate Rules"]["H_Abstraction"]]) assert rr == H_Abstraction_rr_expected assert set(self.uncertainty.all_kinetic_sources["Training"].keys()) == {"Disproportionation", "H_Abstraction"} - assert self.uncertainty.all_kinetic_sources["Library"] == [0] + assert self.uncertainty.all_kinetic_sources["Library"][0][1].item.is_isomorphic(self.uncertainty.reaction_list[0]) assert self.uncertainty.all_kinetic_sources["PDep"] == [6] # Step 3: assign and propagate uncertainties From 0ecb7e114f32d4bbb7d739970c8201ca1e95724e Mon Sep 17 00:00:00 2001 From: Sevy Harris Date: Fri, 29 May 2026 18:33:21 -0400 Subject: [PATCH 6/7] Merge assign_parameter_uncertainties and intermediate_uncertainties There's a lot of repeated code between the old UQ's assign_parameter_uncertainties and the new UQ's assign_intermediate_uncertainties, so this combines them into a single function in order to reuse a lot of that repeated code and make it easier to keep them consistent. --- rmgpy/tools/uncertainty.py | 220 ++++++++-------------------- test/rmgpy/tools/uncertaintyTest.py | 72 ++++----- 2 files changed, 98 insertions(+), 194 deletions(-) diff --git a/rmgpy/tools/uncertainty.py b/rmgpy/tools/uncertainty.py index 368a5eae40..f470b9a4d9 100644 --- a/rmgpy/tools/uncertainty.py +++ b/rmgpy/tools/uncertainty.py @@ -427,10 +427,10 @@ def __init__(self, species_list=None, reaction_list=None, output_directory='', t self.reaction_sources_dict = None self.all_thermo_sources = None self.all_kinetic_sources = None - self.thermo_input_uncertainties = None # previous formulation thermo parameter uncertainties - self.kinetic_input_uncertainties = None # previous formulation kinetic parameter uncertainties - self.thermo_intermediate_uncertainties = None # new formulation thermo parameter uncertainties - can be dependent on each other - self.kinetic_intermediate_uncertainties = None # new formulation kinetic parameter uncertainties - can be dependent on each other + self.thermo_input_uncertainties = None # thermo parameter uncertainties + self.kinetic_input_uncertainties = None # kinetic parameter uncertainties + self.dG_dqs = None # derivatives of Gibbs free energy with respect to underlying thermo parameters + self.dlnk_dqs = None # derivatives of ln(k) with respect to underlying kinetic parameters self.thermo_covariance_matrix = None # covariance matrix of all species thermo uncertainties self.kinetic_covariance_matrix = None # covariance matrix of all reaction kinetic uncertainties self.Sigma_ww_thermo = None # covariance matrix of all underlying thermo parameter uncertainties @@ -916,6 +916,9 @@ def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=Non self.thermo_input_uncertainties = [] self.kinetic_input_uncertainties = [] + self.dG_dqs = [] # a list of dictionaries to store the intermediate derivatives dG_i/dq for each parameter q that contributes to the uncertainty of species i's G + self.dlnk_dqs = [] # a list of dictionaries to store the intermediate derivatives dlnk_i/dq for each parameter q that contributes to the uncertainty of reaction i's k + for species in self.species_list: if not correlated: dG = g_param_engine.get_uncertainty_value(self.species_sources_dict[species]) @@ -923,36 +926,46 @@ def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=Non else: source = self.species_sources_dict[species] dG = {} + dG_dq = {} if 'Library' in source: pdG = g_param_engine.get_partial_uncertainty_value(source, 'Library', corr_param=source['Library']) label = source['Library'][2] # the label we added to the end of the library source tuple in extract_sources_from_model dG[label] = pdG + dG_dq[label] = 1 # derivative of G with respect to the library parameter is 1, since it's a direct contribution if 'Surface_Library' in source: pdG = g_param_engine.get_partial_uncertainty_value(source, 'Surface_Library', corr_param=source['Surface_Library']) label = source['Surface_Library'][2] dG[label] = pdG + dG_dq[label] = 1 # derivative of G with respect to the surface library parameter is 1, since it's a direct contribution if 'QM' in source: pdG = g_param_engine.get_partial_uncertainty_value(source, 'QM', corr_param=source['QM']) label = source['QM'][2] dG[label] = pdG + dG_dq[label] = 1 # derivative of G with respect to the QM parameter is 1, since it's a direct contribution if 'ADS' in source: for adsGroupType, groupList in source['ADS'].items(): for group, weight in groupList: pdG = g_param_engine.get_partial_uncertainty_value(source, 'ADS', group, adsGroupType) label = 'AdsorptionCorrection({}) {}'.format(adsGroupType, group.label) dG[label] = pdG + if weight != 1: + raise ValueError('Weight for adsorption group contribution to thermo should be 1, but got weight={weight} for {adsGroupType} in species {species}'.format(weight=weight, adsGroupType=adsGroupType, species=species)) + dG_dq[label] = weight # This should be 1 because there's only one group contribution per adsorption correction if 'GAV' in source: for groupType, groupList in source['GAV'].items(): for group, weight in groupList: pdG = g_param_engine.get_partial_uncertainty_value(source, 'GAV', group, groupType) label = 'Group({}) {}'.format(groupType, group.label) dG[label] = pdG + dG_dq[label] = weight # the derivative of G with respect to the group contribution is the weight of that group contribution to the overall G # We also know if there is group additivity used, there will be uncorrelated estimation error est_pdG = g_param_engine.get_partial_uncertainty_value(source, 'Estimation') if est_pdG: label = 'Estimation {}'.format(species.to_chemkin()) dG[label] = est_pdG + dG_dq[label] = 1 # the derivative of G with respect to the estimation error is 1, since we add the term directly self.thermo_input_uncertainties.append(dG) + self.dG_dqs.append(dG_dq) for reaction in self.reaction_list: if not correlated: @@ -961,6 +974,7 @@ def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=Non else: source = self.reaction_sources_dict[reaction] dlnk = {} + dlnk_dq = {} if 'Rate Rules' in source: family = source['Rate Rules'][0] source_dict = source['Rate Rules'][1] @@ -975,166 +989,55 @@ def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=Non corr_family=family) label = '{}Rate Rule {} {}'.format(surface_prefix, family, ruleEntry) dlnk[label] = dplnk - + dlnk_dq[label] = weight # the derivative of ln(k) with respect to the rate rule contribution for ruleEntry, trainingEntry, weight in training: dplnk = k_param_engine.get_partial_uncertainty_value(source, 'Rate Rules', corr_param=ruleEntry, corr_family=family) label = '{}Rate Rule {} {}'.format(surface_prefix, family, ruleEntry) dlnk[label] = dplnk + dlnk_dq[label] = weight # the derivative of ln(k) with respect to the training rule contribution + + N = len(source_dict['rules']) + len(source_dict['training']) # There is also estimation error if rate rules are used (nonexact and family contribute to this) nonexact_dplnk = k_param_engine.get_partial_uncertainty_value(source, 'Estimation Nonexact', corr_family=family) if nonexact_dplnk: label = 'Estimation Nonexact {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) dlnk[label] = nonexact_dplnk + dlnk_dq[label] = np.log10(N + 1) # the derivative of ln(k) with respect to the nonexact estimation error family_dplnk = k_param_engine.get_partial_uncertainty_value(source, 'Estimation Family', corr_family=family) if family_dplnk: label = 'Estimation Family {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) dlnk[label] = family_dplnk - + dlnk_dq[label] = 1 # the derivative of ln(k) with respect to the family estimation error elif 'PDep' in source: dplnk = k_param_engine.get_partial_uncertainty_value(source, 'PDep', source['PDep']) label = 'PDep {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) dlnk[label] = dplnk + dlnk_dq[label] = 1 elif 'Library' in source: dplnk = k_param_engine.get_partial_uncertainty_value(source, 'Library', source['Library']) label = source['Library'][2] dlnk[label] = dplnk + dlnk_dq[label] = 1 elif 'Surface_Library' in source: dplnk = k_param_engine.get_partial_uncertainty_value(source, 'Surface_Library', source['Surface_Library']) label = source['Surface_Library'][2] dlnk[label] = dplnk + dlnk_dq[label] = 1 elif 'Training' in source: dplnk = k_param_engine.get_partial_uncertainty_value(source, 'Training', source['Training']) family = source['Training'][0] label = 'Training {} {}'.format(family, reaction.to_chemkin(self.species_list, kinetics=False)) dlnk[label] = dplnk + dlnk_dq[label] = 1 self.kinetic_input_uncertainties.append(dlnk) - - def assign_intermediate_uncertainties(self, g_param_engine=None, k_param_engine=None, correlated=False): - """ - Assign uncertainties to the intermediate parameters based on the sources of the species thermo and reaction kinetics. - - This fills out the class variables thermo_intermediate_uncertainties and kinetic_intermediate_uncertainties - these are each list of dictionaries. For every species or reaction, it lists all the intermediate sources contributing to that parameter's uncertainty. - - So for example, thermo_intermediate_uncertainties might look something like this: - - thermo_intermediate_uncertainties = [ - {'Group(group) Cds-CdsHH': 2.0, 'Group(radical) CCJ': 1.0, 'Estimation CH(4)': 1.0}, - {'Library CH2(5)': 1.0}, - ] - The keys of the dictionaries are the label names for the intermediate parameters. - and the values are partial derivatives dG_i/dq_w, how the species i Gibbs uncertainty changes with the intermediate parameter w. - - This function is the new formulation's equivalent to assign_parameter_uncertainties and similarly handles both correlated and uncorrelated cases. - But instead of assuming all underlying parameters are independent, here we can allow for dependence as long as we have the covariance - """ - if g_param_engine is None: - g_param_engine = ThermoParameterUncertainty(other_covariances=self.thermo_covariances_dict) - if k_param_engine is None: - k_param_engine = KineticParameterUncertainty() - - self.thermo_intermediate_uncertainties = [] # store the intermediate dG_i/dq for each parameter q that contributes to the uncertainty of G_i, for use in correlated uncertainty analysis - self.kinetic_intermediate_uncertainties = [] - - for species in self.species_list: - if not correlated: - dG = g_param_engine.get_uncertainty_value(self.species_sources_dict[species]) - self.thermo_intermediate_uncertainties.append(dG) # in the uncorrelated case, the intermediate is just the uncertainty value itself, since there is only one parameter that contributes to the uncertainty - else: - source = self.species_sources_dict[species] - dGdq = {} - if 'Library' in source: - label = source['Library'][2] - dGdq[label] = 1 # dG/dG_lib = 1, because the parameter is never scaled by anything other than 1 when it is used - if 'Surface_Library' in source: - label = source['Surface_Library'][2] - dGdq[label] = 1 # dG/dG_surf = 1, because the parameter is never scaled by anything other than 1 when it is used - if 'QM' in source: - label = source['QM'][2] - dGdq[label] = 1 - if 'ADS' in source: - for adsGroupType, groupList in source['ADS'].items(): - for group, weight in groupList: - label = 'AdsorptionCorrection({}) {}'.format(adsGroupType, group.label) - if weight != 1: - raise ValueError('Weight for adsorption group contribution to thermo should be 1, but got weight={weight} for {adsGroupType} in species {species}'.format(weight=weight, adsGroupType=adsGroupType, species=species)) - dGdq[label] = weight # This should be 1 - if 'GAV' in source: - for groupType, groupList in source['GAV'].items(): - for group, weight in groupList: - label = 'Group({}) {}'.format(groupType, group.label) - dGdq[label] = weight # dG/dG_group = weight, because the group contribution is scaled by the weight when it is used in the thermo estimation - # We also know if there is group additivity used, there will be uncorrelated estimation error - label = 'Estimation {}'.format(species.to_chemkin()) - dGdq[label] = 1 # dG/dG_est = 1, because the estimation error is added on top of the group additivity value, so it is never scaled by anything other than 1 when it is used - - self.thermo_intermediate_uncertainties.append(dGdq) - - for reaction in self.reaction_list: - if not correlated: - dlnk = k_param_engine.get_uncertainty_value(self.reaction_sources_dict[reaction]) - self.kinetic_intermediate_uncertainties.append(dlnk) # in the uncorrelated case, the intermediate is just the uncertainty value itself, since there is only one parameter that contributes to the uncertainty - else: - source = self.reaction_sources_dict[reaction] - dlnkdq = {} - if 'Rate Rules' in source: - family = source['Rate Rules'][0] - source_dict = source['Rate Rules'][1] - rules = source_dict['rules'] - training = source_dict['training'] - exact = source_dict['exact'] - surface_prefix = '' - if reaction.is_surface_reaction(): - surface_prefix = 'Surface ' - for ruleEntry, weight in rules: - label = '{}Rate Rule {} {}'.format(surface_prefix, family, ruleEntry) - dlnkdq[label] = weight # dlnk/dlnk_rule = weight, because the rate rule is scaled by the weight when it is used in the kinetics estimation - - for ruleEntry, trainingEntry, weight in training: - # TODO - test that training reactions in a tree are correlated with the exact match kind of training reaction - # for now, we follow the old convention of treating these as rate rules - label = '{}Rate Rule {} {}'.format(surface_prefix, family, ruleEntry) # ruleEntry should probably be the reaction equation itself - dlnkdq[label] = weight # dlnk/dlnk_training = weight, because the training entry is scaled by the weight when it is used in the kinetics estimation - - # There is also estimation error if rate rules are used - # Record dlnk/dlnk_family, the derivative with respect to the family estimation uncertainty - label = 'Estimation Family {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) - dlnkdq[label] = 1 # dlnk/dlnk_family = 1, because the family estimation uncertainty is added on top of the rate rule values, so it is never scaled by anything other than 1 when it is used - - # Record the non-exact estimation error if not an exact match for a rate rule - if not exact: - N = len(source_dict['rules']) + len(source_dict['training']) - label = 'Estimation Nonexact {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) - dlnkdq[label] = np.log10(N + 1) - - elif 'PDep' in source: - label = 'PDep {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) - dlnkdq[label] = 1.0 # dlnk/dlnk_PDep = 1, because the PDep kinetics is never scaled by anything other than 1 when it is used - - elif 'Library' in source: - label = 'Library {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) - dlnkdq[label] = 1.0 # dlnk/dlnk_lib = 1, because the library kinetics is never scaled by anything other than 1 when it is used - - elif 'Surface_Library' in source: - label = 'Surface_Library {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) - dlnkdq[label] = 1.0 # dlnk/dlnk_surf_lib = 1, because the surface library kinetics is never scaled by anything other than 1 when it is used - - elif 'Training' in source: - family = source['Training'][0] - surface_prefix = '' - if reaction.is_surface_reaction(): - surface_prefix = 'Surface ' - label = '{}Training {} {}'.format(surface_prefix, family, reaction.to_chemkin(self.species_list, kinetics=False)) - dlnkdq[label] = 1.0 - - self.kinetic_intermediate_uncertainties.append(dlnkdq) + self.dlnk_dqs.append(dlnk_dq) def sensitivity_analysis(self, initial_mole_fractions, sensitive_species, T, P, termination_time, sensitivity_threshold=1e-3, number=10, fileformat='.png', initial_surface_coverages=None, @@ -1434,14 +1337,14 @@ def get_thermo_covariance_matrix(self, g_param_engine=None): NxN square matrix where N is the number of species in the model, with the covariance between species i and j in the ith row and jth column. Units are in (kcal/mol)^2. - Must call assign_intermediate_uncertainties first to populate the source dictionaries. + Must call assign_parameter_uncertainties first to populate the source dictionaries. TODO speed this up with sparse matrix multiplication? """ - assert self.thermo_intermediate_uncertainties is not None, 'Must call assign_intermediate_uncertainties first' - assert len(self.thermo_intermediate_uncertainties) > 0, 'No thermodynamic parameters found' - if isinstance(self.thermo_intermediate_uncertainties[0], np.float64): - self.thermo_covariance_matrix = np.float_power(np.diag(self.thermo_intermediate_uncertainties), 2.0) + assert self.thermo_input_uncertainties is not None, 'Must call assign_parameter_uncertainties first' + assert len(self.thermo_input_uncertainties) > 0, 'No thermodynamic parameters found' + if isinstance(self.thermo_input_uncertainties[0], np.float64): # uncorrelated case + self.thermo_covariance_matrix = np.float_power(np.diag(self.thermo_input_uncertainties), 2.0) return self.thermo_covariance_matrix self.thermo_covariance_matrix = np.zeros((len(self.species_list), len(self.species_list))) @@ -1451,10 +1354,10 @@ def get_thermo_covariance_matrix(self, g_param_engine=None): for i in range(len(self.species_list)): for j in range((len(self.species_list))): - for q in self.thermo_intermediate_uncertainties[i].keys(): - dG_i_dq = self.thermo_intermediate_uncertainties[i][q] - for r in self.thermo_intermediate_uncertainties[j].keys(): - dG_j_dr = self.thermo_intermediate_uncertainties[j][r] + for q in self.dG_dqs[i].keys(): + dG_i_dq = self.dG_dqs[i][q] + for r in self.dG_dqs[j].keys(): + dG_j_dr = self.dG_dqs[j][r] self.thermo_covariance_matrix[i, j] += dG_i_dq * g_param_engine._get_covariance_qq(q, r) * dG_j_dr return self.thermo_covariance_matrix @@ -1465,14 +1368,14 @@ def get_kinetic_covariance_matrix(self, k_param_engine=None): MxM square matrix where M is the number of reactions in the model, with the covariance between reaction i and j in the ith row and jth column. Units are in (ln(k))^2. - Must call assign_intermediate_uncertainties first to populate the source dictionaries. + Must call assign_parameter_uncertainties first to populate the source dictionaries. TODO speed this up with sparse matrix multiplication? """ - assert self.kinetic_intermediate_uncertainties is not None, 'Must call assign_intermediate_uncertainties first' - assert len(self.kinetic_intermediate_uncertainties) > 0, 'No kinetic parameters found' - if isinstance(self.kinetic_intermediate_uncertainties[0], np.float64): - self.kinetic_covariance_matrix = np.float_power(np.diag(self.kinetic_intermediate_uncertainties), 2.0) + assert self.kinetic_input_uncertainties is not None, 'Must call assign_parameter_uncertainties first' + assert len(self.kinetic_input_uncertainties) > 0, 'No kinetic parameters found' + if isinstance(self.kinetic_input_uncertainties[0], np.float64): # uncorrelated case + self.kinetic_covariance_matrix = np.float_power(np.diag(self.kinetic_input_uncertainties), 2.0) return self.kinetic_covariance_matrix if k_param_engine is None: @@ -1482,10 +1385,10 @@ def get_kinetic_covariance_matrix(self, k_param_engine=None): for i in range(len(self.reaction_list)): for j in range(len(self.reaction_list)): - for q in self.kinetic_intermediate_uncertainties[i].keys(): - dlnk_i_dq = self.kinetic_intermediate_uncertainties[i][q] - for r in self.kinetic_intermediate_uncertainties[j].keys(): - dlnk_j_dr = self.kinetic_intermediate_uncertainties[j][r] + for q in self.dlnk_dqs[i].keys(): + dlnk_i_dq = self.dlnk_dqs[i][q] + for r in self.dlnk_dqs[j].keys(): + dlnk_j_dr = self.dlnk_dqs[j][r] self.kinetic_covariance_matrix[i, j] += dlnk_i_dq * k_param_engine._get_covariance_qq(q, r) * dlnk_j_dr return self.kinetic_covariance_matrix @@ -1494,7 +1397,7 @@ def _get_intermediate_thermo_covariance_matrix(self, g_param_engine=None, subset """ Make an explicit covariance matrix of all the qs (intermediate thermo parameters, like specific groups or library entries) - Requires calling assign_intermediate_uncertainties first + Requires calling assign_parameter_uncertainties first if subset_indices is None, computes the full matrix. Otherwise, only computes the matrices relevant to the species indicated by subset_indices @@ -1502,8 +1405,8 @@ def _get_intermediate_thermo_covariance_matrix(self, g_param_engine=None, subset if subset_indices is None: subset_indices = np.arange(len(self.species_list)) - if isinstance(self.thermo_intermediate_uncertainties[0], np.float64): - self.Sigma_ww_thermo = np.diag(np.float_power([self.thermo_intermediate_uncertainties[i] for i in subset_indices], 2.0)) + if isinstance(self.thermo_input_uncertainties[0], np.float64): # uncorrelated case + self.Sigma_ww_thermo = np.diag(np.float_power([self.thermo_input_uncertainties[i] for i in subset_indices], 2.0)) return self.Sigma_ww_thermo if g_param_engine is None: @@ -1511,7 +1414,7 @@ def _get_intermediate_thermo_covariance_matrix(self, g_param_engine=None, subset self.all_thermo_intermediates = set() for sp_idx in subset_indices: - for q in self.thermo_intermediate_uncertainties[sp_idx].keys(): + for q in self.dG_dqs[sp_idx].keys(): self.all_thermo_intermediates.add(q) self.all_thermo_intermediates = list(self.all_thermo_intermediates) W = len(self.all_thermo_intermediates) @@ -1529,7 +1432,7 @@ def _get_intermediate_kinetics_covariance_matrix(self, k_param_engine=None, subs """ Make an explicit covariance matrix of all the qs (intermediate kinetic parameters, like specific rate rules or libraries entries) - Requires calling assign_intermediate_uncertainties first + Requires calling assign_parameter_uncertainties first if subset_indices is None, computes the full matrix. Otherwise, only computes the matrices relevant to the reactions indicated by subset_indices @@ -1537,9 +1440,8 @@ def _get_intermediate_kinetics_covariance_matrix(self, k_param_engine=None, subs if subset_indices is None: subset_indices = np.arange(len(self.reaction_list)) - if isinstance(self.kinetic_intermediate_uncertainties[0], np.float64): - # TODO this might have to be squared - self.Sigma_ww_kinetics = np.diag(np.float_power([self.kinetic_intermediate_uncertainties[i] for i in subset_indices], 2.0)) + if isinstance(self.kinetic_input_uncertainties[0], np.float64): # uncorrelated case + self.Sigma_ww_kinetics = np.diag(np.float_power([self.kinetic_input_uncertainties[i] for i in subset_indices], 2.0)) return self.Sigma_ww_kinetics if k_param_engine is None: @@ -1547,7 +1449,7 @@ def _get_intermediate_kinetics_covariance_matrix(self, k_param_engine=None, subs self.all_kinetics_intermediates = set() for rxn_idx in subset_indices: - for q in self.kinetic_intermediate_uncertainties[rxn_idx].keys(): + for q in self.dlnk_dqs[rxn_idx].keys(): self.all_kinetics_intermediates.add(q) self.all_kinetics_intermediates = list(self.all_kinetics_intermediates) W = len(self.all_kinetics_intermediates) @@ -1578,15 +1480,15 @@ def _get_dG_dq_matrix(self, subset_indices=None): subset_indices = np.arange(len(self.species_list)) # return a square identity matrix if uncorrelated - if isinstance(self.thermo_intermediate_uncertainties[0], np.float64): + if isinstance(self.thermo_input_uncertainties[0], np.float64): # uncorrelated case return np.eye(len(subset_indices)) dGdq = np.zeros((len(subset_indices), len(self.all_thermo_intermediates))) for i, sp_idx in enumerate(subset_indices): - for key in self.thermo_intermediate_uncertainties[sp_idx].keys(): + for key in self.dG_dqs[sp_idx].keys(): q_index = self.all_thermo_intermediates.index(key) - dGdq[i, q_index] = self.thermo_intermediate_uncertainties[sp_idx][key] + dGdq[i, q_index] = self.dG_dqs[sp_idx][key] return dGdq @@ -1603,15 +1505,15 @@ def _get_dlnk_dq_matrix(self, subset_indices=None): subset_indices = np.arange(len(self.reaction_list)) # return a square identity matrix if uncorrelated - if isinstance(self.kinetic_intermediate_uncertainties[0], np.float64): + if isinstance(self.kinetic_input_uncertainties[0], np.float64): # uncorrelated case return np.eye(len(subset_indices)) dlnkdq = np.zeros((len(subset_indices), len(self.all_kinetics_intermediates))) for i, rxn_idx in enumerate(subset_indices): - for key in self.kinetic_intermediate_uncertainties[rxn_idx].keys(): + for key in self.dlnk_dqs[rxn_idx].keys(): q_index = self.all_kinetics_intermediates.index(key) - dlnkdq[i, q_index] = self.kinetic_intermediate_uncertainties[rxn_idx][key] + dlnkdq[i, q_index] = self.dlnk_dqs[rxn_idx][key] return dlnkdq diff --git a/test/rmgpy/tools/uncertaintyTest.py b/test/rmgpy/tools/uncertaintyTest.py index 573239ce48..51eb7953ef 100644 --- a/test/rmgpy/tools/uncertaintyTest.py +++ b/test/rmgpy/tools/uncertaintyTest.py @@ -179,62 +179,64 @@ def test_uncertainty_assignment(self): np.testing.assert_allclose(thermo_unc, expected_uncorrelated_thermo_uncertainties, rtol=1e-4) np.testing.assert_allclose(kinetic_unc, expected_uncorrelated_kinetic_uncertainties, rtol=1e-4) - # ---------------------------- Now repeat for assign_intermediate_uncertainties ----------------------------- + # ---------------------------- Now repeat for new formulation ----------------------------- # uncorrelated - self.uncertainty.assign_intermediate_uncertainties(correlated=False) - intermediate_thermo_unc = self.uncertainty.thermo_intermediate_uncertainties - intermediate_kinetic_unc = self.uncertainty.kinetic_intermediate_uncertainties - np.testing.assert_allclose(intermediate_thermo_unc, expected_uncorrelated_thermo_uncertainties, rtol=1e-4) - np.testing.assert_allclose(intermediate_kinetic_unc, expected_uncorrelated_kinetic_uncertainties, rtol=1e-4) + self.uncertainty.assign_parameter_uncertainties(correlated=False) + # use covariance matrices as test + thermo_covariance = np.sqrt(self.uncertainty.get_thermo_covariance_matrix().diagonal()) + kinetic_covariance = np.sqrt(self.uncertainty.get_kinetic_covariance_matrix().diagonal()) + assert np.isclose(thermo_covariance, expected_uncorrelated_thermo_uncertainties, rtol=1e-4).all() + assert np.isclose(kinetic_covariance, expected_uncorrelated_kinetic_uncertainties, rtol=1e-4).all() + # correlated - self.uncertainty.assign_intermediate_uncertainties(correlated=True) + self.uncertainty.assign_parameter_uncertainties(correlated=True) # do a spot check on some of the intermediates (dG/dq) these are derivatives, not uncertainties # Thermo library example - assert self.uncertainty.thermo_intermediate_uncertainties[0].keys() == {'Library O(0)'} - assert self.uncertainty.thermo_intermediate_uncertainties[0]['Library O(0)'] == 1 + assert self.uncertainty.dG_dqs[0].keys() == {'Library O(0)'} + assert self.uncertainty.dG_dqs[0]['Library O(0)'] == 1 # Thermo GAV example - assert tuple(sorted(self.uncertainty.thermo_intermediate_uncertainties[2].keys())) == ('Estimation HO2(2)', 'Group(group) O2s-OsH', 'Group(other) R', 'Group(radical) HOOJ') - assert self.uncertainty.thermo_intermediate_uncertainties[2]['Estimation HO2(2)'] == 1 - assert self.uncertainty.thermo_intermediate_uncertainties[2]['Group(group) O2s-OsH'] == 2 - assert self.uncertainty.thermo_intermediate_uncertainties[2]['Group(other) R'] == 2 - assert self.uncertainty.thermo_intermediate_uncertainties[2]['Group(radical) HOOJ'] == 1 + assert tuple(sorted(self.uncertainty.dG_dqs[2].keys())) == ('Estimation HO2(2)', 'Group(group) O2s-OsH', 'Group(other) R', 'Group(radical) HOOJ') + assert self.uncertainty.dG_dqs[2]['Estimation HO2(2)'] == 1 + assert self.uncertainty.dG_dqs[2]['Group(group) O2s-OsH'] == 2 + assert self.uncertainty.dG_dqs[2]['Group(other) R'] == 2 + assert self.uncertainty.dG_dqs[2]['Group(radical) HOOJ'] == 1 # Thermo library + GAV - assert tuple(sorted(self.uncertainty.thermo_intermediate_uncertainties[14].keys())) == ('Estimation CH3(14)', 'Group(radical) CH3', 'Library CH4(16)') - assert self.uncertainty.thermo_intermediate_uncertainties[14]['Estimation CH3(14)'] == 1 - assert self.uncertainty.thermo_intermediate_uncertainties[14]['Group(radical) CH3'] == 1 - assert self.uncertainty.thermo_intermediate_uncertainties[14]['Library CH4(16)'] == 1 + assert tuple(sorted(self.uncertainty.dG_dqs[14].keys())) == ('Estimation CH3(14)', 'Group(radical) CH3', 'Library CH4(16)') + assert self.uncertainty.dG_dqs[14]['Estimation CH3(14)'] == 1 + assert self.uncertainty.dG_dqs[14]['Group(radical) CH3'] == 1 + assert self.uncertainty.dG_dqs[14]['Library CH4(16)'] == 1 # Kinetics library - assert self.uncertainty.kinetic_intermediate_uncertainties[0].keys() == {'Library O(0)+H2O2(3)<=>OH(1)+HO2(2)'} + assert self.uncertainty.dlnk_dqs[0].keys() == {'Library O(0)+H2O2(3)<=>OH(1)+HO2(2)'} # Rate rule (exact) - assert tuple(sorted(self.uncertainty.kinetic_intermediate_uncertainties[1].keys())) == ('Estimation Family CH3(14)+PC3H7(15)<=>CH4(16)+CH2CH2CH2(17)', 'Rate Rule H_Abstraction C/H3/Cs;C_methyl') - assert self.uncertainty.kinetic_intermediate_uncertainties[1]['Estimation Family CH3(14)+PC3H7(15)<=>CH4(16)+CH2CH2CH2(17)'] == 1 - assert self.uncertainty.kinetic_intermediate_uncertainties[1]['Rate Rule H_Abstraction C/H3/Cs;C_methyl'] == 1 + assert tuple(sorted(self.uncertainty.dlnk_dqs[1].keys())) == ('Estimation Family CH3(14)+PC3H7(15)<=>CH4(16)+CH2CH2CH2(17)', 'Rate Rule H_Abstraction C/H3/Cs;C_methyl') + assert self.uncertainty.dlnk_dqs[1]['Estimation Family CH3(14)+PC3H7(15)<=>CH4(16)+CH2CH2CH2(17)'] == 1 + assert self.uncertainty.dlnk_dqs[1]['Rate Rule H_Abstraction C/H3/Cs;C_methyl'] == 1 # Rate rule (non-exact, multiple rule weights) - assert tuple(sorted(self.uncertainty.kinetic_intermediate_uncertainties[3].keys())) == ( + assert tuple(sorted(self.uncertainty.dlnk_dqs[3].keys())) == ( 'Estimation Family C2H3(20)+C3H8(19)<=>C2H4(11)+PC3H7(15)', 'Estimation Nonexact C2H3(20)+C3H8(19)<=>C2H4(11)+PC3H7(15)', 'Rate Rule H_Abstraction C/H3/Cs\\H2\\Cs|O;Cd_Cd\\H2_rad/Cs', 'Rate Rule H_Abstraction C/H3/Cs\\H3;Cd_Cd\\H2_pri_rad', ) - assert self.uncertainty.kinetic_intermediate_uncertainties[3]['Estimation Family C2H3(20)+C3H8(19)<=>C2H4(11)+PC3H7(15)'] == 1 - assert np.isclose(self.uncertainty.kinetic_intermediate_uncertainties[3]['Estimation Nonexact C2H3(20)+C3H8(19)<=>C2H4(11)+PC3H7(15)'], 0.4771212547, rtol=1e-4) - assert self.uncertainty.kinetic_intermediate_uncertainties[3]['Rate Rule H_Abstraction C/H3/Cs\\H2\\Cs|O;Cd_Cd\\H2_rad/Cs'] == 0.5 - assert self.uncertainty.kinetic_intermediate_uncertainties[3]['Rate Rule H_Abstraction C/H3/Cs\\H3;Cd_Cd\\H2_pri_rad'] == 0.5 + assert self.uncertainty.dlnk_dqs[3]['Estimation Family C2H3(20)+C3H8(19)<=>C2H4(11)+PC3H7(15)'] == 1 + assert np.isclose(self.uncertainty.dlnk_dqs[3]['Estimation Nonexact C2H3(20)+C3H8(19)<=>C2H4(11)+PC3H7(15)'], 0.4771212547, rtol=1e-4) + assert self.uncertainty.dlnk_dqs[3]['Rate Rule H_Abstraction C/H3/Cs\\H2\\Cs|O;Cd_Cd\\H2_rad/Cs'] == 0.5 + assert self.uncertainty.dlnk_dqs[3]['Rate Rule H_Abstraction C/H3/Cs\\H3;Cd_Cd\\H2_pri_rad'] == 0.5 # Training reaction - assert self.uncertainty.kinetic_intermediate_uncertainties[5].keys() == {'Training H_Abstraction CH3(14)+C2H6(18)<=>CH4(16)+C2H5(12)'} - assert self.uncertainty.kinetic_intermediate_uncertainties[5]['Training H_Abstraction CH3(14)+C2H6(18)<=>CH4(16)+C2H5(12)'] == 1 + assert self.uncertainty.dlnk_dqs[5].keys() == {'Training H_Abstraction CH3(14)+C2H6(18)<=>CH4(16)+C2H5(12)'} + assert self.uncertainty.dlnk_dqs[5]['Training H_Abstraction CH3(14)+C2H6(18)<=>CH4(16)+C2H5(12)'] == 1 # PDEP - assert self.uncertainty.kinetic_intermediate_uncertainties[6].keys() == {'PDep HCCO(10)(+M)<=>O(0)+C2H(8)(+M)'} - assert self.uncertainty.kinetic_intermediate_uncertainties[6]['PDep HCCO(10)(+M)<=>O(0)+C2H(8)(+M)'] == 1 + assert self.uncertainty.dlnk_dqs[6].keys() == {'PDep HCCO(10)(+M)<=>O(0)+C2H(8)(+M)'} + assert self.uncertainty.dlnk_dqs[6]['PDep HCCO(10)(+M)<=>O(0)+C2H(8)(+M)'] == 1 # correlated uncertainties should match uncorrelated, so check diagonal of covariance matrix thermo_covariance = np.sqrt(self.uncertainty.get_thermo_covariance_matrix().diagonal()) @@ -410,7 +412,7 @@ def test_local_analysis(self): # -------------------- repeat the exact same test for new formulation -------------------------- # uncorrelated analysis first - self.uncertainty.assign_intermediate_uncertainties() + self.uncertainty.assign_parameter_uncertainties(correlated=False) output = self.uncertainty.local_analysis_intermediate(sensitive_species=sensitive_species) total_variance, kinetic_uncertainty, thermo_uncertainty = output[sensitive_species[0]] assert np.isclose(total_variance, expected_uncorrelated_total_variance) @@ -431,7 +433,7 @@ def test_local_analysis(self): assert sorted_thermo_names == expected_uncorrelated_thermo_labels # now repeat for correlated analysis - self.uncertainty.assign_intermediate_uncertainties(correlated=True) + self.uncertainty.assign_parameter_uncertainties(correlated=True) output = self.uncertainty.local_analysis_intermediate(sensitive_species=sensitive_species, correlated=True) total_variance, kinetic_uncertainty, thermo_uncertainty = output[sensitive_species[0]] assert np.isclose(total_variance, expected_correlated_total_variance) @@ -467,11 +469,11 @@ def test_covariance_matrices(self): uncorrelated_thermo_inputs = np.array(self.uncertainty.thermo_input_uncertainties) uncorrelated_kinetic_inputs = np.array(self.uncertainty.kinetic_input_uncertainties) - self.uncertainty.assign_intermediate_uncertainties(correlated=False) + self.uncertainty.assign_parameter_uncertainties(correlated=False) uncorrelated_thermo_covariance = self.uncertainty.get_thermo_covariance_matrix() uncorrelated_kinetic_covariance = self.uncertainty.get_kinetic_covariance_matrix() - self.uncertainty.assign_intermediate_uncertainties(correlated=True) + self.uncertainty.assign_parameter_uncertainties(correlated=True) correlated_thermo_covariance = self.uncertainty.get_thermo_covariance_matrix() correlated_kinetic_covariance = self.uncertainty.get_kinetic_covariance_matrix() Sigma_ww_thermo = self.uncertainty._get_intermediate_thermo_covariance_matrix() From 35dab7f7d40e13102376645a6f54450f3456a4a5 Mon Sep 17 00:00:00 2001 From: Sevy Harris Date: Fri, 29 May 2026 10:40:00 -0400 Subject: [PATCH 7/7] implement rank-based UQ The RMG uncertainty tool assigns a single uncertainty value to all training reactions, but RMG has the ability to keep track of rank https://reactionmechanismgenerator.github.io/RMG-Py/users/rmg/kinetics.html so this enables the UQ to look up the rank for any entry in the model that has it and assign a more customized/specific uncertainty --- rmgpy/tools/uncertainty.py | 430 +++++++++++++++++++++++++------------ 1 file changed, 289 insertions(+), 141 deletions(-) diff --git a/rmgpy/tools/uncertainty.py b/rmgpy/tools/uncertainty.py index f470b9a4d9..bc753a7aa4 100644 --- a/rmgpy/tools/uncertainty.py +++ b/rmgpy/tools/uncertainty.py @@ -39,18 +39,59 @@ from rmgpy.tools.plot import parse_csv_data, plot_sensitivity, ReactionSensitivityPlot, ThermoSensitivityPlot +# This represents the ratio k_max/k_0 for a reaction if using a uniform distribution for uncertainty +# or the the ratio k_95/k_0 where k_95 is the 95% confidence bound for a lognormal distribution. +KINETIC_RANK_MULTIPLIERS = { + 1: 1.2, # Experiment/FCI + 2: 1.5, # W4/HEAT with very good (2-d if necessary) rotors + 3: 2.0, # CCSD(T)-F12/cc-PVnZ with n>2 or CCSD(T)-F12/CBS with good (2-d if necessary) rotors + 4: 3.0, # CCSD(T)-F12/DZ, with good (2-d if necessary) rotors + 5: 5.0, # CBS-QB3 with 1-d rotors + 6: 7.5, # Double-hybrid DFT with 1-D rotors + 7: 10.0, # Hybrid DFT (w/ dispersion) (rotors if necessary) + 8: 30, # B3LYP & lower DFT (rotors if necessary) + 9: 100, # Direct Estimate/Guess + 10: 1000, # Average of Rates + 11: 10000, # General Estimate (Never used in generation) + 0: 10000, # General Estimate (Never used in generation) +} + +# Use this for uniform distribution +KINETIC_RANK_UNCERTAINTIES = {k: np.log(v) / np.sqrt(3.0) for k, v in KINETIC_RANK_MULTIPLIERS.items()} + +# Use this for lognormal distribution +KINETIC_RANK_UNCERTAINTIES = {k: np.log(v) / 1.96 for k, v in KINETIC_RANK_MULTIPLIERS.items()} + +THERMO_RANK_UNCERTAINTIES = { # THESE ARE FILLER. PLEASE UPDATE WITH BETTER UNCERTAINTIES BASED ON DATA ANALYSIS + 1: 0.1, # Experiment/FCI + 2: 0.2, # W4/HEAT with very good (2-d if necessary) rotors + 3: 0.3, # CCSD(T)-F12/cc-PVnZ with n>2 or CCSD(T)-F12/CBS with good (2-d if necessary) rotors + 4: 0.5, # CCSD(T)-F12/DZ, with good (2-d if necessary) rotors + 5: 0.8, # CBS-QB3 with 1-d rotors + 6: 1.0, # Double-hybrid DFT with 1-D rotors + 7: 1.5, # Hybrid DFT (w/ dispersion) (rotors if necessary) + 8: 2.0, # B3LYP & lower DFT (rotors if necessary) + 9: 2.5, # Direct Estimate/Guess + 10: 3.0, # Average of Rates + 11: 3.0, # General Estimate (Never used in generation) + 0: 3.0, # General Estimate (Never used in generation) +} + + class ThermoParameterUncertainty(object): """ This class is an engine that generates the species uncertainty based on its thermo sources. """ - def __init__(self, dG_library=1.5, dG_QM=3.0, dG_GAV=1.5, dG_group=0.7159, dG_ADS_correction=6.918, dG_surf_lib=6.918, other_covariances=None): + def __init__(self, dG_library=1.5, dG_QM=3.0, dG_GAV=1.5, dG_group=0.7159, dG_ADS_correction=6.918, dG_surf_lib=6.918, other_covariances=None, rank_dictionary=True): """ Initialize the different uncertainties dG_library, dG_QM, dG_GAV, and dG_other with set values in units of kcal/mol. We expect a uniform distribution for some species free energy G in [Gmin, Gmax]. dG = (Gmax-Gmin)/2 + + rank_dictionary is set to True or False. If True, uses the default RMG ranks in THERMO_RANK_UNCERTAINTIES to assign rank-based uncertainties. """ self.dG_library = dG_library self.dG_QM = dG_QM @@ -59,6 +100,21 @@ def __init__(self, dG_library=1.5, dG_QM=3.0, dG_GAV=1.5, dG_group=0.7159, dG_AD self.dG_ADS_correction = dG_ADS_correction self.dG_surf_lib = dG_surf_lib self.other_covariances = other_covariances # storage of covariances as a dict. Keys are sorted tuples of parameter labels and values are covariances + if rank_dictionary: # default is to use default rank dictionary + self.rank_dictionary = THERMO_RANK_UNCERTAINTIES + else: # ignore rank + self.rank_dictionary = None + + def _get_rank_uncertainty(self, entry, default_uncertainty): + """Helper function for returning the rank-based uncertainty for a given item, + which may be a training reaction or a rate rule entry. + If the item does not have a rank attribute or if the rank is not in the rank dictionary, return the default uncertainty.""" + if not self.rank_dictionary: + return default_uncertainty + rank = getattr(entry, 'rank', None) + if rank is None or rank not in self.rank_dictionary: + return default_uncertainty + return self.rank_dictionary[rank] def get_uncertainty_value(self, source): """ @@ -66,9 +122,16 @@ def get_uncertainty_value(self, source): """ varG = 0.0 if 'Library' in source: - varG += self.dG_library * self.dG_library + library_entry = source['Library'][1] + default_uncertainty = self.dG_library + dG_library = self._get_rank_uncertainty(library_entry, default_uncertainty) + varG += dG_library * dG_library if 'Surface_Library' in source: - surf_lib_varG = self.dG_surf_lib * self.dG_surf_lib + surf_lib_entry = source['Surface_Library'][1] + default_uncertainty = self.dG_surf_lib + dG_surf_lib = self._get_rank_uncertainty(surf_lib_entry, default_uncertainty) + surf_lib_varG = dG_surf_lib * dG_surf_lib + # covariance libraries should overrule the default uncertainties when available, if self.other_covariances is not None: label = source["Surface_Library"][2] # match the covariance dict label format @@ -104,13 +167,17 @@ def get_partial_uncertainty_value(self, source, corr_source_type, corr_param=Non if 'Library' in source: # check if same library and index match, could do isomorphism check on the entries, but this is faster if source['Library'][0] == corr_param[0] and source['Library'][1].index == corr_param[1].index: # Correlated parameter is a source of the overall parameter - return self.dG_library + library_entry = source['Library'][1] + dG_library = self._get_rank_uncertainty(library_entry, self.dG_library) + return dG_library elif corr_source_type == 'Surface_Library': if 'Surface_Library' in source: if source['Surface_Library'][0] == corr_param[0] and source['Surface_Library'][1].index == corr_param[1].index: # Correlated parameter is a source of the overall parameter - return self.dG_surf_lib + surf_lib_entry = source['Surface_Library'][1] + dG_surf_lib = self._get_rank_uncertainty(surf_lib_entry, self.dG_surf_lib) + return dG_surf_lib elif corr_source_type == 'QM': if 'QM' in source: # here we do an isomorphism check since no index is saved for QM @@ -144,6 +211,30 @@ def get_partial_uncertainty_value(self, source, corr_source_type, corr_param=Non # If we get here, it means the correlated parameter was not found return None + def get_intermediate_uncertainty_value(self, source_type, source): + """ + Get the uncertainty of an intermediate parameter, which is an underlying parameter that contributes to the overall uncertainty but is not directly a source parameter. For example, this could be the uncertainty of a specific group contribution value that contributes to the overall GAV uncertainty for a species. + + `source_type` is a string indicating the type of intermediate parameter, such as 'Library', 'Surface_Library', 'QM', 'GAV', or 'ADS' + `source` is the identifier for the intermediate parameter, such as (method_name, species) for QM or (library_name, library_entry, entry_label) for library parameters + """ + if source_type == 'Library': + library_entry = source[1] + return self._get_rank_uncertainty(library_entry, self.dG_library) + elif source_type == 'Surface_Library': + surf_lib_entry = source[1] + return self._get_rank_uncertainty(surf_lib_entry, self.dG_surf_lib) + elif source_type == 'QM': + return self.dG_QM + elif source_type == 'GAV': + return self.dG_group + elif source_type == 'ADS': + return self.dG_ADS_correction + elif source_type == 'Estimation': + return self.dG_GAV + else: + raise Exception('Thermo intermediate parameter source must be Library, Surface_Library, QM, GAV, or ADS') + def get_uncertainty_factor(self, source): """ Retrieve the uncertainty factor f in kcal/mol when the source of the thermo of a species is given. @@ -154,47 +245,26 @@ def get_uncertainty_factor(self, source): f = np.sqrt(3) * dG return f - def _get_covariance_qq(self, q_label1, q_label2): + def _get_covariance_qq(self, q_label1, q_label2, dG_source1=None): """ - Gets the covariance between two intermediate sources q1 and q2 - Where q_label1 and q_label2 are both labels for a given thermo source + Gets the covariance between two intermediate sources q1 and q2 in units of (kcal/mol)^2 + Where q_label1 and q_label2 are both labels for a given thermo source and dG_source1 is the partial uncertainty of source 1 The possible intermediate parameter types are: Library, Surface_Library, QM, Estimation, AdsorptionCorrection, or Group - """ - intermediate_parameters = { - 'Library': self.dG_library, - 'Surface_Library': self.dG_surf_lib, - 'QM': self.dG_QM, - 'Estimation': self.dG_GAV, - 'AdsorptionCorrection': self.dG_ADS_correction, - 'Group': self.dG_group, - } - - # figure out the type of each correlated parameter from its label - corr_type1 = None - corr_type2 = None - - for intermediate_type in intermediate_parameters.keys(): - if q_label1.startswith(intermediate_type): - corr_type1 = intermediate_type - if q_label2.startswith(intermediate_type): - corr_type2 = intermediate_type - - if corr_type1 is None or corr_type2 is None: - raise ValueError(f'Could not determine the type of the correlated parameters from their labels {q_label1} and {q_label2}') + Sources are assumed to be independent unless the labels match exactly or an explicit covariance exists for the two + """ + # Explicit covariance value overrides all other possibilities if self.other_covariances is not None: # check if covariance is specified in other_covariances dict sorted_labels = tuple(sorted([q_label1, q_label2])) if sorted_labels in self.other_covariances: return self.other_covariances[sorted_labels] - if corr_type1 != corr_type2: - return 0 - elif q_label1 == q_label2: + if q_label1 == q_label2: # If the two correlated parameters are exactly the same, return the variance of that parameter - return intermediate_parameters[corr_type1] ** 2.0 + return dG_source1 * dG_source1 return 0 @@ -204,13 +274,14 @@ class KineticParameterUncertainty(object): """ def __init__(self, dlnk_library=0.5, dlnk_training=0.5, dlnk_pdep=2.0, dlnk_family=1.0, dlnk_nonexact=3.5, - dlnk_rule=0.5, dlnk_surf_library=2.659, dlnk_surf_training=2.659, dlnk_surf_rule=2.659): + dlnk_rule=0.5, dlnk_surf_library=2.659, dlnk_surf_training=2.659, dlnk_surf_rule=2.659, rank_dictionary=True): """ Initialize the different uncertainties dlnk We expect a uniform distribution for some reaction kinetics about ln(k0) in [ln(kmin), ln(kmax)]. dlnk = (ln(kmax)-ln(kmin))/2 - """ + + rank_dictionary is set to True or False. If True, uses the default RMG ranks in KINETIC_RANK_UNCERTAINTIES to assign rank-based uncertainties.""" self.dlnk_library = dlnk_library self.dlnk_training = dlnk_training self.dlnk_pdep = dlnk_pdep @@ -220,6 +291,21 @@ def __init__(self, dlnk_library=0.5, dlnk_training=0.5, dlnk_pdep=2.0, dlnk_fami self.dlnk_surf_library = dlnk_surf_library self.dlnk_surf_training = dlnk_surf_training self.dlnk_surf_rule = dlnk_surf_rule + if rank_dictionary: + self.rank_dictionary = KINETIC_RANK_UNCERTAINTIES # use default rank dictionary + else: # ignore rank + self.rank_dictionary = None + + def _get_rank_uncertainty(self, entry, default_uncertainty): + """Helper function for returning the rank-based uncertainty for a given item, + which may be a training reaction or a rate rule entry. + If the item does not have a rank attribute or if the rank is not in the rank dictionary, return the default uncertainty.""" + if not self.rank_dictionary: + return default_uncertainty + rank = getattr(entry, 'rank', None) + if rank is None or rank not in self.rank_dictionary: + return default_uncertainty + return self.rank_dictionary[rank] def get_uncertainty_value(self, source): """ @@ -228,10 +314,16 @@ def get_uncertainty_value(self, source): varlnk = 0.0 if 'Library' in source: # Should be a single library reaction source - varlnk += self.dlnk_library * self.dlnk_library + library_entry = source['Library'][1] + default_uncertainty = self.dlnk_library + dlnk_library = self._get_rank_uncertainty(library_entry, default_uncertainty) + varlnk += dlnk_library * dlnk_library elif 'Surface_Library' in source: # Should be a single library reaction source - varlnk += self.dlnk_surf_library * self.dlnk_surf_library + surface_library_entry = source['Surface_Library'][1] + default_uncertainty = self.dlnk_surf_library + dlnk_surf_library = self._get_rank_uncertainty(surface_library_entry, default_uncertainty) + varlnk += dlnk_surf_library * dlnk_surf_library elif 'PDep' in source: # Should be a single pdep reaction source varlnk += self.dlnk_pdep * self.dlnk_pdep @@ -239,10 +331,13 @@ def get_uncertainty_value(self, source): # Should be a single training reaction # Although some training entries may be used in reverse, # We still consider the kinetics to be directly dependent - if 'surface' in source['Training'][0].lower(): - varlnk += self.dlnk_surf_training * self.dlnk_surf_training + training_entry = source['Training'][1] + if training_entry.item.is_surface_reaction(): + default_uncertainty = self.dlnk_surf_training else: - varlnk += self.dlnk_training * self.dlnk_training + default_uncertainty = self.dlnk_training + dlnk_training = self._get_rank_uncertainty(training_entry, default_uncertainty) + varlnk += dlnk_training * dlnk_training elif 'Rate Rules' in source: family_label = source['Rate Rules'][0] source_dict = source['Rate Rules'][1] @@ -250,6 +345,15 @@ def get_uncertainty_value(self, source): rule_weights = [ruleTuple[-1] for ruleTuple in source_dict['rules']] training_weights = [trainingTuple[-1] for trainingTuple in source_dict['training']] + default_rule_uncertainty = self.dlnk_surf_rule if 'surface' in family_label.lower() else self.dlnk_rule + # Note that even if these source from training reactions, we actually + # use the uncertainty for rate rules, since these are now approximations + # of the original reaction. We consider these to be independent of original the training + # parameters because the rate rules may be reversing the training reactions, + # which leads to more complicated dependence + rule_uncertainties = [self._get_rank_uncertainty(ruleEntry, default_rule_uncertainty) for ruleEntry, weight in source_dict['rules']] + training_uncertainties = [self._get_rank_uncertainty(trainingEntry, default_rule_uncertainty) for ruleEntry, trainingEntry, weight in source_dict['training']] + varlnk += self.dlnk_family * self.dlnk_family N = len(rule_weights) + len(training_weights) @@ -257,19 +361,11 @@ def get_uncertainty_value(self, source): # nonexactness contribution increases as N increases varlnk += (np.log10(N + 1) * self.dlnk_nonexact) * (np.log10(N + 1) * self.dlnk_nonexact) - if 'surface' in family_label.lower(): - varlnk += np.sum([weight * weight * self.dlnk_surf_rule * self.dlnk_surf_rule for weight in rule_weights]) - varlnk += np.sum([weight * weight * self.dlnk_surf_training * self.dlnk_surf_training for weight in training_weights]) - else: - # Add the contributions from rules - varlnk += np.sum([weight * weight * self.dlnk_rule * self.dlnk_rule for weight in rule_weights]) - # Add the contributions from training - # Even though these source from training reactions, we actually - # use the uncertainty for rate rules, since these are now approximations - # of the original reaction. We consider these to be independent of original the training - # parameters because the rate rules may be reversing the training reactions, - # which leads to more complicated dependence - varlnk += np.sum([weight * weight * self.dlnk_rule * self.dlnk_rule for weight in training_weights]) + # Add the contributions from rules + varlnk += np.sum([weight * weight * rule_uncertainty * rule_uncertainty for weight, rule_uncertainty in zip(rule_weights, rule_uncertainties)]) + + # Add the contributions from training + varlnk += np.sum([weight * weight * training_uncertainty * training_uncertainty for weight, training_uncertainty in zip(training_weights, training_uncertainties)]) return np.sqrt(varlnk) @@ -294,16 +390,30 @@ def get_partial_uncertainty_value(self, source, corr_source_type, corr_param=Non for ruleEntry, weight in rules: if corr_param == ruleEntry: if surface_family: - return weight * self.dlnk_surf_rule + dlnk_rule = self.dlnk_surf_rule else: - return weight * self.dlnk_rule + dlnk_rule = self.dlnk_rule + + # get rank-based uncertainty if rank_dictionary is being used + dlnk_rule = self._get_rank_uncertainty(ruleEntry, dlnk_rule) + + return weight * dlnk_rule if training: for ruleEntry, trainingEntry, weight in training: if corr_param == ruleEntry: + # We use the uncertainty for rate rules, since these are now approximations + # of the original reaction. We consider these to be independent of original the training + # parameters because the rate rules may be reversing the training reactions, + # which leads to more complicated dependence if surface_family: - return weight * self.dlnk_surf_rule + dlnk_rule = self.dlnk_surf_rule else: - return weight * self.dlnk_rule + dlnk_rule = self.dlnk_rule + + # get rank-based uncertainty if rank_dictionary is being used + dlnk_rule = self._get_rank_uncertainty(ruleEntry, dlnk_rule) + + return weight * dlnk_rule # Writing it this way in the function is not the most efficient, but makes it easy to use, and # testing a few if statements is not too costly @@ -311,12 +421,14 @@ def get_partial_uncertainty_value(self, source, corr_source_type, corr_param=Non if 'Library' in source: if corr_param == source['Library']: # Should be a single library reaction source - return self.dlnk_library + library_entry = source['Library'][1] + return self._get_rank_uncertainty(library_entry, self.dlnk_library) elif corr_source_type == 'Surface_Library': if 'Surface_Library' in source: if corr_param == source['Surface_Library']: # Should be a single library reaction source - return self.dlnk_surf_library + surface_library_entry = source['Surface_Library'][1] + return self._get_rank_uncertainty(surface_library_entry, self.dlnk_surf_library) elif corr_source_type == 'PDep': if 'PDep' in source: if corr_param == source['PDep']: @@ -325,12 +437,11 @@ def get_partial_uncertainty_value(self, source, corr_source_type, corr_param=Non if 'Training' in source: # Should be a unique single training reaction if corr_param == source['Training']: - if 'surface' in source['Training'][0].lower(): - return self.dlnk_surf_training + training_entry = source['Training'][1] + if training_entry.item.is_surface_reaction(): + return self._get_rank_uncertainty(training_entry, self.dlnk_surf_training) else: - return self.dlnk_training - - + return self._get_rank_uncertainty(training_entry, self.dlnk_training) elif corr_source_type == 'Estimation Nonexact': # Return the uncorrelated uncertainty associated with using a non-exact rate rule if 'Rate Rules' in source: @@ -350,6 +461,31 @@ def get_partial_uncertainty_value(self, source, corr_source_type, corr_param=Non # If we get here, it means that we did not find the correlated parameter in the source return None + def get_intermediate_uncertainty_value(self, source_type, source): + """ + Get the uncertainty of an intermediate parameter, which is an underlying parameter that contributes to the overall uncertainty but is not directly a source parameter. For example, this could be the uncertainty of a specific rate rule that contributes to the overall uncertainty for a reaction. + + `source_type` is a string indicating the type of intermediate parameter, such as 'Rate Rules', 'Library', 'PDep', or 'Training' + `source` is the identifier for the intermediate parameter, such as the string identifier of the rate rule + """ + if source_type == 'Rate Rules': + return self._get_rank_uncertainty(source, self.dlnk_rule) + elif source_type == 'Library': + return self._get_rank_uncertainty(source, self.dlnk_library) + elif source_type == 'Surface_Library': + return self._get_rank_uncertainty(source, self.dlnk_surf_library) + elif source_type == 'PDep': + return self.dlnk_pdep + elif source_type == 'Training': + return self._get_rank_uncertainty(source, self.dlnk_training) + elif source_type == 'Estimation Nonexact': + return self.dlnk_nonexact + elif source_type == 'Estimation Family': + return self.dlnk_family + else: + raise Exception('Kinetic intermediate parameter source must be Rate Rules, Library, PDep, or Training') + + def get_uncertainty_factor(self, source): """ Retrieve the uncertainty factor f when the source of the reaction kinetics are given. @@ -360,48 +496,19 @@ def get_uncertainty_factor(self, source): f = np.sqrt(3) / np.log(10) * dlnk return f - def _get_covariance_qq(self, q_label1, q_label2): + def _get_covariance_qq(self, q_label1, q_label2, dlnk_source1=None): """ Gets the covariance between two intermediate sources q1 and q2 - Where q_label1 and q_label2 are both labels for a given kinetics source + Where q_label1 and q_label2 are both labels for a given kinetics source and dlnk_source1 is the partial uncertainty of source 1 The possible intermediate parameter types are: Library, Surface_Library, PDEP, Training, Rate Rule, Estimation Family, Estimation Nonexact, Surface Training, and Surface Rate Rule """ - - intermediate_parameters = { - 'Library': self.dlnk_library, - 'Surface_Library': self.dlnk_surf_library, - 'PDep': self.dlnk_pdep, - 'Training': self.dlnk_training, - 'Rate Rule': self.dlnk_rule, - 'Estimation Family': self.dlnk_family, - 'Estimation Nonexact': self.dlnk_nonexact, - 'Surface Training': self.dlnk_surf_training, - 'Surface Rate Rule': self.dlnk_surf_rule, - } - - corr_type1 = None - corr_type2 = None - - # figure out the intermediate parameter type - for intermediate_type in intermediate_parameters.keys(): - if q_label1.startswith(intermediate_type): - corr_type1 = intermediate_type - if q_label2.startswith(intermediate_type): - corr_type2 = intermediate_type - - if corr_type1 is None or corr_type2 is None: - raise ValueError(f'Could not determine the type of the correlated parameters from their labels {q_label1} and {q_label2}') - - if corr_type1 != corr_type2: - # If the two correlated parameters are of different types, we consider them to be uncorrelated - return 0 - - elif q_label1 == q_label2: + + if q_label1 == q_label2: # If the two correlated parameters are exactly the same, return the variance of that parameter - return intermediate_parameters[corr_type1] ** 2.0 + return dlnk_source1 * dlnk_source1 return 0 @@ -429,20 +536,21 @@ def __init__(self, species_list=None, reaction_list=None, output_directory='', t self.all_kinetic_sources = None self.thermo_input_uncertainties = None # thermo parameter uncertainties self.kinetic_input_uncertainties = None # kinetic parameter uncertainties + self.thermo_intermediate_uncertainties = None # list of dictionaries of intermediate parameter uncertainties for each species + self.kinetic_intermediate_uncertainties = None # list of dictionaries of intermediate parameter uncertainties for each reaction self.dG_dqs = None # derivatives of Gibbs free energy with respect to underlying thermo parameters self.dlnk_dqs = None # derivatives of ln(k) with respect to underlying kinetic parameters self.thermo_covariance_matrix = None # covariance matrix of all species thermo uncertainties self.kinetic_covariance_matrix = None # covariance matrix of all reaction kinetic uncertainties self.Sigma_ww_thermo = None # covariance matrix of all underlying thermo parameter uncertainties self.Sigma_ww_kinetics = None # covariance matrix of all underlying kinetics parameter uncertainties - self.all_thermo_intermediates = None # list of labels of underlying thermo parameters - self.all_kinetics_intermediates = None # list of labels of underlying kinetic parameters + self.thermo_intermediates = None # list of labels of underlying thermo parameters (may be truncated to relevant parameters based on sensitivity) + self.kinetic_intermediates = None # list of labels of underlying kinetic parameters (may be truncated to relevant parameters based on sensitivity) self.output_directory = output_directory if output_directory else os.getcwd() self.thermo_covariance_libraries = thermo_covariance_libraries self.thermo_covariance_groups = thermo_covariance_groups self.thermo_covariances_dict = {} # dictionary to store covariances from covariance libraries - - # Make output directory if it does not yet exist: + self.used_rank = False # keep track of whether we apply rank-based uncertainties if not os.path.exists(self.output_directory): try: os.makedirs(self.output_directory) @@ -904,17 +1012,25 @@ def get_species_label(self, species): else: return species.to_chemkin() - def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=None, correlated=False): + def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=None, correlated=False, use_rank=False): """ Assign uncertainties based on the sources of the species thermo and reaction kinetics. + + use_rank: whether to apply the rank-based uncertainties for library entries """ if g_param_engine is None: - g_param_engine = ThermoParameterUncertainty(other_covariances=self.thermo_covariances_dict) + g_param_engine = ThermoParameterUncertainty(other_covariances=self.thermo_covariances_dict, rank_dictionary=use_rank) if k_param_engine is None: - k_param_engine = KineticParameterUncertainty() + k_param_engine = KineticParameterUncertainty(rank_dictionary=use_rank) + self.used_rank = use_rank # keep track of whether we applied rank-based uncertainties in case it's needed for later analysis or plotting + # reset variables self.thermo_input_uncertainties = [] self.kinetic_input_uncertainties = [] + self.thermo_intermediate_uncertainties = [] + self.kinetic_intermediate_uncertainties = [] + self.thermo_intermediates = None + self.kinetic_intermediates = None self.dG_dqs = [] # a list of dictionaries to store the intermediate derivatives dG_i/dq for each parameter q that contributes to the uncertainty of species i's G self.dlnk_dqs = [] # a list of dictionaries to store the intermediate derivatives dlnk_i/dq for each parameter q that contributes to the uncertainty of reaction i's k @@ -925,6 +1041,7 @@ def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=Non self.thermo_input_uncertainties.append(dG) else: source = self.species_sources_dict[species] + intermediate_source = {} dG = {} dG_dq = {} if 'Library' in source: @@ -932,16 +1049,19 @@ def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=Non label = source['Library'][2] # the label we added to the end of the library source tuple in extract_sources_from_model dG[label] = pdG dG_dq[label] = 1 # derivative of G with respect to the library parameter is 1, since it's a direct contribution + intermediate_source[label] = g_param_engine.get_intermediate_uncertainty_value('Library', source['Library']) if 'Surface_Library' in source: pdG = g_param_engine.get_partial_uncertainty_value(source, 'Surface_Library', corr_param=source['Surface_Library']) label = source['Surface_Library'][2] dG[label] = pdG dG_dq[label] = 1 # derivative of G with respect to the surface library parameter is 1, since it's a direct contribution + intermediate_source[label] = g_param_engine.get_intermediate_uncertainty_value('Surface_Library', source['Surface_Library']) if 'QM' in source: pdG = g_param_engine.get_partial_uncertainty_value(source, 'QM', corr_param=source['QM']) label = source['QM'][2] dG[label] = pdG dG_dq[label] = 1 # derivative of G with respect to the QM parameter is 1, since it's a direct contribution + intermediate_source[label] = g_param_engine.get_intermediate_uncertainty_value('QM', source['QM']) if 'ADS' in source: for adsGroupType, groupList in source['ADS'].items(): for group, weight in groupList: @@ -951,6 +1071,7 @@ def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=Non if weight != 1: raise ValueError('Weight for adsorption group contribution to thermo should be 1, but got weight={weight} for {adsGroupType} in species {species}'.format(weight=weight, adsGroupType=adsGroupType, species=species)) dG_dq[label] = weight # This should be 1 because there's only one group contribution per adsorption correction + intermediate_source[label] = g_param_engine.get_intermediate_uncertainty_value('ADS', source['ADS']) if 'GAV' in source: for groupType, groupList in source['GAV'].items(): for group, weight in groupList: @@ -958,14 +1079,17 @@ def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=Non label = 'Group({}) {}'.format(groupType, group.label) dG[label] = pdG dG_dq[label] = weight # the derivative of G with respect to the group contribution is the weight of that group contribution to the overall G + intermediate_source[label] = g_param_engine.get_intermediate_uncertainty_value('GAV', source['GAV']) # We also know if there is group additivity used, there will be uncorrelated estimation error est_pdG = g_param_engine.get_partial_uncertainty_value(source, 'Estimation') if est_pdG: label = 'Estimation {}'.format(species.to_chemkin()) dG[label] = est_pdG dG_dq[label] = 1 # the derivative of G with respect to the estimation error is 1, since we add the term directly + intermediate_source[label] = g_param_engine.get_intermediate_uncertainty_value('Estimation', source['GAV']) self.thermo_input_uncertainties.append(dG) self.dG_dqs.append(dG_dq) + self.thermo_intermediate_uncertainties.append(intermediate_source) for reaction in self.reaction_list: if not correlated: @@ -975,6 +1099,7 @@ def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=Non source = self.reaction_sources_dict[reaction] dlnk = {} dlnk_dq = {} + intermediate_source = {} if 'Rate Rules' in source: family = source['Rate Rules'][0] source_dict = source['Rate Rules'][1] @@ -990,12 +1115,14 @@ def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=Non label = '{}Rate Rule {} {}'.format(surface_prefix, family, ruleEntry) dlnk[label] = dplnk dlnk_dq[label] = weight # the derivative of ln(k) with respect to the rate rule contribution + intermediate_source[label] = k_param_engine.get_intermediate_uncertainty_value('Rate Rules', source['Rate Rules']) for ruleEntry, trainingEntry, weight in training: dplnk = k_param_engine.get_partial_uncertainty_value(source, 'Rate Rules', corr_param=ruleEntry, corr_family=family) label = '{}Rate Rule {} {}'.format(surface_prefix, family, ruleEntry) dlnk[label] = dplnk dlnk_dq[label] = weight # the derivative of ln(k) with respect to the training rule contribution + intermediate_source[label] = k_param_engine.get_intermediate_uncertainty_value('Rate Rules', source['Rate Rules']) N = len(source_dict['rules']) + len(source_dict['training']) @@ -1005,29 +1132,34 @@ def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=Non label = 'Estimation Nonexact {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) dlnk[label] = nonexact_dplnk dlnk_dq[label] = np.log10(N + 1) # the derivative of ln(k) with respect to the nonexact estimation error + intermediate_source[label] = k_param_engine.get_intermediate_uncertainty_value('Estimation Nonexact', source['Rate Rules']) family_dplnk = k_param_engine.get_partial_uncertainty_value(source, 'Estimation Family', corr_family=family) if family_dplnk: label = 'Estimation Family {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) dlnk[label] = family_dplnk dlnk_dq[label] = 1 # the derivative of ln(k) with respect to the family estimation error + intermediate_source[label] = k_param_engine.get_intermediate_uncertainty_value('Estimation Family', source['Rate Rules']) elif 'PDep' in source: dplnk = k_param_engine.get_partial_uncertainty_value(source, 'PDep', source['PDep']) label = 'PDep {}'.format(reaction.to_chemkin(self.species_list, kinetics=False)) dlnk[label] = dplnk dlnk_dq[label] = 1 + intermediate_source[label] = k_param_engine.get_intermediate_uncertainty_value('PDep', source['PDep']) elif 'Library' in source: dplnk = k_param_engine.get_partial_uncertainty_value(source, 'Library', source['Library']) label = source['Library'][2] dlnk[label] = dplnk dlnk_dq[label] = 1 + intermediate_source[label] = k_param_engine.get_intermediate_uncertainty_value('Library', source['Library']) elif 'Surface_Library' in source: dplnk = k_param_engine.get_partial_uncertainty_value(source, 'Surface_Library', source['Surface_Library']) label = source['Surface_Library'][2] dlnk[label] = dplnk dlnk_dq[label] = 1 + intermediate_source[label] = k_param_engine.get_intermediate_uncertainty_value('Surface_Library', source['Surface_Library']) elif 'Training' in source: dplnk = k_param_engine.get_partial_uncertainty_value(source, 'Training', source['Training']) @@ -1035,9 +1167,11 @@ def assign_parameter_uncertainties(self, g_param_engine=None, k_param_engine=Non label = 'Training {} {}'.format(family, reaction.to_chemkin(self.species_list, kinetics=False)) dlnk[label] = dplnk dlnk_dq[label] = 1 + intermediate_source[label] = k_param_engine.get_intermediate_uncertainty_value('Training', source['Training']) self.kinetic_input_uncertainties.append(dlnk) self.dlnk_dqs.append(dlnk_dq) + self.kinetic_intermediate_uncertainties.append(intermediate_source) def sensitivity_analysis(self, initial_mole_fractions, sensitive_species, T, P, termination_time, sensitivity_threshold=1e-3, number=10, fileformat='.png', initial_surface_coverages=None, @@ -1294,26 +1428,26 @@ def local_analysis_intermediate(self, sensitive_species, reaction_system_index=0 for i in range(len(thermo_contributions)): if thermo_contributions[i] < 0: - print(f'Warning: negative contribution to variance from {self.all_thermo_intermediates[i]} of {thermo_contributions[i]}. Setting contribution to 0 for plotting purposes.') + print(f'Warning: negative contribution to variance from {self.thermo_intermediates[i]} of {thermo_contributions[i]}. Setting contribution to 0 for plotting purposes.') thermo_contributions[i] = 0 total_variance = np.sum(thermo_contributions) + np.sum(kinetic_contributions) # 5. ------------------------- Make plots -------------------------- - # define labels if they're not already listed in all_thermo/kinetics_intermediates - if self.all_thermo_intermediates is None or len(self.all_thermo_intermediates) != N_q_thermo: - self.all_thermo_intermediates = [f'dln[{sens_species.to_chemkin()}]/dG[{self.species_list[sp_idx].to_chemkin()}]' for sp_idx in species_used] - if self.all_kinetics_intermediates is None or len(self.all_kinetics_intermediates) != N_q_kinetics: - self.all_kinetics_intermediates = ['k' + str(self.reaction_list[rxn_idx].index) + ': ' + self.reaction_list[rxn_idx].to_chemkin(kinetics=False) for rxn_idx in reactions_used] - + # define labels if they're not already listed in all_thermo/kinetic_intermediates + if self.thermo_intermediates is None or len(self.thermo_intermediates) != N_q_thermo: + self.thermo_intermediates = [f'dln[{sens_species.to_chemkin()}]/dG[{self.species_list[sp_idx].to_chemkin()}]' for sp_idx in species_used] + if self.kinetic_intermediates is None or len(self.kinetic_intermediates) != N_q_kinetics: + self.kinetic_intermediates = ['k' + str(self.reaction_list[rxn_idx].index) + ': ' + self.reaction_list[rxn_idx].to_chemkin(kinetics=False) for rxn_idx in reactions_used] + # append all data points thermo_plotting_data = [] kinetics_plotting_data = [] for i in range(N_q_thermo): - label = self.all_thermo_intermediates[i] + label = self.thermo_intermediates[i] thermo_plotting_data.append(GenericData(data=[np.sqrt(thermo_contributions[i])], uncertainty=1.0, label=label, species='dummy')) for i in range(N_q_kinetics): - label = self.all_kinetics_intermediates[i] + label = self.kinetic_intermediates[i] kinetics_plotting_data.append(GenericData(data=[np.sqrt(kinetic_contributions[i])], uncertainty=1.0, label=label, reaction='dummy')) # set up the folders and filenames for plotting @@ -1350,15 +1484,16 @@ def get_thermo_covariance_matrix(self, g_param_engine=None): self.thermo_covariance_matrix = np.zeros((len(self.species_list), len(self.species_list))) if g_param_engine is None: - g_param_engine = ThermoParameterUncertainty(other_covariances=self.thermo_covariances_dict) + g_param_engine = ThermoParameterUncertainty(other_covariances=self.thermo_covariances_dict, rank_dictionary=self.used_rank) for i in range(len(self.species_list)): for j in range((len(self.species_list))): for q in self.dG_dqs[i].keys(): dG_i_dq = self.dG_dqs[i][q] + q_uncertainty = self.thermo_intermediate_uncertainties[i][q] for r in self.dG_dqs[j].keys(): dG_j_dr = self.dG_dqs[j][r] - self.thermo_covariance_matrix[i, j] += dG_i_dq * g_param_engine._get_covariance_qq(q, r) * dG_j_dr + self.thermo_covariance_matrix[i, j] += dG_i_dq * g_param_engine._get_covariance_qq(q, r, q_uncertainty) * dG_j_dr return self.thermo_covariance_matrix @@ -1379,7 +1514,7 @@ def get_kinetic_covariance_matrix(self, k_param_engine=None): return self.kinetic_covariance_matrix if k_param_engine is None: - k_param_engine = KineticParameterUncertainty() + k_param_engine = KineticParameterUncertainty(rank_dictionary=self.used_rank) self.kinetic_covariance_matrix = np.zeros((len(self.reaction_list), len(self.reaction_list))) @@ -1387,9 +1522,10 @@ def get_kinetic_covariance_matrix(self, k_param_engine=None): for j in range(len(self.reaction_list)): for q in self.dlnk_dqs[i].keys(): dlnk_i_dq = self.dlnk_dqs[i][q] + q_uncertainty = self.kinetic_intermediate_uncertainties[i][q] for r in self.dlnk_dqs[j].keys(): dlnk_j_dr = self.dlnk_dqs[j][r] - self.kinetic_covariance_matrix[i, j] += dlnk_i_dq * k_param_engine._get_covariance_qq(q, r) * dlnk_j_dr + self.kinetic_covariance_matrix[i, j] += dlnk_i_dq * k_param_engine._get_covariance_qq(q, r, q_uncertainty) * dlnk_j_dr return self.kinetic_covariance_matrix @@ -1406,25 +1542,31 @@ def _get_intermediate_thermo_covariance_matrix(self, g_param_engine=None, subset subset_indices = np.arange(len(self.species_list)) if isinstance(self.thermo_input_uncertainties[0], np.float64): # uncorrelated case + self.thermo_intermediates = None # in the uncorrelated case, we don't have specific intermediate parameters to track, so we can set this to None self.Sigma_ww_thermo = np.diag(np.float_power([self.thermo_input_uncertainties[i] for i in subset_indices], 2.0)) return self.Sigma_ww_thermo if g_param_engine is None: - g_param_engine = ThermoParameterUncertainty(other_covariances=self.thermo_covariances_dict) + g_param_engine = ThermoParameterUncertainty(other_covariances=self.thermo_covariances_dict, rank_dictionary=self.used_rank) - self.all_thermo_intermediates = set() + + # the goal is to construct a list of unique tuples (q_label, q_partial_uncertainty) + # but first, find the unique list of intermediate thermo sources used + self.thermo_intermediates = {} for sp_idx in subset_indices: - for q in self.dG_dqs[sp_idx].keys(): - self.all_thermo_intermediates.add(q) - self.all_thermo_intermediates = list(self.all_thermo_intermediates) - W = len(self.all_thermo_intermediates) + for q_label in self.thermo_intermediate_uncertainties[sp_idx].keys(): + self.thermo_intermediates[q_label] = self.thermo_intermediate_uncertainties[sp_idx][q_label] + q_uncertainties = [uncertainty for q_label, uncertainty in self.thermo_intermediates.items()] + self.thermo_intermediates = [q_label for q_label in self.thermo_intermediates.keys()] + W = len(self.thermo_intermediates) self.Sigma_ww_thermo = np.zeros((W, W)) for i in range(W): - q_i = self.all_thermo_intermediates[i] + q_i_label = self.thermo_intermediates[i] + q_i_uncertainty = q_uncertainties[i] for j in range(i + 1): - q_j = self.all_thermo_intermediates[j] - self.Sigma_ww_thermo[i, j] = g_param_engine._get_covariance_qq(q_i, q_j) + q_j_label = self.thermo_intermediates[j] + self.Sigma_ww_thermo[i, j] = g_param_engine._get_covariance_qq(q_i_label, q_j_label, q_i_uncertainty) self.Sigma_ww_thermo[j, i] = self.Sigma_ww_thermo[i, j] # symmetric matrix return self.Sigma_ww_thermo @@ -1441,25 +1583,31 @@ def _get_intermediate_kinetics_covariance_matrix(self, k_param_engine=None, subs subset_indices = np.arange(len(self.reaction_list)) if isinstance(self.kinetic_input_uncertainties[0], np.float64): # uncorrelated case + self.kinetic_intermediates = None # in the uncorrelated case, we don't have specific intermediate parameters to track, so we can set this to None self.Sigma_ww_kinetics = np.diag(np.float_power([self.kinetic_input_uncertainties[i] for i in subset_indices], 2.0)) return self.Sigma_ww_kinetics if k_param_engine is None: - k_param_engine = KineticParameterUncertainty() + k_param_engine = KineticParameterUncertainty(rank_dictionary=self.used_rank) - self.all_kinetics_intermediates = set() + # the goal is to construct a list of unique tuples (q_label, q_partial_uncertainty) + # but first, find the unique list of intermediate kinetic sources used + self.kinetic_intermediates = {} for rxn_idx in subset_indices: - for q in self.dlnk_dqs[rxn_idx].keys(): - self.all_kinetics_intermediates.add(q) - self.all_kinetics_intermediates = list(self.all_kinetics_intermediates) - W = len(self.all_kinetics_intermediates) + for q in self.kinetic_intermediate_uncertainties[rxn_idx].keys(): + self.kinetic_intermediates[q] = self.kinetic_intermediate_uncertainties[rxn_idx][q] + q_uncertainties = [uncertainty for q_label, uncertainty in self.kinetic_intermediates.items()] + self.kinetic_intermediates = [q_label for q_label in self.kinetic_intermediates.keys()] + + W = len(self.kinetic_intermediates) self.Sigma_ww_kinetics = np.zeros((W, W)) for i in range(W): - q_i = self.all_kinetics_intermediates[i] + q_i_label = self.kinetic_intermediates[i] + q_i_uncertainty = q_uncertainties[i] for j in range(i + 1): - q_j = self.all_kinetics_intermediates[j] - self.Sigma_ww_kinetics[i, j] = k_param_engine._get_covariance_qq(q_i, q_j) + q_j_label = self.kinetic_intermediates[j] + self.Sigma_ww_kinetics[i, j] = k_param_engine._get_covariance_qq(q_i_label, q_j_label, q_i_uncertainty) self.Sigma_ww_kinetics[j, i] = self.Sigma_ww_kinetics[i, j] # symmetric matrix return self.Sigma_ww_kinetics @@ -1483,11 +1631,11 @@ def _get_dG_dq_matrix(self, subset_indices=None): if isinstance(self.thermo_input_uncertainties[0], np.float64): # uncorrelated case return np.eye(len(subset_indices)) - dGdq = np.zeros((len(subset_indices), len(self.all_thermo_intermediates))) + dGdq = np.zeros((len(subset_indices), len(self.thermo_intermediates))) for i, sp_idx in enumerate(subset_indices): for key in self.dG_dqs[sp_idx].keys(): - q_index = self.all_thermo_intermediates.index(key) + q_index = self.thermo_intermediates.index(key) dGdq[i, q_index] = self.dG_dqs[sp_idx][key] return dGdq @@ -1508,11 +1656,11 @@ def _get_dlnk_dq_matrix(self, subset_indices=None): if isinstance(self.kinetic_input_uncertainties[0], np.float64): # uncorrelated case return np.eye(len(subset_indices)) - dlnkdq = np.zeros((len(subset_indices), len(self.all_kinetics_intermediates))) + dlnkdq = np.zeros((len(subset_indices), len(self.kinetic_intermediates))) for i, rxn_idx in enumerate(subset_indices): for key in self.dlnk_dqs[rxn_idx].keys(): - q_index = self.all_kinetics_intermediates.index(key) + q_index = self.kinetic_intermediates.index(key) dlnkdq[i, q_index] = self.dlnk_dqs[rxn_idx][key] return dlnkdq