Skip to content

Commit f16f29e

Browse files
ser-vasilichsingaraiona
authored andcommitted
add parallel join
1 parent 692dc4b commit f16f29e

4 files changed

Lines changed: 347 additions & 72 deletions

File tree

core/index.c

Lines changed: 171 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -3005,78 +3005,173 @@ obj_p index_group_list(obj_p obj, obj_p filter) {
30053005
#endif // Old chunk-based approach
30063006
}
30073007

3008+
// --- Parallel join: flat chained HT (no per-bucket alloc) ---
3009+
// heads[bucket] -> first row index (or JHT_EMPTY)
3010+
// next[row] -> next row in chain (or JHT_EMPTY)
3011+
// Parallel build uses atomic CAS on heads; next[] is per-row (no contention).
3012+
3013+
typedef struct __jht_t {
3014+
obj_p heads;
3015+
obj_p next;
3016+
i64_t cap; // number of buckets (power of 2)
3017+
i64_t mask; // cap - 1
3018+
} __jht_t;
3019+
3020+
static __jht_t __jht_create(i64_t nrows) {
3021+
__jht_t jht;
3022+
i64_t cap = 1;
3023+
while (cap < nrows * 2) cap <<= 1;
3024+
jht.cap = cap;
3025+
jht.mask = cap - 1;
3026+
jht.heads = I64(cap);
3027+
jht.next = I64(nrows);
3028+
for (i64_t i = 0; i < cap; i++) AS_I64(jht.heads)[i] = NULL_I64;
3029+
return jht;
3030+
}
3031+
3032+
static nil_t __jht_destroy(__jht_t *jht) {
3033+
drop_obj(jht->heads);
3034+
drop_obj(jht->next);
3035+
}
3036+
3037+
typedef struct __join_build_ctx_t {
3038+
__jht_t *jht;
3039+
__index_list_ctx_t list_ctx;
3040+
} __join_build_ctx_t;
3041+
3042+
static obj_p __join_build_chunk(i64_t len, i64_t offset, void *raw_ctx) {
3043+
__join_build_ctx_t *ctx = (__join_build_ctx_t *)raw_ctx;
3044+
__jht_t *jht = ctx->jht;
3045+
i64_t i, row, cur;
3046+
i64_t *heads = AS_I64(jht->heads);
3047+
i64_t *next = AS_I64(jht->next);
3048+
3049+
for (i = offset; i < offset + len; i++) {
3050+
row = i;
3051+
u64_t h = __index_list_hash_get(i, &ctx->list_ctx);
3052+
i64_t bucket = h & jht->mask;
3053+
3054+
for (;;) {
3055+
cur = __atomic_load_n(&heads[bucket], __ATOMIC_ACQUIRE);
3056+
3057+
// Check if key already exists in chain (dedup — first-wins)
3058+
i64_t walk = cur;
3059+
b8_t found = B8_FALSE;
3060+
while (walk != NULL_I64) {
3061+
if (__index_list_cmp_row(walk, i, &ctx->list_ctx) == 0) {
3062+
found = B8_TRUE;
3063+
break;
3064+
}
3065+
walk = next[walk];
3066+
}
3067+
if (found) break;
3068+
3069+
next[row] = cur;
3070+
if (__atomic_compare_exchange_n(&heads[bucket], &cur, row,
3071+
1, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE))
3072+
break;
3073+
}
3074+
}
3075+
return NULL_OBJ;
3076+
}
3077+
3078+
static inline i64_t __jht_probe(__jht_t *jht, i64_t left_row, __index_list_ctx_t *ctx) {
3079+
u64_t h = __index_list_hash_get(left_row, ctx);
3080+
i64_t bucket = h & jht->mask;
3081+
i64_t *heads = AS_I64(jht->heads);
3082+
i64_t *next = AS_I64(jht->next);
3083+
i64_t walk = __atomic_load_n(&heads[bucket], __ATOMIC_RELAXED);
3084+
3085+
while (walk != NULL_I64) {
3086+
if (__index_list_cmp_row(walk, left_row, ctx) == 0)
3087+
return walk;
3088+
walk = next[walk];
3089+
}
3090+
return NULL_I64;
3091+
}
3092+
3093+
typedef struct __join_probe_ctx_t {
3094+
__jht_t *jht;
3095+
__index_list_ctx_t list_ctx;
3096+
i64_t *match_rids;
3097+
} __join_probe_ctx_t;
3098+
3099+
static obj_p __join_probe_chunk(i64_t len, i64_t offset, void *raw_ctx) {
3100+
__join_probe_ctx_t *ctx = (__join_probe_ctx_t *)raw_ctx;
3101+
__jht_t *jht = ctx->jht;
3102+
i64_t i, end = offset + len;
3103+
i64_t *match_rids = ctx->match_rids;
3104+
3105+
for (i = offset; i < end; i++) {
3106+
if (i + 8 < end) {
3107+
u64_t ph = __index_list_hash_get(i + 8, &ctx->list_ctx) & jht->mask;
3108+
__builtin_prefetch(&jht->heads[ph], 0, 0);
3109+
}
3110+
match_rids[i] = __jht_probe(jht, i, &ctx->list_ctx);
3111+
}
3112+
return NULL_OBJ;
3113+
}
3114+
30083115
obj_p index_left_join_obj(obj_p lcols, obj_p rcols, i64_t len) {
3009-
i64_t i, ll, rl;
3010-
obj_p ht, ids, hashes;
3011-
i64_t idx;
3012-
__index_list_ctx_t ctx;
3116+
i64_t ll, rl;
30133117

3014-
// one column join
30153118
if (len == 1)
30163119
return ray_find(rcols, lcols);
30173120

3018-
// multiple columns join
30193121
ll = ops_count(AS_LIST(lcols)[0]);
30203122
rl = ops_count(AS_LIST(rcols)[0]);
3021-
ht = ht_oa_create(rl, -1);
3022-
hashes = I64(MAXI64(ll, rl));
30233123

3024-
// Right hashes
3025-
__index_list_precalc_hash(rcols, (i64_t *)AS_I64(hashes), len, rl, NULL, B8_TRUE);
3026-
ctx = (__index_list_ctx_t){rcols, rcols, (i64_t *)AS_I64(hashes), NULL};
3027-
for (i = 0; i < rl; i++) {
3028-
idx = ht_oa_tab_next_with(&ht, i, &__index_list_hash_get, &__index_list_cmp_row, &ctx);
3029-
if (AS_I64(AS_LIST(ht)[0])[idx] == NULL_I64)
3030-
AS_I64(AS_LIST(ht)[0])[idx] = i;
3031-
}
3124+
obj_p rhashes = I64(rl);
3125+
__index_list_precalc_hash(rcols, AS_I64(rhashes), len, rl, NULL, B8_TRUE);
30323126

3033-
ids = I64(ll);
3127+
__jht_t jht = __jht_create(rl);
3128+
__join_build_ctx_t build_ctx = {
3129+
.jht = &jht,
3130+
.list_ctx = {rcols, rcols, AS_I64(rhashes), NULL}
3131+
};
3132+
pool_map(rl, __join_build_chunk, &build_ctx);
3133+
drop_obj(rhashes);
30343134

3035-
// Left hashes
3036-
__index_list_precalc_hash(lcols, (i64_t *)AS_I64(hashes), len, ll, NULL, B8_TRUE);
3037-
ctx = (__index_list_ctx_t){rcols, lcols, (i64_t *)AS_I64(hashes), NULL};
3038-
for (i = 0; i < ll; ++i) {
3039-
idx = ht_oa_tab_get_with(ht, i, &__index_list_hash_get, &__index_list_cmp_row, &ctx);
3040-
if (idx != NULL_I64)
3041-
AS_I64(ids)[i] = AS_I64(AS_LIST(ht)[0])[idx];
3042-
else
3043-
AS_I64(ids)[i] = NULL_I64;
3044-
}
3135+
obj_p lhashes = I64(ll);
3136+
__index_list_precalc_hash(lcols, AS_I64(lhashes), len, ll, NULL, B8_TRUE);
30453137

3046-
drop_obj(hashes);
3047-
drop_obj(ht);
3138+
obj_p ids = I64(ll);
3139+
__join_probe_ctx_t probe_ctx = {
3140+
.jht = &jht,
3141+
.list_ctx = {rcols, lcols, AS_I64(lhashes), NULL},
3142+
.match_rids = AS_I64(ids)
3143+
};
3144+
pool_map(ll, __join_probe_chunk, &probe_ctx);
3145+
3146+
drop_obj(lhashes);
3147+
__jht_destroy(&jht);
30483148

30493149
return ids;
30503150
}
30513151

