Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion resources/RecordEncoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
import * as harperLogger from '../utility/logging/harper_logger.js';
import './blob.ts';
import { blobsWereEncoded, decodeFromDatabase, deleteBlobsInObject, encodeBlobsWithFilePath } from './blob.ts';
import { getThisNodeId } from './nodeIdMapping.ts';
import { recordAction } from './analytics/write.ts';
import { RocksDatabase } from '@harperfast/rocksdb-js';
import { when } from '../utility/when.ts';
Expand Down Expand Up @@ -614,7 +615,7 @@ export function recordUpdater(store, tableId, auditStore) {
store.encoder.structureUpdate = null;
}
const structureVersion = store.encoder.structures.length + (store.encoder.typedStructs?.length ?? 0);
const nodeId = options?.nodeId ?? server.replication?.getThisNodeId(auditStore) ?? 0;
const nodeId = options?.nodeId ?? getThisNodeId(auditStore) ?? 0;
const viaNodeId = options?.viaNodeId ?? nodeId;
if (resolveRecord && existingEntry?.localTime) {
const replacingId = existingEntry?.localTime;
Expand Down
3 changes: 2 additions & 1 deletion resources/RocksTransactionLogStore.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { TransactionLog, RocksDatabase, shutdown, type TransactionEntry } from '@harperfast/rocksdb-js';
import { ExtendedIterable } from '@harperfast/extended-iterable';
import { getIdOfRemoteNode } from './nodeIdMapping.ts';
import { Decoder, readAuditEntry, ENTRY_DATAVIEW, AuditRecord, createAuditEntry } from './auditStore.ts';
import { isMainThread } from 'node:worker_threads';
import { EventEmitter } from 'node:events';
Expand Down Expand Up @@ -123,7 +124,7 @@ export class RocksTransactionLogStore extends EventEmitter {
throw new Error('Not implemented');
}
addLogToMaps(logName: string, log: TransactionLog) {
const nodeId = ((globalThis as any).server?.replication?.getIdOfRemoteNode?.(logName, this) ?? 0) as number;
const nodeId = (getIdOfRemoteNode(logName, this) ?? 0) as number;
if (this.nodeLogs) {
this.nodeLogs![nodeId] ??= log;
}
Expand Down
7 changes: 4 additions & 3 deletions resources/Table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import { CONFIG_PARAMS, OPERATIONS_ENUM, SYSTEM_TABLE_NAMES, SYSTEM_SCHEMA_NAME } from '../utility/hdbTerms.ts';
import { type Database } from 'lmdb';
import { getIndexedValues } from '../utility/lmdb/commonUtility.js';
import { getThisNodeId, exportIdMapping } from './nodeIdMapping.ts';
import lodash from 'lodash';
import { ExtendedIterable, SKIP } from '@harperfast/extended-iterable';
import type {
Expand Down Expand Up @@ -3912,15 +3913,15 @@ export function makeTable(options) {

function precedesExistingVersion(txnTime: number, existingEntry: Entry, nodeId?: number): number {
if (nodeId === undefined) {
nodeId = server.replication?.getThisNodeId(auditStore);
nodeId = getThisNodeId(auditStore);
}

if (txnTime <= existingEntry?.version) {
if (existingEntry?.version === txnTime && nodeId !== undefined) {
// if we have a timestamp tie, we break the tie by comparing the node name of the
// existing entry to the node name of the update
const nodeNameToId = server.replication?.exportIdMapping(auditStore);
let existingNodeId = existingEntry.nodeId;
const nodeNameToId = exportIdMapping(auditStore);
let existingNodeId = existingEntry.nodeId ?? 0;
if (nodeId === existingNodeId) {
return 0; // early match for a tie
}
Expand Down
123 changes: 123 additions & 0 deletions resources/nodeIdMapping.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/**
* This module is responsible for managing the mapping of node/host names to node ids.
*/
import { logger } from '../utility/logging/logger.ts';
import { getThisNodeName } from '../server/nodeName.ts';
import { pack, unpack } from 'msgpackr';
import type { Database } from 'lmdb';
import { server } from '../server/Server.ts';

const REMOTE_NODE_IDS = Symbol.for('remote-ids');
function getIdMappingRecord(auditStore) {
const idMappingRecordBuffer = auditStore.getBinary(REMOTE_NODE_IDS);
let idMappingRecord = idMappingRecordBuffer ? unpack(idMappingRecordBuffer) : null;
if (!idMappingRecord) {
idMappingRecord = { remoteNameToId: {} };
}
// this is the default mapping for the local node (id of 0 is used for local)
const node_name = getThisNodeName();
idMappingRecord.nodeName = getThisNodeName();
const nameToId = idMappingRecord.remoteNameToId;
if (nameToId[node_name] !== 0) {
// if we don't have the local node id, we want to assign it and take over that id, but if there was a previous host name
// there, we need to reassign it and update the record and we want to assign a starting sequence id for it
let lastId = 0;
let previousLocalHostName: string;
for (const name in nameToId) {
const id = nameToId[name];
if (id === 0) {
previousLocalHostName = name;
} else if (id > lastId) {
lastId = id;
}
}
if (previousLocalHostName) {
// we need to reassign the local node id to the previous host name
lastId++;
nameToId[previousLocalHostName] = lastId;
// we need to update the sequence id for the previous host name, and have it start from our last sequence id
const seqKey = [Symbol.for('seq'), lastId];
auditStore.rootStore.dbisDb.transactionSync(() => {
if (!auditStore.rootStore.dbisDb.get(seqKey))
auditStore.rootStore.dbisDb.putSync(seqKey, {
seqId: lastTimeInAuditStore(auditStore) ?? 1,
nodes: [],
});
});
}
// now we can take over the local node id
nameToId[node_name] = 0;
auditStore.putSync(REMOTE_NODE_IDS, pack(idMappingRecord));
}
return idMappingRecord;
}
export function exportIdMapping(auditStore) {
return getIdMappingRecord(auditStore).remoteNameToId;
}

/**
* Take the remote node's long id to short id mapping and create a map from the remote node's short id to the local node short id.
*/
export function remoteToLocalNodeId(remoteMapping: any, auditStore: any) {
const idMappingRecord = getIdMappingRecord(auditStore);
const nameToId = idMappingRecord.remoteNameToId;
const remoteToLocalId = new Map();
let hasChanges = false;
for (const remoteNodeName in remoteMapping) {
const remoteId = remoteMapping[remoteNodeName];
let localId = nameToId[remoteNodeName];
if (localId == undefined) {
let lastId = 0;
for (const name in nameToId) {
const id = nameToId[name];
if (id > lastId) {
lastId = id;
}
}
localId = lastId + 1;
nameToId[remoteNodeName] = localId;
hasChanges = true;
}
remoteToLocalId.set(remoteId, localId);
}
if (hasChanges) {
auditStore.putSync(REMOTE_NODE_IDS, pack(idMappingRecord));
}
return remoteToLocalId;
}

export function getIdOfRemoteNode(remoteNodeName, auditStore) {
const idMappingRecord = getIdMappingRecord(auditStore);
const nameToId = idMappingRecord.remoteNameToId;
let id = nameToId[remoteNodeName];
if (id == undefined) {
let lastId = 0;
for (const name in nameToId) {
const id = nameToId[name];
if (id > lastId) {
lastId = id;
}
}
id = lastId + 1;
nameToId[remoteNodeName] = id;
auditStore.putSync(REMOTE_NODE_IDS, pack(idMappingRecord));
}
logger.trace?.('The remote node name map', remoteNodeName, nameToId, id);
return id;
}

/**
* Get the last time that an audit record was added to the audit store
* @param auditStore
*/
export function lastTimeInAuditStore(auditStore: Database) {
for (const timestamp of auditStore.getKeys({
limit: 1,
reverse: true,
})) {
return timestamp;
}
}
export function getThisNodeId(auditStore: any) {
return exportIdMapping(auditStore)?.[server.hostname];
}
9 changes: 0 additions & 9 deletions server/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ export interface Server {
hostname: string;
resources: Resources;
replication: {
getThisNodeId(auditStore: any): number;
exportIdMapping(auditStore: any): any;
getIdOfRemoteNode(remoteNodeName: string, auditStore: any): number;
replicateOperation(operation: {
replicated: boolean;
[key: string]: any;
Expand Down Expand Up @@ -81,12 +78,6 @@ export interface ContentTypeHandler {

export const server: Server = {
replication: {
getThisNodeId() {
return 0;
},
exportIdMapping() {
return undefined;
},
replicateOperation(operation) {
return operation.replicated
? Promise.reject(new Error('Replication not implemented.'))
Expand Down
3 changes: 2 additions & 1 deletion utility/install/installer.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ async function install() {

// REPLICATION_HOSTNAME was renamed to NODE_HOSTNAME in v5, but we still support the replication value if provided
if (promptOverride[hdbTerms.INSTALL_PROMPTS.REPLICATION_HOSTNAME]) {
promptOverride[hdbTerms.INSTALL_PROMPTS.NODE_HOSTNAME] = promptOverride[hdbTerms.INSTALL_PROMPTS.REPLICATION_HOSTNAME];
promptOverride[hdbTerms.INSTALL_PROMPTS.NODE_HOSTNAME] =
promptOverride[hdbTerms.INSTALL_PROMPTS.REPLICATION_HOSTNAME];
}

// For backwards compatibility for a time before DEFAULTS_MODE (and host name) assume prod when these args used
Expand Down
Loading