Skip to content

Commit

Permalink
Merge branch 'CU-86c11rv16_automated-tests' into CU-86c13wh47_Utilize…
Browse files Browse the repository at this point in the history
…-OpenHIMs-Mediators-configuration-to-set-buckets
  • Loading branch information
brett-onions committed Nov 25, 2024
2 parents 1475242 + c4e7c17 commit 3fb3fba
Showing 1 changed file with 34 additions and 39 deletions.
73 changes: 34 additions & 39 deletions src/utils/minioClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,45 +182,40 @@ export async function createMinioBucketListeners() {
logger.info(`File received: ${file} from bucket ${tableName}`);

try {
//@ts-ignore
minioClient.fGetObject(bucket, file, `tmp/${file}`, async (err) => {
if (err) {
logger.error(err);
} else {
const fileBuffer = await readFile(`tmp/${file}`);

//get the file extension
const extension = file.split('.').pop();
logger.info(`File Downloaded - Type: ${extension}`);

if (extension === 'json' && validateJsonFile(fileBuffer)) {
// flatten the json and pass it to clickhouse
//const fields = flattenJson(JSON.parse(fileBuffer.toString()));
//await createTable(fields, tableName);
logger.warn(`File type not currently supported- ${extension}`);
} else if (extension === 'csv' && getCsvHeaders(fileBuffer)) {
//get the first line of the csv file
const fields = (await readFile(`tmp/${file}`, 'utf8')).split('\n')[0].split(',');

await createTable(fields, tableName);

// If running locally and using docker compose, the minio host is 'minio'. This is to allow clickhouse to connect to the minio server
const host = getConfig().runningMode === 'testing' ? 'minio' : endPoint;
// Construct the S3-style URL for the file
const minioUrl = `http://${host}:${port}/${bucket}/${file}`;

// Insert data into clickhouse
await insertFromS3(tableName, minioUrl, {
accessKey,
secretKey,
});
} else {
logger.warn(`Unknown file type - ${extension}`);
}
await rm(`tmp/${file}`);
logger.debug(`File ${file} deleted from tmp directory`);
}
});
await minioClient.fGetObject(bucket, file, `tmp/${file}`);

const fileBuffer = await readFile(`tmp/${file}`);

//get the file extension
const extension = file.split('.').pop();
logger.info(`File Downloaded - Type: ${extension}`);

if (extension === 'json' && validateJsonFile(fileBuffer)) {
// flatten the json and pass it to clickhouse
//const fields = flattenJson(JSON.parse(fileBuffer.toString()));
//await createTable(fields, tableName);
logger.warn(`File type not currently supported- ${extension}`);
} else if (extension === 'csv' && getCsvHeaders(fileBuffer)) {
//get the first line of the csv file
const fields = (await readFile(`tmp/${file}`, 'utf8')).split('\n')[0].split(',');

await createTable(fields, tableName);

// If running locally and using docker compose, the minio host is 'minio'. This is to allow clickhouse to connect to the minio server
const host = getConfig().runningMode === 'testing' ? 'minio' : endPoint;
// Construct the S3-style URL for the file
const minioUrl = `http://${host}:${port}/${bucket}/${file}`;

// Insert data into clickhouse
await insertFromS3(tableName, minioUrl, {
accessKey,
secretKey,
});
} else {
logger.warn(`Unknown file type - ${extension}`);
}
await rm(`tmp/${file}`);
logger.debug(`File ${file} deleted from tmp directory`);
} catch (error) {
logger.error(`Error processing file ${file}: ${error}`);
}
Expand Down

0 comments on commit 3fb3fba

Please sign in to comment.