diff --git a/gitlab-runner/Cargo.toml b/gitlab-runner/Cargo.toml index 9eca5dd..5f68651 100644 --- a/gitlab-runner/Cargo.toml +++ b/gitlab-runner/Cargo.toml @@ -29,6 +29,7 @@ tracing-subscriber = "0.3.8" tracing = "0.1.30" doc-comment = "0.3.3" tokio-util = { version = "0.7", features = [ "io" ] } +masker = "0.0.2" [dev-dependencies] tokio = { version = "1.5.0", features = [ "full", "test-util" ] } diff --git a/gitlab-runner/src/run.rs b/gitlab-runner/src/run.rs index 3c6d240..3da559d 100644 --- a/gitlab-runner/src/run.rs +++ b/gitlab-runner/src/run.rs @@ -1,4 +1,5 @@ use bytes::Bytes; +use masker::Masker; use std::future::Future; use std::path::PathBuf; use std::sync::Arc; @@ -15,10 +16,13 @@ use crate::uploader::Uploader; use crate::CancellableJobHandler; use crate::{JobResult, Phase}; +const GITLAB_MASK: &str = "[MASKED]"; + async fn run( job: Job, client: Client, response: Arc, + masker: Masker, process: F, build_dir: PathBuf, cancel_token: CancellationToken, @@ -62,7 +66,7 @@ where }); let r = if upload { - if let Ok(mut uploader) = Uploader::new(client, &build_dir, response) { + if let Ok(mut uploader) = Uploader::new(client, &build_dir, response, masker) { let r = handler.upload_artifacts(&mut uploader).await; if r.is_ok() { uploader.upload().await.and(script_result) @@ -140,7 +144,13 @@ impl Run { buf: Bytes, cancel_token: &CancellationToken, ) -> Option { - assert!(!buf.is_empty()); + if buf.is_empty() { + // It's convenient to permit this because if we are + // masking, the masker may not have produced any output, + // so we'd just end up doing the same test in every + // caller, rather than once here. + return None; + } let len = buf.len(); match self @@ -178,6 +188,15 @@ impl Run { { let cancel_token = CancellationToken::new(); + let masked_variables = self + .response + .variables + .iter() + .filter(|(_, v)| v.masked) + .map(|(_, v)| v.value.as_str()) + .collect::>(); + let masker = Masker::new(&masked_variables, GITLAB_MASK); + let job = Job::new( self.client.clone(), self.response.clone(), @@ -189,6 +208,7 @@ impl Run { job, self.client.clone(), self.response.clone(), + masker.clone(), process, build_dir, cancel_token.clone(), @@ -198,6 +218,8 @@ impl Run { ); tokio::pin!(join); + let mut cm = masker.mask_chunks(); + let result = loop { tokio::select! { _ = self.interval.tick() => { @@ -206,6 +228,7 @@ impl Run { let now = Instant::now(); if let Some(buf) = self.joblog.split_trace() { // TODO be resiliant against send errors + let buf = cm.mask_chunk(buf).into(); if let Some(interval) = self.send_trace(buf, &cancel_token).await { if interval != self.interval.period() { self.interval = Self::create_interval(now, interval); @@ -224,8 +247,12 @@ impl Run { // Send the remaining trace buffer back to gitlab. if let Some(buf) = self.joblog.split_trace() { + let buf = cm.mask_chunk(buf).into(); self.send_trace(buf, &cancel_token).await; } + // Flush anything the masker was holding back + let buf = cm.finish().into(); + self.send_trace(buf, &cancel_token).await; // Don't bother updating the status if cancelled, since it will just fail. if !cancel_token.is_cancelled() { diff --git a/gitlab-runner/src/uploader.rs b/gitlab-runner/src/uploader.rs index f17aaf7..e5a2364 100644 --- a/gitlab-runner/src/uploader.rs +++ b/gitlab-runner/src/uploader.rs @@ -8,6 +8,7 @@ use std::thread; use std::{sync::Arc, task::Poll}; use futures::{future::BoxFuture, AsyncWrite, FutureExt}; +use masker::{ChunkMasker, Masker}; use reqwest::Body; use tokio::fs::File as AsyncFile; use tokio::sync::mpsc::{self, error::SendError}; @@ -71,6 +72,7 @@ fn zip_thread(mut temp: File, mut rx: mpsc::Receiver) { pub struct UploadFile<'a> { tx: &'a mpsc::Sender, state: UploadFileState<'a>, + masker: Option>, } impl<'a> AsyncWrite for UploadFile<'a> { @@ -84,10 +86,12 @@ impl<'a> AsyncWrite for UploadFile<'a> { match this.state { UploadFileState::Idle => { let (tx, rx) = oneshot::channel(); - let send = this - .tx - .send(UploadRequest::WriteData(Vec::from(buf), tx)) - .boxed(); + let buf = if let Some(masker) = &mut this.masker { + masker.mask_chunk(buf) + } else { + Vec::from(buf) + }; + let send = this.tx.send(UploadRequest::WriteData(buf, tx)).boxed(); this.state = UploadFileState::Writing(Some(send), rx) } UploadFileState::Writing(ref mut send, ref mut rx) => { @@ -114,9 +118,33 @@ impl<'a> AsyncWrite for UploadFile<'a> { fn poll_close( self: std::pin::Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, + cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - Poll::Ready(Ok(())) + let this = self.get_mut(); + if let Some(masker) = this.masker.take() { + let (tx, mut rx) = oneshot::channel(); + let buf = masker.finish(); + let mut send = Some(this.tx.send(UploadRequest::WriteData(buf, tx)).boxed()); + + loop { + if let Some(mut f) = send { + // Phase 1: Waiting for the send to the writer + // thread to complete. + + // TODO error handling + let _r = futures::ready!(f.as_mut().poll(cx)); + send = None; + } else { + // Phase 2: Waiting for the writer thread to + // signal write completion. + + let _r = futures::ready!(Pin::new(&mut rx).poll(cx)); + return Poll::Ready(Ok(())); + } + } + } else { + Poll::Ready(Ok(())) + } } } @@ -125,6 +153,7 @@ pub struct Uploader { client: Client, data: Arc, tx: mpsc::Sender, + masker: Masker, } impl Uploader { @@ -132,13 +161,19 @@ impl Uploader { client: Client, build_dir: &Path, data: Arc, + masker: Masker, ) -> Result { let temp = tempfile::tempfile_in(build_dir) .map_err(|e| warn!("Failed to create artifacts temp file: {:?}", e))?; let (tx, rx) = mpsc::channel(2); thread::spawn(move || zip_thread(temp, rx)); - Ok(Self { client, data, tx }) + Ok(Self { + client, + data, + tx, + masker, + }) } /// Create a new file to be uploaded @@ -149,6 +184,20 @@ impl Uploader { .expect("Failed to create file"); UploadFile { tx: &self.tx, + masker: None, + state: UploadFileState::Idle, + } + } + + /// Create a new file to be uploaded, which will be masked + pub async fn masked_file(&mut self, name: String) -> UploadFile<'_> { + self.tx + .send(UploadRequest::NewFile(name)) + .await + .expect("Failed to create file"); + UploadFile { + tx: &self.tx, + masker: Some(self.masker.mask_chunks()), state: UploadFileState::Idle, } } diff --git a/gitlab-runner/tests/integration.rs b/gitlab-runner/tests/integration.rs index 5ffd1a6..657e9d4 100644 --- a/gitlab-runner/tests/integration.rs +++ b/gitlab-runner/tests/integration.rs @@ -412,6 +412,53 @@ async fn job_log() { .await; } +#[tokio::test] +async fn job_mask_log() { + let mock = GitlabRunnerMock::start().await; + let job = { + let mut b = mock.job_builder("log".to_string()); + b.add_variable("SECRET".to_string(), "$ecret".to_string(), false, true); + b.add_step( + MockJobStepName::Script, + vec!["dummy".to_string()], + 3600, + MockJobStepWhen::OnSuccess, + false, + ); + b.build() + }; + mock.enqueue_job(job.clone()); + + let dir = tempfile::tempdir().unwrap(); + let (mut runner, layer) = Runner::new_with_layer( + mock.uri(), + mock.runner_token().to_string(), + dir.path().to_path_buf(), + ); + + let subscriber = Registry::default().with(layer); + async { + tracing::info!("TEST"); + let got_job = runner + .request_job(|job| async move { + tracing::info!("TEST1234"); + outputln!("aa-$ecret"); + job.trace("bb$ec"); + outputln!("retxyz"); + SimpleRun::dummy(Ok(())).await + }) + .with_current_subscriber() + .await + .unwrap(); + assert!(got_job); + runner.wait_for_space(1).await; + assert_eq!(MockJobState::Success, job.state()); + assert_eq!(b"aa-[MASKED]\nbb[MASKED]xyz\n", job.log().as_slice()); + } + .with_subscriber(subscriber) + .await; +} + #[tokio::test] async fn job_steps() { let mock = GitlabRunnerMock::start().await;