-
Notifications
You must be signed in to change notification settings - Fork 51
Expand file tree
/
Copy path_lambda_geological_feature.py
More file actions
181 lines (160 loc) · 6.71 KB
/
_lambda_geological_feature.py
File metadata and controls
181 lines (160 loc) · 6.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
"""
Geological features
"""
from LoopStructural.utils.maths import regular_tetraherdron_for_points, gradient_from_tetrahedron
from ...modelling.features import BaseFeature
from ...utils import getLogger
from ...modelling.features import FeatureType
import numpy as np
from typing import Callable, Optional
from ...utils import LoopValueError
logger = getLogger(__name__)
class LambdaGeologicalFeature(BaseFeature):
def __init__(
self,
function: Optional[Callable[[np.ndarray], np.ndarray]] = None,
name: str = "unnamed_lambda",
gradient_function: Optional[Callable[[np.ndarray], np.ndarray]] = None,
model=None,
regions: Optional[list] = None,
faults: Optional[list] = None,
builder=None,
):
"""A lambda geological feature is a wrapper for a geological
feature that has a function at the base. This can be then used
in place of a geological feature.
Parameters
----------
function : _type_, optional
_description_, by default None
name : str, optional
_description_, by default "unnamed_lambda"
gradient_function : _type_, optional
_description_, by default None
model : _type_, optional
_description_, by default None
regions : list, optional
_description_, by default []
faults : list, optional
_description_, by default []
builder : _type_, optional
_description_, by default None
"""
BaseFeature.__init__(self, name, model, faults if faults is not None else [], regions if regions is not None else [], builder)
self.type = FeatureType.LAMBDA
self.function = function
self.gradient_function = gradient_function
self.regions = regions if regions is not None else []
def evaluate_value(self, pos: np.ndarray, ignore_regions=False) -> np.ndarray:
"""_summary_
Parameters
----------
xyz : np.ndarray
_description_
Returns
-------
np.ndarray
_description_
"""
v = np.zeros((pos.shape[0]))
v[:] = np.nan
# Precompute each fault's scalar value (gx = fault.__getitem__(0).evaluate_value)
# once and reuse for both the region mask check and fault application.
# FaultSegment.evaluate_value(pos) == self.__getitem__(0).evaluate_value(pos) == gx.
# apply_to_points also needs gx to determine the displacement mask — passing it
# avoids the duplicate evaluation on the np.copy of pos.
precomputed_gx = {id(f): f.evaluate_value(pos) for f in self.faults}
# Region mask: pass precomputed gx so PositiveRegion/NegativeRegion skip re-evaluation
mask = np.ones(pos.shape[0], dtype=bool)
if not ignore_regions:
for r in self.regions:
pre = precomputed_gx.get(id(getattr(r, 'feature', None)))
mask = np.logical_and(mask, r(pos, precomputed_val=pre))
# Apply faults: pass precomputed gx so apply_to_points skips one evaluate_value call
if self.faults_enabled:
for f in self.faults:
pos = f.apply_to_points(pos, precomputed_gx=precomputed_gx.get(id(f)))
if self.function is not None:
v[mask] = self.function(pos[mask,:])
return v
def evaluate_gradient(self, pos: np.ndarray, ignore_regions=False,element_scale_parameter=None) -> np.ndarray:
"""_summary_
Parameters
----------
xyz : np.ndarray
_description_
Returns
-------
np.ndarray
_description_
"""
if pos.shape[1] != 3:
raise LoopValueError("Need Nx3 array of xyz points to evaluate gradient")
logger.info(f'Calculating gradient for {self.name}')
if element_scale_parameter is None:
if self.model is not None:
element_scale_parameter = np.min(self.model.bounding_box.step_vector) / 10
else:
element_scale_parameter = 1
else:
try:
element_scale_parameter = float(element_scale_parameter)
except ValueError:
logger.error("element_scale_parameter must be a float")
element_scale_parameter = 1
v = np.zeros((pos.shape[0], 3))
v = np.zeros(pos.shape)
v[:] = np.nan
mask = self._calculate_mask(pos, ignore_regions=ignore_regions)
# evaluate the faults on the nodes of the faulted feature support
# then evaluate the gradient at these points
if len(self.faults) > 0:
# generate a regular tetrahedron for each point
# we will then move these points by the fault and then recalculate the gradient.
# this should work...
resolved = False
tetrahedron = regular_tetraherdron_for_points(pos, element_scale_parameter)
while resolved:
for f in self.faults:
v = (
f[0]
.evaluate_value(tetrahedron.reshape(-1, 3), fillnan='nearest')
.reshape(tetrahedron.shape[0], 4)
)
flag = np.logical_or(np.all(v > 0, axis=1), np.all(v < 0, axis=1))
if np.any(~flag):
logger.warning(
f"Points are too close to fault {f[0].name}. Refining the tetrahedron"
)
element_scale_parameter *= 0.5
tetrahedron = regular_tetraherdron_for_points(pos, element_scale_parameter)
resolved = True
tetrahedron_faulted = self._apply_faults(np.array(tetrahedron.reshape(-1, 3))).reshape(
tetrahedron.shape
)
values = self.function(tetrahedron_faulted.reshape(-1, 3)).reshape(
(-1, 4)
)
v[mask, :] = gradient_from_tetrahedron(tetrahedron[mask, :, :], values[mask])
return v
if self.gradient_function is None:
v[:, :] = np.nan
else:
v[:, :] = self.gradient_function(pos)
return v
def get_data(self, value_map: Optional[dict] = None):
return
def copy(self, name: Optional[str] = None):
return LambdaGeologicalFeature(
self.function,
name if name is not None else f'{self.name}_copy',
self.gradient_function,
self.model,
self.regions,
self.faults,
self.builder,
)
def is_valid(self):
if self.function is None and self.gradient_function is None:
return False
return True