@@ -2,6 +2,7 @@ import { getDatabases, getDefaultCompression, resetDatabases } from '../resource
22import { open } from 'lmdb' ;
33import { join } from 'path' ;
44import { move , remove } from 'fs-extra' ;
5+ import { existsSync , mkdirSync } from 'node:fs' ;
56import { get } from '../utility/environment/environmentManager.js' ;
67import OpenEnvironmentObject from '../utility/lmdb/OpenEnvironmentObject.js' ;
78import { OpenDBIObject } from '../utility/lmdb/OpenDBIObject.js' ;
@@ -11,6 +12,8 @@ import { AUDIT_STORE_OPTIONS } from '../resources/auditStore.ts';
1112import { describeSchema } from '../dataLayer/schemaDescribe.js' ;
1213import { updateConfigValue } from '../config/configUtils.js' ;
1314import * as hdbLogger from '../utility/logging/harper_logger.js' ;
15+ import { RocksDatabase , type RocksDatabaseOptions } from '@harperfast/rocksdb-js' ;
16+ import { RocksIndexStore } from '../resources/RocksIndexStore.ts' ;
1417
1518export async function compactOnStart ( ) {
1619 hdbLogger . notify ( 'Running compact on start' ) ;
@@ -278,3 +281,208 @@ export async function copyDb(sourceDatabase: string, targetDatabasePath: string)
278281 targetEnv . close ( ) ;
279282 }
280283}
284+
285+ function openRocksDb ( path : string , options : RocksDatabaseOptions & { dupSort ?: boolean } = { } ) {
286+ options . disableWAL ??= false ;
287+ if ( ! existsSync ( path ) ) {
288+ mkdirSync ( path , { recursive : true } ) ;
289+ }
290+ let db ;
291+ if ( options . dupSort ) {
292+ db = RocksDatabase . open ( new RocksIndexStore ( path , options ) ) ;
293+ } else {
294+ db = RocksDatabase . open ( path , options ) ;
295+ db . encoder . name = options . name ;
296+ }
297+ return db ;
298+ }
299+
300+ export async function migrateOnStart ( ) {
301+ hdbLogger . notify ( 'Running migrate on start (LMDB to RocksDB)' ) ;
302+ console . log ( 'Running migrate on start (LMDB to RocksDB)' ) ;
303+
304+ const rootPath = get ( CONFIG_PARAMS . ROOTPATH ) ;
305+ const databases = getDatabases ( ) ;
306+
307+ updateConfigValue ( CONFIG_PARAMS . STORAGE_MIGRATEONSTART , false ) ;
308+
309+ try {
310+ for ( const databaseName in databases ) {
311+ if ( databaseName === 'system' ) continue ;
312+ if ( databaseName . endsWith ( '-copy' ) ) continue ;
313+ let rootStore ;
314+ for ( const tableName in databases [ databaseName ] ) {
315+ const table = databases [ databaseName ] [ tableName ] ;
316+ table . primaryStore . put = noop ;
317+ table . primaryStore . remove = noop ;
318+ for ( const attributeName in table . indices ) {
319+ const index = table . indices [ attributeName ] ;
320+ index . put = noop ;
321+ index . remove = noop ;
322+ }
323+ if ( table . auditStore ) {
324+ table . auditStore . put = noop ;
325+ table . auditStore . remove = noop ;
326+ }
327+ rootStore = table . primaryStore . rootStore ;
328+ }
329+ if ( ! rootStore ) {
330+ console . log ( "Couldn't find any tables in database" , databaseName ) ;
331+ continue ;
332+ }
333+ if ( rootStore instanceof RocksDatabase ) {
334+ console . log ( 'Database' , databaseName , 'is already RocksDB, skipping' ) ;
335+ continue ;
336+ }
337+
338+ const targetPath = join ( rootPath , DATABASES_DIR_NAME , databaseName ) ;
339+ const lmdbPath = rootStore . path ;
340+ const backupDest = join ( rootPath , 'backup' , databaseName + '.mdb' ) ;
341+
342+ console . log ( 'Migrating' , databaseName , 'from LMDB to RocksDB at' , targetPath ) ;
343+
344+ await copyDbToRocks ( rootStore , databaseName , targetPath ) ;
345+
346+ // Back up the original LMDB file
347+ console . log ( 'Backing up LMDB' , databaseName , 'to' , backupDest ) ;
348+ try {
349+ await move ( lmdbPath , backupDest , { overwrite : true } ) ;
350+ } catch ( error ) {
351+ console . log ( 'Error moving database' , lmdbPath , 'to' , backupDest , error ) ;
352+ }
353+ // Remove the lock file
354+ try {
355+ await remove ( lmdbPath + '-lock' ) ;
356+ } catch {
357+ // lock file may not exist
358+ }
359+ }
360+
361+ try {
362+ resetDatabases ( ) ;
363+ } catch ( err ) {
364+ hdbLogger . error ( 'Error resetting databases after migration' , err ) ;
365+ console . error ( 'Error resetting databases after migration' , err ) ;
366+ }
367+ } catch ( err ) {
368+ hdbLogger . error ( 'Error migrating database' , err ) ;
369+ console . error ( 'Error migrating database' , err ) ;
370+ throw err ;
371+ }
372+ }
373+
374+ async function copyDbToRocks ( sourceRootStore , sourceDatabase : string , targetPath : string ) {
375+ console . log ( `Migrating database ${ sourceDatabase } to RocksDB at ${ targetPath } ` ) ;
376+ const sourceDbisDb = sourceRootStore . dbisDb ;
377+
378+ const targetRootStore = openRocksDb ( targetPath , { disableWAL : false } ) ;
379+ const targetDbisDb = openRocksDb ( targetPath , {
380+ disableWAL : false ,
381+ name : INTERNAL_DBIS_NAME ,
382+ } ) ;
383+
384+ let written ;
385+ let outstandingWrites = 0 ;
386+ const transaction = sourceDbisDb . useReadTransaction ( ) ;
387+ try {
388+ for ( const { key, value : attribute } of sourceDbisDb . getRange ( { transaction } ) ) {
389+ const isPrimary = attribute . isPrimaryKey ;
390+ targetDbisDb . put ( key , attribute ) ;
391+ if ( ! ( isPrimary || attribute . indexed ) ) continue ;
392+
393+ // Open source LMDB dbi with default encoding so values are decoded
394+ const dbiInit = new OpenDBIObject ( ! isPrimary , isPrimary ) ;
395+ const sourceDbi = sourceRootStore . openDB ( key , dbiInit ) ;
396+
397+ let targetDbi ;
398+ if ( ! isPrimary ) {
399+ targetDbi = openRocksDb ( targetPath , { dupSort : true , name : key } ) ;
400+ } else {
401+ targetDbi = openRocksDb ( targetPath , { name : key } ) ;
402+ }
403+
404+ console . log ( 'migrating' , key , 'from' , sourceDatabase , 'to RocksDB' ) ;
405+ await copyDbiToRocks ( sourceDbi , targetDbi , isPrimary , transaction ) ;
406+ }
407+
408+ // Note: audit store is not migrated because LMDB and RocksDB use fundamentally different
409+ // audit store formats (LMDB uses a custom binary encoding in a regular DB, RocksDB uses TransactionLog).
410+ // A new audit store will be created automatically when the RocksDB database is opened.
411+
412+ await written ;
413+ console . log ( 'migrated database ' + sourceDatabase + ' to RocksDB' ) ;
414+ } finally {
415+ transaction . done ( ) ;
416+ targetRootStore . close ( ) ;
417+ }
418+
419+ async function copyDbiToRocks ( sourceDbi , targetDbi , isPrimary , transaction ) {
420+ let recordsCopied = 0 ;
421+ let skippedRecord = 0 ;
422+ let retries = 1000000 ;
423+ let start = null ;
424+ while ( retries -- > 0 ) {
425+ try {
426+ if ( isPrimary ) {
427+ for ( const { key, value, version } of sourceDbi . getRange ( { start, transaction, versions : true } ) ) {
428+ try {
429+ start = key ;
430+ if ( value == null ) {
431+ skippedRecord ++ ;
432+ continue ;
433+ }
434+ written = targetDbi . put ( key , value , version ) ;
435+ recordsCopied ++ ;
436+ if ( transaction . openTimer ) transaction . openTimer = 0 ;
437+ if ( outstandingWrites ++ > 5000 ) {
438+ await written ;
439+ console . log ( 'migrated' , recordsCopied , 'entries, skipped' , skippedRecord , 'delete records' ) ;
440+ outstandingWrites = 0 ;
441+ }
442+ } catch ( error ) {
443+ console . error (
444+ 'Error migrating record' ,
445+ typeof key === 'symbol' ? 'symbol' : key ,
446+ 'from' ,
447+ sourceDatabase ,
448+ error
449+ ) ;
450+ }
451+ }
452+ } else {
453+ for ( const { key, value } of sourceDbi . getRange ( { start, transaction } ) ) {
454+ try {
455+ start = key ;
456+ written = targetDbi . put ( key , value ) ;
457+ recordsCopied ++ ;
458+ if ( transaction . openTimer ) transaction . openTimer = 0 ;
459+ if ( outstandingWrites ++ > 5000 ) {
460+ await written ;
461+ console . log ( 'migrated' , recordsCopied , 'index entries' ) ;
462+ outstandingWrites = 0 ;
463+ }
464+ } catch ( error ) {
465+ console . error (
466+ 'Error migrating index record' ,
467+ typeof key === 'symbol' ? 'symbol' : key ,
468+ 'from' ,
469+ sourceDatabase ,
470+ error
471+ ) ;
472+ }
473+ }
474+ }
475+ console . log ( 'finish migrating, copied' , recordsCopied , 'entries, skipped' , skippedRecord , 'delete records' ) ;
476+ return ;
477+ } catch {
478+ if ( typeof start === 'string' ) {
479+ if ( start === 'z' ) {
480+ return console . error ( 'Reached end of dbi' , start , 'for' , sourceDatabase ) ;
481+ }
482+ start = start . slice ( 0 , - 2 ) + 'z' ;
483+ } else if ( typeof start === 'number' ) start ++ ;
484+ else return console . error ( 'Unknown key type' , start , 'for' , sourceDatabase ) ;
485+ }
486+ }
487+ }
488+ }
0 commit comments