Skip to content

Commit 350b5dd

Browse files
ser-vasilichsingaraiona
authored andcommitted
fix: lowest right row index wins
1 parent a6f3e6a commit 350b5dd

File tree

2 files changed

+82
-92
lines changed

2 files changed

+82
-92
lines changed

core/index.c

Lines changed: 52 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -3005,16 +3005,15 @@ obj_p index_group_list(obj_p obj, obj_p filter) {
30053005
#endif // Old chunk-based approach
30063006
}
30073007

3008-
// --- Parallel join: flat chained HT (no per-bucket alloc) ---
3009-
// heads[bucket] -> first row index (or JHT_EMPTY)
3010-
// next[row] -> next row in chain (or JHT_EMPTY)
3011-
// Parallel build uses atomic CAS on heads; next[] is per-row (no contention).
3008+
// --- Open addressing join HT ---
3009+
// table[slot] = row index or NULL_I64. Linear probing.
3010+
// Parallel build with CAS. Deterministic dedup: smallest row index wins.
3011+
// Parallel probe with prefetch.
30123012

30133013
typedef struct __jht_t {
3014-
obj_p heads;
3015-
obj_p next;
3016-
i64_t cap; // number of buckets (power of 2)
3017-
i64_t mask; // cap - 1
3014+
obj_p table;
3015+
i64_t cap;
3016+
i64_t mask;
30183017
} __jht_t;
30193018