30523152
obj_p index_inner_join_obj(obj_p lcols, obj_p rcols, i64_t len) {
30533153
i64_t i, j, ll, rl;
3054-
obj_p ht, lids, rids, find_res;
3055-
i64_t idx;
3056-
__index_list_ctx_t ctx;
3154+
obj_p lids, rids, find_res;
30573155

30583156
if (len == 1) {
3059-
// For single key: find matching indices from left in right
30603157
find_res = ray_find(rcols, lcols);
30613158
if (IS_ERR(find_res))
30623159
return find_res;
30633160

3064-
// Count matches (non-NULL results)
30653161
ll = find_res->len;
30663162
j = 0;
30673163
for (i = 0; i < ll; i++) {
30683164
if (AS_I64(find_res)[i] != NULL_I64)
30693165
j++;
30703166
}
30713167

3072-
// Build result arrays with only matching rows
30733168
lids = I64(j);
30743169
rids = I64(j);
30753170
j = 0;
30763171
for (i = 0; i < ll; i++) {
30773172
if (AS_I64(find_res)[i] != NULL_I64) {
3078-
AS_I64(lids)[j] = i; // index in left table
3079-
AS_I64(rids)[j] = AS_I64(find_res)[i]; // index in right table
3173+
AS_I64(lids)[j] = i;
3174+
AS_I64(rids)[j] = AS_I64(find_res)[i];
30803175
j++;
30813176
}
30823177
}
@@ -3087,35 +3182,51 @@ obj_p index_inner_join_obj(obj_p lcols, obj_p rcols, i64_t len) {
30873182

30883183
ll = ops_count(AS_LIST(lcols)[0]);
30893184
rl = ops_count(AS_LIST(rcols)[0]);
3090-
ht = ht_oa_create(rl, -1);
3091-
rids = I64(MAXI64(ll, rl));
30923185

3093-
// Right hashes
3094-
__index_list_precalc_hash(rcols, (i64_t *)AS_I64(rids), len, rl, NULL, B8_TRUE);
3095-
ctx = (__index_list_ctx_t){rcols, rcols, (i64_t *)AS_I64(rids), NULL};
3096-
for (i = 0; i < rl; i++) {
3097-
idx = ht_oa_tab_next_with(&ht, i, &__index_list_hash_get, &__index_list_cmp_row, &ctx);
3098-
if (AS_I64(AS_LIST(ht)[0])[idx] == NULL_I64)
3099-
AS_I64(AS_LIST(ht)[0])[idx] = i;
3100-
}
3186+
obj_p rhashes = I64(rl);
3187+
__index_list_precalc_hash(rcols, AS_I64(rhashes), len, rl, NULL, B8_TRUE);
31013188

3102-
lids = I64(ll);
3189+
__jht_t jht = __jht_create(rl);
3190+
__join_build_ctx_t build_ctx = {
3191+
.jht = &jht,
3192+
.list_ctx = {rcols, rcols, AS_I64(rhashes), NULL}
3193+
};
3194+
pool_map(rl, __join_build_chunk, &build_ctx);
3195+
drop_obj(rhashes);
31033196

3104-
// Left hashes
3105-
__index_list_precalc_hash(lcols, (i64_t *)AS_I64(rids), len, ll, NULL, B8_TRUE);
3106-
ctx = (__index_list_ctx_t){rcols, lcols, (i64_t *)AS_I64(rids), NULL};
3107-
for (i = 0, j = 0; i < ll; i++) {
3108-
idx = ht_oa_tab_get_with(ht, i, &__index_list_hash_get, &__index_list_cmp_row, &ctx);
3109-
if (idx != NULL_I64) {
3110-
AS_I64(rids)[j] = AS_I64(AS_LIST(ht)[0])[idx];
3111-
AS_I64(lids)[j++] = i;
3112-
}
3197+
obj_p lhashes = I64(ll);
3198+
__index_list_precalc_hash(lcols, AS_I64(lhashes), len, ll, NULL, B8_TRUE);
3199+
3200+
obj_p match = I64(ll);
3201+
__join_probe_ctx_t probe_ctx = {
3202+
.jht = &jht,
3203+
.list_ctx = {rcols, lcols, AS_I64(lhashes), NULL},
3204+
.match_rids = AS_I64(match)
3205+
};
3206+
pool_map(ll, __join_probe_chunk, &probe_ctx);
3207+
3208+
drop_obj(lhashes);
3209+
__jht_destroy(&jht);
3210+
3211+
i64_t *match_rids = AS_I64(match);
3212+
j = 0;
3213+
for (i = 0; i < ll; i++) {
3214+
if (match_rids[i] != NULL_I64)
3215+
j++;
31133216
}
31143217

3115-
drop_obj(ht);
3218+
lids = I64(j);
3219+
rids = I64(j);
3220+
j = 0;
3221+
for (i = 0; i < ll; i++) {
3222+
if (match_rids[i] != NULL_I64) {
3223+
AS_I64(lids)[j] = i;
3224+
AS_I64(rids)[j] = match_rids[i];
3225+
j++;
3226+
}
3227+
}
31163228

3117-
resize_obj(&lids, j);
3118-
resize_obj(&rids, j);
3229+
drop_obj(match);
31193230

31203231
return vn_list(2, lids, rids);
31213232
}

0 commit comments

Comments
 (0)