Skip to content

Commit

Permalink
feat: added support for duckdb in exportToAres
Browse files Browse the repository at this point in the history
  • Loading branch information
Mikhail-iontsev committed Nov 16, 2023
1 parent 734803f commit 9830795
Showing 1 changed file with 42 additions and 179 deletions.
221 changes: 42 additions & 179 deletions R/exportToAres.R
Original file line number Diff line number Diff line change
Expand Up @@ -1945,23 +1945,15 @@ exportToAres <- function(
dir.create(sourceOutputPath, showWarnings = F, recursive = T)
duckdbCon <- NULL
conceptsSchema <- "concepts"
domainSchema <- "domain"
densitySchema <- "data_density"
metadataSchema <- "metadata"
personSchema <- "person"
observationPeriodSchema <- "observation_period"
deathSchema <- "death"
conceptsFolder <- file.path(sourceOutputPath, "concepts")
dir.create(conceptsFolder, showWarnings = F)
if (outputFormat == "duckdb") {
duckdbCon <- dbConnect(duckdb::duckdb(), dbdir = file.path(outputPath, sourceKey, releaseDateKey, 'ares.duckdb'), read_only = FALSE)
conceptsDatabasePath <- file.path(conceptsFolder, 'data.duckdb')
if (file.exists(conceptsDatabasePath)) {
unlink(conceptsDatabasePath)
}
duckdbCon <- dbConnect(duckdb::duckdb(), dbdir = conceptsDatabasePath, read_only = FALSE)
dbExecute(duckdbCon, paste("CREATE SCHEMA", conceptsSchema))
dbExecute(duckdbCon, paste("CREATE SCHEMA", domainSchema))
dbExecute(duckdbCon, paste("CREATE SCHEMA", densitySchema))
dbExecute(duckdbCon, paste("CREATE SCHEMA", metadataSchema))
dbExecute(duckdbCon, paste("CREATE SCHEMA", observationPeriodSchema))
dbExecute(duckdbCon, paste("CREATE SCHEMA", personSchema))
dbExecute(duckdbCon, paste("CREATE SCHEMA", deathSchema))


on.exit(dbDisconnect(duckdbCon, shutdown = TRUE))
}
print(paste0("processing AO export to ", sourceOutputPath))
Expand All @@ -1971,260 +1963,140 @@ exportToAres <- function(
currentTable <- { }
# data density - totals
currentTable <- generateDataDensityTotal(conn, resultsDatabaseSchema)
if (outputFormat == "json") {
data.table::fwrite(currentTable$totalRecordsData, file = paste0(sourceOutputPath, "/datadensity-total.csv"))
data.table::fwrite(currentTable$domainAggregates, file = paste0(sourceOutputPath, "/records-by-domain.csv"))
}
if (outputFormat == "duckdb") {
writeReportToTable(duckdbCon, currentTable$totalRecordsData, "datadensity_total", densitySchema)
writeReportToTable(duckdbCon, currentTable$totalRecordsData, "records_by_domain", densitySchema)
}
data.table::fwrite(currentTable$totalRecordsData, file = paste0(sourceOutputPath, "/datadensity-total.csv"))
data.table::fwrite(currentTable$domainAggregates, file = paste0(sourceOutputPath, "/records-by-domain.csv"))


# data density - records per person
currentTable <- generateDataDensityRecordsPerPerson(conn, resultsDatabaseSchema)
if (outputFormat == "json") {
data.table::fwrite(currentTable, file = paste0(sourceOutputPath, "/datadensity-records-per-person.csv"))
}
if (outputFormat == "duckdb") {
writeReportToTable(duckdbCon, currentTable, "datadensity_records_per_person", densitySchema)
}
data.table::fwrite(currentTable, file = paste0(sourceOutputPath, "/datadensity-records-per-person.csv"))


# data density - concepts per person
currentTable <- generateDataDensityConceptsPerPerson(conn, resultsDatabaseSchema)
if (outputFormat == "json") {
data.table::fwrite(currentTable, file = paste0(sourceOutputPath, "/datadensity-concepts-per-person.csv"))
}
if (outputFormat == "duckdb") {
writeReportToTable(duckdbCon, currentTable, "datadensity_concepts_per_person", densitySchema)
}
data.table::fwrite(currentTable, file = paste0(sourceOutputPath, "/datadensity-concepts-per-person.csv"))


# data density - domains per person
currentTable <- generateDataDensityDomainsPerPerson(conn, resultsDatabaseSchema)
if (outputFormat == "json") {
data.table::fwrite(currentTable, file = paste0(sourceOutputPath, "/datadensity-domains-per-person.csv"))
}
if (outputFormat == "duckdb") {
writeReportToTable(duckdbCon, currentTable, "datadensity_domains_per_person", densitySchema)
}
data.table::fwrite(currentTable, file = paste0(sourceOutputPath, "/datadensity-domains-per-person.csv"))

}

if (length(reports) == 0 || (length(reports) > 0 && ("domain" %in% reports || "concept" %in% reports))) {
# metadata
writeLines("Generating metadata report")
currentTable <- generateAOMetadataReport(conn, cdmDatabaseSchema, sourceOutputPath)
if (outputFormat == "json") {
data.table::fwrite(currentTable, file = paste0(sourceOutputPath, "/metadata.csv"))
}
if (outputFormat == "duckdb") {
writeReportToTable(duckdbCon, currentTable, "metadata", metadataSchema)
}
data.table::fwrite(currentTable, file = paste0(sourceOutputPath, "/metadata.csv"))


# cdm source
writeLines("Generating cdm source report")
currentTable <- generateAOCdmSourceReport(conn, cdmDatabaseSchema, sourceOutputPath)
if (outputFormat == "json") {
data.table::fwrite(currentTable, file = paste0(sourceOutputPath, "/cdmsource.csv"))
}
if (outputFormat == "duckdb") {
writeReportToTable(duckdbCon, currentTable, "cdm_source", metadataSchema)
}
data.table::fwrite(currentTable, file = paste0(sourceOutputPath, "/cdmsource.csv"))


# domain summary - observation period
writeLines("Generating observation period reports")
currentTable <- generateAOObservationPeriodReport(conn, cdmDatabaseSchema, resultsDatabaseSchema, vocabDatabaseSchema, sourceOutputPath)
if (outputFormat == "json") {
filename <- file.path(sourceOutputPath, "observationperiod.json")
write(jsonlite::toJSON(currentTable), filename)
}
filename <- file.path(sourceOutputPath, "observationperiod.json")
write(jsonlite::toJSON(currentTable), filename)

if (outputFormat == "duckdb") {
for (key in names(currentTable)) {
print(is.data.frame(currentTable[key]))
writeReportToTable(duckdbCon, as.data.frame(currentTable[[key]]), key, observationPeriodSchema)
}
}

# death report
writeLines("Generating death report")
currentTable <- generateAODeathReport(conn, cdmDatabaseSchema, resultsDatabaseSchema, vocabDatabaseSchema, sourceOutputPath)
if (outputFormat == "json") {
filename <- file.path(sourceOutputPath, "death.json")
write(jsonlite::toJSON(currentTable), filename)
}

if (outputFormat == "duckdb") {
for (key in names(currentTable)) {
print(is.data.frame(currentTable[key]))
writeReportToTable(duckdbCon, as.data.frame(currentTable[[key]]), key, deathSchema)
}
}
filename <- file.path(sourceOutputPath, "death.json")
write(jsonlite::toJSON(currentTable), filename)


writeLines("Generating domain summary reports")

# domain summary - conditions
dataConditions <- generateDomainSummaryConditions(conn, resultsDatabaseSchema, vocabDatabaseSchema)
if (outputFormat == "json") {
data.table::fwrite(dataConditions, file = paste0(sourceOutputPath, "/domain-summary-condition_occurrence.csv"))
}
if (outputFormat == "duckdb") {
writeReportToTable(duckdbCon, dataConditions, "condition_occurrence", domainSchema)
}
data.table::fwrite(dataConditions, file = paste0(sourceOutputPath, "/domain-summary-condition_occurrence.csv"))


# domain summary - condition eras
dataConditionEra <- generateDomainSummaryConditionEras(conn, resultsDatabaseSchema, vocabDatabaseSchema)
if (outputFormat == "json") {
data.table::fwrite(dataConditionEra, file = paste0(sourceOutputPath, "/domain-summary-condition_era.csv"))
}
if (outputFormat == "duckdb") {
writeReportToTable(duckdbCon, dataConditionEra, "condition_era", domainSchema)
}
data.table::fwrite(dataConditionEra, file = paste0(sourceOutputPath, "/domain-summary-condition_era.csv"))


# domain summary - drugs
dataDrugs <- generateDomainSummaryDrugs(conn, resultsDatabaseSchema, vocabDatabaseSchema)
if (outputFormat == "json") {
data.table::fwrite(dataDrugs, file = paste0(sourceOutputPath, "/domain-summary-drug_exposure.csv"))
}
if (outputFormat == "duckdb") {
writeReportToTable(duckdbCon, dataDrugs, "drug_exposure", domainSchema)
}
data.table::fwrite(dataDrugs, file = paste0(sourceOutputPath, "/domain-summary-drug_exposure.csv"))


# domain stratification by drug type concept
dataDrugType <- generateDomainDrugStratification(conn, resultsDatabaseSchema, vocabDatabaseSchema)
if (outputFormat == "json") {
data.table::fwrite(dataDrugType, file = paste0(sourceOutputPath, "/domain-drug-stratification.csv"))
}
if (outputFormat == "duckdb") {
writeReportToTable(duckdbCon, dataDrugType, "drug_stratification", domainSchema)
}
data.table::fwrite(dataDrugType, file = paste0(sourceOutputPath, "/domain-drug-stratification.csv"))


# domain summary - drug era
dataDrugEra <- generateDomainSummaryDrugEra(conn, resultsDatabaseSchema, vocabDatabaseSchema)
if (outputFormat == "json") {
data.table::fwrite(dataDrugEra, file = paste0(sourceOutputPath, "/domain-summary-drug_era.csv"))
}
if (outputFormat == "duckdb") {
writeReportToTable(duckdbCon, dataDrugType, "drug_stratification", domainSchema)
}
data.table::fwrite(dataDrugEra, file = paste0(sourceOutputPath, "/domain-summary-drug_era.csv"))


# domain summary - measurements
dataMeasurements <- generateDomainSummaryMeasurements(conn, resultsDatabaseSchema, vocabDatabaseSchema)
if (outputFormat == "json") {
data.table::fwrite(dataMeasurements, file = paste0(sourceOutputPath, "/domain-summary-measurement.csv"))
}
if (outputFormat == "duckdb") {
writeReportToTable(duckdbCon, dataMeasurements, "measurement", domainSchema)
}
data.table::fwrite(dataMeasurements, file = paste0(sourceOutputPath, "/domain-summary-measurement.csv"))


# domain summary - observations
dataObservations <- generateDomainSummaryObservations(conn, resultsDatabaseSchema, vocabDatabaseSchema)
if (outputFormat == "json") {
data.table::fwrite(dataObservations, file = paste0(sourceOutputPath, "/domain-summary-observation.csv"))
}
if (outputFormat == "duckdb") {
writeReportToTable(duckdbCon, dataObservations, "observation", domainSchema)
}
data.table::fwrite(dataObservations, file = paste0(sourceOutputPath, "/domain-summary-observation.csv"))


# domain summary - visit details
dataVisitDetails <- generateDomainSummaryVisitDetails(conn, resultsDatabaseSchema, vocabDatabaseSchema)
if (outputFormat == "json") {
data.table::fwrite(dataVisitDetails, file = paste0(sourceOutputPath, "/domain-summary-visit_detail.csv"))
}
if (outputFormat == "duckdb") {
writeReportToTable(duckdbCon, dataVisitDetails, "visit_detail", domainSchema)
}
data.table::fwrite(dataVisitDetails, file = paste0(sourceOutputPath, "/domain-summary-visit_detail.csv"))


# domain summary - visits
dataVisits <- generateDomainSummaryVisits(conn, resultsDatabaseSchema, vocabDatabaseSchema)
if (outputFormat == "json") {
data.table::fwrite(dataVisits, file = paste0(sourceOutputPath, "/domain-summary-visit_occurrence.csv"))
}
if (outputFormat == "duckdb") {
writeReportToTable(duckdbCon, dataVisits, "visit_occurrence", domainSchema)
}
data.table::fwrite(dataVisits, file = paste0(sourceOutputPath, "/domain-summary-visit_occurrence.csv"))


# domain stratification by visit concept
currentTable <- generateDomainVisitStratification(conn, resultsDatabaseSchema, vocabDatabaseSchema)
if (outputFormat == "json") {
data.table::fwrite(currentTable, file = paste0(sourceOutputPath, "/domain-visit-stratification.csv"))
}
if (outputFormat == "duckdb") {
writeReportToTable(duckdbCon, currentTable, "visit_stratification", domainSchema)
}
data.table::fwrite(currentTable, file = paste0(sourceOutputPath, "/domain-visit-stratification.csv"))


# domain summary - procedures
dataProcedures <- generateDomainSummaryProcedures(conn, resultsDatabaseSchema, vocabDatabaseSchema)
if (outputFormat == "json") {
data.table::fwrite(dataProcedures, file = paste0(sourceOutputPath, "/domain-summary-procedure_occurrence.csv"))
}
if (outputFormat == "duckdb") {
writeReportToTable(duckdbCon, dataProcedures, "procedure_occurrence", domainSchema)
}
data.table::fwrite(dataProcedures, file = paste0(sourceOutputPath, "/domain-summary-procedure_occurrence.csv"))


# domain summary - devices
dataDevices <- generateDomainSummaryDevices(conn, resultsDatabaseSchema, vocabDatabaseSchema)
if (outputFormat == "json") {
data.table::fwrite(dataDevices, file = paste0(sourceOutputPath, "/domain-summary-device_exposure.csv"))
}
if (outputFormat == "duckdb") {
writeReportToTable(duckdbCon, dataDevices, "device_exposure", domainSchema)
}
data.table::fwrite(dataDevices, file = paste0(sourceOutputPath, "/domain-summary-device_exposure.csv"))
}

# domain summary - provider
dataProviders <- generateDomainSummaryProvider(conn, resultsDatabaseSchema, vocabDatabaseSchema)
if (outputFormat == "json") {
data.table::fwrite(dataProviders, file = paste0(sourceOutputPath, "/domain-summary-provider.csv"))
}
if (outputFormat == "duckdb") {
writeReportToTable(duckdbCon, dataProviders, "provider", domainSchema)
}
data.table::fwrite(dataProviders, file = paste0(sourceOutputPath, "/domain-summary-provider.csv"))


if (length(reports) == 0 || (length(reports) > 0 && "quality" %in% reports)) {
writeLines("Generating quality completeness report")

# quality - completeness
currentTable <- generateQualityCompleteness(conn, resultsDatabaseSchema)
if (outputFormat == "json") {
data.table::fwrite(currentTable, file = paste0(sourceOutputPath, "/quality-completeness.csv"))
}
if (outputFormat == "duckdb") {
writeReportToTable(duckdbCon, currentTable, "quality_completeness", metadataSchema)
}
data.table::fwrite(currentTable, file = paste0(sourceOutputPath, "/quality-completeness.csv"))

}

if (length(reports) == 0 || (length(reports) > 0 && "performance" %in% reports)) {
writeLines("Generating achilles performance report")
currentTable <- generateAOAchillesPerformanceReport(conn, cdmDatabaseSchema, resultsDatabaseSchema, vocabDatabaseSchema, sourceOutputPath)
if (outputFormat == "json") {
data.table::fwrite(currentTable, file.path(sourceOutputPath, "achilles-performance.csv"))
}
if (outputFormat == "duckdb") {
writeReportToTable(duckdbCon, currentTable, "achilles_performance", metadataSchema)
}
data.table::fwrite(currentTable, file.path(sourceOutputPath, "achilles-performance.csv"))


}

if (length(reports) == 0 || (length(reports) > 0 && "concept" %in% reports)) {
# concept level reporting
conceptsFolder <- file.path(sourceOutputPath, "concepts")
if (outputFormat == "json") {
dir.create(conceptsFolder, showWarnings = F)

}
columnsToNormalize <- c("CONCEPT_NAME", "NUM_PERSONS", "PERCENT_PERSONS", "RECORDS_PER_PERSON")

writeLines("Generating visit reports")
Expand Down Expand Up @@ -2311,17 +2183,8 @@ exportToAres <- function(
if (length(reports) == 0 || (length(reports) > 0 && "person" %in% reports)) {
writeLines("Generating person report")
currentTable <- generateAOPersonReport(connectionDetails, cdmDatabaseSchema, resultsDatabaseSchema, vocabDatabaseSchema, sourceOutputPath)
if (outputFormat == "json") {
jsonOutput = jsonlite::toJSON(currentTable)
write(jsonOutput, file = paste0(sourceOutputPath, "/person.json"))
}

if (outputFormat == "duckdb") {
for (key in names(currentTable)) {
print(is.data.frame(currentTable[key]))
writeReportToTable(duckdbCon, as.data.frame(currentTable[[key]]), key, personSchema)
}
}
jsonOutput = jsonlite::toJSON(currentTable)
write(jsonOutput, file = paste0(sourceOutputPath, "/person.json"))
}
}

0 comments on commit 9830795

Please sign in to comment.