diff --git a/.github/workflows/vorpal.yaml b/.github/workflows/vorpal.yaml index 367feb1d..5699aae9 100644 --- a/.github/workflows/vorpal.yaml +++ b/.github/workflows/vorpal.yaml @@ -86,14 +86,14 @@ jobs: - run: ./dist/vorpal keys generate - - env: - AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} - AWS_DEFAULT_REGION: ${{ secrets.AWS_DEFAULT_REGION }} - AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} - run: | - ./dist/vorpal start \ - --registry-backend "s3" \ - --registry-backend-s3-bucket "altf4llc-vorpal-registry" \ + - uses: actions/github-script@v6 + with: + script: | + core.exportVariable('ACTIONS_CACHE_URL', process.env.ACTIONS_CACHE_URL || ''); + core.exportVariable('ACTIONS_RUNTIME_TOKEN', process.env.ACTIONS_RUNTIME_TOKEN || ''); + + - run: | + ./dist/vorpal start --registry-backend "gha" \ > worker_output.log 2>&1 & WORKER_PID=$(echo $!) echo "WORKER_PID=$WORKER_PID" >> $GITHUB_ENV diff --git a/Cargo.lock b/Cargo.lock index 85565853..88a414d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3197,7 +3197,10 @@ dependencies = [ "anyhow", "aws-config", "aws-sdk-s3", + "reqwest", "rsa", + "serde", + "sha2", "tokio", "tokio-stream", "tonic", diff --git a/cli/src/main.rs b/cli/src/main.rs index 7922aab9..e1a937bd 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -528,6 +528,7 @@ async fn main() -> Result<()> { if services.contains("registry") { let backend = match registry_backend.as_str() { + "gha" => RegistryServerBackend::GHA, "local" => RegistryServerBackend::Local, "s3" => RegistryServerBackend::S3, _ => RegistryServerBackend::Unknown, diff --git a/registry/Cargo.toml b/registry/Cargo.toml index de706a29..54c7d34e 100644 --- a/registry/Cargo.toml +++ b/registry/Cargo.toml @@ -7,7 +7,10 @@ edition = "2021" anyhow = { default-features = false, version = "1" } aws-config = { default-features = false, features = ["behavior-version-latest", "rt-tokio", "rustls", "sso"], version = "1" } aws-sdk-s3 = { default-features = false, version = "1" } +reqwest = { default-features = false, version = "0", features = ["json", "rustls-tls"] } rsa = { default-features = false, version = "0" } +serde = { version = "1.0", features = ["derive"] } +sha2 = "0.10" tokio = { default-features = false, features = ["process", "rt-multi-thread"], version = "1" } tokio-stream = { default-features = false, features = ["io-util"], version = "0" } tonic = { default-features = false, version = "0" } diff --git a/registry/src/gha.rs b/registry/src/gha.rs new file mode 100644 index 00000000..1a7c1504 --- /dev/null +++ b/registry/src/gha.rs @@ -0,0 +1,222 @@ +use anyhow::{anyhow, Context, Result}; +use reqwest::{ + header::{HeaderMap, HeaderValue, ACCEPT, CONTENT_RANGE, CONTENT_TYPE}, + Client, StatusCode, +}; +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +use std::{ + fs::File, + io::{Read, Seek, SeekFrom}, + path::Path, + sync::Arc, +}; +use tokio::sync::Semaphore; +use tracing::info; + +const VERSION_SALT: &str = "1.0"; +const API_VERSION: &str = "6.0-preview.1"; + +#[derive(Debug, Serialize, Deserialize)] +pub struct ArtifactCacheEntry { + pub archive_location: String, + pub cache_key: String, + pub cache_version: String, + pub scope: String, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ReserveCacheRequest { + pub key: String, + pub version: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub cache_size: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ReserveCacheResponse { + pub cache_id: u64, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct CommitCacheRequest { + pub size: u64, +} + +#[derive(Debug)] +pub struct CacheClient { + client: Client, + base_url: String, +} + +impl CacheClient { + pub fn new() -> Result { + let token = std::env::var("ACTIONS_RUNTIME_TOKEN") + .context("ACTIONS_RUNTIME_TOKEN environment variable not found")?; + let base_url = std::env::var("ACTIONS_CACHE_URL") + .context("ACTIONS_CACHE_URL environment variable not found")?; + + let mut headers = HeaderMap::new(); + headers.insert( + ACCEPT, + HeaderValue::from_str(&format!("application/json;api-version={API_VERSION}"))?, + ); + headers.insert( + "Authorization", + HeaderValue::from_str(&format!("Bearer {token}"))?, + ); + + let client = Client::builder() + .user_agent("rust/github-actions-cache") + .default_headers(headers) + .build()?; + + Ok(Self { client, base_url }) + } + + pub async fn get_cache_entry( + &self, + keys: &[String], + paths: &[String], + compression_method: Option, + enable_cross_os_archive: bool, + ) -> Result> { + let version = get_cache_version(paths, compression_method, enable_cross_os_archive)?; + let keys_str = keys.join(","); + let url = format!( + "{}/_apis/artifactcache/cache?keys={}&version={}", + self.base_url, keys_str, version + ); + + let response = self.client.get(&url).send().await?; + + match response.status() { + StatusCode::NO_CONTENT => Ok(None), + StatusCode::OK => { + let entry = response.json::().await?; + Ok(Some(entry)) + } + status => Err(anyhow!("Unexpected status code: {}", status)), + } + } + + pub async fn reserve_cache( + &self, + key: &str, + paths: &[String], + compression_method: Option, + enable_cross_os_archive: bool, + cache_size: Option, + ) -> Result { + let version = get_cache_version(paths, compression_method, enable_cross_os_archive)?; + let url = format!("{}/_apis/artifactcache/caches", self.base_url); + + let request = ReserveCacheRequest { + cache_size, + key: key.to_string(), + version, + }; + + let response = self + .client + .post(&url) + .json(&request) + .send() + .await? + .error_for_status()?; + + Ok(response.json().await?) + } + + pub async fn save_cache( + &self, + cache_id: u64, + archive_path: &Path, + concurrency: usize, + chunk_size: usize, + ) -> Result<()> { + let file = File::open(archive_path)?; + let file_size = file.metadata()?.len(); + let url = format!("{}/_apis/artifactcache/caches/{}", self.base_url, cache_id); + + info!("Uploading cache file with size: {} bytes", file_size); + + // Create a semaphore to limit concurrent uploads + let semaphore = Arc::new(Semaphore::new(concurrency)); + let mut tasks = Vec::new(); + let file = Arc::new(tokio::sync::Mutex::new(file)); + + for chunk_start in (0..file_size).step_by(chunk_size) { + let chunk_end = (chunk_start + chunk_size as u64 - 1).min(file_size - 1); + let permit = semaphore.clone().acquire_owned().await?; + let client = self.client.clone(); + let url = url.clone(); + let file = file.clone(); + + let task = tokio::spawn(async move { + let _permit = permit; // Keep permit alive for the duration of the upload + let mut file = file.lock().await; + file.seek(SeekFrom::Start(chunk_start))?; + + let mut buffer = vec![0; (chunk_end - chunk_start + 1) as usize]; + file.read_exact(&mut buffer)?; + + let range = format!("bytes {}-{}/{}", chunk_start, chunk_end, file_size); + let response = client + .patch(&url) + .header(CONTENT_TYPE, "application/octet-stream") + .header(CONTENT_RANGE, &range) + .body(buffer) + .send() + .await? + .error_for_status()?; + + info!("Uploaded chunk response: {}", response.status()); + + Result::<()>::Ok(()) + }); + + tasks.push(task); + } + + // Wait for all upload tasks to complete + for task in tasks { + task.await??; + } + + // Commit the cache + info!("Committing cache"); + let commit_request = CommitCacheRequest { size: file_size }; + self.client + .post(&url) + .json(&commit_request) + .send() + .await? + .error_for_status()?; + + info!("Cache saved successfully"); + Ok(()) + } +} + +fn get_cache_version( + paths: &[String], + compression_method: Option, + enable_cross_os_archive: bool, +) -> Result { + let mut components = paths.to_vec(); + + if let Some(method) = compression_method { + components.push(method); + } + + if cfg!(windows) && !enable_cross_os_archive { + components.push("windows-only".to_string()); + } + + components.push(VERSION_SALT.to_string()); + + let mut hasher = Sha256::new(); + hasher.update(components.join("|")); + Ok(format!("{:x}", hasher.finalize())) +} diff --git a/registry/src/lib.rs b/registry/src/lib.rs index c8bb7e84..d93801cb 100644 --- a/registry/src/lib.rs +++ b/registry/src/lib.rs @@ -5,13 +5,14 @@ use rsa::{ sha2::Sha256, signature::Verifier, }; +use std::path::Path; use tokio::{ fs::{read, write}, sync::mpsc, }; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tonic::{transport::Server, Request, Response, Status, Streaming}; -use tracing::error; +use tracing::{error, info}; use vorpal_notary::get_public_key; use vorpal_schema::vorpal::registry::v0::{ registry_service_server::{RegistryService, RegistryServiceServer}, @@ -24,12 +25,15 @@ use vorpal_store::paths::{ set_timestamps, }; +mod gha; + const DEFAULT_CHUNK_SIZE: usize = 8192; #[derive(Clone, Debug, Default, PartialEq)] pub enum RegistryServerBackend { #[default] Unknown, + GHA, Local, S3, } @@ -69,6 +73,39 @@ impl RegistryService for RegistryServer { let backend = self.backend.clone(); + if backend == RegistryServerBackend::GHA { + let gha = gha::CacheClient::new().map_err(|err| { + Status::internal(format!("failed to create GHA cache client: {:?}", err)) + })?; + + let key = match request.kind() { + Artifact => format!("{}-{}-artifact", request.name, request.hash), + ArtifactSource => format!("{}-{}-source", request.name, request.hash), + _ => return Err(Status::invalid_argument("unsupported store kind")), + }; + + let path = match request.kind() { + Artifact => get_artifact_archive_path(&request.hash, &request.name), + ArtifactSource => get_source_archive_path(&request.hash, &request.name), + _ => return Err(Status::invalid_argument("unsupported store kind")), + }; + + let cache_entry = gha + .get_cache_entry(&[key], &[path.to_string_lossy().to_string()], None, false) + .await + .map_err(|e| { + Status::internal(format!("failed to get cache entry: {:?}", e.to_string())) + })?; + + if cache_entry.is_none() { + return Err(Status::not_found("store path not found")); + } + + info!("cache entry: {:?}", cache_entry); + + return Ok(Response::new(RegistryResponse { success: true })); + } + if backend == RegistryServerBackend::Local { let path = match request.kind() { Artifact => get_artifact_archive_path(&request.hash, &request.name), @@ -145,6 +182,94 @@ impl RegistryService for RegistryServer { return; } + if backend == RegistryServerBackend::GHA { + let gha = gha::CacheClient::new().expect("failed to create GHA cache client"); + + let key = match request.kind() { + Artifact => format!("{}-{}-artifact", request.name, request.hash), + ArtifactSource => format!("{}-{}-source", request.name, request.hash), + _ => { + if let Err(err) = tx + .send(Err(Status::invalid_argument("unsupported store kind"))) + .await + { + error!("failed to send store error: {:?}", err); + } + + return; + } + }; + + let path = match request.kind() { + Artifact => get_artifact_archive_path(&request.hash, &request.name), + ArtifactSource => get_source_archive_path(&request.hash, &request.name), + _ => { + if let Err(err) = tx + .send(Err(Status::invalid_argument("unsupported store kind"))) + .await + { + error!("failed to send store error: {:?}", err); + } + + return; + } + }; + + let cache_entry = gha + .get_cache_entry(&[key], &[path.to_string_lossy().to_string()], None, false) + .await + .expect("failed to get cache entry"); + + if cache_entry.is_none() { + if let Err(err) = tx + .send(Err(Status::not_found("store path not found"))) + .await + { + error!("failed to send store error: {:?}", err); + } + + return; + } + + let cache_entry = cache_entry.unwrap(); + let cache_entry_archive_path = Path::new(&cache_entry.archive_location); + + if !cache_entry_archive_path.exists() { + if let Err(err) = tx + .send(Err(Status::not_found("store path not found"))) + .await + { + error!("failed to send store error: {:?}", err); + } + + return; + } + + let data = match read(&cache_entry_archive_path).await { + Ok(data) => data, + Err(err) => { + if let Err(err) = tx.send(Err(Status::internal(err.to_string()))).await { + error!("failed to send store error: {:?}", err); + } + + return; + } + }; + + for chunk in data.chunks(DEFAULT_CHUNK_SIZE) { + if let Err(err) = tx + .send(Ok(RegistryPullResponse { + data: chunk.to_vec(), + })) + .await + { + error!("failed to send store chunk: {:?}", err); + + break; + } + } + } + if backend == RegistryServerBackend::Local { let path = match request.kind() { Artifact => get_artifact_archive_path(&request.hash, &request.name), @@ -353,6 +478,63 @@ impl RegistryService for RegistryServer { let hash = data_hash.unwrap(); let name = data_name.unwrap(); + if backend == RegistryServerBackend::GHA { + let gha = gha::CacheClient::new().map_err(|err| { + Status::internal(format!("failed to create GHA cache client: {:?}", err)) + })?; + + let key = match data_kind { + Artifact => format!("{}-{}-artifact", name, hash), + ArtifactSource => format!("{}-{}-source", name, hash), + _ => return Err(Status::invalid_argument("unsupported store kind")), + }; + + let path = match data_kind { + Artifact => get_artifact_archive_path(&hash, &name), + ArtifactSource => get_source_archive_path(&hash, &name), + _ => return Err(Status::invalid_argument("unsupported store kind")), + }; + + let cache_size = data.len() as u64; + + let reserve_response = gha + .reserve_cache( + &key, + &[path.to_string_lossy().to_string()], + None, + false, + Some(cache_size), + ) + .await + .map_err(|e| { + Status::internal(format!("failed to reserve cache: {:?}", e.to_string())) + })?; + + if reserve_response.cache_id == 0 { + return Err(Status::internal("failed to reserve cache")); + } + + let archive_path = path.clone(); + + if let Err(err) = write(&archive_path, &data).await { + return Err(Status::internal(format!( + "failed to write store path: {:?}", + err + ))); + } + + set_timestamps(&archive_path) + .await + .map_err(|err| Status::internal(format!("failed to sanitize path: {:?}", err)))?; + + let _ = gha + .save_cache(reserve_response.cache_id, &archive_path, 1, 2048) + .await + .map_err(|e| { + Status::internal(format!("failed to save cache: {:?}", e.to_string())) + })?; + } + if backend == RegistryServerBackend::Local { let path = match data_kind { Artifact => get_artifact_archive_path(&hash, &name),