diff --git a/DESCRIPTION b/DESCRIPTION index 05c308f52..b053e0930 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: CohortDiagnostics Type: Package Title: Diagnostics for OHDSI Cohorts -Version: 3.4.0 +Version: 4.0.0 Date: 2024-22-10 Authors@R: c( person("Jamie", "Gilbert", email = "gilbert@ohdsi.org", role = c("aut", "cre")), @@ -55,6 +55,7 @@ Suggests: yaml Remotes: ohdsi/OhdsiShinyModules + ohdsi/CohortGenerator@db_cohort_checksums License: Apache License VignetteBuilder: knitr URL: https://ohdsi.github.io/CohortDiagnostics, https://github.com/OHDSI/CohortDiagnostics diff --git a/NEWS.md b/NEWS.md index 3587ba526..001b82821 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,11 @@ +CohortDiagnostics 4.0.0 +======================= + +Change: + +1. Created new settings R6 object to allow simplified API and ability to support calling package independently + + CohortDiagnostics 3.4.0 ======================= diff --git a/R/ConceptSets.R b/R/ConceptSets.R index c7e7e6362..4a18682e4 100644 --- a/R/ConceptSets.R +++ b/R/ConceptSets.R @@ -667,7 +667,7 @@ runConceptSetDiagnostics <- function(connection, } cohortDefinition <- - jsonlite::fromJSON(jsonDef) + jsonlite::fromJSON(jsonDef, simplifyDataFrame = FALSE) primaryCodesetIds <- lapply( diff --git a/R/Private.R b/R/Private.R index 080ad39fa..de8f18f98 100644 --- a/R/Private.R +++ b/R/Private.R @@ -14,34 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -createIfNotExist <- - function(type, - name, - recursive = TRUE, - errorMessage = NULL) { - if (is.null(errorMessage) | - !is(errorMessage, "AssertColection")) { - errorMessage <- checkmate::makeAssertCollection() - } - if (!is.null(type)) { - if (length(name) == 0) { - stop(ParallelLogger::logError("Must specify ", name)) - } - if (type %in% c("folder")) { - if (!file.exists(gsub("/$", "", name))) { - dir.create(name, recursive = recursive) - ParallelLogger::logInfo("Created ", type, " at ", name) - } - } - checkmate::assertDirectory( - x = name, - access = "x", - add = errorMessage - ) - } - invisible(errorMessage) - } - swapColumnContents <- function(df, column1 = "targetId", diff --git a/R/RunDiagnostics.R b/R/RunDiagnostics.R index 8f60ed798..8f097d64a 100644 --- a/R/RunDiagnostics.R +++ b/R/RunDiagnostics.R @@ -119,15 +119,7 @@ getDefaultCovariateSettings <- function() { #' #' #' @template CohortSetReference -#' @param exportFolder The folder where the output will be exported to. If this folder -#' does not exist it will be created. -#' @param cohortIds Optionally, provide a subset of cohort IDs to restrict the -#' diagnostics to. -#' @param cohortDefinitionSet Data.frame of cohorts must include columns cohortId, cohortName, json, sql -#' @param cohortTableNames Cohort Table names used by CohortGenerator package -#' @param databaseId A short string for identifying the database (e.g. 'Synpuf'). -#' @param databaseName The full name of the database. If NULL, defaults to value in cdm_source table -#' @param databaseDescription A short description (several sentences) of the database. If NULL, defaults to value in cdm_source table + #' @template cdmVersion #' @param runInclusionStatistics Generate and export statistic on the cohort inclusion rules? #' @param runIncludedSourceConcepts Generate and export the source concepts included in the cohorts? @@ -136,35 +128,7 @@ getDefaultCovariateSettings <- function() { #' @param runVisitContext Generate and export index-date visit context? #' @param runBreakdownIndexEvents Generate and export the breakdown of index events? #' @param runIncidenceRate Generate and export the cohort incidence rates? -#' @param runCohortRelationship Compute cohort relationships. Overlap is now computed with FeaturExtraction, time paramters are derived from temporalCovariateSettings -#' relationship between two or more cohorts. -#' @param runTemporalCohortCharacterization Generate and export the temporal cohort characterization? -#' Only records with values greater than 0.001 are returned. -#' @param temporalCovariateSettings Either an object of type \code{covariateSettings} as created using one of -#' the createTemporalCovariateSettings function in the FeatureExtraction package, or a list -#' of such objects. This can be anythin accepted by FeatureExtraction (including -#' custom covariates). However, it should be noted that certain time windows will be -#' included by default. @seealso[getDefaultCovariateSettings] -#' @param minCellCount The minimum cell count for fields contains person counts or fractions. -#' @param minCharacterizationMean The minimum mean value for characterization output. Values below this will be cut off from output. This -#' will help reduce the file size of the characterization output, but will remove information -#' on covariates that have very low values. The default is 0.001 (i.e. 0.1 percent) -#' @param irWashoutPeriod Number of days washout to include in calculation of incidence rates - default is 0 -#' @param incremental Create only cohort diagnostics that haven't been created before? -#' @param incrementalFolder If \code{incremental = TRUE}, specify a folder where records are kept -#' of which cohort diagnostics has been executed. -#' @param runFeatureExtractionOnSample Logical. If TRUE, the function will operate on a sample of the data. -#' Default is FALSE, meaning the function will operate on the full data set. -#' -#' @param sampleN Integer. The number of records to include in the sample if runFeatureExtractionOnSample is TRUE. -#' Default is 1000. Ignored if runFeatureExtractionOnSample is FALSE. -#' -#' @param seed Integer. The seed for the random number generator used to create the sample. -#' This ensures that the same sample can be drawn again in future runs. Default is 64374. -#' -#' @param seedArgs List. Additional arguments to pass to the sampling function. -#' This can be used to control aspects of the sampling process beyond the seed and sample size. -#' +#' @inheritParams createCohortDiagnosticsSettings #' @examples #' \dontrun{ #' # Load cohorts (assumes that they have already been instantiated) @@ -210,21 +174,7 @@ getDefaultCovariateSettings <- function() { #' @importFrom CohortGenerator getCohortTableNames #' @importFrom tidyr any_of #' @export -executeDiagnostics <- function(cohortDefinitionSet, - exportFolder, - databaseId, - cohortDatabaseSchema, - databaseName = NULL, - databaseDescription = NULL, - connectionDetails = NULL, - connection = NULL, - cdmDatabaseSchema, - tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"), - cohortTable = "cohort", - cohortTableNames = CohortGenerator::getCohortTableNames(cohortTable = cohortTable), - vocabularyDatabaseSchema = cdmDatabaseSchema, - cohortIds = NULL, - cdmVersion = 5, +executeDiagnostics <- function(..., runInclusionStatistics = TRUE, runIncludedSourceConcepts = TRUE, runOrphanConcepts = TRUE, @@ -233,89 +183,12 @@ executeDiagnostics <- function(cohortDefinitionSet, runBreakdownIndexEvents = TRUE, runIncidenceRate = TRUE, runCohortRelationship = TRUE, - runTemporalCohortCharacterization = TRUE, - temporalCovariateSettings = getDefaultCovariateSettings(), - minCellCount = 5, - minCharacterizationMean = 0.01, - irWashoutPeriod = 0, - incremental = FALSE, - incrementalFolder = file.path(exportFolder, "incremental"), - runFeatureExtractionOnSample = FALSE, - sampleN = 1000, - seed = 64374, - seedArgs = NULL) { - # collect arguments that were passed to cohort diagnostics at initiation - callingArgsJson <- - list( - runInclusionStatistics = runInclusionStatistics, - runIncludedSourceConcepts = runIncludedSourceConcepts, - runOrphanConcepts = runOrphanConcepts, - runTimeSeries = runTimeSeries, - runVisitContext = runVisitContext, - runBreakdownIndexEvents = runBreakdownIndexEvents, - runIncidenceRate = runIncidenceRate, - runTemporalCohortCharacterization = runTemporalCohortCharacterization, - minCellCount = minCellCount, - minCharacterizationMean = minCharacterizationMean, - incremental = incremental, - temporalCovariateSettings = temporalCovariateSettings - ) %>% - ParallelLogger::convertSettingsToJson() - - exportFolder <- normalizePath(exportFolder, mustWork = FALSE) - incrementalFolder <- normalizePath(incrementalFolder, mustWork = FALSE) - executionTimePath <- file.path(exportFolder, "taskExecutionTimes.csv") - ParallelLogger::addDefaultFileLogger(file.path(exportFolder, "log.txt"), name = "CD_LOGGER") - ParallelLogger::addDefaultErrorReportLogger(file.path(exportFolder, "errorReportR.txt"), name = "CD_ERROR_LOGGER") - on.exit( - { - ParallelLogger::unregisterLogger("CD_LOGGER", silent = TRUE) - ParallelLogger::unregisterLogger("CD_ERROR_LOGGER", silent = TRUE) - }, - add = TRUE - ) + runTemporalCohortCharacterization = TRUE) { + cdSettings <- createCohortDiagnosticsSettings(...) start <- Sys.time() ParallelLogger::logInfo("Run Cohort Diagnostics started at ", start) - - databaseId <- as.character(databaseId) - - if (any(is.null(databaseName), is.na(databaseName))) { - ParallelLogger::logTrace(" - Databasename was not provided. Using CDM source table") - } - if (any(is.null(databaseDescription), is.na(databaseDescription))) { - ParallelLogger::logTrace(" - Databasedescription was not provided. Using CDM source table") - } - errorMessage <- checkmate::makeAssertCollection() - checkmate::assertList(cohortTableNames, null.ok = FALSE, types = "character", add = errorMessage, names = "named") - checkmate::assertNames(names(cohortTableNames), - must.include = c( - "cohortTable", - "cohortInclusionTable", - "cohortInclusionResultTable", - "cohortInclusionStatsTable", - "cohortSummaryStatsTable", - "cohortCensorStatsTable" - ), - add = errorMessage - ) - checkmate::assertDataFrame(cohortDefinitionSet, add = errorMessage) - checkmate::assertNames(names(cohortDefinitionSet), - must.include = c( - "json", - "cohortId", - "cohortName", - "sql" - ), - add = errorMessage - ) - - if (!"isSubset" %in% colnames(cohortDefinitionSet)) { - cohortDefinitionSet$isSubset <- FALSE - } - - cohortTable <- cohortTableNames$cohortTable checkmate::assertLogical(runInclusionStatistics, add = errorMessage) checkmate::assertLogical(runIncludedSourceConcepts, add = errorMessage) checkmate::assertLogical(runOrphanConcepts, add = errorMessage) @@ -323,222 +196,23 @@ executeDiagnostics <- function(cohortDefinitionSet, checkmate::assertLogical(runBreakdownIndexEvents, add = errorMessage) checkmate::assertLogical(runIncidenceRate, add = errorMessage) checkmate::assertLogical(runTemporalCohortCharacterization, add = errorMessage) - checkmate::assertInt( - x = cdmVersion, - na.ok = FALSE, - lower = 5, - upper = 5, - null.ok = FALSE, - add = errorMessage - ) - minCellCount <- utils::type.convert(minCellCount, as.is = TRUE) - checkmate::assertInteger(x = minCellCount, len = 1, lower = 0, add = errorMessage) - minCharacterizationMean <- utils::type.convert(minCharacterizationMean, as.is = TRUE) - checkmate::assertNumeric(x = minCharacterizationMean, lower = 0, add = errorMessage) - checkmate::assertLogical(incremental, add = errorMessage) - - if (any( - runInclusionStatistics, - runIncludedSourceConcepts, - runOrphanConcepts, - runBreakdownIndexEvents, - runIncidenceRate - )) { - checkmate::assertCharacter( - x = cdmDatabaseSchema, - min.len = 1, - add = errorMessage - ) - checkmate::assertCharacter( - x = vocabularyDatabaseSchema, - min.len = 1, - add = errorMessage - ) - checkmate::assertCharacter( - x = cohortDatabaseSchema, - min.len = 1, - add = errorMessage - ) - checkmate::assertCharacter( - x = cohortTable, - min.len = 1, - add = errorMessage - ) - checkmate::assertCharacter( - x = databaseId, - min.len = 1, - add = errorMessage - ) - } checkmate::reportAssertions(collection = errorMessage) - errorMessage <- - createIfNotExist( - type = "folder", - name = exportFolder, - errorMessage = errorMessage - ) - - if (incremental) { - errorMessage <- - createIfNotExist( - type = "folder", - name = incrementalFolder, - errorMessage = errorMessage - ) - } - - if (is(temporalCovariateSettings, "covariateSettings")) { - temporalCovariateSettings <- list(temporalCovariateSettings) - } - # All temporal covariate settings objects must be covariateSettings - checkmate::assert_true(all(lapply(temporalCovariateSettings, class) == c("covariateSettings")), add = errorMessage) - - if (runTemporalCohortCharacterization) { - requiredCharacterisationSettings <- c( - "DemographicsGender", "DemographicsAgeGroup", "DemographicsRace", - "DemographicsEthnicity", "DemographicsIndexYear", "DemographicsIndexMonth", - "ConditionEraGroupOverlap", "DrugEraGroupOverlap", "CharlsonIndex", - "Chads2", "Chads2Vasc" - ) - presentSettings <- temporalCovariateSettings[[1]][requiredCharacterisationSettings] - if (!all(unlist(presentSettings))) { - warning( - "For cohort charcterization to display standardized results the following covariates must be present in your temporalCovariateSettings: \n\n", - paste(requiredCharacterisationSettings, collapse = ", ") - ) - } - - requiredTimeDistributionSettings <- c( - "DemographicsPriorObservationTime", - "DemographicsPostObservationTime", - "DemographicsTimeInCohort" - ) - - presentSettings <- temporalCovariateSettings[[1]][requiredTimeDistributionSettings] - if (!all(unlist(presentSettings))) { - warning( - "For time distributions diagnostics to display standardized results the following covariates must be present in your temporalCovariateSettings: \n\n", - paste(requiredTimeDistributionSettings, collapse = ", ") - ) - } - - # forcefully set ConditionEraGroupStart and drugEraGroupStart to NULL - # because of known bug in FeatureExtraction. https://github.com/OHDSI/FeatureExtraction/issues/144 - temporalCovariateSettings[[1]]$ConditionEraGroupStart <- NULL - temporalCovariateSettings[[1]]$DrugEraGroupStart <- NULL - - checkmate::assert_integerish( - x = temporalCovariateSettings[[1]]$temporalStartDays, - any.missing = FALSE, - min.len = 1, - add = errorMessage - ) - checkmate::assert_integerish( - x = temporalCovariateSettings[[1]]$temporalEndDays, - any.missing = FALSE, - min.len = 1, - add = errorMessage - ) - checkmate::reportAssertions(collection = errorMessage) - - # Adding required temporal windows required in results viewer - requiredTemporalPairs <- - list( - c(-365, 0), - c(-30, 0), - c(-365, -31), - c(-30, -1), - c(0, 0), - c(1, 30), - c(31, 365), - c(-9999, 9999) - ) - for (p1 in requiredTemporalPairs) { - found <- FALSE - for (i in 1:length(temporalCovariateSettings[[1]]$temporalStartDays)) { - p2 <- c( - temporalCovariateSettings[[1]]$temporalStartDays[i], - temporalCovariateSettings[[1]]$temporalEndDays[i] - ) - - if (p2[1] == p1[1] & p2[2] == p1[2]) { - found <- TRUE - break - } - } - - if (!found) { - temporalCovariateSettings[[1]]$temporalStartDays <- - c(temporalCovariateSettings[[1]]$temporalStartDays, p1[1]) - temporalCovariateSettings[[1]]$temporalEndDays <- - c(temporalCovariateSettings[[1]]$temporalEndDays, p1[2]) - } - } - } - - checkmate::reportAssertions(collection = errorMessage) - if (!is.null(cohortIds)) { - cohortDefinitionSet <- cohortDefinitionSet %>% dplyr::filter(.data$cohortId %in% cohortIds) - } - - if (nrow(cohortDefinitionSet) == 0) { - stop("No cohorts specified") - } - cohortTableColumnNamesObserved <- colnames(cohortDefinitionSet) %>% - sort() - cohortTableColumnNamesExpected <- - getResultsDataModelSpecifications() %>% - dplyr::filter(.data$tableName == "cohort") %>% - dplyr::pull(.data$columnName) %>% - SqlRender::snakeCaseToCamelCase() %>% - sort() - cohortTableColumnNamesRequired <- - getResultsDataModelSpecifications() %>% - dplyr::filter(.data$tableName == "cohort") %>% - dplyr::filter(.data$isRequired == "Yes") %>% - dplyr::pull(.data$columnName) %>% - SqlRender::snakeCaseToCamelCase() %>% - sort() - - expectedButNotObsevered <- - setdiff(x = cohortTableColumnNamesExpected, y = cohortTableColumnNamesObserved) - if (length(expectedButNotObsevered) > 0) { - requiredButNotObsevered <- - setdiff(x = cohortTableColumnNamesRequired, y = cohortTableColumnNamesObserved) - } - obseveredButNotExpected <- - setdiff(x = cohortTableColumnNamesObserved, y = cohortTableColumnNamesExpected) - - if (length(requiredButNotObsevered) > 0) { - stop(paste( - "The following required fields not found in cohort table:", - paste0(requiredButNotObsevered, collapse = ", ") - )) - } - - if (length(obseveredButNotExpected) > 0) { - ParallelLogger::logInfo( - paste0( - "The following fields found in the cohortDefinitionSet will be exported in JSON format as part of metadata field of cohort table:\n ", - paste0(obseveredButNotExpected, collapse = ",\n ") - ) - ) - } + cdSettings$temporalCovariateSettings <- cdSettings$checkDefaultTemporalCovariateSettings() cohortDefinitionSet <- makeDataExportable( - x = cohortDefinitionSet, + x = cdSettings$cohortDefinitionSet, tableName = "cohort", - minCellCount = minCellCount, + minCellCount = cdSettings$minCellCount, databaseId = NULL ) writeToCsv( - data = cohortDefinitionSet, - fileName = file.path(exportFolder, "cohort.csv") + data = cdSettings$cohortDefinitionSet, + fileName = file.path(cdSettings$exportFolder, "cohort.csv") ) - subsets <- CohortGenerator::getSubsetDefinitions(cohortDefinitionSet) + subsets <- CohortGenerator::getSubsetDefinitions(cdSettings$cohortDefinitionSet) if (length(subsets)) { dfs <- lapply(subsets, function(x) { data.frame(subsetDefinitionId = x$definitionId, json = as.character(x$toJSON())) @@ -550,23 +224,14 @@ executeDiagnostics <- function(cohortDefinitionSet, writeToCsv( data = subsetDefinitions, - fileName = file.path(exportFolder, "subset_definition.csv") + fileName = file.path(cdSettings$exportFolder, "subset_definition.csv") ) } - # Set up connection to server ---------------------------------------------------- - if (is.null(connection)) { - if (!is.null(connectionDetails)) { - connection <- DatabaseConnector::connect(connectionDetails) - on.exit(DatabaseConnector::disconnect(connection)) - } else { - stop("No connection or connectionDetails provided.") - } - } - + connection <- cdSettings$getConnection() ## CDM source information---- timeExecution( - exportFolder, + cdSettings$exportFolder, taskName = "getCdmDataSourceInformation", cohortIds = NULL, parent = "executeDiagnostics", @@ -574,23 +239,23 @@ executeDiagnostics <- function(cohortDefinitionSet, cdmSourceInformation <- getCdmDataSourceInformation( connection = connection, - cdmDatabaseSchema = cdmDatabaseSchema + cdmDatabaseSchema = cdSettings$cdmDatabaseSchema ) ## Use CDM source table as default name/description if (!is.null(cdmSourceInformation)) { - if (any(is.null(databaseName), is.na(databaseName))) { + if (any(is.null(cdSettings$databaseName), is.na(cdSettings$databaseName))) { databaseName <- cdmSourceInformation$cdmSourceName } - if (any(is.null(databaseDescription), is.na(databaseDescription))) { + if (any(is.null(cdSettings$databaseDescription), is.na(cdSettings$databaseDescription))) { databaseDescription <- cdmSourceInformation$sourceDescription } } else { - if (any(is.null(databaseName), is.na(databaseName))) { + if (any(is.null(cdSettings$databaseName), is.na(cdSettings$databaseName))) { databaseName <- databaseId } - if (any(is.null(databaseDescription), is.na(databaseDescription))) { + if (any(is.null(cdSettings$databaseDescription), is.na(cdSettings$databaseDescription))) { databaseDescription <- databaseName } } @@ -598,12 +263,12 @@ executeDiagnostics <- function(cohortDefinitionSet, } ) - cohortDefinitionSet$checksum <- computeChecksum(cohortDefinitionSet$sql) + cdSettings$cohortDefinitionSet$checksum <- computeChecksum(cdSettings$cohortDefinitionSet$sql) - if (incremental) { + if (cdSettings$incremental) { ParallelLogger::logDebug("Working in incremental mode.") recordKeepingFile <- - file.path(incrementalFolder, "CreatedDiagnostics.csv") + file.path(cdSettings$incrementalFolder, "CreatedDiagnostics.csv") if (file.exists(path = recordKeepingFile)) { ParallelLogger::logInfo( "Found existing record keeping file in incremental folder - CreatedDiagnostics.csv" @@ -614,12 +279,12 @@ executeDiagnostics <- function(cohortDefinitionSet, ## Observation period---- ParallelLogger::logTrace(" - Collecting date range from Observational period table.") timeExecution( - exportFolder, + cdSettings$exportFolder, taskName = "observationPeriodDateRange", cohortIds = NULL, parent = "executeDiagnostics", expr = { - observationPeriodDateRange <- renderTranslateQuerySql( + observationPeriodDateRange <- DatabaseConnector::renderTranslateQuerySql( connection = connection, sql = "SELECT MIN(observation_period_start_date) observation_period_min_date, MAX(observation_period_end_date) observation_period_max_date, @@ -627,54 +292,54 @@ executeDiagnostics <- function(cohortDefinitionSet, COUNT(person_id) records, SUM(CAST(DATEDIFF(dd, observation_period_start_date, observation_period_end_date) AS BIGINT)) person_days FROM @cdm_database_schema.observation_period;", - cdm_database_schema = cdmDatabaseSchema, + cdm_database_schema = cdSettings$cdmDatabaseSchema, snakeCaseToCamelCase = TRUE, - tempEmulationSchema = tempEmulationSchema + tempEmulationSchema = cdSettings$tempEmulationSchema ) } ) # Database metadata --------------------------------------------- saveDatabaseMetaData( - databaseId = databaseId, - databaseName = databaseName, - databaseDescription = databaseDescription, - exportFolder = exportFolder, - minCellCount = minCellCount, + databaseId = cdSettings$databaseId, + databaseName = cdSettings$databaseName, + databaseDescription = cdSettings$databaseDescription, + exportFolder = cdSettings$exportFolder, + minCellCount = cdSettings$minCellCount, vocabularyVersionCdm = cdmSourceInformation$vocabularyVersion, - vocabularyVersion = vocabularyVersion + vocabularyVersion = cdSettings$vocabularyVersion ) # Create concept table ------------------------------------------ - createConceptTable(connection, tempEmulationSchema) + createConceptTable(connection, cdSettings$tempEmulationSchema) # Counting cohorts ----------------------------------------------------------------------- timeExecution( - exportFolder, + cdSettings$exportFolder, taskName = "getInclusionStats", - cohortIds = cohortIds, + cohortIds = cdSettings$cohortIds, parent = "executeDiagnostics", expr = { cohortCounts <- computeCohortCounts( connection = connection, - cohortDatabaseSchema = cohortDatabaseSchema, - cohortTable = cohortTable, - cohorts = cohortDefinitionSet, - exportFolder = exportFolder, - minCellCount = minCellCount, - databaseId = databaseId + cohortDatabaseSchema = cdSettings$cohortDatabaseSchema, + cohortTable = cdSettings$cohortTable, + cohorts = cdSettings$cohortDefinitionSet, + exportFolder = cdSettings$exportFolder, + minCellCount = cdSettings$minCellCount, + databaseId = cdSettings$databaseId ) } ) if (nrow(cohortCounts) > 0) { - instantiatedCohorts <- cohortCounts %>% - dplyr::filter(.data$cohortEntries > 0) %>% + instantiatedCohorts <- cohortCounts |> + dplyr::filter(.data$cohortEntries > 0) |> dplyr::pull(.data$cohortId) ParallelLogger::logInfo( sprintf( "Found %s of %s (%1.2f%%) submitted cohorts instantiated. ", length(instantiatedCohorts), - nrow(cohortDefinitionSet), - 100 * (length(instantiatedCohorts) / nrow(cohortDefinitionSet)) + nrow(cdSettings$cohortDefinitionSet), + 100 * (length(instantiatedCohorts) / nrow(cdSettings$cohortDefinitionSet)) ), "Beginning cohort diagnostics for instantiated cohorts. " ) @@ -682,27 +347,27 @@ executeDiagnostics <- function(cohortDefinitionSet, stop("All cohorts were either not instantiated or all have 0 records.") } - cohortDefinitionSet <- cohortDefinitionSet %>% + cdSettings$cohortDefinitionSet <- cdSettings$cohortDefinitionSet |> dplyr::filter(.data$cohortId %in% instantiatedCohorts) # Inclusion statistics ----------------------------------------------------------------------- if (runInclusionStatistics) { timeExecution( - exportFolder, + cdSettings$exportFolder, "getInclusionStats", - cohortIds, + cdSettings$cohortIds, parent = "executeDiagnostics", expr = { getInclusionStats( connection = connection, - exportFolder = exportFolder, - databaseId = databaseId, - cohortDefinitionSet = cohortDefinitionSet, - cohortDatabaseSchema = cohortDatabaseSchema, - cohortTableNames = cohortTableNames, - incremental = incremental, - instantiatedCohorts = instantiatedCohorts, - minCellCount = minCellCount, + exportFolder = cdSettings$exportFolder, + databaseId = cdSettings$databaseId, + cohortDefinitionSet = cdSettings$cohortDefinitionSet, + cohortDatabaseSchema = cdSettings$cohortDatabaseSchema, + cohortTableNames = cdSettings$cohortTableNames, + incremental = cdSettings$incremental, + instantiatedCohorts = cdSettings$instantiatedCohorts, + minCellCount = cdSettings$minCellCount, recordKeepingFile = recordKeepingFile ) } @@ -711,10 +376,10 @@ executeDiagnostics <- function(cohortDefinitionSet, # Always export concept sets to csv exportConceptSets( - cohortDefinitionSet = cohortDefinitionSet, - exportFolder = exportFolder, - minCellCount = minCellCount, - databaseId = databaseId + cohortDefinitionSet = cdSettings$cohortDefinitionSet, + exportFolder = cdSettings$exportFolder, + minCellCount = cdSettings$minCellCount, + databaseId = cdSettings$databaseId ) # Concept set diagnostics ----------------------------------------------- @@ -722,30 +387,30 @@ executeDiagnostics <- function(cohortDefinitionSet, runOrphanConcepts || runBreakdownIndexEvents) { timeExecution( - exportFolder, + cdSettings$exportFolder, taskName = "runConceptSetDiagnostics", - cohortIds, + cdSettings$cohortIds, parent = "executeDiagnostics", expr = { runConceptSetDiagnostics( connection = connection, - tempEmulationSchema = tempEmulationSchema, - cdmDatabaseSchema = cdmDatabaseSchema, - vocabularyDatabaseSchema = vocabularyDatabaseSchema, - databaseId = databaseId, - cohorts = cohortDefinitionSet, + tempEmulationSchema = cdSettings$tempEmulationSchema, + cdmDatabaseSchema = cdSettings$cdmDatabaseSchema, + vocabularyDatabaseSchema = cdSettings$vocabularyDatabaseSchema, + databaseId = cdSettings$databaseId, + cohorts = cdSettings$cohortDefinitionSet, runIncludedSourceConcepts = runIncludedSourceConcepts, runOrphanConcepts = runOrphanConcepts, runBreakdownIndexEvents = runBreakdownIndexEvents, - exportFolder = exportFolder, - minCellCount = minCellCount, + exportFolder = cdSettings$exportFolder, + minCellCount = cdSettings$minCellCount, conceptCountsDatabaseSchema = NULL, conceptCountsTable = "#concept_counts", conceptCountsTableIsTemp = TRUE, - cohortDatabaseSchema = cohortDatabaseSchema, - cohortTable = cohortTable, + cohortDatabaseSchema = cdSettings$cohortDatabaseSchema, + cohortTable = cdSettings$cohortTable, useExternalConceptCountsTable = FALSE, - incremental = incremental, + incremental = cdSettings$incremental, conceptIdTable = "#concept_ids", recordKeepingFile = recordKeepingFile ) @@ -756,23 +421,23 @@ executeDiagnostics <- function(cohortDefinitionSet, # Time series ---------------------------------------------------------------------- if (runTimeSeries) { timeExecution( - exportFolder, + cdSettings$exportFolder, "executeTimeSeriesDiagnostics", - cohortIds, + cdSettings$cohortIds, parent = "executeDiagnostics", expr = { executeTimeSeriesDiagnostics( connection = connection, - tempEmulationSchema = tempEmulationSchema, - cdmDatabaseSchema = cdmDatabaseSchema, - cohortDatabaseSchema = cohortDatabaseSchema, - cohortTable = cohortTable, - cohortDefinitionSet = cohortDefinitionSet, - databaseId = databaseId, - exportFolder = exportFolder, - minCellCount = minCellCount, - instantiatedCohorts = instantiatedCohorts, - incremental = incremental, + tempEmulationSchema = cdSettings$tempEmulationSchema, + cdmDatabaseSchema = cdSettings$cdmDatabaseSchema, + cohortDatabaseSchema = ccdSettings$ohortDatabaseSchema, + cohortTable = cdSettings$cohortTable, + cohortDefinitionSet = cdSettings$cohortDefinitionSet, + databaseId = cdSettings$databaseId, + exportFolder = cdSettings$exportFolder, + minCellCount = cdSettings$minCellCount, + instantiatedCohorts = cdSettings$instantiatedCohorts, + incremental = cdSettings$incremental, recordKeepingFile = recordKeepingFile, observationPeriodDateRange = observationPeriodDateRange ) @@ -784,25 +449,25 @@ executeDiagnostics <- function(cohortDefinitionSet, # Visit context ---------------------------------------------------------------------------- if (runVisitContext) { timeExecution( - exportFolder, + cdSettings$exportFolder, "executeVisitContextDiagnostics", - cohortIds, + cdSettings$cohortIds, parent = "executeDiagnostics", expr = { executeVisitContextDiagnostics( connection = connection, - tempEmulationSchema = tempEmulationSchema, - cdmDatabaseSchema = cdmDatabaseSchema, - cohortDatabaseSchema = cohortDatabaseSchema, - cohortTable = cohortTable, - cdmVersion = cdmVersion, - databaseId = databaseId, - exportFolder = exportFolder, - minCellCount = minCellCount, - cohorts = cohortDefinitionSet, + tempEmulationSchema = cdSettings$tempEmulationSchema, + cdmDatabaseSchema = cdSettings$cdmDatabaseSchema, + cohortDatabaseSchema = cdSettings$cohortDatabaseSchema, + cohortTable = cdSettings$cohortTable, + cdmVersion = cdSettings$cdmVersion, + databaseId = cdSettings$databaseId, + exportFolder = cdSettings$exportFolder, + minCellCount = cdSettings$minCellCount, + cohorts = cdSettings$cohortDefinitionSet, instantiatedCohorts = instantiatedCohorts, recordKeepingFile = recordKeepingFile, - incremental = incremental + incremental = cdSettings$incremental ) } ) @@ -811,126 +476,126 @@ executeDiagnostics <- function(cohortDefinitionSet, # Incidence rates -------------------------------------------------------------------------------------- if (runIncidenceRate) { timeExecution( - exportFolder, + cdSettings$exportFolder, "computeIncidenceRates", - cohortIds, + cdSettings$cohortIds, parent = "executeDiagnostics", expr = { computeIncidenceRates( connection = connection, - tempEmulationSchema = tempEmulationSchema, - cdmDatabaseSchema = cdmDatabaseSchema, - cohortDatabaseSchema = cohortDatabaseSchema, - cohortTable = cohortTable, - databaseId = databaseId, - exportFolder = exportFolder, - minCellCount = minCellCount, - cohorts = cohortDefinitionSet, - washoutPeriod = irWashoutPeriod, - instantiatedCohorts = instantiatedCohorts, + tempEmulationSchema = cdSettings$tempEmulationSchema, + cdmDatabaseSchema = cdSettings$cdmDatabaseSchema, + cohortDatabaseSchema = cdSettings$cohortDatabaseSchema, + cohortTable = cdSettings$cohortTable, + databaseId = cdSettings$databaseId, + exportFolder = cdSettings$exportFolder, + minCellCount = cdSettings$minCellCount, + cohorts = cdSettings$cohortDefinitionSet, + washoutPeriod = cdSettings$irWashoutPeriod, + instantiatedCohorts = cdSettings$instantiatedCohorts, recordKeepingFile = recordKeepingFile, - incremental = incremental + incremental = cdSettings$incremental ) } ) } # Cohort relationship --------------------------------------------------------------------------------- - if (runCohortRelationship && nrow(cohortDefinitionSet) > 1) { - covariateCohorts <- cohortDefinitionSet |> dplyr::select(cohortId, cohortName) + if (runCohortRelationship && nrow(cdSettings$cohortDefinitionSet) > 1) { + covariateCohorts <- cdSettings$cohortDefinitionSet |> dplyr::select("cohortId", "cohortName") analysisId <- as.integer(Sys.getenv("OHDSI_CD_CF_ANALYSIS_ID", unset = 173)) cohortFeSettings <- FeatureExtraction::createCohortBasedTemporalCovariateSettings( analysisId = analysisId, # problem - how to assign this uniquely? - covariateCohortDatabaseSchema = cohortDatabaseSchema, - covariateCohortTable = cohortTableNames$cohortTable, + covariateCohortDatabaseSchema = cdSettings$cohortDatabaseSchema, + covariateCohortTable = cdSettings$cohortTableNames$cohortTable, covariateCohorts = covariateCohorts, valueType = "binary", - temporalStartDays = temporalCovariateSettings[[1]]$temporalStartDays, - temporalEndDays = temporalCovariateSettings[[1]]$temporalEndDays + temporalStartDays = cdSettings$temporalCovariateSettings[[1]]$temporalStartDays, + temporalEndDays = cdSettings$temporalCovariateSettings[[1]]$temporalEndDays ) # Add feature set - temporalCovariateSettings[[length(temporalCovariateSettings) + 1]] <- cohortFeSettings + cdSettings$temporalCovariateSettings[[length(cdSettings$temporalCovariateSettings$temporalCovariateSettings) + 1]] <- cohortFeSettings } - feCohortDefinitionSet <- cohortDefinitionSet - feCohortTable <- cohortTable - feCohortCounts <- cohortCounts + feCohortDefinitionSet <- cdSettings$cohortDefinitionSet + feCohortTable <- cdSettings$cohortTable + feCohortCounts <- cdSettings$cohortCounts # Temporal Cohort characterization --------------------------------------------------------------- if (runTemporalCohortCharacterization) { - if (runFeatureExtractionOnSample & !isTRUE(attr(cohortDefinitionSet, "isSampledCohortDefinition"))) { - cohortTableNames$cohortSampleTable <- paste0(cohortTableNames$cohortTable, "_cd_sample") + if (cdSettings$runFeatureExtractionOnSample & !isTRUE(attr(cdSettings$cohortDefinitionSet, "isSampledCohortDefinition"))) { + cdSettings$cohortTableNames$cohortSampleTable <- paste0(cdSettings$cohortTableNames$cohortTable, "_cd_sample") CohortGenerator::createCohortTables( connection = connection, - cohortTableNames = cohortTableNames, - cohortDatabaseSchema = cohortDatabaseSchema, + cohortTableNames = cdSettings$cohortTableNames, + cohortDatabaseSchema = cdSettings$cohortDatabaseSchema, incremental = TRUE ) - feCohortTable <- cohortTableNames$cohortSampleTable + feCohortTable <- cdSettings$cohortTableNames$cohortSampleTable feCohortDefinitionSet <- CohortGenerator::sampleCohortDefinitionSet( connection = connection, - cohortDefinitionSet = cohortDefinitionSet, - tempEmulationSchema = tempEmulationSchema, - cohortDatabaseSchema = cohortDatabaseSchema, - cohortTableNames = cohortTableNames, - n = sampleN, - seed = seed, - seedArgs = seedArgs, + cohortDefinitionSet = cdSettings$cohortDefinitionSet, + tempEmulationSchema = cdSettings$tempEmulationSchema, + cohortDatabaseSchema = cdSettings$cohortDatabaseSchema, + cohortTableNames = cdSettings$cohortTableNames, + n = cdSettings$sampleN, + seed = cdSettings$seed, + seedArgs = cdSettings$seedArgs, identifierExpression = "cohortId", - incremental = incremental, - incrementalFolder = incrementalFolder + incremental = cdSettings$incremental, + incrementalFolder = cdSettings$incrementalFolder ) feCohortCounts <- computeCohortCounts( connection = connection, - cohortDatabaseSchema = cohortDatabaseSchema, - cohortTable = cohortTableNames$cohortSampleTable, + cohortDatabaseSchema = cdSettings$cohortDatabaseSchema, + cohortTable = cdSettings$cohortTableNames$cohortSampleTable, cohorts = feCohortDefinitionSet, - exportFolder = exportFolder, - minCellCount = minCellCount, - databaseId = databaseId, + exportFolder = cdSettings$exportFolder, + minCellCount = cdSettings$minCellCount, + databaseId = cdSettings$databaseId, writeResult = FALSE ) } } else { - temporalCovariateSettings <- temporalCovariateSettings[-1] + cdSettings$temporalCovariateSettings <- cdSettings$temporalCovariateSettings[-1] } - if (length(temporalCovariateSettings)) { + if (length(cdSettings$temporalCovariateSettings)) { timeExecution( - exportFolder, + cdSettings$exportFolder, "executeCohortCharacterization", - cohortIds, + cdSettings$cohortIds, parent = "executeDiagnostics", expr = { executeCohortCharacterization( connection = connection, - databaseId = databaseId, - exportFolder = exportFolder, - cdmDatabaseSchema = cdmDatabaseSchema, - cohortDatabaseSchema = cohortDatabaseSchema, + databaseId = cdSettings$databaseId, + exportFolder = cdSettings$exportFolder, + cdmDatabaseSchema = cdSettings$cdmDatabaseSchema, + cohortDatabaseSchema = cdSettings$cohortDatabaseSchema, cohortTable = feCohortTable, - covariateSettings = temporalCovariateSettings, - tempEmulationSchema = tempEmulationSchema, - cdmVersion = cdmVersion, + covariateSettings = cdSettings$temporalCovariateSettings, + tempEmulationSchema = cdSettings$tempEmulationSchema, + cdmVersion = cdSettings$cdmVersion, cohorts = feCohortDefinitionSet, cohortCounts = feCohortCounts, - minCellCount = minCellCount, + minCellCount = cdSettings$minCellCount, instantiatedCohorts = instantiatedCohorts, - incremental = incremental, + incremental = cdSettings$incremental, recordKeepingFile = recordKeepingFile, task = "runTemporalCohortCharacterization", jobName = "Temporal Cohort characterization", - covariateValueFileName = file.path(exportFolder, "temporal_covariate_value.csv"), - covariateValueContFileName = file.path(exportFolder, "temporal_covariate_value_dist.csv"), - covariateRefFileName = file.path(exportFolder, "temporal_covariate_ref.csv"), - analysisRefFileName = file.path(exportFolder, "temporal_analysis_ref.csv"), - timeRefFileName = file.path(exportFolder, "temporal_time_ref.csv"), - minCharacterizationMean = minCharacterizationMean + covariateValueFileName = file.path(cdSettings$exportFolder, "temporal_covariate_value.csv"), + covariateValueContFileName = file.path(cdSettings$exportFolder, "temporal_covariate_value_dist.csv"), + covariateRefFileName = file.path(cdSettings$exportFolder, "temporal_covariate_ref.csv"), + analysisRefFileName = file.path(cdSettings$exportFolder, "temporal_analysis_ref.csv"), + timeRefFileName = file.path(cdSettings$exportFolder, "temporal_time_ref.csv"), + minCharacterizationMean = cdSettings$minCharacterizationMean ) } ) @@ -938,24 +603,24 @@ executeDiagnostics <- function(cohortDefinitionSet, # Store information from the vocabulary on the concepts used ------------------------- timeExecution( - exportFolder, + cdSettings$exportFolder, "exportConceptInformation", parent = "executeDiagnostics", expr = { exportConceptInformation( connection = connection, - vocabularyDatabaseSchema = vocabularyDatabaseSchema, - tempEmulationSchema = tempEmulationSchema, + vocabularyDatabaseSchema = cdSettings$vocabularyDatabaseSchema, + tempEmulationSchema = cdSettings$tempEmulationSchema, conceptIdTable = "#concept_ids", - incremental = incremental, - exportFolder = exportFolder + incremental = cdSettings$incremental, + exportFolder = cdSettings$exportFolder ) } ) # Delete unique concept ID table --------------------------------- ParallelLogger::logTrace("Deleting concept ID table") timeExecution( - exportFolder, + cdSettings$exportFolder, "DeleteConceptIdTable", parent = "executeDiagnostics", expr = { @@ -963,7 +628,7 @@ executeDiagnostics <- function(cohortDefinitionSet, DatabaseConnector::renderTranslateExecuteSql( connection = connection, sql = sql, - tempEmulationSchema = tempEmulationSchema, + tempEmulationSchema = cdSettings$tempEmulationSchema, table = "#concept_ids", progressBar = FALSE, reportOverallTime = FALSE @@ -983,7 +648,7 @@ executeDiagnostics <- function(cohortDefinitionSet, delta <- Sys.time() - start timeExecution( - exportFolder = exportFolder, + exportFolder = cdSettings$exportFolder, taskName = "executeDiagnostics", parent = NULL, cohortIds = NULL, @@ -1047,7 +712,7 @@ executeDiagnostics <- function(cohortDefinitionSet, # 3 "{}", # 4 - callingArgsJson, + cdSettings$toJson(), # 5 as.character(R.Version()$version.string), # 6 @@ -1090,7 +755,7 @@ executeDiagnostics <- function(cohortDefinitionSet, as.character(observationPeriodDateRange$personDays) # 22 ) metadata <- dplyr::tibble( - databaseId = as.character(!!databaseId), + databaseId = as.character(!!cdSettings$databaseId), startTime = paste0("TM_", as.character(start)), variableField = variableField, valueField = valueField @@ -1098,24 +763,24 @@ executeDiagnostics <- function(cohortDefinitionSet, metadata <- makeDataExportable( x = metadata, tableName = "metadata", - minCellCount = minCellCount, - databaseId = databaseId + minCellCount = cdSettings$minCellCount, + databaseId = cdSettings$databaseId ) writeToCsv( data = metadata, - fileName = file.path(exportFolder, "metadata.csv"), + fileName = file.path(cdSettings$exportFolder, "metadata.csv"), incremental = TRUE, start_time = as.character(start) ) # Add all to zip file ------------------------------------------------------------------------------- timeExecution( - exportFolder, + cdSettings$exportFolder, "writeResultsZip", NULL, parent = "executeDiagnostics", expr = { - writeResultsZip(exportFolder, databaseId) + writeResultsZip(cdSettings$exportFolder, cdSettings$databaseId) } ) diff --git a/R/Settings.R b/R/Settings.R new file mode 100644 index 000000000..305ace0bf --- /dev/null +++ b/R/Settings.R @@ -0,0 +1,504 @@ +#' Cohort Diagnostics Settings +#' @description R6Class Generator for working within the CohortDiagnostics package. +#' Exposes results model and icremental exectution API. +#' Should be used to execute all diagnostics functions within cohort diagnostics +#' +#' @export +#' +CohortDiagnosticsSettings <- R6::R6Class( + classname = "CohortDiagnosticsSettings", + public = list( + errorMessage = NULL, + cohortTable = NULL, + # Initalize + initialize = function(settings, connection = NULL, connectionDetails = NULL) { + #ParallelLogger::addDefaultFileLogger(file.path(exportFolder, "log.txt"), name = "CD_LOGGER") + #ParallelLogger::addDefaultErrorReportLogger(file.path(exportFolder, "errorReportR.txt"), name = "CD_ERROR_LOGGER") + if (is.null(connection) && is.null(connectionDetails)) + stop("Connection or ConnectionDetails for a CDM must be provided") + + self$errorMessage <- checkmate::makeAssertCollection() + + for (name in names(settings)) { + self[[name]] <- settings[[name]] + } + + checkmate::reportAssertions(collection = self$errorMessage) + private$.executionTimePath <- file.path(exportFolder, "taskExecutionTimes.csv") + private$setConnection(connection, connectionDetails) + }, + + # to list + toList = function() { + idList <- list() + for (name in names(CohortDiagnosticsSettings)) { + idList[[name]] <- self[[name]] + } + return(idList) + }, + + # to json + toJson = function() { + self$toList() |> ParallelLogger::convertSettingsToJson() + }, + + # get database connection. + # return an active connection object + getConnection = function() { + if (!is.null(private$.connection) && DatabaseConnector::dbIsValid(private$.connection)) { + return(private$.connection) + } + stop("Connection is no longer valid") + }, + + # Check a set of default temporal covariate settings + # This is required for default behaviour in shiny apps and if all reporting is required + checkDefaultTemporalCovariateSettings = function(temporalCovariateSettings = self$temporalCovariateSettings, errorMessage = self$errorMessage) { + if (is(temporalCovariateSettings, "covariateSettings")) { + temporalCovariateSettings <- list(temporalCovariateSettings) + } + # All temporal covariate settings objects must be covariateSettings + checkmate::assert_true(all(lapply(temporalCovariateSettings, class) == "covariateSettings"), add = errorMessage) + + + requiredCharacterisationSettings <- c( + "DemographicsGender", "DemographicsAgeGroup", "DemographicsRace", + "DemographicsEthnicity", "DemographicsIndexYear", "DemographicsIndexMonth", + "ConditionEraGroupOverlap", "DrugEraGroupOverlap", "CharlsonIndex", + "Chads2", "Chads2Vasc" + ) + presentSettings <- temporalCovariateSettings[[1]][requiredCharacterisationSettings] + if (!all(unlist(presentSettings))) { + warning( + "For cohort charcterization to display standardized results the following covariates must be present in your temporalCovariateSettings: \n\n", + paste(requiredCharacterisationSettings, collapse = ", ") + ) + } + + requiredTimeDistributionSettings <- c( + "DemographicsPriorObservationTime", + "DemographicsPostObservationTime", + "DemographicsTimeInCohort" + ) + + presentSettings <- temporalCovariateSettings[[1]][requiredTimeDistributionSettings] + if (!all(unlist(presentSettings))) { + warning( + "For time distributions diagnostics to display standardized results the following covariates must be present in your temporalCovariateSettings: \n\n", + paste(requiredTimeDistributionSettings, collapse = ", ") + ) + } + + # forcefully set ConditionEraGroupStart and drugEraGroupStart to NULL + # because of known bug in FeatureExtraction. https://github.com/OHDSI/FeatureExtraction/issues/144 + temporalCovariateSettings[[1]]$ConditionEraGroupStart <- NULL + temporalCovariateSettings[[1]]$DrugEraGroupStart <- NULL + + checkmate::assert_integerish( + x = temporalCovariateSettings[[1]]$temporalStartDays, + any.missing = FALSE, + min.len = 1, + add = errorMessage + ) + checkmate::assert_integerish( + x = temporalCovariateSettings[[1]]$temporalEndDays, + any.missing = FALSE, + min.len = 1, + add = errorMessage + ) + checkmate::reportAssertions(collection = errorMessage) + + # Adding required temporal windows required in results viewer + requiredTemporalPairs <- + list( + c(-365, 0), + c(-30, 0), + c(-365, -31), + c(-30, -1), + c(0, 0), + c(1, 30), + c(31, 365), + c(-9999, 9999) + ) + for (p1 in requiredTemporalPairs) { + found <- FALSE + for (i in seq_len(length(temporalCovariateSettings[[1]]$temporalStartDays))) { + p2 <- c( + temporalCovariateSettings[[1]]$temporalStartDays[i], + temporalCovariateSettings[[1]]$temporalEndDays[i] + ) + + if (p2[1] == p1[1] & p2[2] == p1[2]) { + found <- TRUE + break + } + } + + if (!found) { + temporalCovariateSettings[[1]]$temporalStartDays <- + c(temporalCovariateSettings[[1]]$temporalStartDays, p1[1]) + temporalCovariateSettings[[1]]$temporalEndDays <- + c(temporalCovariateSettings[[1]]$temporalEndDays, p1[2]) + } + } + + return(temporalCovariateSettings) + } + + ), + active = list( + # @field cohortDefinitionSet + cohortDefinitionSet = function(cohortDefinitionSet) { + if (missing(cohortDefinitionSet)) return(private$.cohortDefinitionSet) + # Assert data frame + checkmate::assertDataFrame(cohortDefinitionSet, add = self$errorMessage) + checkmate::assertNames(names(cohortDefinitionSet), + must.include = c( + "json", + "cohortId", + "cohortName", + "sql" + ), + add = self$errorMessage) + + if (!"isSubset" %in% colnames(cohortDefinitionSet)) { + cohortDefinitionSet$isSubset <- FALSE + } + private$.cohortDefinitionSet <- cohortDefinitionSet + }, + + # @field exportFolder folder to export results to + exportFolder = function(exportFolder) { + if (missing(exportFolder)) return(private$.exportFolder) + # Assert string + checkmate::assertCharacter(exportFolder, add = self$errorMessage) + dir.create(exportFolder, showWarnings = FALSE, recursive = TRUE) + private$.exportFolder <- exportFolder + }, + + # @field databaseId database identifier to use in export of results and storage of incremental work + databaseId = function(databaseId) { + if (missing(databaseId)) return(private$.databaseId) + # Assert string + databaseId <- as.character(databaseId) + checkmate::assertCharacter(databaseId, min.len = 1, add = self$errorMessage) + private$.databaseId <- databaseId + }, + + # @field databaseId database identifier to use in export of results and storage of incremental work + databaseName = function(databaseName) { + if (missing(databaseName)) return(private$.databaseName) + databaseName <- as.character(databaseName) + if (length(databaseName) == 0) { + ParallelLogger::logTrace(" - Database description was not provided. Using CDM source table") + databaseName <- NULL + } + checkmate::assertCharacter(databaseName, null.ok = TRUE, add = self$errorMessage) + private$.databaseName <- databaseName + }, + + # @field databaseId database identifier to use in export of results and storage of incremental work + databaseDescription = function(databaseDescription) { + if (missing(databaseDescription)) return(private$.databaseDescription) + databaseDescription <- as.character(databaseDescription) + if (length(databaseDescription) == 0) { + ParallelLogger::logTrace(" - Database description was not provided. Using CDM source table") + databaseDescription <- NULL + } + checkmate::assertCharacter(databaseDescription, null.ok = TRUE, add = self$errorMessage) + private$.databaseDescription <- databaseDescription + }, + + # @field connectionDetails DatabaseConnector connectionDetails instance for a a CDM + connectionDetails = function(connectionDetails) { + if (missing(connectionDetails)) { + return(private$.connectionDetails) + } + + checkmate::assertClass(connectionDetails, classes = "ConnectionDetails", null.ok = TRUE) + private$.connectionDetails <- connectionDetails + }, + + # @field cdmDatabaseSchema database schema that cdm is on + cdmDatabaseSchema = function(cdmDatabaseSchema) { + if (missing(cdmDatabaseSchema)) return(private$.cdmDatabaseSchema) + checkmate::assertCharacter(cdmDatabaseSchema) + private$.cdmDatabaseSchema <- cdmDatabaseSchema + }, + + # @field cdmDatabaseSchema database schema that cdm is on + cohortDatabaseSchema = function(cohortDatabaseSchema) { + if (missing(cohortDatabaseSchema)) return(private$.cohortDatabaseSchema) + checkmate::assertCharacter(cohortDatabaseSchema) + private$.cohortDatabaseSchema <- cohortDatabaseSchema + }, + + # @field tempEmulationSchema temp schema on platforms that do not use temp schemas + tempEmulationSchema = function(tempEmulationSchema) { + if (missing(tempEmulationSchema)) return(private$.tempEmulationSchema) + checkmate::assertCharacter(tempEmulationSchema, null.ok = TRUE) + private$.tempEmulationSchema <- tempEmulationSchema + }, + + # @field cohortTableNames list of table names created with CohortGenerator::getTableNames + cohortTableNames = function(cohortTableNames) { + if (missing(cohortTableNames)) return(private$.cohortTableNames) + checkmate::assertList(cohortTableNames, null.ok = FALSE, types = "character", add = self$errorMessage, names = "named") + checkmate::assertNames(names(cohortTableNames), + must.include = c( + "cohortTable", + "cohortInclusionTable", + "cohortInclusionResultTable", + "cohortInclusionStatsTable", + "cohortSummaryStatsTable", + "cohortCensorStatsTable" + ), + add = self$errorMessage + ) + private$.cohortTableNames <- cohortTableNames + self$cohortTable <- cohortTableNames$cohortTable + }, + + # @field vocabularyDatabaseSchema generally the same as the cdm schema but where the vocabulary is set + vocabularyDatabaseSchema = function(vocabularyDatabaseSchema) { + if (missing(vocabularyDatabaseSchema)) return(private$.vocabularyDatabaseSchema) + checkmate::assertCharacter(vocabularyDatabaseSchema) + private$.vocabularyDatabaseSchema <- vocabularyDatabaseSchema + }, + # @field cohortIds, if specified execution will only happen on this subset of cohort ids + cohortIds = function(cohortIds) { + if (missing(cohortIds)) return(private$.cohortIds) + checkmate::assertNumeric(cohortIds, add = self$errorMessage, null.ok = TRUE) + private$.cohortIds <- cohortIds + }, + # @field cdmVersion The version of the OMOP CDM. Default 5. (Note: only 5.x lineage is supported.) + cdmVersion = function(cdmVersion) { + if (missing(cdmVersion)) return(private$.cdmVersion) + checkmate::assertNumeric( + x = cdmVersion, + lower = 5, + upper = 5.4, + null.ok = FALSE, + add = self$errorMessage + ) + private$.cdmVersion <- cdmVersion + }, + # @field temporalCovariateSettings Either an object of type \code{covariateSettings} as created using one of + # the createTemporalCovariateSettings function in the FeatureExtraction package, or a list + # of such objects. This can be anythin accepted by FeatureExtraction (including + # custom covariates). However, it should be noted that certain time windows will be + # included by default. @seealso[getDefaultCovariateSettings] + temporalCovariateSettings = function(temporalCovariateSettings) { + if (missing(temporalCovariateSettings)) return(private$.temporalCovariateSettings) + checkmate::assertList(temporalCovariateSettings, add = self$errorMessage) + private$.temporalCovariateSettings <- temporalCovariateSettings + }, + # @field minCellCount The minimum cell count for fields contains person counts or fractions. + minCellCount = function(minCellCount) { + if (missing(minCellCount)) return(private$.minCellCount) + minCellCount <- utils::type.convert(minCellCount, as.is = TRUE) + checkmate::assertNumeric(minCellCount, , len = 1, add = self$errorMessage) + private$.minCellCount <- minCellCount + }, + # @field minCharacterizationMean The minimum mean value for characterization output. Values below this will be cut off from output. This + # will help reduce the file size of the characterization output, but will remove information + # on covariates that have very low values. The default is 0.001 (i.e. 0.1 percent) + minCharacterizationMean = function(minCharacterizationMean) { + if (missing(minCharacterizationMean)) return(private$.minCharacterizationMean) + val <- utils::type.convert(minCharacterizationMean, as.is = TRUE) + checkmate::assertNumeric(x = val, len = 1, lower = 0, upper = 1, add = self$errorMessage) + private$.minCharacterizationMean <- minCharacterizationMean + }, + + # @field irWashoutPeriod Number of days washout to include in calculation of incidence rates - default is 0 + irWashoutPeriod = function(irWashoutPeriod) { + if (missing(irWashoutPeriod)) return(private$.irWashoutPeriod) + checkmate::assertNumeric(irWashoutPeriod, add = self$errorMessage) + private$.irWashoutPeriod <- irWashoutPeriod + }, + # @field incremental Create only cohort diagnostics that haven't been created before? + incremental = function(incremental) { + if (missing(incremental)) return(private$.incremental) + checkmate::assertLogical(incremental, add = self$errorMessage) + private$.incremental <- incremental + }, + + # @field incrementalFolder If \code{incremental = TRUE}, specify a folder where records are kept + # of which cohort diagnostics has been executed. + incrementalFolder = function(incrementalFolder) { + if (missing(incrementalFolder)) return(private$.incrementalFolder) + checkmate::assertCharacter(incrementalFolder, add = self$errorMessage) + dir.create(incrementalFolder, showWarnings = FALSE, recursive = TRUE) + private$.incrementalFolder <- incrementalFolder + }, + # @field runFeatureExtractionOnSample Logical. If TRUE, the function will operate on a sample of the data. + # Default is FALSE, meaning the function will operate on the full data set. + runFeatureExtractionOnSample = function(runFeatureExtractionOnSample) { + if (missing(runFeatureExtractionOnSample)) return(private$.runFeatureExtractionOnSample) + checkmate::assertLogical(runFeatureExtractionOnSample, add = self$errorMessage) + private$.runFeatureExtractionOnSample <- runFeatureExtractionOnSample + }, + + # @field sampleN Integer. The number of records to include in the sample if runFeatureExtractionOnSample is TRUE. + # Default is 1000. Ignored if runFeatureExtractionOnSample is FALSE. + sampleN = function(sampleN) { + if (missing(sampleN)) return(private$.sampleN) + checkmate::assertNumeric(sampleN, add = self$errorMessage) + private$.sampleN <- sampleN + }, + # @field seed Integer. The seed for the random number generator used to create the sample. + # This ensures that the same sample can be drawn again in future runs. Default is 64374. + seed = function(seed) { + if (missing(seed)) return(private$.seed) + checkmate::assertNumeric(seed, add = self$errorMessage) + private$.seed <- seed + }, + # @field seedArgs List. Additional arguments to pass to the sampling function. + # This can be used to control aspects of the sampling process beyond the seed and sample size. + seedArgs = function(seedArgs) { + if (missing(seedArgs)) return(private$.seedArgs) + checkmate::assertList(seedArgs, null.ok = TRUE, add = self$errorMessage) + private$.seedArgs <- seedArgs + } + ), + private = list( + .cohortDefinitionSet = NULL, + .exportFolder = NULL, + .databaseId = NULL, + .databaseName = NULL, + .databaseDescription = NULL, + .connectionDetails = NULL, + .connection = NULL, + .cdmDatabaseSchema = NULL, + .tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"), + .cohortDatabaseSchema = NULL, + .cohortTable = "cohort", + .cohortTableNames = CohortGenerator::getCohortTableNames(cohortTable = cohortTable), + .vocabularyDatabaseSchema = cdmDatabaseSchema, + .cohortIds = NULL, + .cdmVersion = 5, + .temporalCovariateSettings = NULL, + .minCellCount = 5, + .minCharacterizationMean = 0.01, + .irWashoutPeriod = 0, + .incremental = FALSE, + .incrementalFolder = file.path(exportFolder, "incremental"), + .runFeatureExtractionOnSample = FALSE, + .sampleN = 1000, + .seed = 64374, + .seedArgs = NULL, + .executionTimePath = NULL, + setConnection = function(connection, connectionDetails) { + if (is.null(connection)) + private$.connection <- DatabaseConnector::connect(connectionDetails) + else if (DatabaseConnector::dbIsValid(connection)) + private$.connection <- connection + else + stop("Connection is not valid") + } + ) +) + +#' Create CohortDiagnostics Settings +#' @description +#' +#' Create a global object that contains settings for execution with cohort diagnostics +#' This returns an R6 Class instance that can be used throughout all cohort diagnostics +#' +#' @export +#' +#' @template CdmDatabaseSchema +#' @template VocabularyDatabaseSchema +#' @template CohortDatabaseSchema +#' @template TempEmulationSchema +#' +#' @template CohortTable +#' +#' @template cdmVersion +#' @param exportFolder The folder where the output will be exported to. If this folder +#' does not exist it will be created. +#' @param cohortIds Optionally, provide a subset of cohort IDs to restrict the +#' diagnostics to. +#' @param cohortDefinitionSet Data.frame of cohorts must include columns cohortId, cohortName, json, sql +#' @param cohortTableNames Cohort Table names used by CohortGenerator package +#' @param databaseId A short string for identifying the database (e.g. 'Synpuf'). +#' @param databaseName The full name of the database. If NULL, defaults to value in cdm_source table +#' @param databaseDescription A short description (several sentences) of the database. If NULL, defaults to value in cdm_source table +#' @param temporalCovariateSettings Either an object of type \code{covariateSettings} as created using one of +#' the createTemporalCovariateSettings function in the FeatureExtraction package, or a list +#' of such objects. This can be anythin accepted by FeatureExtraction (including +#' custom covariates). However, it should be noted that certain time windows will be +#' included by default. @seealso[getDefaultCovariateSettings] +#' @param minCellCount The minimum cell count for fields contains person counts or fractions. +#' @param minCharacterizationMean The minimum mean value for characterization output. Values below this will be cut off from output. This +#' will help reduce the file size of the characterization output, but will remove information +#' on covariates that have very low values. The default is 0.001 (i.e. 0.1 percent) +#' @param irWashoutPeriod Number of days washout to include in calculation of incidence rates - default is 0 +#' @param incremental Create only cohort diagnostics that haven't been created before? +#' @param incrementalFolder If \code{incremental = TRUE}, specify a folder where records are kept +#' of which cohort diagnostics has been executed. +#' @param runFeatureExtractionOnSample Logical. If TRUE, the function will operate on a sample of the data. +#' Default is FALSE, meaning the function will operate on the full data set. +#' +#' @param sampleN Integer. The number of records to include in the sample if runFeatureExtractionOnSample is TRUE. +#' Default is 1000. Ignored if runFeatureExtractionOnSample is FALSE. +#' +#' @param seed Integer. The seed for the random number generator used to create the sample. +#' This ensures that the same sample can be drawn again in future runs. Default is 64374. +#' +#' @param seedArgs List. Additional arguments to pass to the sampling function. +#' This can be used to control aspects of the sampling process beyond the seed and sample size. +createCohortDiagnosticsSettings <- function(cohortDefinitionSet, + exportFolder, + databaseId, + cohortDatabaseSchema, + databaseName = NULL, + databaseDescription = NULL, + connectionDetails = NULL, + connection = NULL, + cdmDatabaseSchema, + tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"), + cohortTable = "cohort", + cohortTableNames = CohortGenerator::getCohortTableNames(cohortTable = cohortTable), + vocabularyDatabaseSchema = cdmDatabaseSchema, + cohortIds = NULL, + cdmVersion = 5, + temporalCovariateSettings = getDefaultCovariateSettings(), + minCellCount = 5, + minCharacterizationMean = 0.01, + irWashoutPeriod = 0, + incremental = FALSE, + incrementalFolder = file.path(exportFolder, "incremental"), + runFeatureExtractionOnSample = FALSE, + sampleN = 1000, + seed = 64374, + seedArgs = NULL) { + # Validate and encapsulate parameters in a list + settings <- list( + cohortDefinitionSet = cohortDefinitionSet, + exportFolder = exportFolder, + databaseId = databaseId, + cohortDatabaseSchema = cohortDatabaseSchema, + databaseName = databaseName, + databaseDescription = databaseDescription, + cdmDatabaseSchema = cdmDatabaseSchema, + tempEmulationSchema = tempEmulationSchema, + cohortTableNames = cohortTableNames, + vocabularyDatabaseSchema = vocabularyDatabaseSchema, + cohortIds = cohortIds, + cdmVersion = cdmVersion, + temporalCovariateSettings = temporalCovariateSettings, + minCellCount = minCellCount, + minCharacterizationMean = minCharacterizationMean, + irWashoutPeriod = irWashoutPeriod, + incremental = incremental, + incrementalFolder = incrementalFolder, + runFeatureExtractionOnSample = runFeatureExtractionOnSample, + sampleN = sampleN, + seed = seed, + seedArgs = seedArgs + ) + + # Create an instance of the R6 class + return(CohortDiagnosticsSettings$new(settings, connection = connection, connectionDetails = connectionDetails)) +} \ No newline at end of file diff --git a/tests/testthat/test-0-CohortDiagnosticsSettings.R b/tests/testthat/test-0-CohortDiagnosticsSettings.R new file mode 100644 index 000000000..eadce5e33 --- /dev/null +++ b/tests/testthat/test-0-CohortDiagnosticsSettings.R @@ -0,0 +1,75 @@ +test_that("createCohortDiagnosticsSettings creates an object with valid settings", { + cohortDefinitionSet <- data.frame( + cohortId = 1:2, + cohortName = c("Cohort A", "Cohort B"), + json = c("{}", "{}"), + sql = c("SELECT * FROM cdm.cohort WHERE cohort_id = 1", "SELECT * FROM cdm.cohort WHERE cohort_id = 2") + ) + + cohortTableNames <- list( + cohortTable = "cohort", + cohortInclusionTable = "cohort_inclusion", + cohortInclusionResultTable = "cohort_inclusion_result", + cohortInclusionStatsTable = "cohort_inclusion_stats", + cohortSummaryStatsTable = "cohort_summary_stats", + cohortCensorStatsTable = "cohort_censor_stats" + ) + + settings <- createCohortDiagnosticsSettings( + cohortDefinitionSet = cohortDefinitionSet, + exportFolder = "output_folder", + databaseId = "test_db", + cohortDatabaseSchema = "cdm", + cdmDatabaseSchema = "cdm", + cohortTableNames = cohortTableNames, + minCellCount = 3 + ) + + checkmate::expect_class(settings, "CohortDiagnosticsSettings") + checkmate::expect_r6(settings, "R6") + + # Check that various public attributes are correctly assigned. + expect_equal(settings$exportFolder, "output_folder") + expect_equal(settings$databaseId, "test_db") + expect_equal(settings$minCellCount, 3) +}) + +test_that("createCohortDiagnosticsSettings throws errors for invalid inputs", { + cohortDefinitionSet <- data.frame( + cohortId = 1:2, + cohortName = c("Cohort A", "Cohort B"), + json = c("{}", "{}"), + sql = c("SELECT * FROM cdm.cohort WHERE cohort_id = 1", "SELECT * FROM cdm.cohort WHERE cohort_id = 2") + ) + + expect_error( + createCohortDiagnosticsSettings( + cohortDefinitionSet = cohortDefinitionSet, + exportFolder = NULL, # Invalid input + databaseId = "test_db", + cohortDatabaseSchema = "cdm", + cdmDatabaseSchema = "cdm" + ) + ) + + expect_error( + createCohortDiagnosticsSettings( + cohortDefinitionSet = NULL, # Invalid input + exportFolder = "output_folder", + databaseId = "test_db", + cohortDatabaseSchema = "cdm", + cdmDatabaseSchema = "cdm" + ) + ) + + expect_error( + createCohortDiagnosticsSettings( + cohortDefinitionSet = cohortDefinitionSet, + exportFolder = "output_folder", + databaseId = NULL, # Invalid input + cohortDatabaseSchema = "cdm", + cdmDatabaseSchema = "cdm" + ), + "databaseId" + ) +}) \ No newline at end of file