Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions src/azul/field_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
)

from more_itertools import (
first,
one,
)

Expand Down Expand Up @@ -429,15 +428,27 @@ def from_index(self, value: str) -> str | None:
null_datetime: NullableDateTime = NullableDateTime(str, str)


class Nested(PassThrough[JSON]):
class Nested(FieldType[JSON, JSON]):
allow_sorting_by_empty_lists = False
es_type = 'nested'
properties: Mapping[str, FieldType]
agg_property: str

def __init__(self, **properties):
super().__init__(JSON, es_type='nested')
self.agg_property = first(properties.keys())
super().__init__(JSON, JSON)
self.properties = properties

def to_index(self, value: JSON) -> JSON:
return {
field: field_type.to_index(value[field])
for field, field_type in self.properties.items()
}

def from_index(self, value: JSON) -> JSON:
return {
field: field_type.from_index(value[field])
for field, field_type in self.properties.items()
}
Comment on lines +440 to +450
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment was generated by Claude Code.

With these two overrides, does it still make sense for Nested to inherit PassThrough?

The defining characteristic of PassThrough is that to_index and from_index are identity operations. After this change, Nested overrides both with non-trivial logic that delegates to per-property field types — the values are no longer "passed through" unchanged.

What Nested still gets from PassThrough:

  1. The es_type property (trivial — stores and returns _es_type)
  2. allow_sorting_by_empty_lists = False
  3. The __init__ convenience of setting both native_form and index_form to the same type (JSON)

All three are easy to replicate by inheriting FieldType[JSON, JSON] directly.


def api_filter_values_schema(self, operator: str, mode: Mode) -> JSON:
assert operator == 'is'
schema = super().api_filter_values_schema(operator, mode)
Expand Down
23 changes: 22 additions & 1 deletion src/azul/indexer/document_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
FieldTypes,
FieldTypes1,
Nested,
pass_thru_bool,
)
from azul.indexer.document import (
Aggregate,
Expand Down Expand Up @@ -96,7 +97,6 @@
if isinstance(field_types, Nested):
element = next(elements, None)
if element is not None:
assert element == field_types.agg_property, (element, field_types)
field_types = field_types.properties[element]
assert isinstance(field_types, FieldType), (path, field_types)
element = next(elements, None)
Expand All @@ -122,6 +122,27 @@
# does not undergo translation
)

@cache

Check warning

Code scanning / CodeQL

Use of the return value of a procedure Warning

The result of
cache
is used even though it is always None.
def field_types_by_name(self, catalog: CatalogName) -> Mapping[str, FieldType]:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PL

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: rename method, use FieldName instead of str.

"""
Returns the field type for each supported sort and filter field, using
the name of the field as provided by clients. Unlike field_types(), this
is a flat mapping and includes the synthetic field 'accessible' that has
no entry in the plugin's field_mapping.

:return: dict with field names as keys and each field's type as value
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:return: dict with field names as keys and each field's type as value
:return: a mapping from each field's name to its type

"""
plugin = self.metadata_plugin(catalog)
result = {}
for field, path in plugin.field_mapping.items():
field_type = self.field_type(catalog, path)
if isinstance(field_type, FieldType):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PL

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: investigate in which case field_type is not of type FieldType

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The returned value from DocumentService.field_type() is always a FieldType, and there is an assert in that method for that.

result[field] = field_type
accessible_field = plugin.special_fields.accessible.name
Comment thread
dsotirho-ucsc marked this conversation as resolved.
assert accessible_field not in result, result
result[accessible_field] = pass_thru_bool
return result

