Skip to content

Commit

Permalink
[FAI-13686] Spin up docker in cli and check source connection (#99)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeniii authored Dec 19, 2024
1 parent 026cbcb commit 685eb94
Show file tree
Hide file tree
Showing 17 changed files with 311 additions and 37 deletions.
1 change: 1 addition & 0 deletions airbyte-local-cli-nodejs/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ out/pkg
sample_command.sh
*airbyte-local
test/exec/resources
tmp*
2 changes: 1 addition & 1 deletion airbyte-local-cli-nodejs/.nvmrc
Original file line number Diff line number Diff line change
@@ -1 +1 @@
20.17
20.18
32 changes: 16 additions & 16 deletions airbyte-local-cli-nodejs/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions airbyte-local-cli-nodejs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"lint": "./scripts/lint",
"lint-fix": "npm run lint -- --fix",
"clean": "rm -rf lib node_modules package-lock.json",
"bundle": "npm run build && esbuild src/index.ts --bundle --platform=node --outfile=dist/index.js",
"bundle": "npm run build && esbuild src/index.ts --bundle --platform=node --outfile=dist/index.js --external:*.node",
"pkg": "npm run bundle && pkg --output ./out/pkg/airbyte-local dist/index.js",
"pkg-linuxstatic": "npm run bundle && pkg --output ./out/pkg/airbyte-local -t linuxstatic dist/index.js",
"test": "jest",
Expand All @@ -33,21 +33,21 @@
"devDependencies": {
"@tsconfig/node20": "^20.1.4",
"@tsconfig/strictest": "^2.0.5",
"@types/dockerode": "^3.3.31",
"@types/dockerode": "^3.3.32",
"@types/jest": "^29.5.14",
"@types/lodash": "^4.17.13",
"@types/node": "^20.17.6",
"@typescript-eslint/eslint-plugin": "^8.13.0",
"@typescript-eslint/parser": "^8.13.0",
"@yao-pkg/pkg": "^6.1.0",
"@yao-pkg/pkg": "^6.1.1",
"eslint": "^8.57.1",
"eslint-config-faros": "^0.1.0",
"eslint-config-prettier": "^9.1.0",
"eslint-plugin-simple-import-sort": "^12.1.1",
"prettier": "^3.3.3",
"prettier": "^3.4.1",
"ts-jest": "^29.2.5",
"tsx": "^4.19.2",
"typescript": "^5.6.3"
"typescript": "~5.6.3"
},
"dependencies": {
"commander": "^12.1.0",
Expand Down
4 changes: 3 additions & 1 deletion airbyte-local-cli-nodejs/src/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ function command() {
// Options: Airbyte connector settings
.option('--no-src-pull', 'Skip pulling Airbyte source image')
.option('--no-dst-pull', 'Skip pulling Airbyte destination image')
.option('--src-check-connection', 'Validate the Airbyte source connection')
.addOption(
new Option('--src-check-connection', `Validate the Airbyte source connection`).conflicts('srcOutputFile'),
)
.addOption(
new Option(
'--src-only',
Expand Down
99 changes: 99 additions & 0 deletions airbyte-local-cli-nodejs/src/docker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import {Writable} from 'node:stream';

import Docker from 'dockerode';

import {AirbyteConnectionStatus, AirbyteConnectionStatusMessage, AirbyteMessageType} from './types';
import {logger, SRC_CONFIG_FILENAME} from './utils';

// Create a new Docker instance
let _docker = new Docker();

// For testing purposes
export function setDocker(docker: Docker): void {
_docker = docker;
}

export async function checkDockerInstalled(): Promise<void> {
try {
await _docker.version();
logger.debug('Docker is installed and running.');
} catch (error: any) {
logger.error('Docker is not installed or running.');
throw error;
}
}

export async function pullDockerImage(image: string): Promise<void> {
logger.info(`Pulling docker image: ${image}`);

try {
const stream = await _docker.pull(image);
await new Promise((resolve, reject) => {
_docker.modem.followProgress(stream, (err, res) => (err ? reject(err) : resolve(res)));
});
logger.info(`Docker image pulled: ${image}`);
} catch (error: any) {
logger.error(`Failed to pull docker image: ${image}`);
throw error;
}
}

/**
* Spinning up a docker container to check the source connection.
* `docker run --rm -v "$tempdir:/configs" $src_docker_options "$src_docker_image"
* check --config "/configs/$src_config_filename"`
*
* Sample output from the docker container:
* {"connectionStatus":{"status":"SUCCEEDED"},"type":"CONNECTION_STATUS"}
* {"connectionStatus":{"status":"FAILED","message":"Faros API key was not provided"},"type":"CONNECTION_STATUS"}
*/
export async function checkSrcConnection(tmpDir: string, image: string, srcConfigFile?: string): Promise<void> {
logger.info('Validating connection to source...');

if (!image) {
throw new Error('Source image is missing.');
}

try {
const cfgFile = srcConfigFile ?? SRC_CONFIG_FILENAME;
const command = ['check', '--config', `/configs/${cfgFile}`];
const createOptions: Docker.ContainerCreateOptions = {
HostConfig: {
Binds: [`${tmpDir}:/configs`],
AutoRemove: true,
},
platform: 'linux/amd64',
};

// create a writable stream to capture the output
let data = '';
const outputStream = new Writable({
write(chunk, _encoding, callback) {
data += chunk.toString();
callback();
},
});

// docker run
const res = await _docker.run(image, command, outputStream, createOptions);

// capture connection status from the output
let status: AirbyteConnectionStatusMessage | undefined;
data.split('\n').forEach((line) => {
if (line.includes(AirbyteMessageType.CONNECTION_STATUS)) {
status = JSON.parse(line) as AirbyteConnectionStatusMessage;
}
});
if (
status?.type === AirbyteMessageType.CONNECTION_STATUS &&
status?.connectionStatus.status === AirbyteConnectionStatus.SUCCEEDED &&
res[0].StatusCode === 0
) {
logger.info('Source connection is valid.');
} else {
throw new Error(status?.connectionStatus.message);
}
} catch (error: any) {
throw new Error(`Failed to validate source connection: ${error.message ?? JSON.stringify(error)}.`);
}
}
27 changes: 21 additions & 6 deletions airbyte-local-cli-nodejs/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,36 @@
import {parseAndValidateInputs} from './command';
import {checkDockerInstalled, checkSrcConnection, pullDockerImage} from './docker';
import {AirbyteCliContext} from './types';
import {checkDockerInstalled, cleanUp, createTmpDir, loadStateFile, logger, writeConfig} from './utils';
import {cleanUp, createTmpDir, loadStateFile, logger, writeConfig} from './utils';

function main(): void {
async function main(): Promise<void> {
const context: AirbyteCliContext = {};
try {
// Parse and validate cli arguments
const cfg = parseAndValidateInputs(process.argv);
checkDockerInstalled();
await checkDockerInstalled();

// Create temporary directory, load state file, write config to files
context.tmpDir = createTmpDir();
loadStateFile(context.tmpDir, cfg?.stateFile, cfg?.connectionName);
writeConfig(context.tmpDir, cfg);

// Pull source docker image
if (cfg.srcPull && cfg.src?.image) {
await pullDockerImage(cfg.src.image);
}
// Check source connection
if (cfg.srcCheckConnection && cfg.src?.image) {
await checkSrcConnection(context.tmpDir, cfg.src.image);
}
} catch (error: any) {
logger.error(error.message, 'Error');
cleanUp(context);
logger.error('Exit Airbyte CLI with errors.');
process.exit(1);
throw error;
}
}

main();
main().catch((_error) => {
logger.error('Exit Airbyte CLI with errors.');
process.exit(1);
});
29 changes: 29 additions & 0 deletions airbyte-local-cli-nodejs/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,32 @@ export interface FarosConfig {
logLevel: string;
debug: boolean;
}

/**
* Copy types from faros-ai/airtype-connectors
* https://github.com/faros-ai/airbyte-connectors/blob/main/faros-airbyte-cdk/src/protocol.ts
*/
export enum AirbyteConnectionStatus {
SUCCEEDED = 'SUCCEEDED',
FAILED = 'FAILED',
}
export enum AirbyteMessageType {
CATALOG = 'CATALOG',
CONNECTION_STATUS = 'CONNECTION_STATUS',
LOG = 'LOG',
RECORD = 'RECORD',
SPEC = 'SPEC',
STATE = 'STATE',
TRACE = 'TRACE',
}
export interface AirbyteMessage {
readonly type: AirbyteMessageType;
}
export declare class AirbyteConnectionStatusMessage implements AirbyteMessage {
readonly connectionStatus: {
status: AirbyteConnectionStatus;
message?: string;
};
readonly type: AirbyteMessageType;
constructor(connectionStatus: {status: AirbyteConnectionStatus; message?: string});
}
20 changes: 14 additions & 6 deletions airbyte-local-cli-nodejs/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import {AirbyteCliContext, AirbyteConfig, FarosConfig} from './types';

// constants
export const FILENAME_PREFIX = 'faros_airbyte_cli';
export const SRC_CONFIG_FILENAME = `${FILENAME_PREFIX}_src_config.json`;
export const DST_CONFIG_FILENAME = `${FILENAME_PREFIX}_dst_config.json`;
export const SRC_CATALOG_FILENAME = `${FILENAME_PREFIX}_src_catalog.json`;
export const DST_CATALOG_FILENAME = `${FILENAME_PREFIX}_dst_catalog.json`;

// Create a pino logger instance
export const logger = pino(pretty({colorize: true}));
Expand Down Expand Up @@ -58,13 +62,17 @@ export function checkDockerInstalled(command = 'docker', args = ['--version']):
execCommand(command, args, {errMsg: 'Docker is not installed'});
}

// Create a temporary directory
// The default temporary directory would be under system default temporaray dir e.g. `/tmp`
// with appending six random characters for uniqueness, like `/tmp/abc123`
export function createTmpDir(tmpDir?: string): string {
/**
* Create a temporary directory
* The default temporary directory would be under system default temporaray dir e.g. `/tmp`
* with appending six random characters for uniqueness, like `/tmp/abc123`
* @param absTmpDir Testing purpose. Customized absolute path to the temporary directory
* @returns The absolute path of the temporary directory
*/
export function createTmpDir(absTmpDir?: string): string {
try {
logger.debug(`Creating temporary directory for temporary Airbyte files...`);
const tmpDirPath = mkdtempSync(tmpDir ?? `${tmpdir()}${sep}`);
const tmpDirPath = mkdtempSync(absTmpDir ?? `${tmpdir()}${sep}`);
logger.debug(`Temporary directory created: ${tmpDirPath}.`);
return tmpDirPath;
} catch (error: any) {
Expand Down Expand Up @@ -135,7 +143,7 @@ export function writeConfig(tmpDir: string, config: FarosConfig): void {

// write config to temporary directory config files
logger.debug(`Writing Airbyte config to files...`);
const srcConfigFilePath = `${tmpDir}${sep}${FILENAME_PREFIX}_src_config.json`;
const srcConfigFilePath = `${tmpDir}${sep}${SRC_CONFIG_FILENAME}`;
const dstConfigFilePath = `${tmpDir}${sep}${FILENAME_PREFIX}_dst_config.json`;
writeFileSync(srcConfigFilePath, JSON.stringify(airbyteConfig.src.config ?? {}, null, 2));
writeFileSync(dstConfigFilePath, JSON.stringify(airbyteConfig.dst.config ?? {}, null, 2));
Expand Down
27 changes: 27 additions & 0 deletions airbyte-local-cli-nodejs/test/docker.it.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import {checkSrcConnection, pullDockerImage} from '../src/docker';

beforeAll(async () => {
await pullDockerImage('farosai/airbyte-example-source');
});

describe('checkSrcConnection', () => {
it('should success', async () => {
await expect(
checkSrcConnection(
`${process.cwd()}/test/resources`,
'farosai/airbyte-example-source',
'faros_airbyte_cli_src_config_chris.json',
),
).resolves.not.toThrow();
});

it('should fail with', async () => {
await expect(
checkSrcConnection(
`${process.cwd()}/test/resources`,
'farosai/airbyte-example-source',
'faros_airbyte_cli_src_config_jennie.json',
),
).rejects.toThrow('Failed to validate source connection: User is not chris.');
});
});
Loading

0 comments on commit 685eb94

Please sign in to comment.