30203019
static __jht_t __jht_create(i64_t nrows) {
@@ -3023,71 +3022,67 @@ static __jht_t __jht_create(i64_t nrows) {
30233022
while (cap < nrows * 2) cap <<= 1;
30243023
jht.cap = cap;
30253024
jht.mask = cap - 1;
3026-
jht.heads = I64(cap);
3027-
jht.next = I64(nrows);
3028-
for (i64_t i = 0; i < cap; i++) AS_I64(jht.heads)[i] = NULL_I64;
3025+
jht.table = I64(cap);
3026+
for (i64_t i = 0; i < cap; i++) AS_I64(jht.table)[i] = NULL_I64;
30293027
return jht;
30303028
}
30313029

30323030
static nil_t __jht_destroy(__jht_t *jht) {
3033-
drop_obj(jht->heads);
3034-
drop_obj(jht->next);
3031+
drop_obj(jht->table);
30353032
}
30363033

3037-
typedef struct __join_build_ctx_t {
3034+
typedef struct __jht_build_ctx_t {
30383035
__jht_t *jht;
30393036
__index_list_ctx_t list_ctx;
3040-
} __join_build_ctx_t;
3037+
} __jht_build_ctx_t;
30413038

3042-
static obj_p __join_build_chunk(i64_t len, i64_t offset, void *raw_ctx) {
3043-
__join_build_ctx_t *ctx = (__join_build_ctx_t *)raw_ctx;
3044-
__jht_t *jht = ctx->jht;
3045-
i64_t i, row, cur;
3046-
i64_t *heads = AS_I64(jht->heads);
3047-
i64_t *next = AS_I64(jht->next);
3039+
static obj_p __jht_build_chunk(i64_t len, i64_t offset, void *raw_ctx) {
3040+
__jht_build_ctx_t *ctx = (__jht_build_ctx_t *)raw_ctx;
3041+
i64_t *table = AS_I64(ctx->jht->table);
3042+
i64_t mask = ctx->jht->mask;
3043+
i64_t i, bucket, slot;
30483044

30493045
for (i = offset; i < offset + len; i++) {
3050-
row = i;
3051-
u64_t h = __index_list_hash_get(i, &ctx->list_ctx);
3052-
i64_t bucket = h & jht->mask;
3046+
bucket = __index_list_hash_get(i, &ctx->list_ctx) & mask;
3047+
slot = __atomic_load_n(&table[bucket], __ATOMIC_ACQUIRE);
30533048

30543049
for (;;) {
3055-
cur = __atomic_load_n(&heads[bucket], __ATOMIC_ACQUIRE);
3056-
3057-
// Check if key already exists in chain (dedup — first-wins)
3058-
i64_t walk = cur;
3059-
b8_t found = B8_FALSE;
3060-
while (walk != NULL_I64) {
3061-
if (__index_list_cmp_row(walk, i, &ctx->list_ctx) == 0) {
3062-
found = B8_TRUE;
3050+
if (slot == NULL_I64) {
3051+
if (__atomic_compare_exchange_n(&table[bucket], &slot, i,
3052+
1, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE))
30633053
break;
3064-
}
3065-
walk = next[walk];
30663054
}
3067-
if (found) break;
30683055

3069-
next[row] = cur;
3070-
if (__atomic_compare_exchange_n(&heads[bucket], &cur, row,
3071-
1, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE))
3056+
if (__index_list_cmp_row(slot, i, &ctx->list_ctx) == 0) {
3057+
while (slot > i) {
3058+
if (__atomic_compare_exchange_n(&table[bucket], &slot, i,
3059+
0, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE))
3060+
break;
3061+
}
30723062
break;
3063+
}
3064+
3065+
bucket = (bucket + 1) & mask;
3066+
slot = __atomic_load_n(&table[bucket], __ATOMIC_ACQUIRE);
30733067
}
30743068
}
30753069
return NULL_OBJ;
30763070
}
30773071

30783072
static inline i64_t __jht_probe(__jht_t *jht, i64_t left_row, __index_list_ctx_t *ctx) {
3079-
u64_t h = __index_list_hash_get(left_row, ctx);
3080-
i64_t bucket = h & jht->mask;
3081-
i64_t *heads = AS_I64(jht->heads);
3082-
i64_t *next = AS_I64(jht->next);
3083-
i64_t walk = __atomic_load_n(&heads[bucket], __ATOMIC_RELAXED);
3073+
i64_t *table = AS_I64(jht->table);
3074+
i64_t mask = jht->mask;
3075+
i64_t bucket = __index_list_hash_get(left_row, ctx) & mask;
3076+
i64_t slot;
30843077

3085-
while (walk != NULL_I64) {
3086-
if (__index_list_cmp_row(walk, left_row, ctx) == 0)
3087-
return walk;
3088-
walk = next[walk];
3078+
for (;;) {
3079+
slot = table[bucket];
3080+
if (slot == NULL_I64)
3081+
return NULL_I64;
3082+
if (__index_list_cmp_row(slot, left_row, ctx) == 0)
3083+
return slot;
3084+
bucket = (bucket + 1) & mask;
30893085
}
3090-
return NULL_I64;
30913086
}
30923087

30933088
typedef struct __join_probe_ctx_t {
@@ -3098,16 +3093,17 @@ typedef struct __join_probe_ctx_t {
30983093

30993094
static obj_p __join_probe_chunk(i64_t len, i64_t offset, void *raw_ctx) {
31003095
__join_probe_ctx_t *ctx = (__join_probe_ctx_t *)raw_ctx;
3101-
__jht_t *jht = ctx->jht;
3096+
i64_t *table = AS_I64(ctx->jht->table);
3097+
i64_t mask = ctx->jht->mask;
31023098
i64_t i, end = offset + len;
31033099
i64_t *match_rids = ctx->match_rids;
31043100

31053101
for (i = offset; i < end; i++) {
31063102
if (i + 8 < end) {
3107-
u64_t ph = __index_list_hash_get(i + 8, &ctx->list_ctx) & jht->mask;
3108-
__builtin_prefetch(&AS_I64(jht->heads)[ph], 0, 0);
3103+
i64_t ph = __index_list_hash_get(i + 8, &ctx->list_ctx) & mask;
3104+
__builtin_prefetch(&table[ph], 0, 0);
31093105
}
3110-
match_rids[i] = __jht_probe(jht, i, &ctx->list_ctx);
3106+
match_rids[i] = __jht_probe(ctx->jht, i, &ctx->list_ctx);
31113107
}
31123108
return NULL_OBJ;
31133109
}
@@ -3125,11 +3121,11 @@ obj_p index_left_join_obj(obj_p lcols, obj_p rcols, i64_t len) {
31253121
__index_list_precalc_hash(rcols, AS_I64(rhashes), len, rl, NULL, B8_TRUE);
31263122

31273123
__jht_t jht = __jht_create(rl);
3128-
__join_build_ctx_t build_ctx = {
3124+
__jht_build_ctx_t build_ctx = {
31293125
.jht = &jht,
31303126
.list_ctx = {rcols, rcols, AS_I64(rhashes), NULL}
31313127
};
3132-
pool_map(rl, __join_build_chunk, &build_ctx);
3128+
pool_map(rl, __jht_build_chunk, &build_ctx);
31333129
drop_obj(rhashes);
31343130

31353131
obj_p lhashes = I64(ll);
@@ -3187,11 +3183,11 @@ obj_p index_inner_join_obj(obj_p lcols, obj_p rcols, i64_t len) {
31873183
__index_list_precalc_hash(rcols, AS_I64(rhashes), len, rl, NULL, B8_TRUE);
31883184

31893185
__jht_t jht = __jht_create(rl);
3190-
__join_build_ctx_t build_ctx = {
3186+
__jht_build_ctx_t build_ctx = {
31913187
.jht = &jht,
31923188
.list_ctx = {rcols, rcols, AS_I64(rhashes), NULL}
31933189
};
3194-
pool_map(rl, __join_build_chunk, &build_ctx);
3190+
pool_map(rl, __jht_build_chunk, &build_ctx);
31953191
drop_obj(rhashes);
31963192

31973193
obj_p lhashes = I64(ll);

tests/join_tests.c

Lines changed: 30 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -595,82 +595,58 @@ test_result_t test_join_larger_tables() {
595595

596596
// ==================== JOIN PARALLEL (>16K rows) ====================
597597
test_result_t test_join_parallel() {
598-
// Inner join with 20K rows — exceeds POOL_SPLIT_THRESHOLD, triggers pool_map
598+
// Single-key, 20K rows — exceeds POOL_SPLIT_THRESHOLD
599599
TEST_ASSERT_EQ(
600600
"(set t1 (table [id val1] (list (til 20000) (til 20000))))"
601601
"(set t2 (table [id val2] (list (til 10000) (* 2 (til 10000)))))"
602602
"(count (inner-join [id] t1 t2))",
603603
"10000");
604604

605-
// Left join parallel — all left rows preserved
606605
TEST_ASSERT_EQ(
607606
"(set t1 (table [id val1] (list (til 20000) (til 20000))))"
608607
"(set t2 (table [id val2] (list (til 10000) (* 2 (til 10000)))))"
609608
"(count (left-join [id] t1 t2))",
610609
"20000");
611610

612-
// Verify correctness of parallel inner join values
613611
TEST_ASSERT_EQ(
614612
"(set t1 (table [id val1] (list (til 20000) (til 20000))))"
615613
"(set t2 (table [id val2] (list (til 10000) (* 2 (til 10000)))))"
616614
"(sum (at (inner-join [id] t1 t2) 'val1))",
617615
"49995000");
618616

619-
// Multi-key parallel inner join (2 keys, >16K rows)
620-
TEST_ASSERT_EQ(
621-
"(set t1 (table [k1 k2 val1] (list (take (til 100) 20000) (take (til 200) 20000) (til 20000))))"
622-
"(set t2 (table [k1 k2 val2] (list (take (til 100) 20000) (take (til 200) 20000) (* 3 (til 20000)))))"
623-
"(count (inner-join [k1 k2] t1 t2))",
624-
"20000");
625-
626-
// Multi-key parallel left join
627-
TEST_ASSERT_EQ(
628-
"(set t1 (table [k1 k2 val1] (list (take (til 100) 20000) (take (til 200) 20000) (til 20000))))"
629-
"(set t2 (table [k1 k2 val2] (list (take (til 50) 10000) (take (til 200) 10000) (* 3 (til 10000)))))"
630-
"(count (left-join [k1 k2] t1 t2))",
631-
"20000");
632-
633-
// Verify inner join right values (sum of val2 from right table)
634617
TEST_ASSERT_EQ(
635618
"(set t1 (table [id val1] (list (til 20000) (til 20000))))"
636619
"(set t2 (table [id val2] (list (til 10000) (* 2 (til 10000)))))"
637620
"(sum (at (inner-join [id] t1 t2) 'val2))",
638621
"99990000");
639622

640-
// Verify left join — left values always preserved (sum of val1)
641623
TEST_ASSERT_EQ(
642624
"(set t1 (table [id val1] (list (til 20000) (til 20000))))"
643625
"(set t2 (table [id val2] (list (til 10000) (* 2 (til 10000)))))"
644626
"(sum (at (left-join [id] t1 t2) 'val1))",
645627
"199990000");
646628

647-
// Left join — right-only column sum
648629
TEST_ASSERT_EQ(
649630
"(set t1 (table [id val1] (list (til 20000) (til 20000))))"
650631
"(set t2 (table [id val2] (list (til 10000) (* 2 (til 10000)))))"
651632
"(sum (at (left-join [id] t1 t2) 'val2))",
652633
"99990000");
653634

654-
// Multi-key parallel inner join — verify values
655-
// 2 keys (id, k) both unique per row, so inner join = exact match
656-
// sum(val2) for matched rows = sum(2 * 0..9999) = 2 * 49995000 = 99990000
635+
// Multi-key, 20K rows
657636
TEST_ASSERT_EQ(
658637
"(set t1 (table [id k val1] (list (til 20000) (* 10 (til 20000)) (til 20000))))"
659638
"(set t2 (table [id k val2] (list (til 10000) (* 10 (til 10000)) (* 2 (til 10000)))))"
660-
"(sum (at (inner-join [id k] t1 t2) 'val2))",
661-
"99990000");
639+
"(count (inner-join [id k] t1 t2))",
640+
"10000");
662641

663-
// Multi-key parallel left join — verify left values preserved
664-
// sum(val1) = sum(0..19999) = 199990000
642+
// Multi-key left join, partial match
665643
TEST_ASSERT_EQ(
666-
"(set t1 (table [id k val1] (list (til 20000) (* 10 (til 20000)) (til 20000))))"
667-
"(set t2 (table [id k val2] (list (til 10000) (* 10 (til 10000)) (* 2 (til 10000)))))"
668-
"(sum (at (left-join [id k] t1 t2) 'val1))",
669-
"199990000");
644+
"(set t1 (table [k1 k2 val1] (list (take (til 100) 20000) (take (til 200) 20000) (til 20000))))"
645+
"(set t2 (table [k1 k2 val2] (list (take (til 50) 10000) (take (til 200) 10000) (* 3 (til 10000)))))"
646+
"(count (left-join [k1 k2] t1 t2))",
647+
"20000");
670648

671-
// Left join, right-only column with partial match (multi-key)
672-
// t1 has no val2 column. t2 has val2. Rows (3,a) and (4,a) are unmatched.
673-
// val2 for unmatched rows must be null-filled, not crash.
649+
// Right-only column, null-fill for unmatched
674650
TEST_ASSERT_EQ(
675651
"(set t1 (table [k1 k2] (list [1 2 3 4] [a a a a])))"
676652
"(set t2 (table [k1 k2 val2] (list [1 2] [a a] [100 200])))"
@@ -683,25 +659,43 @@ test_result_t test_join_parallel() {
683659
"(at (left-join [k1 k2] t1 t2) 'val2)",
684660
"[100 200 0Nl 0Nl]");
685661

686-
// Left join, right-only column, >16K rows, half unmatched
662+
// Right-only column, >16K rows, half unmatched
687663
TEST_ASSERT_EQ(
688664
"(set t1 (table [id k] (list (til 20000) (* 10 (til 20000)))))"
689665
"(set t2 (table [id k val2] (list (til 10000) (* 10 (til 10000)) (* 2 (til 10000)))))"
690666
"(count (left-join [id k] t1 t2))",
691667
"20000");
692668

693-
// Multi-key join with duplicate composite keys in right table
669+
// Duplicate composite keys — smallest index wins
694670
TEST_ASSERT_EQ(
695671
"(set t1 (table [k1 k2 val1] (list [1 1 2 2] [a b a b] [10 20 30 40])))"
696672
"(set t2 (table [k1 k2 val2] (list [1 1 2] [a a b] [100 101 200])))"
697673
"(count (inner-join [k1 k2] t1 t2))",
698674
"2");
699675

676+
TEST_ASSERT_EQ(
677+
"(set t1 (table [k1 k2 val1] (list [1 1 2 2] [a b a b] [10 20 30 40])))"
678+
"(set t2 (table [k1 k2 val2] (list [1 1 2] [a a b] [100 101 200])))"
679+
"(at (inner-join [k1 k2] t1 t2) 'val1)",
680+
"[10 40]");
681+
682+
TEST_ASSERT_EQ(
683+
"(set t1 (table [k1 k2 val1] (list [1 1 2 2] [a b a b] [10 20 30 40])))"
684+
"(set t2 (table [k1 k2 val2] (list [1 1 2] [a a b] [100 101 200])))"
685+
"(at (inner-join [k1 k2] t1 t2) 'val2)",
686+
"[100 200]");
687+
700688
TEST_ASSERT_EQ(
701689
"(set t1 (table [k1 k2 val1] (list [1 1 2 2] [a b a b] [10 20 30 40])))"
702690
"(set t2 (table [k1 k2 val2] (list [1 1 2] [a a b] [100 101 200])))"
703691
"(count (left-join [k1 k2] t1 t2))",
704692
"4");
705693

694+
TEST_ASSERT_EQ(
695+
"(set t1 (table [k1 k2 val1] (list [1 1 2 2] [a b a b] [10 20 30 40])))"
696+
"(set t2 (table [k1 k2 val2] (list [1 1 2] [a a b] [100 101 200])))"
697+
"(at (left-join [k1 k2] t1 t2) 'val1)",
698+
"[10 20 30 40]");
699+
706700
PASS();
707701
}

0 commit comments

Comments
 (0)