def catalogued_field_types(self) -> CataloguedFieldTypes:
return {
catalog: self.field_types(catalog)
Expand Down
9 changes: 8 additions & 1 deletion src/azul/plugins/metadata/hca/indexer/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,14 @@ def _project(self, project: api.Project) -> MutableJSON:
'accessions': list(map(self._accession, project.accessions)),
'is_tissue_atlas_project': any(bionetwork.atlas_project
for bionetwork in project.bionetworks),
'tissue_atlas': list(map(self._tissue_atlas, project.bionetworks)),
# We deduplicate the `tissue_atlas` field values since duplicate
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PL

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: replace custom deduplication with an existing mechanism, and investigate why this is necessary since a SetOfDictAccumulator is used during aggregation.

# values in a nested field would cause incorrect term facet totals.
'tissue_atlas': [
dict(d) for d in dict.fromkeys(
tuple(self._tissue_atlas(b).items())
for b in project.bionetworks
)
],
'bionetwork_name': json_sorted(bionetwork.name
for bionetwork in project.bionetworks),
'estimated_cell_count': project.estimated_cell_count,
Expand Down
23 changes: 17 additions & 6 deletions src/azul/plugins/metadata/hca/service/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
one,
)

from azul.field_type import (
Nested,
)
from azul.lib import (
cached_property,
)
Expand Down Expand Up @@ -558,9 +561,11 @@ def file_type_summary(aggregate_file: JSON) -> FileTypeSummaryForHit:
]
return summarized_hit

def make_terms(self, agg) -> Terms:
def choose_entry(_term):
if 'key_as_string' in _term:
def make_terms(self, field_type, agg) -> Terms:
def choose_entry(_term, nested_keys):
if nested_keys is not None:
return dict(zip(nested_keys, _term['key']))
elif 'key_as_string' in _term:
return _term['key_as_string']
elif (term_key := _term['key']) is None:
return None
Expand All @@ -571,10 +576,15 @@ def choose_entry(_term):
else:
return str(term_key)

if isinstance(field_type, Nested):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PL

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Rename nested_keys to nested_property_names. Remove nested_keys param from inner function choose_entry(). Populate nested_property_names with field_type.properties instead of agg['myTerms']['meta']['paths']

nested_keys = [path[-1] for path in agg['myTerms']['meta']['paths']]
else:
nested_keys = None
terms: list[Term] = []
for bucket in agg['myTerms']['buckets']:
term = Term(term=choose_entry(bucket),
count=bucket['doc_count'])
doc_count = bucket['doc_count']
term = Term(term=choose_entry(bucket, nested_keys),
count=doc_count)
try:
sub_agg = bucket['myProjectIds']
except KeyError:
Expand Down Expand Up @@ -605,8 +615,9 @@ def choose_entry(_term):
type='terms')

def make_facets(self, aggs: JSON) -> dict[str, Terms]:
field_types = self.service.field_types_by_name(self.catalog)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please familiarize yourself with what the "Pull up method" refactoring does. This change is not that.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PL

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: rename commit title

