From 6413b96424059d73887fcefc8f3c101095c087f9 Mon Sep 17 00:00:00 2001 From: ColinLee Date: Wed, 24 Jun 2026 17:16:44 +0800 Subject: [PATCH 1/5] fix tag null segsegv. --- cpp/src/cwrapper/tsfile_cwrapper.cc | 15 ++- cpp/src/cwrapper/tsfile_cwrapper.h | 5 +- cpp/src/reader/filter/tag_filter.cc | 60 ++++++++-- cpp/src/reader/filter/tag_filter.h | 17 +++ cpp/test/reader/filter/tag_filter_test.cc | 83 +++++++++++++ python/tests/test_tsfile_dataset.py | 139 +++++++++++++++++++--- python/tsfile/__init__.py | 2 + python/tsfile/dataset/reader.py | 39 ++++-- python/tsfile/tag_filter.py | 23 +++- python/tsfile/tsfile_cpp.pxd | 2 + 10 files changed, 342 insertions(+), 43 deletions(-) diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index 0934981f9..d9e19fb6b 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -1118,8 +1118,13 @@ int duplicate_ideviceid_to_device_fields(storage::IDeviceID* id, for (int i = 0; i < n; i++) { const std::string* ps = (static_cast(i) < segs.size()) ? segs[i] : nullptr; - const char* lit = (ps != nullptr) ? ps->c_str() : "null"; - seg_arr[i] = strdup(lit); + // A null tag segment is exposed as a NULL pointer so callers can + // distinguish a missing/null tag from the literal string "null". + if (ps == nullptr) { + seg_arr[i] = nullptr; + continue; + } + seg_arr[i] = strdup(ps->c_str()); if (seg_arr[i] == nullptr) { for (int j = 0; j < i; j++) { free(seg_arr[j]); @@ -1627,6 +1632,12 @@ TagFilterHandle tsfile_tag_filter_create(TsFileReader reader, case TAG_FILTER_NOT_REGEXP: filter = builder.not_reg_exp(column_name, value); break; + case TAG_FILTER_IS_NULL: + filter = builder.is_null(column_name); + break; + case TAG_FILTER_IS_NOT_NULL: + filter = builder.is_not_null(column_name); + break; default: *err_code = common::E_INVALID_ARG; return nullptr; diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h b/cpp/src/cwrapper/tsfile_cwrapper.h index ae3e28eed..4471da89e 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.h +++ b/cpp/src/cwrapper/tsfile_cwrapper.h @@ -875,6 +875,8 @@ typedef enum { TAG_FILTER_GTEQ = 5, TAG_FILTER_REGEXP = 6, TAG_FILTER_NOT_REGEXP = 7, + TAG_FILTER_IS_NULL = 8, + TAG_FILTER_IS_NOT_NULL = 9, } TagFilterOp; /** @@ -884,7 +886,8 @@ typedef enum { * index). * @param table_name [in] Table name whose schema defines the TAG columns. * @param column_name [in] Name of the TAG column to filter on. - * @param value [in] Comparison value (string). + * @param value [in] Comparison value (string). Ignored for + * TAG_FILTER_IS_NULL / TAG_FILTER_IS_NOT_NULL (may be NULL). * @param op [in] Comparison operator (TagFilterOp). * @param err_code [out] Error code. E_OK(0) on success. * @return TagFilterHandle on success; NULL on failure. diff --git a/cpp/src/reader/filter/tag_filter.cc b/cpp/src/reader/filter/tag_filter.cc index f92c9ef86..0115cc389 100644 --- a/cpp/src/reader/filter/tag_filter.cc +++ b/cpp/src/reader/filter/tag_filter.cc @@ -44,7 +44,8 @@ TagEq::TagEq(int col_idx, std::string tag_value) : TagFilter(col_idx, std::move(tag_value)) {} bool TagEq::satisfyRow(std::vector segments) const { - if (col_idx_ >= segments.size()) return false; + if (col_idx_ >= segments.size() || segments[col_idx_] == nullptr) + return false; return *segments[col_idx_] == value_; } @@ -53,7 +54,8 @@ TagNeq::TagNeq(int col_idx, std::string tag_value) : TagFilter(col_idx, std::move(tag_value)) {} bool TagNeq::satisfyRow(std::vector segments) const { - if (col_idx_ >= segments.size()) return false; + if (col_idx_ >= segments.size() || segments[col_idx_] == nullptr) + return false; return *segments[col_idx_] != value_; } @@ -62,7 +64,8 @@ TagLt::TagLt(int col_idx, std::string tag_value) : TagFilter(col_idx, std::move(tag_value)) {} bool TagLt::satisfyRow(std::vector segments) const { - if (col_idx_ >= segments.size()) return false; + if (col_idx_ >= segments.size() || segments[col_idx_] == nullptr) + return false; return *segments[col_idx_] < value_; } @@ -71,7 +74,8 @@ TagLteq::TagLteq(int col_idx, std::string tag_value) : TagFilter(col_idx, std::move(tag_value)) {} bool TagLteq::satisfyRow(std::vector segments) const { - if (col_idx_ >= segments.size()) return false; + if (col_idx_ >= segments.size() || segments[col_idx_] == nullptr) + return false; return *segments[col_idx_] <= value_; } @@ -80,7 +84,8 @@ TagGt::TagGt(int col_idx, std::string tag_value) : TagFilter(col_idx, std::move(tag_value)) {} bool TagGt::satisfyRow(std::vector segments) const { - if (col_idx_ >= segments.size()) return false; + if (col_idx_ >= segments.size() || segments[col_idx_] == nullptr) + return false; return *segments[col_idx_] > value_; } @@ -89,7 +94,8 @@ TagGteq::TagGteq(int col_idx, std::string tag_value) : TagFilter(col_idx, std::move(tag_value)) {} bool TagGteq::satisfyRow(std::vector segments) const { - if (col_idx_ >= segments.size()) return false; + if (col_idx_ >= segments.size() || segments[col_idx_] == nullptr) + return false; return *segments[col_idx_] >= value_; } @@ -105,7 +111,9 @@ TagRegExp::TagRegExp(int col_idx, std::string tag_value) } bool TagRegExp::satisfyRow(std::vector segments) const { - if (col_idx_ >= segments.size() || !is_valid_pattern_) return false; + if (col_idx_ >= segments.size() || segments[col_idx_] == nullptr || + !is_valid_pattern_) + return false; try { return std::regex_search(*segments[col_idx_], pattern_); } catch (const std::regex_error&) { @@ -125,7 +133,9 @@ TagNotRegExp::TagNotRegExp(int col_idx, std::string tag_value) } bool TagNotRegExp::satisfyRow(std::vector segments) const { - if (col_idx_ >= segments.size() || !is_valid_pattern_) return false; + if (col_idx_ >= segments.size() || segments[col_idx_] == nullptr || + !is_valid_pattern_) + return false; try { return !std::regex_search(*segments[col_idx_], pattern_); } catch (const std::regex_error&) { @@ -133,6 +143,22 @@ bool TagNotRegExp::satisfyRow(std::vector segments) const { } } +// TagIsNull implementation +TagIsNull::TagIsNull(int col_idx) : TagFilter(col_idx, "") {} + +bool TagIsNull::satisfyRow(std::vector segments) const { + // A tag is null when its segment is an explicit null pointer or when the + // device id omits the (trailing) segment entirely. + return col_idx_ >= segments.size() || segments[col_idx_] == nullptr; +} + +// TagIsNotNull implementation +TagIsNotNull::TagIsNotNull(int col_idx) : TagFilter(col_idx, "") {} + +bool TagIsNotNull::satisfyRow(std::vector segments) const { + return col_idx_ < segments.size() && segments[col_idx_] != nullptr; +} + // TagBetween implementation TagBetween::TagBetween(int col_idx, std::string lower_value, std::string upper_value) @@ -141,7 +167,8 @@ TagBetween::TagBetween(int col_idx, std::string lower_value, } bool TagBetween::satisfyRow(std::vector segments) const { - if (col_idx_ >= segments.size()) return false; + if (col_idx_ >= segments.size() || segments[col_idx_] == nullptr) + return false; const std::string& segment_value = *segments[col_idx_]; return segment_value >= value_ && segment_value <= value2_; } @@ -154,7 +181,8 @@ TagNotBetween::TagNotBetween(int col_idx, std::string lower_value, } bool TagNotBetween::satisfyRow(std::vector segments) const { - if (col_idx_ >= segments.size()) return false; + if (col_idx_ >= segments.size() || segments[col_idx_] == nullptr) + return false; const std::string& segment_value = *segments[col_idx_]; return segment_value < value_ || segment_value > value2_; } @@ -254,6 +282,18 @@ Filter* TagFilterBuilder::not_reg_exp(const std::string& columnName, return new TagNotRegExp(idx, value); } +Filter* TagFilterBuilder::is_null(const std::string& columnName) { + auto idx = get_id_column_index(columnName); + if (idx < 0) return nullptr; + return new TagIsNull(idx); +} + +Filter* TagFilterBuilder::is_not_null(const std::string& columnName) { + auto idx = get_id_column_index(columnName); + if (idx < 0) return nullptr; + return new TagIsNotNull(idx); +} + Filter* TagFilterBuilder::between_and(const std::string& columnName, const std::string& lower, const std::string& upper) { diff --git a/cpp/src/reader/filter/tag_filter.h b/cpp/src/reader/filter/tag_filter.h index b858be8c9..6e84dcf50 100644 --- a/cpp/src/reader/filter/tag_filter.h +++ b/cpp/src/reader/filter/tag_filter.h @@ -106,6 +106,21 @@ class TagNotRegExp : public TagFilter { bool satisfyRow(std::vector segments) const override; }; +// IS NULL: tag column has no value for this device. An absent trailing +// segment (col_idx_ beyond the device's segment count) is also treated as null. +class TagIsNull : public TagFilter { + public: + explicit TagIsNull(int col_idx); + bool satisfyRow(std::vector segments) const override; +}; + +// IS NOT NULL: tag column has a concrete value for this device. +class TagIsNotNull : public TagFilter { + public: + explicit TagIsNotNull(int col_idx); + bool satisfyRow(std::vector segments) const override; +}; + // Range query [value_, value2_] class TagBetween : public TagFilter { public: @@ -171,6 +186,8 @@ class TagFilterBuilder { Filter* reg_exp(const std::string& columnName, const std::string& value); Filter* not_reg_exp(const std::string& columnName, const std::string& value); + Filter* is_null(const std::string& columnName); + Filter* is_not_null(const std::string& columnName); Filter* between_and(const std::string& columnName, const std::string& lower, const std::string& upper); Filter* not_between_and(const std::string& columnName, diff --git a/cpp/test/reader/filter/tag_filter_test.cc b/cpp/test/reader/filter/tag_filter_test.cc index 0274d2424..02ce64b85 100644 --- a/cpp/test/reader/filter/tag_filter_test.cc +++ b/cpp/test/reader/filter/tag_filter_test.cc @@ -414,4 +414,87 @@ TEST_F(TagFilterTest, TagRegExpEdgeCases) { delete invalid_filter; delete empty_filter; +} + +// A null tag segment must not crash comparison filters; a null value never +// satisfies a concrete-value predicate (SQL UNKNOWN -> not matched). +TEST_F(TagFilterTest, ComparisonFiltersTreatNullSegmentAsNoMatch) { + // "name" (col 1) is an explicit null pointer. + std::vector segments = {nullptr, nullptr, + new std::string("25"), + new std::string("engineering")}; + + auto eq = builder_->eq("name", "john"); + auto neq = builder_->neq("name", "john"); + auto lt = builder_->lt("name", "z"); + auto gt = builder_->gt("name", "a"); + auto between = builder_->between_and("name", "a", "z"); + auto reg = builder_->reg_exp("name", ".*"); + + EXPECT_FALSE(eq->satisfyRow(0, segments)); + EXPECT_FALSE(neq->satisfyRow(0, segments)); + EXPECT_FALSE(lt->satisfyRow(0, segments)); + EXPECT_FALSE(gt->satisfyRow(0, segments)); + EXPECT_FALSE(between->satisfyRow(0, segments)); + EXPECT_FALSE(reg->satisfyRow(0, segments)); + + delete eq; + delete neq; + delete lt; + delete gt; + delete between; + delete reg; + delete segments[2]; + delete segments[3]; +} + +// IS NULL filter +TEST_F(TagFilterTest, TagIsNullFilter) { + auto filter = builder_->is_null("name"); + ASSERT_NE(filter, nullptr); + + // "name" is an explicit null pointer. + std::vector null_seg = {nullptr, nullptr, + new std::string("25"), + new std::string("engineering")}; + EXPECT_TRUE(filter->satisfyRow(0, null_seg)); + delete null_seg[2]; + delete null_seg[3]; + + // "name" has a concrete value. + auto present = createSegments("john", "25", "engineering"); + EXPECT_FALSE(filter->satisfyRow(0, present)); + cleanupSegments(present); + + // A trailing tag column omitted from the device id (segment count too + // small) is also treated as null. + auto trailing = builder_->is_null("score"); // col_idx 5 + std::vector short_seg = {nullptr, new std::string("john")}; + EXPECT_TRUE(trailing->satisfyRow(0, short_seg)); + delete short_seg[1]; + + delete filter; + delete trailing; +} + +// IS NOT NULL filter +TEST_F(TagFilterTest, TagIsNotNullFilter) { + auto filter = builder_->is_not_null("name"); + ASSERT_NE(filter, nullptr); + + auto present = createSegments("john", "25", "engineering"); + EXPECT_TRUE(filter->satisfyRow(0, present)); + cleanupSegments(present); + + std::vector null_seg = {nullptr, nullptr}; + EXPECT_FALSE(filter->satisfyRow(0, null_seg)); + + // An omitted trailing tag is null, so IS NOT NULL is false. + auto trailing = builder_->is_not_null("score"); + std::vector short_seg = {nullptr, new std::string("john")}; + EXPECT_FALSE(trailing->satisfyRow(0, short_seg)); + delete short_seg[1]; + + delete filter; + delete trailing; } \ No newline at end of file diff --git a/python/tests/test_tsfile_dataset.py b/python/tests/test_tsfile_dataset.py index f79a6d466..c25ed76f3 100644 --- a/python/tests/test_tsfile_dataset.py +++ b/python/tests/test_tsfile_dataset.py @@ -290,6 +290,74 @@ def test_dataset_loc_aligns_timestamp_union_and_preserves_requested_order(tmp_pa assert aligned.values[2, 1] == 30.0 +def test_dataset_reads_nullable_tag_devices_in_isolation(tmp_path): + path = tmp_path / "nullable_tags.tsfile" + schema = TableSchema( + "sensors", + [ + ColumnSchema("region", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("temperature", TSDataType.DOUBLE, ColumnCategory.FIELD), + ], + ) + # Non-trailing null: region IS NULL, device='alpha'. + null_region = pd.DataFrame( + { + "time": [0, 1, 2], + "region": [None, None, None], + "device": ["alpha", "alpha", "alpha"], + "temperature": [10.0, 11.0, 12.0], + } + ) + # Trailing null: region='north', device IS NULL. Shares the region prefix + # with the fully specified device below to exercise device isolation. + null_device = pd.DataFrame( + { + "time": [0, 1, 2], + "region": ["north", "north", "north"], + "device": [None, None, None], + "temperature": [20.0, 21.0, 22.0], + } + ) + full = pd.DataFrame( + { + "time": [0, 1, 2], + "region": ["north", "north", "north"], + "device": ["beta", "beta", "beta"], + "temperature": [30.0, 31.0, 32.0], + } + ) + with TsFileTableWriter(str(path), schema) as writer: + writer.write_dataframe(null_region) + writer.write_dataframe(null_device) + writer.write_dataframe(full) + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + series = tsdf.list_timeseries() + # Null tags are dropped from the compressed logical path. + assert set(series) == { + "sensors.alpha.temperature", + "sensors.north.temperature", + "sensors.north.beta.temperature", + } + + ordered = sorted(series) + aligned = tsdf.loc[:, ordered] + by_name = dict(zip(aligned.series_names, aligned.values.T)) + + # Non-trailing null device reads its own data (previously crashed / NaN). + np.testing.assert_array_equal( + by_name["sensors.alpha.temperature"], np.array([10.0, 11.0, 12.0]) + ) + # Trailing-null device must NOT merge with the fully specified north.beta. + np.testing.assert_array_equal( + by_name["sensors.north.temperature"], np.array([20.0, 21.0, 22.0]) + ) + np.testing.assert_array_equal( + by_name["sensors.north.beta.temperature"], np.array([30.0, 31.0, 32.0]) + ) + + def test_dataset_loc_supports_single_timestamp_and_mixed_series_specifiers(tmp_path): path = tmp_path / "weather.tsfile" _write_weather_file(path, 0) @@ -826,24 +894,60 @@ class _Group: assert TsFileSeriesReader._metadata_tag_values(_Group(), 1) == ("device_a",) -def test_exact_tag_filter_rejects_none_tag_values(): - with pytest.raises(NotImplementedError, match="IS NULL / IS NOT NULL"): - _build_exact_tag_filter({"device": None}) - with pytest.raises(NotImplementedError, match="IS NULL / IS NOT NULL"): - _build_exact_tag_filter({"city": "beijing", "device": None}) +def test_exact_tag_filter_uses_is_null_for_none_tag_values(): + from tsfile.tag_filter import AndTagFilter, ComparisonTagFilter + + only_null = _build_exact_tag_filter({"device": None}) + assert isinstance(only_null, ComparisonTagFilter) + assert only_null.op == ComparisonTagFilter.IS_NULL + assert only_null.column_name == "device" + + mixed = _build_exact_tag_filter({"city": "beijing", "device": None}) + assert isinstance(mixed, AndTagFilter) + assert isinstance(mixed.left, ComparisonTagFilter) + assert mixed.left.op == ComparisonTagFilter.EQ + assert mixed.left.value == "beijing" + assert isinstance(mixed.right, ComparisonTagFilter) + assert mixed.right.op == ComparisonTagFilter.IS_NULL + assert mixed.right.column_name == "device" + + +def _tag_filter_has_is_null(tag_filter) -> bool: + from tsfile.tag_filter import ComparisonTagFilter + + if isinstance(tag_filter, ComparisonTagFilter): + return tag_filter.op == ComparisonTagFilter.IS_NULL + for attr in ("left", "right", "filter"): + child = getattr(tag_filter, attr, None) + if child is not None and _tag_filter_has_is_null(child): + return True + return False + + +def test_reader_exact_match_with_none_tag_values_issues_is_null_query(): + captured = {} + class _EmptyResultSet: + def __enter__(self): + return self + + def __exit__(self, *args): + return False + + def read_arrow_batch(self): + return None + + def next(self): + return False -def test_reader_exact_match_with_none_tag_values_fails_fast(): class _FakeNativeReader: def query_table(self, *args, **kwargs): - raise AssertionError( - "query should not be issued when None-tag exact matching is unsupported" - ) + captured["table"] = kwargs.get("tag_filter") + return _EmptyResultSet() def query_table_by_row(self, *args, **kwargs): - raise AssertionError( - "row query should not be issued when None-tag exact matching is unsupported" - ) + captured["row"] = kwargs.get("tag_filter") + return _EmptyResultSet() reader = object.__new__(TsFileSeriesReader) reader._reader = _FakeNativeReader() @@ -864,10 +968,13 @@ def query_table_by_row(self, *args, **kwargs): "timeline_max_time": 1, } - with pytest.raises(NotImplementedError, match="IS NULL / IS NOT NULL"): - reader.read_series_by_ref(device_id, 0, 0, 1) - with pytest.raises(NotImplementedError, match="IS NULL / IS NOT NULL"): - reader.read_series_by_row(device_id, 0, 0, 2) + # Both read paths now issue a native query that encodes the null tag as + # IS NULL instead of failing fast. + reader.read_series_by_ref(device_id, 0, 0, 1) + reader.read_series_by_row(device_id, 0, 0, 2) + + assert _tag_filter_has_is_null(captured["table"]) + assert _tag_filter_has_is_null(captured["row"]) def test_dataframe_resolves_named_sparse_tag_series_path(): diff --git a/python/tsfile/__init__.py b/python/tsfile/__init__.py index ac8b6b853..783d9ef46 100644 --- a/python/tsfile/__init__.py +++ b/python/tsfile/__init__.py @@ -81,6 +81,8 @@ def _preload_dll(path): tag_gteq, tag_regexp, tag_not_regexp, + tag_is_null, + tag_is_not_null, tag_between, tag_not_between, ) diff --git a/python/tsfile/dataset/reader.py b/python/tsfile/dataset/reader.py index 4899b2bf9..831926324 100644 --- a/python/tsfile/dataset/reader.py +++ b/python/tsfile/dataset/reader.py @@ -25,7 +25,7 @@ import numpy as np from ..constants import ColumnCategory, TSDataType -from ..tag_filter import tag_eq +from ..tag_filter import tag_eq, tag_is_null from ..tsfile_reader import TsFileReaderPy from .metadata import ( MetadataCatalog, @@ -48,23 +48,36 @@ def _to_python_scalar(value): return value.item() if hasattr(value, "item") else value -def _ensure_supported_exact_tag_values(tag_values: Dict[str, object]) -> None: - if any(tag_value is None for tag_value in tag_values.values()): - raise NotImplementedError( - "Exact tag matching with None tag values is not supported yet. " - "Native tag filter support for IS NULL / IS NOT NULL is required." - ) - - def _build_exact_tag_filter(tag_values: Dict[str, object]): - _ensure_supported_exact_tag_values(tag_values) + """Build a conjunctive filter that isolates exactly one device. + + A ``None`` tag value matches the device's null/missing tag via IS NULL so + that devices sharing the same non-null tags (for example a trailing-null + device versus a fully specified one) are not conflated. + """ tag_filter = None for tag_column, tag_value in tag_values.items(): - expr = tag_eq(tag_column, str(tag_value)) + if tag_value is None: + expr = tag_is_null(tag_column) + else: + expr = tag_eq(tag_column, str(tag_value)) tag_filter = expr if tag_filter is None else tag_filter & expr return tag_filter +def _device_exact_tag_values(table_entry, device_entry) -> Dict[str, object]: + """Map every declared tag column to this device's value (None when null/missing). + + ``device_entry.tag_values`` drops trailing null tags, so columns beyond its + length are treated as null rather than omitted from the exact-match filter. + """ + device_tag_values = device_entry.tag_values + return { + column: device_tag_values[idx] if idx < len(device_tag_values) else None + for idx, column in enumerate(table_entry.tag_columns) + } + + class TsFileSeriesReader: """Wrap ``TsFileReaderPy`` with numeric dataset discovery and batch reads.""" @@ -362,7 +375,7 @@ def read_series_by_row( table_entry, device_entry, field_name = self._resolve_series_ref( device_id, field_idx ) - tag_values = dict(zip(table_entry.tag_columns, device_entry.tag_values)) + tag_values = _device_exact_tag_values(table_entry, device_entry) tag_filter = _build_exact_tag_filter(tag_values) if tag_values else None # Some native row-query paths stop at an internal block boundary even @@ -416,7 +429,7 @@ def read_device_fields_by_time_range( table_entry.table_name, requested_field_columns, table_entry.tag_columns, - dict(zip(table_entry.tag_columns, device_entry.tag_values)), + _device_exact_tag_values(table_entry, device_entry), start_time, end_time, ) diff --git a/python/tsfile/tag_filter.py b/python/tsfile/tag_filter.py index a40c0c47c..ac6e46c5c 100644 --- a/python/tsfile/tag_filter.py +++ b/python/tsfile/tag_filter.py @@ -42,10 +42,17 @@ class ComparisonTagFilter(TagFilter): GTEQ = 5 REGEXP = 6 NOT_REGEXP = 7 + IS_NULL = 8 + IS_NOT_NULL = 9 + + # Operators that take no comparison value. + _NO_VALUE_OPS = (IS_NULL, IS_NOT_NULL) def __init__(self, column_name: str, value: str, op: int): self.column_name = column_name - self.value = value + # IS NULL / IS NOT NULL carry no value; the native layer ignores it but + # still expects a (possibly empty) string. + self.value = "" if value is None else value self.op = op def __repr__(self): @@ -58,7 +65,11 @@ def __repr__(self): 5: ">=", 6: "=~", 7: "!~", + 8: "IS NULL", + 9: "IS NOT NULL", } + if self.op in self._NO_VALUE_OPS: + return f"TagFilter({self.column_name} {op_names.get(self.op, '?')})" return ( f"TagFilter({self.column_name} {op_names.get(self.op, '?')} {self.value!r})" ) @@ -151,6 +162,16 @@ def tag_not_regexp(column_name: str, pattern: str) -> TagFilter: return ComparisonTagFilter(column_name, pattern, ComparisonTagFilter.NOT_REGEXP) +def tag_is_null(column_name: str) -> TagFilter: + """Create a tag IS NULL filter: the device has no value for this tag column.""" + return ComparisonTagFilter(column_name, "", ComparisonTagFilter.IS_NULL) + + +def tag_is_not_null(column_name: str) -> TagFilter: + """Create a tag IS NOT NULL filter: the device has a value for this tag column.""" + return ComparisonTagFilter(column_name, "", ComparisonTagFilter.IS_NOT_NULL) + + def tag_between(column_name: str, lower: str, upper: str) -> TagFilter: """Create a tag BETWEEN filter: lower <= column <= upper.""" return BetweenTagFilter(column_name, lower, upper, is_not=False) diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd index 0fa570df2..4e90dd483 100644 --- a/python/tsfile/tsfile_cpp.pxd +++ b/python/tsfile/tsfile_cpp.pxd @@ -324,6 +324,8 @@ cdef extern from "cwrapper/tsfile_cwrapper.h": TAG_FILTER_GTEQ = 5, TAG_FILTER_REGEXP = 6, TAG_FILTER_NOT_REGEXP = 7, + TAG_FILTER_IS_NULL = 8, + TAG_FILTER_IS_NOT_NULL = 9, TagFilterHandle tsfile_tag_filter_create(TsFileReader reader, const char* table_name, From c363dd1953e640d1265048937bd2f0bf78984211 Mon Sep 17 00:00:00 2001 From: ColinLee Date: Thu, 25 Jun 2026 09:07:52 +0800 Subject: [PATCH 2/5] Python: represent null tags with SeriesPath and position-preserving paths The dataset layer flattened a table device into a "table.tag.field" string and dropped null tags entirely, which lost the tag position and made two devices with nulls in different positions collide (e.g. a..b.c vs a.b..c), so they became unaddressable. Reading them by the auto-generated name also could not distinguish a real null tag from the literal string "null". Introduce a SeriesPath (str subclass): its string value is the escaped path form with a collision-proof \N marker for null tags, and it also exposes the structured table / tags / field components (a None entry in tags = null). A real value can never escape to \N because escaping doubles backslashes, so \N is unambiguous against the literal string "null". - metadata.py: add SeriesPath; build/split/resolve now keep every tag position and mark null with \N / None. Every device maps to a unique path, so the sparse/compressed lookup and the "ambiguous path" fallback are removed in favor of a single direct lookup. - dataframe.py: list_timeseries() returns SeriesPath; loc[...] and resolution accept a SeriesPath or a path string (with \N); drop the sparse-tag index. - tsfile_py_cpp.pyx: key the timeseries-metadata map by the device segment tuple instead of the device-name string. The name renders null as "null", so keying by it collided a real null tag with the literal "null" and silently dropped one device. - Export SeriesPath from the package; update/extend tests (null position preservation, real-null vs string "null", SeriesPath round-trip and escaping). --- python/tests/test_reader_metadata.py | 16 +-- python/tests/test_tsfile_dataset.py | 135 ++++++++++++++---- python/tsfile/__init__.py | 2 +- python/tsfile/dataset/__init__.py | 3 +- python/tsfile/dataset/dataframe.py | 92 ++++-------- python/tsfile/dataset/metadata.py | 203 ++++++++++++++++----------- python/tsfile/tsfile_py_cpp.pyx | 12 +- python/tsfile/tsfile_reader.pyx | 8 +- 8 files changed, 287 insertions(+), 184 deletions(-) diff --git a/python/tests/test_reader_metadata.py b/python/tests/test_reader_metadata.py index e4e7d0f24..fd00d7f74 100644 --- a/python/tests/test_reader_metadata.py +++ b/python/tests/test_reader_metadata.py @@ -63,7 +63,7 @@ def test_get_all_devices_segments(): assert d0.table_name == "root.sg" assert d0.segments == ("root.sg", "py_details") - grp = reader.get_timeseries_metadata(None)[device] + grp = reader.get_timeseries_metadata(None)[d0.segments] assert grp.table_name == "root.sg" assert grp.segments == ("root.sg", "py_details") assert len(grp.timeseries) == 1 @@ -103,8 +103,8 @@ def test_get_all_devices_and_timeseries_metadata_statistic(): assert devices[0].path == device meta_all = reader.get_timeseries_metadata(None) - assert list(meta_all.keys()) == [device] - grp = meta_all[device] + assert list(meta_all.keys()) == [devices[0].segments] + grp = meta_all[devices[0].segments] assert grp.table_name == "root.sg" assert grp.segments == ("root.sg", "py_meta") series = grp.timeseries @@ -127,11 +127,11 @@ def test_get_all_devices_and_timeseries_metadata_statistic(): assert reader.get_timeseries_metadata([]) == {} sub = reader.get_timeseries_metadata([DeviceID(device, None, ())]) - assert device in sub - assert len(sub[device].timeseries) == 1 + assert devices[0].segments in sub + assert len(sub[devices[0].segments].timeseries) == 1 sub_str = reader.get_timeseries_metadata([device]) - assert device in sub_str + assert devices[0].segments in sub_str finally: reader.close() try: @@ -163,7 +163,7 @@ def test_get_timeseries_metadata_boolean_statistic(): reader = TsFileReader(path) try: meta_all = reader.get_timeseries_metadata(None) - st = meta_all[device].timeseries[0].statistic + st = meta_all[("root.sg", "py_bool")].timeseries[0].statistic assert isinstance(st, BoolTimeseriesStatistic) assert st.has_statistic assert st.sum == pytest.approx(2.0) @@ -200,7 +200,7 @@ def test_get_timeseries_metadata_string_statistic(): reader = TsFileReader(path) try: meta_all = reader.get_timeseries_metadata(None) - m = meta_all[device].timeseries[0] + m = meta_all[("root.sg", "py_str")].timeseries[0] assert m.measurement_name == "m_str" assert m.data_type == TSDataType.STRING st = m.statistic diff --git a/python/tests/test_tsfile_dataset.py b/python/tests/test_tsfile_dataset.py index c25ed76f3..8b4e66d32 100644 --- a/python/tests/test_tsfile_dataset.py +++ b/python/tests/test_tsfile_dataset.py @@ -28,7 +28,7 @@ TableSchema, TsFileTableWriter, ) -from tsfile import AlignedTimeseries, Timeseries, TsFileDataFrame +from tsfile import AlignedTimeseries, SeriesPath, Timeseries, TsFileDataFrame from tsfile.dataset.formatting import format_timestamp from tsfile.dataset.metadata import ( MetadataCatalog, @@ -334,12 +334,17 @@ def test_dataset_reads_nullable_tag_devices_in_isolation(tmp_path): with TsFileDataFrame(str(path), show_progress=False) as tsdf: series = tsdf.list_timeseries() - # Null tags are dropped from the compressed logical path. + # Null tags keep their position via the \N marker; trailing nulls drop. assert set(series) == { - "sensors.alpha.temperature", + "sensors.\\N.alpha.temperature", "sensors.north.temperature", "sensors.north.beta.temperature", } + # list_timeseries returns SeriesPath objects carrying structured tags. + by_tags = {sp.tags: sp for sp in series} + assert (None, "alpha") in by_tags + assert ("north",) in by_tags + assert ("north", "beta") in by_tags ordered = sorted(series) aligned = tsdf.loc[:, ordered] @@ -347,7 +352,7 @@ def test_dataset_reads_nullable_tag_devices_in_isolation(tmp_path): # Non-trailing null device reads its own data (previously crashed / NaN). np.testing.assert_array_equal( - by_name["sensors.alpha.temperature"], np.array([10.0, 11.0, 12.0]) + by_name["sensors.\\N.alpha.temperature"], np.array([10.0, 11.0, 12.0]) ) # Trailing-null device must NOT merge with the fully specified north.beta. np.testing.assert_array_equal( @@ -358,6 +363,79 @@ def test_dataset_reads_nullable_tag_devices_in_isolation(tmp_path): ) +def test_series_path_object_roundtrip_and_escaping(): + from tsfile.dataset.metadata import split_logical_series_path + + sp = SeriesPath("tbl", ("a.b", None, "x"), "f") + assert isinstance(sp, str) + assert sp.table == "tbl" + assert sp.tags == ("a.b", None, "x") + assert sp.field == "f" + # A dot in a value is escaped; a null tag uses the collision-proof \N marker. + assert str(sp) == "tbl.a\\.b.\\N.x.f" + # Splitting round-trips: the escaped dot stays in the value, \N decodes to None. + assert split_logical_series_path(str(sp)) == ["tbl", "a.b", None, "x", "f"] + # Trailing nulls are dropped (mirroring device-id normalization). + assert SeriesPath("tbl", ("a", None), "f").tags == ("a",) + assert str(SeriesPath("tbl", ("a", None), "f")) == "tbl.a.f" + + +def test_dataset_null_tag_positions_and_string_null_are_distinct(tmp_path): + path = tmp_path / "null_positions.tsfile" + schema = TableSchema( + "a", + [ + ColumnSchema("t1", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("t2", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("t3", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("v", TSDataType.DOUBLE, ColumnCategory.FIELD), + ], + ) + rows = { + (None, "b", "c"): 10.0, # null at position 1 + ("b", None, "c"): 20.0, # null at position 2 (distinct from the above) + ("null", "b", "c"): 30.0, # the literal string "null", not a real null + } + with TsFileTableWriter(str(path), schema) as writer: + for tags, base in rows.items(): + writer.write_dataframe( + pd.DataFrame( + { + "time": [0, 1], + "t1": [tags[0], tags[0]], + "t2": [tags[1], tags[1]], + "t3": [tags[2], tags[2]], + "v": [base, base + 1], + } + ) + ) + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + series = tsdf.list_timeseries() + # Nothing collapses: three physically distinct devices stay distinct. + assert len(series) == 3 + by_tags = {sp.tags: sp for sp in series} + assert (None, "b", "c") in by_tags # null position 1 + assert ("b", None, "c") in by_tags # null position 2 + assert ("null", "b", "c") in by_tags # the string "null" + + # Each device reads its own data via SeriesPath and via the \N string form. + for tags, base in rows.items(): + sp = by_tags[tags] + np.testing.assert_array_equal( + tsdf.loc[:, sp].values.ravel(), np.array([base, base + 1]) + ) + np.testing.assert_array_equal( + tsdf.loc[:, str(sp)].values.ravel(), np.array([base, base + 1]) + ) + + # A hand-built SeriesPath resolves to the same null-tag device. + hand = SeriesPath("a", (None, "b", "c"), "v") + np.testing.assert_array_equal( + tsdf.loc[:, hand].values.ravel(), np.array([10.0, 11.0]) + ) + + def test_dataset_loc_supports_single_timestamp_and_mixed_series_specifiers(tmp_path): path = tmp_path / "weather.tsfile" _write_weather_file(path, 0) @@ -882,8 +960,12 @@ def test_series_path_resolution_uses_named_tags_for_sparse_non_prefix_values(): } series_path = build_series_path(catalog, device_id, 0) - assert series_path == "weather.device_a.temperature" + # The leading null tag is preserved at its position via the \N marker. + assert series_path == "weather.\\N.device_a.temperature" + assert series_path.tags == (None, "device_a") assert resolve_series_path(catalog, series_path) == (table_id, device_id, 0) + # The plain string form (with \N) round-trips to the same device. + assert resolve_series_path(catalog, str(series_path)) == (table_id, device_id, 0) def test_reader_metadata_tag_values_trim_trailing_none(): @@ -989,17 +1071,15 @@ def test_dataframe_resolves_named_sparse_tag_series_path(): device_key = ("weather", (None, "device_a")) tsdf._index.device_order = [device_key] tsdf._index.device_index_by_key = {device_key: 0} - tsdf._index.tables_with_sparse_tag_values = {"weather"} - tsdf._index.sparse_device_indices_by_compressed_path = { - ("weather", ("device_a",)): [0] - } tsdf._index.device_refs = [[]] tsdf._index.series_refs_ordered = [(0, 0)] tsdf._index.series_ref_set = {(0, 0)} tsdf._index.series_ref_map = {(0, 0): []} - assert tsdf.list_timeseries() == ["weather.device_a.temperature"] - assert tsdf._resolve_series_name("weather.device_a.temperature") == (0, 0) + assert tsdf.list_timeseries() == ["weather.\\N.device_a.temperature"] + # Resolvable by the \N string form and by the returned SeriesPath itself. + assert tsdf._resolve_series_name("weather.\\N.device_a.temperature") == (0, 0) + assert tsdf._resolve_series_name(tsdf.list_timeseries()[0]) == (0, 0) def test_dataframe_list_timeseries_filters_named_sparse_tag_prefix(): @@ -1019,17 +1099,17 @@ def test_dataframe_list_timeseries_filters_named_sparse_tag_prefix(): ("weather", (None, "device_a")): 0, ("weather", ("beijing", "device_b")): 1, } - tsdf._index.tables_with_sparse_tag_values = {"weather"} - tsdf._index.sparse_device_indices_by_compressed_path = { - ("weather", ("device_a",)): [0], - ("weather", ("beijing", "device_b")): [1], - } tsdf._index.device_refs = [[], []] tsdf._index.series_refs_ordered = [(0, 0), (1, 0)] tsdf._index.series_ref_set = {(0, 0), (1, 0)} tsdf._index.series_ref_map = {(0, 0): [], (1, 0): []} - assert tsdf.list_timeseries("weather.device_a") == ["weather.device_a.temperature"] + # Prefix matching is position-aware: "weather.\N" selects the null-city + # device, "weather.beijing" selects the fully specified one. + assert tsdf.list_timeseries("weather.\\N") == ["weather.\\N.device_a.temperature"] + assert tsdf.list_timeseries("weather.beijing") == [ + "weather.beijing.device_b.temperature" + ] def test_dataframe_list_timeseries_prefix_can_skip_full_name_build( @@ -1050,7 +1130,7 @@ def fail_build_series_name(_series_ref): assert tsdf.list_timeseries("pvf") == [] -def test_series_path_resolution_reports_ambiguous_sparse_path(): +def test_series_path_resolution_distinguishes_null_position(): catalog = MetadataCatalog() table_id = catalog.add_table( "weather", @@ -1058,8 +1138,8 @@ def test_series_path_resolution_reports_ambiguous_sparse_path(): (TSDataType.STRING, TSDataType.STRING), ("temperature",), ) - first_id = catalog.add_device(table_id, ("beijing", None), 0, 1) - second_id = catalog.add_device(table_id, (None, "beijing"), 0, 1) + first_id = catalog.add_device(table_id, ("beijing", None), 0, 1) # device IS NULL + second_id = catalog.add_device(table_id, (None, "beijing"), 0, 1) # city IS NULL for device_id in (first_id, second_id): catalog.series_stats_by_ref[(device_id, 0)] = { "length": 1, @@ -1070,10 +1150,17 @@ def test_series_path_resolution_reports_ambiguous_sparse_path(): "timeline_max_time": 0, } - assert build_series_path(catalog, first_id, 0) == "weather.beijing.temperature" - assert build_series_path(catalog, second_id, 0) == "weather.beijing.temperature" - with pytest.raises(ValueError, match="Ambiguous series path"): - resolve_series_path(catalog, "weather.beijing.temperature") + # Null position is preserved, so these two devices get distinct paths + # (previously both compressed to "weather.beijing.temperature" -> ambiguous). + first_path = build_series_path(catalog, first_id, 0) + second_path = build_series_path(catalog, second_id, 0) + assert first_path == "weather.beijing.temperature" + assert second_path == "weather.\\N.beijing.temperature" + assert first_path != second_path + + # Each resolves unambiguously back to its own device. + assert resolve_series_path(catalog, first_path) == (table_id, first_id, 0) + assert resolve_series_path(catalog, second_path) == (table_id, second_id, 0) def test_reader_show_progress_reports_start_immediately(tmp_path, capsys): diff --git a/python/tsfile/__init__.py b/python/tsfile/__init__.py index 783d9ef46..a1c37fce1 100644 --- a/python/tsfile/__init__.py +++ b/python/tsfile/__init__.py @@ -90,4 +90,4 @@ def _preload_dll(path): from .tsfile_py_cpp import get_tsfile_config, set_tsfile_config from .tsfile_table_writer import TsFileTableWriter from .utils import to_dataframe, dataframe_to_tsfile -from .dataset import TsFileDataFrame, Timeseries, AlignedTimeseries +from .dataset import TsFileDataFrame, Timeseries, AlignedTimeseries, SeriesPath diff --git a/python/tsfile/dataset/__init__.py b/python/tsfile/dataset/__init__.py index 4072bd4c1..15c20d540 100644 --- a/python/tsfile/dataset/__init__.py +++ b/python/tsfile/dataset/__init__.py @@ -19,6 +19,7 @@ """Dataset-style TsFile accessors.""" from .dataframe import TsFileDataFrame +from .metadata import SeriesPath from .timeseries import AlignedTimeseries, Timeseries -__all__ = ["TsFileDataFrame", "Timeseries", "AlignedTimeseries"] +__all__ = ["TsFileDataFrame", "Timeseries", "AlignedTimeseries", "SeriesPath"] diff --git a/python/tsfile/dataset/dataframe.py b/python/tsfile/dataset/dataframe.py index 40149102a..77c278289 100644 --- a/python/tsfile/dataset/dataframe.py +++ b/python/tsfile/dataset/dataframe.py @@ -30,7 +30,9 @@ from .formatting import format_dataframe_table from .metadata import ( + SeriesPath, TableEntry, + _normalize_tag_values, build_logical_series_components, build_logical_series_path, split_logical_series_path, @@ -62,15 +64,10 @@ class _LogicalIndex: # Stable logical device order, each item is (table_name, tag_values). device_order: List[DeviceKey] = field(default_factory=list) - # Map one logical device key to its dataframe-local device index. + # Map one logical device key to its dataframe-local device index. The key's + # tag tuple keeps interior nulls (None) and drops trailing ones, so every + # device -- including null-tagged ones -- resolves by a single direct lookup. device_index_by_key: Dict[DeviceKey, int] = field(default_factory=dict) - # Tables that need sparse compressed-path lookup because some devices - # contain non-trailing missing tag values. - tables_with_sparse_tag_values: Set[str] = field(default_factory=set) - # Map one compressed tree-style device path to sparse logical devices only. - sparse_device_indices_by_compressed_path: Dict[ - Tuple[str, Tuple[str, ...]], List[int] - ] = field(default_factory=dict) # For each logical device, keep the contributing reader-local device refs. device_refs: List[List[DeviceRef]] = field(default_factory=list) @@ -168,20 +165,6 @@ def _register_reader( index.device_index_by_key[device_key] = device_idx index.device_order.append(device_key) index.device_refs.append([]) - if any(value is None for value in device_entry.tag_values): - index.tables_with_sparse_tag_values.add(table_entry.table_name) - compressed_components = tuple( - build_logical_series_components( - table_entry.table_name, - device_entry.tag_values, - "", - table_entry.tag_columns, - )[1:-1] - ) - compressed_key = (table_entry.table_name, compressed_components) - index.sparse_device_indices_by_compressed_path.setdefault( - compressed_key, [] - ).append(device_idx) index.device_refs[device_idx].append((reader, device_id)) for field_idx in range(len(table_entry.field_columns)): @@ -707,7 +690,7 @@ def _get_series_components( device_key = self._index.device_order[device_idx] return device_key, self._index.table_entries[device_key[0]], field_idx - def _build_series_name(self, series_ref: SeriesRefKey) -> str: + def _build_series_name(self, series_ref: SeriesRefKey) -> SeriesPath: device_key, table_entry, field_idx = self._get_series_components(series_ref) table_name, tag_values = device_key field_name = table_entry.field_columns[field_idx] @@ -715,52 +698,39 @@ def _build_series_name(self, series_ref: SeriesRefKey) -> str: table_name, tag_values, field_name, table_entry.tag_columns ) - def _resolve_series_name(self, series_name: str) -> SeriesRefKey: - try: - parts = split_logical_series_path(series_name) - except ValueError as exc: - raise KeyError(_series_lookup_hint(series_name)) from exc - if len(parts) < 2: - raise KeyError(_series_lookup_hint(series_name)) + def _resolve_series_name(self, series_name) -> SeriesRefKey: + """Resolve a ``SeriesPath`` or path string (``\\N`` = null tag) to a ref. + + Every device has a unique position-preserving key, so this is a single + direct lookup -- no sparse/compressed fallback and no ambiguity. + """ + if isinstance(series_name, SeriesPath): + table_name, tag_parts, field_name = ( + series_name.table, + list(series_name.tags), + series_name.field, + ) + else: + try: + parts = split_logical_series_path(series_name) + except ValueError as exc: + raise KeyError(_series_lookup_hint(series_name)) from exc + if len(parts) < 2: + raise KeyError(_series_lookup_hint(series_name)) + table_name, field_name, tag_parts = parts[0], parts[-1], parts[1:-1] - table_name = parts[0] if table_name not in self._index.table_entries: raise KeyError(_series_lookup_hint(series_name)) - table_entry = self._index.table_entries[table_name] - field_name = parts[-1] try: field_idx = table_entry.get_field_index(field_name) except ValueError as exc: raise KeyError(_series_lookup_hint(series_name)) from exc - tag_parts = parts[1:-1] - direct_device_idx = self._index.device_index_by_key.get( - (table_name, tuple(tag_parts)) - ) - - if table_name not in self._index.tables_with_sparse_tag_values: - if direct_device_idx is None: - raise KeyError(_series_lookup_hint(series_name)) - device_idx = direct_device_idx - else: - compressed_key = (table_name, tuple(tag_parts)) - sparse_device_indices = ( - self._index.sparse_device_indices_by_compressed_path.get( - compressed_key, [] - ) - ) - candidate_indices = [] - if direct_device_idx is not None: - candidate_indices.append(direct_device_idx) - for device_idx in sparse_device_indices: - if device_idx not in candidate_indices: - candidate_indices.append(device_idx) - if not candidate_indices: - raise KeyError(_series_lookup_hint(series_name)) - if len(candidate_indices) > 1: - raise KeyError(f"Ambiguous series path: '{series_name}'.") - device_idx = candidate_indices[0] + device_key = (table_name, _normalize_tag_values(tag_parts)) + device_idx = self._index.device_index_by_key.get(device_key) + if device_idx is None: + raise KeyError(_series_lookup_hint(series_name)) series_ref = (device_idx, field_idx) if series_ref not in self._index.series_ref_set: @@ -784,7 +754,7 @@ def _build_series_info(self, series_ref: SeriesRefKey) -> dict: def __len__(self) -> int: return len(self._index.series_refs_ordered) - def list_timeseries(self, path_prefix: str = "") -> List[str]: + def list_timeseries(self, path_prefix: str = "") -> List[SeriesPath]: if not path_prefix: return [ self._build_series_name(series_ref) diff --git a/python/tsfile/dataset/metadata.py b/python/tsfile/dataset/metadata.py index 125bb00c2..be0aa9a3b 100644 --- a/python/tsfile/dataset/metadata.py +++ b/python/tsfile/dataset/metadata.py @@ -73,10 +73,6 @@ class MetadataCatalog: device_entries: List[DeviceEntry] = field(default_factory=list) table_id_by_name: Dict[str, int] = field(default_factory=dict) device_id_by_key: Dict[Tuple[int, tuple], int] = field(default_factory=dict) - tables_with_sparse_tag_values: set = field(default_factory=set) - sparse_device_ids_by_compressed_path: Dict[ - Tuple[int, Tuple[str, ...]], List[int] - ] = field(default_factory=dict) series_stats_by_ref: Dict[Tuple[int, int], Dict[str, int]] = field( default_factory=dict ) @@ -122,15 +118,6 @@ def add_device( ) ) self.device_id_by_key[key] = device_id - if _has_sparse_tag_holes(normalized_tag_values): - self.tables_with_sparse_tag_values.add(table_id) - compressed_key = ( - table_id, - _compressed_tag_path_components(normalized_tag_values), - ) - self.sparse_device_ids_by_compressed_path.setdefault( - compressed_key, [] - ).append(device_id) return device_id @property @@ -141,6 +128,48 @@ def series_count(self) -> int: ) +# Path marker for a null tag value. A real tag value can never escape to this +# sequence because escaping always doubles a backslash, so it unambiguously +# distinguishes a null tag from the literal string "null". +_NULL_TOKEN = _PATH_ESCAPE + "N" + + +class SeriesPath(str): + """Logical identifier of one time series: table + ordered tag values + field. + + ``SeriesPath`` subclasses ``str``; its string value is the escaped path form + (with ``\\N`` marking a null tag), so it can be used anywhere a path string + is accepted. It additionally exposes the structured ``table`` / ``tags`` / + ``field`` components, where a ``None`` entry in ``tags`` means the tag is + null -- unambiguously distinct from the literal string value ``"null"``. + + Trailing null tags are dropped (mirroring the device-id normalization), so + ``tags`` keeps every interior null but not absent trailing ones. + """ + + __slots__ = ("_table", "_tags", "_field") + + def __new__(cls, table: str, tags: Iterable[Any], field: str) -> "SeriesPath": + normalized = _normalize_tag_values(tags) + obj = str.__new__(cls, _join_series_path(table, normalized, field)) + obj._table = table + obj._tags = normalized + obj._field = field + return obj + + @property + def table(self) -> str: + return self._table + + @property + def tags(self) -> Tuple[Any, ...]: + return self._tags + + @property + def field(self) -> str: + return self._field + + def _escape_path_component(value: Any) -> str: return ( str(value) @@ -149,6 +178,26 @@ def _escape_path_component(value: Any) -> str: ) +def _render_path_component(value: Any) -> str: + """Render one tag component: ``None`` -> the null marker, else escaped value.""" + return _NULL_TOKEN if value is None else _escape_path_component(value) + + +def _unescape_path_component(raw: str) -> str: + out: List[str] = [] + escaping = False + for char in raw: + if escaping: + out.append(char) + escaping = False + continue + if char == _PATH_ESCAPE: + escaping = True + continue + out.append(char) + return "".join(out) + + def _normalize_tag_values(tag_values: Iterable[Any]) -> Tuple[Any, ...]: values = list(tag_values) while values and values[-1] is None: @@ -156,17 +205,10 @@ def _normalize_tag_values(tag_values: Iterable[Any]) -> Tuple[Any, ...]: return tuple(values) -def _compressed_tag_path_components(tag_values: Iterable[Any]) -> Tuple[str, ...]: - return tuple(str(value) for value in tag_values if value is not None) - - -def _has_sparse_tag_holes(tag_values: Iterable[Any]) -> bool: - return any(value is None for value in tag_values) - - -def split_logical_series_path(series_path: str) -> List[str]: - parts = [] - current = [] +def split_logical_series_path(series_path: str) -> List[Any]: + """Split a path into components; a ``\\N`` component decodes to ``None``.""" + raw_parts: List[str] = [] + current: List[str] = [] escaping = False for char in series_path: @@ -176,9 +218,10 @@ def split_logical_series_path(series_path: str) -> List[str]: continue if char == _PATH_ESCAPE: escaping = True + current.append(char) # keep the escape char in the raw component continue if char == _PATH_SEPARATOR: - parts.append("".join(current)) + raw_parts.append("".join(current)) current = [] continue current.append(char) @@ -186,8 +229,20 @@ def split_logical_series_path(series_path: str) -> List[str]: if escaping: raise ValueError(f"Invalid series path: {series_path}") - parts.append("".join(current)) - return parts + raw_parts.append("".join(current)) + return [ + None if raw == _NULL_TOKEN else _unescape_path_component(raw) + for raw in raw_parts + ] + + +def _join_series_path( + table_name: str, tag_values: Iterable[Any], field_name: str +) -> str: + parts = [_escape_path_component(table_name)] + parts.extend(_render_path_component(value) for value in tag_values) + parts.append(_escape_path_component(field_name)) + return _PATH_SEPARATOR.join(parts) def build_logical_series_path( @@ -195,13 +250,8 @@ def build_logical_series_path( tag_values: Iterable[Any], field_name: str, tag_columns: Iterable[str] = (), -) -> str: - components = build_logical_series_components( - table_name, tag_values, field_name, tag_columns - ) - return _PATH_SEPARATOR.join( - _escape_path_component(component) for component in components - ) +) -> SeriesPath: + return SeriesPath(table_name, tag_values, field_name) def build_logical_series_components( @@ -209,9 +259,16 @@ def build_logical_series_components( tag_values: Iterable[Any], field_name: str, _tag_columns: Iterable[str] = (), -) -> List[str]: - components = [table_name, *_compressed_tag_path_components(tag_values), field_name] - return [str(component) for component in components] +) -> List[Any]: + """Position-preserving components for prefix matching; ``None`` marks a null tag.""" + return [ + str(table_name), + *( + None if value is None else str(value) + for value in _normalize_tag_values(tag_values) + ), + str(field_name), + ] def build_series_path(catalog: MetadataCatalog, device_id: int, field_idx: int) -> str: @@ -242,60 +299,48 @@ def iter_series_paths(catalog: MetadataCatalog) -> Iterator[str]: def resolve_series_path( - catalog: MetadataCatalog, series_path: str + catalog: MetadataCatalog, series_path: Any ) -> Tuple[int, int, int]: - """Resolve an external path to ``(table_id, device_id, field_idx)``.""" - parts = split_logical_series_path(series_path) - if len(parts) < 2: - raise ValueError(f"Invalid series path: {series_path}") + """Resolve a path (``str`` with ``\\N``, or ``SeriesPath``) to refs. + + Returns ``(table_id, device_id, field_idx)``. Every device maps to a unique + position-preserving path, so resolution is a single direct lookup. + """ + if isinstance(series_path, SeriesPath): + table_name, tag_parts, field_name = ( + series_path.table, + list(series_path.tags), + series_path.field, + ) + coerce = False + else: + parts = split_logical_series_path(series_path) + if len(parts) < 2: + raise ValueError(f"Invalid series path: {series_path}") + table_name, field_name, tag_parts = parts[0], parts[-1], parts[1:-1] + coerce = True - table_name = parts[0] if table_name not in catalog.table_id_by_name: raise ValueError(f"Series not found: {series_path}") - table_id = catalog.table_id_by_name[table_name] table_entry = catalog.table_entries[table_id] - field_name = parts[-1] try: field_idx = table_entry.get_field_index(field_name) except ValueError as exc: raise ValueError(f"Series not found: {series_path}") from exc - tag_parts = parts[1:-1] - direct_device_id = None - direct_tag_values = _normalize_tag_values( - _coerce_path_component(raw_value, tag_type) - for raw_value, tag_type in zip(tag_parts, table_entry.tag_types) - ) - direct_key = (table_id, direct_tag_values) - if direct_key in catalog.device_id_by_key: - direct_device_id = catalog.device_id_by_key[direct_key] - - if table_id not in catalog.tables_with_sparse_tag_values: - if direct_device_id is None: - raise ValueError(f"Series not found: {series_path}") - return table_id, direct_device_id, field_idx - - compressed_key = (table_id, tuple(tag_parts)) - sparse_device_ids = catalog.sparse_device_ids_by_compressed_path.get( - compressed_key, [] - ) - candidate_ids = [] - seen_ids = set() - if direct_device_id is not None: - candidate_ids.append(direct_device_id) - seen_ids.add(direct_device_id) - for device_id in sparse_device_ids: - if device_id in seen_ids: - continue - candidate_ids.append(device_id) - seen_ids.add(device_id) - if not candidate_ids: - raise ValueError(f"Series not found: {series_path}") - if len(candidate_ids) > 1: - raise ValueError(f"Ambiguous series path: {series_path}") + if coerce: + tag_values = _normalize_tag_values( + None if raw_value is None else _coerce_path_component(raw_value, tag_type) + for raw_value, tag_type in zip(tag_parts, table_entry.tag_types) + ) + else: + tag_values = _normalize_tag_values(tag_parts) - return table_id, candidate_ids[0], field_idx + device_id = catalog.device_id_by_key.get((table_id, tag_values)) + if device_id is None: + raise ValueError(f"Series not found: {series_path}") + return table_id, device_id, field_idx def _coerce_path_component(value: str, data_type: TSDataType) -> Any: diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx index 1aea243ed..038336a42 100644 --- a/python/tsfile/tsfile_py_cpp.pyx +++ b/python/tsfile/tsfile_py_cpp.pyx @@ -1056,18 +1056,11 @@ cdef tuple c_device_segments_to_tuple(char** segs, uint32_t n): cdef dict device_timeseries_metadata_map_to_py(DeviceTimeseriesMetadataMap* mmap): cdef dict out = {} cdef uint32_t di, ti - cdef char* p cdef char* tnp - cdef object key cdef object table_py cdef tuple segs_py cdef list series for di in range(mmap.device_count): - p = mmap.entries[di].device.path - if p == NULL: - key = None - else: - key = p.decode('utf-8') tnp = mmap.entries[di].device.table_name if tnp == NULL: table_py = None @@ -1081,7 +1074,10 @@ cdef dict device_timeseries_metadata_map_to_py(DeviceTimeseriesMetadataMap* mmap series.append( timeseries_metadata_c_to_py( &mmap.entries[di].timeseries[ti])) - out[key] = DeviceTimeseriesMetadataGroupPy( + # Key by the full segments tuple, not the device path string: the path + # renders a null tag as "null", so keying by it would collide a real + # null tag with the literal string "null" and silently drop one device. + out[segs_py] = DeviceTimeseriesMetadataGroupPy( table_py, segs_py, series) return out diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx index 9193e2c61..341a7493d 100644 --- a/python/tsfile/tsfile_reader.pyx +++ b/python/tsfile/tsfile_reader.pyx @@ -497,11 +497,15 @@ cdef class TsFileReaderPy: def get_timeseries_metadata( self, device_ids: Optional[List] = None - ) -> Dict[str, DeviceTimeseriesMetadataGroup]: + ) -> Dict[tuple, DeviceTimeseriesMetadataGroup]: """ - Return map device path -> :class:`tsfile.schema.DeviceTimeseriesMetadataGroup` + Return map device-segments-tuple -> :class:`tsfile.schema.DeviceTimeseriesMetadataGroup` (table name, segments, and list of :class:`tsfile.schema.TimeseriesMetadata`). + The key is the device's full segment tuple (a null tag is ``None``), not + the dotted path string, so a real null tag and the literal string + ``"null"`` map to distinct entries instead of colliding. + ``device_ids is None``: all devices. ``device_ids == []``: empty map. Non-empty list restricts to those devices (only existing devices appear). """ From d3e0c025f57c28e22742caa78b452657dcaea9ab Mon Sep 17 00:00:00 2001 From: ColinLee Date: Thu, 25 Jun 2026 10:28:01 +0800 Subject: [PATCH 3/5] fix parse null. --- python/tsfile/dataset/metadata.py | 62 ++++++++++++++----------------- 1 file changed, 27 insertions(+), 35 deletions(-) diff --git a/python/tsfile/dataset/metadata.py b/python/tsfile/dataset/metadata.py index be0aa9a3b..37e95f55b 100644 --- a/python/tsfile/dataset/metadata.py +++ b/python/tsfile/dataset/metadata.py @@ -128,10 +128,12 @@ def series_count(self) -> int: ) -# Path marker for a null tag value. A real tag value can never escape to this -# sequence because escaping always doubles a backslash, so it unambiguously -# distinguishes a null tag from the literal string "null". -_NULL_TOKEN = _PATH_ESCAPE + "N" +# Path marker for a null tag value: a single backslash followed by N. A real +# tag value can never produce this because escaping always doubles a backslash +# (and never escapes "N"), so \N unambiguously distinguishes a null tag from the +# literal string "null". +_NULL_MARKER = "N" +_NULL_TOKEN = _PATH_ESCAPE + _NULL_MARKER class SeriesPath(str): @@ -183,21 +185,6 @@ def _render_path_component(value: Any) -> str: return _NULL_TOKEN if value is None else _escape_path_component(value) -def _unescape_path_component(raw: str) -> str: - out: List[str] = [] - escaping = False - for char in raw: - if escaping: - out.append(char) - escaping = False - continue - if char == _PATH_ESCAPE: - escaping = True - continue - out.append(char) - return "".join(out) - - def _normalize_tag_values(tag_values: Iterable[Any]) -> Tuple[Any, ...]: values = list(tag_values) while values and values[-1] is None: @@ -206,34 +193,39 @@ def _normalize_tag_values(tag_values: Iterable[Any]) -> Tuple[Any, ...]: def split_logical_series_path(series_path: str) -> List[Any]: - """Split a path into components; a ``\\N`` component decodes to ``None``.""" - raw_parts: List[str] = [] + """Split a path into components, decoding escapes in a single pass. + + ``\\.`` -> ``.``, ``\\\\`` -> ``\\``, and the null marker ``\\N`` -> ``None``. + Null is detected in the escape branch: a lone backslash only ever precedes + ``N`` for the null marker, since a real value's backslash is always doubled + and ``N`` itself is never escaped. + """ + parts: List[Any] = [] current: List[str] = [] + is_null = False escaping = False for char in series_path: if escaping: - current.append(char) + if char == _NULL_MARKER: # \N -> the whole component is a null tag + is_null = True + else: # \\ -> \, \. -> ., any other \x -> x + current.append(char) escaping = False - continue - if char == _PATH_ESCAPE: + elif char == _PATH_ESCAPE: escaping = True - current.append(char) # keep the escape char in the raw component - continue - if char == _PATH_SEPARATOR: - raw_parts.append("".join(current)) + elif char == _PATH_SEPARATOR: + parts.append(None if is_null else "".join(current)) current = [] - continue - current.append(char) + is_null = False + else: + current.append(char) if escaping: raise ValueError(f"Invalid series path: {series_path}") - raw_parts.append("".join(current)) - return [ - None if raw == _NULL_TOKEN else _unescape_path_component(raw) - for raw in raw_parts - ] + parts.append(None if is_null else "".join(current)) + return parts def _join_series_path( From 13265f5bebf2b85690a449439b47149bcbec53f3 Mon Sep 17 00:00:00 2001 From: ColinLee Date: Thu, 25 Jun 2026 10:44:13 +0800 Subject: [PATCH 4/5] fix tag parser. --- python/tests/test_tsfile_dataset.py | 48 +++++++++++++++++++++++++++++ python/tsfile/dataset/metadata.py | 43 ++++++++++++++++++++++++-- 2 files changed, 88 insertions(+), 3 deletions(-) diff --git a/python/tests/test_tsfile_dataset.py b/python/tests/test_tsfile_dataset.py index 8b4e66d32..16d007b9a 100644 --- a/python/tests/test_tsfile_dataset.py +++ b/python/tests/test_tsfile_dataset.py @@ -380,6 +380,23 @@ def test_series_path_object_roundtrip_and_escaping(): assert str(SeriesPath("tbl", ("a", None), "f")) == "tbl.a.f" +def test_series_path_construction_forms_are_equivalent(): + explicit = SeriesPath("tbl", (None, "sensorA"), "temperature") + flat = SeriesPath(["tbl", None, "sensorA", "temperature"]) # [table, *tags, field] + from_string = SeriesPath("tbl.\\N.sensorA.temperature") + + for sp in (explicit, flat, from_string): + assert sp == "tbl.\\N.sensorA.temperature" + assert sp.table == "tbl" + assert sp.tags == (None, "sensorA") + assert sp.field == "temperature" + + # A no-tag table is just [table, field]. + assert SeriesPath(["tbl", "f"]).tags == () + with pytest.raises(ValueError): + SeriesPath(["tbl"]) + + def test_dataset_null_tag_positions_and_string_null_are_distinct(tmp_path): path = tmp_path / "null_positions.tsfile" schema = TableSchema( @@ -968,6 +985,37 @@ def test_series_path_resolution_uses_named_tags_for_sparse_non_prefix_values(): assert resolve_series_path(catalog, str(series_path)) == (table_id, device_id, 0) +def test_resolve_series_path_rejects_wrong_tag_count(): + catalog = MetadataCatalog() + table_id = catalog.add_table( + "weather", + ("city", "device"), + (TSDataType.STRING, TSDataType.STRING), + ("temperature",), + ) + device_id = catalog.add_device(table_id, ("beijing", "d1"), 0, 1) + catalog.series_stats_by_ref[(device_id, 0)] = { + "length": 1, + "min_time": 0, + "max_time": 0, + "timeline_length": 1, + "timeline_min_time": 0, + "timeline_max_time": 0, + } + + assert resolve_series_path(catalog, "weather.beijing.d1.temperature") == ( + table_id, + device_id, + 0, + ) + # An extra tag must NOT be silently truncated into a match. + with pytest.raises(ValueError, match="Series not found"): + resolve_series_path(catalog, "weather.beijing.d1.extra.temperature") + # Too few tags has no matching device either. + with pytest.raises(ValueError, match="Series not found"): + resolve_series_path(catalog, "weather.beijing.temperature") + + def test_reader_metadata_tag_values_trim_trailing_none(): class _Group: segments = ("weather", "device_a", None, None) diff --git a/python/tsfile/dataset/metadata.py b/python/tsfile/dataset/metadata.py index 37e95f55b..bf3ec9c2d 100644 --- a/python/tsfile/dataset/metadata.py +++ b/python/tsfile/dataset/metadata.py @@ -147,11 +147,36 @@ class SeriesPath(str): Trailing null tags are dropped (mirroring the device-id normalization), so ``tags`` keeps every interior null but not absent trailing ones. + + Construct it from explicit parts or from a single flat component sequence + (the same shape :func:`split_logical_series_path` returns):: + + SeriesPath("table", (None, "sensorA"), "temperature") + SeriesPath(["table", None, "sensorA", "temperature"]) # [table, *tags, field] + SeriesPath("table.\\N.sensorA.temperature") # a path string """ __slots__ = ("_table", "_tags", "_field") - def __new__(cls, table: str, tags: Iterable[Any], field: str) -> "SeriesPath": + def __new__(cls, *args: Any) -> "SeriesPath": + if len(args) == 3: + table, tags, field = args + elif len(args) == 1: + components = args[0] + # A bare string is a path; otherwise it is a [table, *tags, field] + # sequence (None entries are null tags). + if isinstance(components, str): + components = split_logical_series_path(components) + components = list(components) + if len(components) < 2: + raise ValueError( + f"SeriesPath needs at least [table, field]; got {components!r}" + ) + table, tags, field = components[0], components[1:-1], components[-1] + else: + raise TypeError( + "SeriesPath(table, tags, field) or " "SeriesPath([table, *tags, field])" + ) normalized = _normalize_tag_values(tags) obj = str.__new__(cls, _join_series_path(table, normalized, field)) obj._table = table @@ -322,9 +347,21 @@ def resolve_series_path( raise ValueError(f"Series not found: {series_path}") from exc if coerce: + # Coerce each part by its column's type. Parts beyond the declared tag + # columns are kept as-is (rather than truncated) so an over-specified + # path fails the lookup instead of silently matching a shorter device. + tag_types = table_entry.tag_types tag_values = _normalize_tag_values( - None if raw_value is None else _coerce_path_component(raw_value, tag_type) - for raw_value, tag_type in zip(tag_parts, table_entry.tag_types) + ( + None + if raw_value is None + else ( + _coerce_path_component(raw_value, tag_types[idx]) + if idx < len(tag_types) + else raw_value + ) + ) + for idx, raw_value in enumerate(tag_parts) ) else: tag_values = _normalize_tag_values(tag_parts) From cb4ab89774545224bc2fcdf35e5d123aa3d5b44a Mon Sep 17 00:00:00 2001 From: ColinLee Date: Thu, 25 Jun 2026 13:25:03 +0800 Subject: [PATCH 5/5] fix some comment. --- cpp/src/reader/filter/tag_filter.cc | 26 +++++++++++++------------- cpp/src/reader/filter/tag_filter.h | 2 +- python/tests/test_tsfile_dataset.py | 21 +++++++++++++++++++++ python/tsfile/dataset/metadata.py | 18 +++++++++++++----- 4 files changed, 48 insertions(+), 19 deletions(-) diff --git a/cpp/src/reader/filter/tag_filter.cc b/cpp/src/reader/filter/tag_filter.cc index 0115cc389..03b8ae785 100644 --- a/cpp/src/reader/filter/tag_filter.cc +++ b/cpp/src/reader/filter/tag_filter.cc @@ -228,68 +228,68 @@ TagFilterBuilder::TagFilterBuilder(TableSchema* schema) Filter* TagFilterBuilder::eq(const std::string& columnName, const std::string& value) { - auto idx = get_id_column_index(columnName); + auto idx = get_tag_column_index(columnName); if (idx < 0) return nullptr; return new TagEq(idx, value); } Filter* TagFilterBuilder::neq(const std::string& columnName, const std::string& value) { - auto idx = get_id_column_index(columnName); + auto idx = get_tag_column_index(columnName); if (idx < 0) return nullptr; return new TagNeq(idx, value); } Filter* TagFilterBuilder::lt(const std::string& columnName, const std::string& value) { - auto idx = get_id_column_index(columnName); + auto idx = get_tag_column_index(columnName); if (idx < 0) return nullptr; return new TagLt(idx, value); } Filter* TagFilterBuilder::lteq(const std::string& columnName, const std::string& value) { - auto idx = get_id_column_index(columnName); + auto idx = get_tag_column_index(columnName); if (idx < 0) return nullptr; return new TagLteq(idx, value); } Filter* TagFilterBuilder::gt(const std::string& columnName, const std::string& value) { - auto idx = get_id_column_index(columnName); + auto idx = get_tag_column_index(columnName); if (idx < 0) return nullptr; return new TagGt(idx, value); } Filter* TagFilterBuilder::gteq(const std::string& columnName, const std::string& value) { - auto idx = get_id_column_index(columnName); + auto idx = get_tag_column_index(columnName); if (idx < 0) return nullptr; return new TagGteq(idx, value); } Filter* TagFilterBuilder::reg_exp(const std::string& columnName, const std::string& value) { - auto idx = get_id_column_index(columnName); + auto idx = get_tag_column_index(columnName); if (idx < 0) return nullptr; return new TagRegExp(idx, value); } Filter* TagFilterBuilder::not_reg_exp(const std::string& columnName, const std::string& value) { - auto idx = get_id_column_index(columnName); + auto idx = get_tag_column_index(columnName); if (idx < 0) return nullptr; return new TagNotRegExp(idx, value); } Filter* TagFilterBuilder::is_null(const std::string& columnName) { - auto idx = get_id_column_index(columnName); + auto idx = get_tag_column_index(columnName); if (idx < 0) return nullptr; return new TagIsNull(idx); } Filter* TagFilterBuilder::is_not_null(const std::string& columnName) { - auto idx = get_id_column_index(columnName); + auto idx = get_tag_column_index(columnName); if (idx < 0) return nullptr; return new TagIsNotNull(idx); } @@ -297,7 +297,7 @@ Filter* TagFilterBuilder::is_not_null(const std::string& columnName) { Filter* TagFilterBuilder::between_and(const std::string& columnName, const std::string& lower, const std::string& upper) { - auto idx = get_id_column_index(columnName); + auto idx = get_tag_column_index(columnName); if (idx < 0) return nullptr; return new TagBetween(idx, lower, upper); } @@ -305,7 +305,7 @@ Filter* TagFilterBuilder::between_and(const std::string& columnName, Filter* TagFilterBuilder::not_between_and(const std::string& columnName, const std::string& lower, const std::string& upper) { - auto idx = get_id_column_index(columnName); + auto idx = get_tag_column_index(columnName); if (idx < 0) return nullptr; return new TagNotBetween(idx, lower, upper); } @@ -324,7 +324,7 @@ Filter* TagFilterBuilder::not_filter(Filter* filter) { return new TagNot(dynamic_cast(filter)); } -int TagFilterBuilder::get_id_column_index(const std::string& columnName) { +int TagFilterBuilder::get_tag_column_index(const std::string& columnName) { int idColumnOrder = table_schema_->find_id_column_order(columnName); if (idColumnOrder == -1) { return -1; diff --git a/cpp/src/reader/filter/tag_filter.h b/cpp/src/reader/filter/tag_filter.h index 6e84dcf50..50c750d44 100644 --- a/cpp/src/reader/filter/tag_filter.h +++ b/cpp/src/reader/filter/tag_filter.h @@ -199,7 +199,7 @@ class TagFilterBuilder { static Filter* not_filter(Filter* filter); private: - int get_id_column_index(const std::string& columnName); + int get_tag_column_index(const std::string& columnName); }; } // namespace storage diff --git a/python/tests/test_tsfile_dataset.py b/python/tests/test_tsfile_dataset.py index 16d007b9a..d95d247c1 100644 --- a/python/tests/test_tsfile_dataset.py +++ b/python/tests/test_tsfile_dataset.py @@ -397,6 +397,27 @@ def test_series_path_construction_forms_are_equivalent(): SeriesPath(["tbl"]) +def test_split_logical_series_path_null_marker_only_whole_component(): + from tsfile.dataset.metadata import split_logical_series_path + + # \N is a null tag only as a complete component. + assert split_logical_series_path("a.\\N.b.f") == ["a", None, "b", "f"] + assert split_logical_series_path("a.\\N.\\N.f") == ["a", None, None, "f"] + # A real value "\N" (doubled backslash) stays a string, never null. + assert split_logical_series_path("a.\\\\N.b.f") == ["a", "\\N", "b", "f"] + + # \N mixed with other characters is invalid input and fails fast, instead of + # being silently parsed as a null tag (which could resolve the wrong device). + for bad in ( + "tbl.a\\N.b.f", # characters before the marker + "tbl.\\Nfoo.x.f", # characters after the marker + "a.\\N\\N.f", # two markers in one component + "a.\\N\\.b.f", # an escape after the marker + ): + with pytest.raises(ValueError, match="Invalid series path"): + split_logical_series_path(bad) + + def test_dataset_null_tag_positions_and_string_null_are_distinct(tmp_path): path = tmp_path / "null_positions.tsfile" schema = TableSchema( diff --git a/python/tsfile/dataset/metadata.py b/python/tsfile/dataset/metadata.py index bf3ec9c2d..c6d7e5887 100644 --- a/python/tsfile/dataset/metadata.py +++ b/python/tsfile/dataset/metadata.py @@ -220,10 +220,12 @@ def _normalize_tag_values(tag_values: Iterable[Any]) -> Tuple[Any, ...]: def split_logical_series_path(series_path: str) -> List[Any]: """Split a path into components, decoding escapes in a single pass. - ``\\.`` -> ``.``, ``\\\\`` -> ``\\``, and the null marker ``\\N`` -> ``None``. - Null is detected in the escape branch: a lone backslash only ever precedes - ``N`` for the null marker, since a real value's backslash is always doubled - and ``N`` itself is never escaped. + ``\\.`` -> ``.``, ``\\\\`` -> ``\\``, and a component that is *exactly* the null + marker ``\\N`` -> ``None``. The marker is only valid as a whole component: a + real value's backslash is always doubled, so a lone ``\\N`` never occurs in a + real value. ``\\N`` combined with any other characters (e.g. ``a\\N`` or + ``\\Nfoo``) is therefore invalid input and raises, rather than being silently + parsed as a null tag (which could otherwise resolve the wrong device). """ parts: List[Any] = [] current: List[str] = [] @@ -232,8 +234,12 @@ def split_logical_series_path(series_path: str) -> List[Any]: for char in series_path: if escaping: - if char == _NULL_MARKER: # \N -> the whole component is a null tag + if char == _NULL_MARKER: # \N is a null tag only as a whole component + if is_null or current: + raise ValueError(f"Invalid series path: {series_path}") is_null = True + elif is_null: # nothing may follow the null marker in a component + raise ValueError(f"Invalid series path: {series_path}") else: # \\ -> \, \. -> ., any other \x -> x current.append(char) escaping = False @@ -243,6 +249,8 @@ def split_logical_series_path(series_path: str) -> List[Any]: parts.append(None if is_null else "".join(current)) current = [] is_null = False + elif is_null: # nothing may follow the null marker in a component + raise ValueError(f"Invalid series path: {series_path}") else: current.append(char)