facets = {}
for facet, agg in aggs.items():
if facet != '_project_agg': # Filter out project specific aggs
facets[facet] = self.make_terms(agg)
facets[facet] = self.make_terms(field_types[facet], agg)
return facets
18 changes: 1 addition & 17 deletions src/azul/service/query_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from azul.field_type import (
FieldType,
Mode,
pass_thru_bool,
)
from azul.lib import (
cache,
Expand Down Expand Up @@ -224,19 +223,4 @@ def _filter_schema_validator(self,

@cache
def _field_types(self, catalog: CatalogName) -> Mapping[str, FieldType]:
"""
Returns the field type for each supported sort and filter field, using
the name of the field as provided by clients.
"""
result = {}
plugin = self._metadata_plugin
for field, path in plugin.field_mapping.items():
field_type = self._service.field_type(catalog, path)
if isinstance(field_type, FieldType):
result[field] = field_type
# This field is a synthetic element of the response and will never be
# null. Including it here helps to streamline request validation.
accessible_field = plugin.special_fields.accessible.name
assert accessible_field not in result, result
result[accessible_field] = pass_thru_bool
return result
return self._service.field_types_by_name(catalog)
81 changes: 59 additions & 22 deletions src/azul/service/query_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
)
from opensearchpy.helpers.aggs import (
Agg,
MultiTerms,
Terms,
)
from opensearchpy.helpers.query import (
Expand Down Expand Up @@ -327,21 +328,35 @@ def _prepare_aggregation(self, *, facet: str, facet_path: FieldPath) -> Agg:

field_type = self.service.field_type(self.catalog, facet_path)
if isinstance(field_type, Nested):
nested_agg = agg.bucket(name='nested',
agg_type='nested',
path=dotted(facet_path))
facet_path = dotted(facet_path, field_type.agg_property)
path = dotted(facet_path)
# A nested aggregation to aggregate on fields inside a nested field
agg.bucket(name='nested',
agg_type='nested',
path=path)
# A multi-terms aggregation to form composite keys made from the
# fields inside a nested field
agg.aggs.nested.bucket(name='myTerms',
agg_type='multi_terms',
terms=[
{'field': path + f'.{field}.keyword'}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PL

for field in field_type.properties
],
size=config.terms_aggregation_size)
# A filter aggregation to work around that we can't use a missing
# aggregation with a nested field.
# See https://github.com/elastic/elasticsearch/issues/9571
agg.bucket(name='untagged',
agg_type='filter',
filter=Q('bool', must_not=[
Q('nested', path=path, query=Q('exists', field=path))
]))
else:
nested_agg = agg
# Make an inner agg that will contain the terms in question
path = dotted(facet_path, 'keyword')
# FIXME: Approximation errors for terms aggregation are unchecked
# https://github.com/DataBiosphere/azul/issues/3413
nested_agg.bucket(name='myTerms',
agg_type='terms',
field=path,
size=config.terms_aggregation_size)
nested_agg.bucket('untagged', 'missing', field=path)
path = dotted(facet_path, 'keyword')
agg.bucket(name='myTerms',
agg_type='terms',
field=path,
size=config.terms_aggregation_size)
agg.bucket('untagged', 'missing', field=path)
return agg

def _annotate_aggs_for_translation(self, request: Search):
Expand All @@ -352,13 +367,23 @@ def _annotate_aggs_for_translation(self, request: Search):
"""

def annotate(agg: Agg):
if isinstance(agg, Terms):
path = agg.field.split('.')
if path[-1] == 'keyword':
path.pop()
if isinstance(agg, (Terms, MultiTerms)):
if not hasattr(agg, 'meta'):
agg.meta = {}
agg.meta['path'] = path
if isinstance(agg, Terms):
# A Terms agg is for a single field, so we only put one
# field path in `paths`.
path = agg.field.removesuffix('.keyword').split('.')
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PL

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Remove `.removesuffix('.keyword'), instead split first with an existing method, assert last element is 'keyword' and remove.

agg.meta['paths'] = [path]
else:
# A MultiTerms agg contains multiple fields, so we need the
# path of each one. By storing these paths in the same order
# that the fields occur in `agg.terms`, we can later pair
# these paths to the values in the aggregation buckets.
agg.meta['paths'] = []
for term in agg.terms:
path = term['field'].removesuffix('.keyword').split('.')
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PL

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: remove duplication between if/else branches.

agg.meta['paths'].append(path)
if hasattr(agg, 'aggs'):
subs = agg.aggs
for sub_name in subs:
Expand Down Expand Up @@ -391,13 +416,25 @@ def translate(k, v: MutableJSON):
translate(k, v)
else:
try:
path = v['meta']['path']
# `paths` is a key we added to `meta` to have available here
# when processing the response. Each path is a list (e.g.
# ['contents', 'projects', 'document_id']) and `paths` will
# have only one path in the case of a Terms aggregation, or
# many paths in the case of a MultiTerms aggregation.
paths = v['meta']['paths']
except KeyError:
pass
else:
field_type = self.service.field_type(self.catalog, tuple(path))
for i, path in enumerate(paths):
field_type = self.service.field_type(self.catalog, tuple(path))
for bucket in buckets:
# If the bucket is from a MultiTemrms aggregation
if isinstance(bucket['key'], list):
bucket['key'][i] = field_type.from_index(bucket['key'][i])
# If the bucket is from a Terms aggregation
else:
bucket['key'] = field_type.from_index(bucket['key'])
for bucket in buckets:
bucket['key'] = field_type.from_index(bucket['key'])
translate(k, bucket)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PL to explain why

Copy link
Copy Markdown
Contributor Author

@dsotirho-ucsc dsotirho-ucsc Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was mistaken in thinking that this is no longer needed. We still have occurrences of nested agg buckets such as in the HCA agg project.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't ignore my PL request.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided in PL to refactor the changes to the translation of aggregates (_translate_response_aggs()) in order to remove duplication. This will be done by using the existing meta field path when annotating both kinds of aggregates (MultiTerms and not MultiTerms), instead of adding a new meta field paths specifically for MultiTerms aggregates.


for k, v in aggs.items():
Expand Down
35 changes: 35 additions & 0 deletions test/indexer/test_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1746,6 +1746,41 @@ def test_organoid_priority(self):
self.assertEqual(inner_cell_suspensions_in_contributions + inner_cell_suspensions_in_aggregates,
inner_cell_suspensions)

def test_nested_field_aggregation(self):
bundles = [
# Bundles with the following tissue_atlas (atlas/version) values:
# [None/None (x2), Lung/None, Retina/v1.0, Blood/v1.0]
self.bundle_fqid(uuid='2c7d06b8-658e-4c51-9de4-a768322f84c5',
version='2021-09-21T17:27:23.898000Z'),
# [Blood/v1.0]
self.bundle_fqid(uuid='587d74b4-1075-4bbf-b96a-4d1ede0481b2',
version='2018-10-10T02:23:43.182000Z'),
# [] (none)
self.bundle_fqid(uuid='97f0cc83-f0ac-417a-8a29-221c77debde8',
version='2019-10-14T19:54:15.397406Z')
]
for bundle in bundles:
self._index_canned_bundle(bundle)
hits = self._get_all_hits()
expected = {
'50151324-f3ed-4358-98af-ec352a940a61': [
{'atlas': '~null', 'version': '~null'},
{'atlas': 'Lung', 'version': '~null'},
{'atlas': 'Retina', 'version': 'v1.0'},
{'atlas': 'Blood', 'version': 'v1.0'}
],
'6615efae-fca8-4dd2-a223-9cfcf30fe94d': [
{'atlas': 'Blood', 'version': 'v1.0'}
],
'4e6f083b-5b9a-4393-9890-2a83da8188f1': [
]
}
for hit in self._filter_hits(hits, DocumentType.aggregate, 'projects'):
contents = hit['_source']['contents']
project = cast(JSON, one(contents['projects']))
project_id = project['document_id']
self.assertEqual(expected[project_id], project['tissue_atlas'])

def test_accessions_fields(self):
bundle_fqid = self.bundle_fqid(uuid='fa5be5eb-2d64-49f5-8ed8-bd627ac9bc7a',
version='2019-02-14T19:24:38.034764Z')
Expand Down
2 changes: 1 addition & 1 deletion test/service/test_app_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def filter_body(organ: str) -> JSON:
elif debug == 1:
expected_log = f'… with a response body starting in {body[:prefix_len]}'
elif debug > 1:
expected_log = f'… with a response body of length 9137 being {body}'
expected_log = f'… with a response body of length 9163 being {body}'
else:
assert False
self.assertEqual(expected_log, body_log_message)
Expand Down
2 changes: 1 addition & 1 deletion test/service/test_request_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ def test_create_aggregate(self):
'size': 99999
},
'meta': {
'path': ['path', 'to', 'foo']
'paths': [['path', 'to', 'foo']]
}
},
'untagged': {
Expand Down
Loading
Loading