Skip to content

Commit

Permalink
Add script for benchmarking Pioneer decryption (#1275)
Browse files Browse the repository at this point in the history
* Add class for generating encoded pings with beam

* Avoid using beam due to large overhead

* Add scaffolding for keys and encrypting messages

* Implement method to encrypt data

* Add metadata and keys

* Export MAVEN_OPTS

* wip: running local beam job`

* Upate benchmarking script for running locally

Running locally takes a long time, I don't recommend it.

* Include private key inside of generator

* Add final benchmarking script

* Move script to bin

* Fix checkstyle issues

* Address review
  • Loading branch information
acmiyaguchi authored May 1, 2020
1 parent 83be734 commit 8945ede
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 0 deletions.
1 change: 1 addition & 0 deletions bin/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ docker run $INTERACTIVE_FLAGS --rm \
-v "$GIT_TOPLEVEL":/var/maven/project \
-w /var/maven/project/"$GIT_PREFIX" \
-v ~/.m2:/var/maven/.m2 \
-e MAVEN_OPTS \
-e MAVEN_CONFIG=/var/maven/.m2 \
-e GOOGLE_APPLICATION_CREDENTIALS \
$MOUNT_CREDENTIALS_FLAGS \
Expand Down
103 changes: 103 additions & 0 deletions ingestion-beam/bin/run-pioneer-benchmark
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#!/bin/bash
# Runs a benchmark that can be used to quantify the overhead of the
# DecryptPioneerPayloads step. A document sample of ingested data is encrypted
# using JOSE and is run through the Pioneer-enabled decoder. This is compared
# against the plaintext data.
#
# Initial findings show that for every 1 minute spent on ParsePayload, there are
# 3 minutes spend on DecryptPioneerPayloads. Peak throughput in ParsePayload
# goes from 3360 elements/sec to 2466 elements/sec when adding encryption.
#
# This script requires the use of GNU Coreutils. This may be installed on MacOS
# via homebrew: `brew install coreutils`.

set -ex

export MAVEN_OPTS="-Xms8g -Xmx8g"
reset=${RESET:-false}
project=$(gcloud config get-value project)
bucket="gs://${BUCKET?bucket value must be specified}"
prefix="ingestion-beam-benchmark"
staging="benchmark_staging"

cd "$(dirname "$0")/.."

if [[ ! -f document_sample.ndjson ]]; then
echo "missing document_sample.ndjson, run download-document-sample"
exit 1
fi

if [[ ! -f pioneer_benchmark_data.ndjson ]] || [[ ${reset} == "true" ]]; then
# generate the data using the document sample, this may take a while depending on your machine
./bin/mvn clean compile exec:java -Dexec.mainClass=com.mozilla.telemetry.PioneerBenchmarkGenerator
fi

# assert bucket can be read
gsutil ls "$bucket" &> /dev/null

# generate a new folder and sync it to gcs, assumes a bucket value
if [[ ! -d $staging ]]; then
plaintext=$staging/input/plaintext
ciphertext=$staging/input/ciphertext
metadata=$staging/metadata
mkdir -p $plaintext
mkdir -p $ciphertext
mkdir -p $metadata
# shuffle to avoid data skew, and to prepare for file splitting if necessary
shuf document_sample.ndjson > $plaintext/part-0.ndjson
shuf pioneer_benchmark_data.ndjson > $ciphertext/part-0.ndjson
cp pioneer_benchmark_key.json $staging/metadata/key.json
# compute the location of the remote key and insert it into the metadata
remote_key="$bucket/$prefix/metadata/key.json"
jq "(.. | .private_key_uri?) |= \"$remote_key\"" pioneer_benchmark_metadata.json > $staging/metadata/metadata.json
cp schemas.tar.gz $metadata
cp cities15000.txt $metadata
cp GeoLite2-City.mmdb $metadata
fi

# this can take a while, depending on your upload speed (~1 GB of data)
gsutil -m rsync -R -d $staging/ "$bucket/$prefix/"

# plaintext
./bin/mvn compile exec:java -Dexec.mainClass=com.mozilla.telemetry.Decoder -Dexec.args="\
--runner=Dataflow \
--profilingAgentConfiguration='{\"APICurated\": true}'
--project=$project \
--autoscalingAlgorithm=NONE \
--workerMachineType=n1-standard-1 \
--gcpTempLocation=$bucket/tmp \
--numWorkers=2 \
--geoCityDatabase=$bucket/$prefix/metadata/GeoLite2-City.mmdb \
--geoCityFilter=$bucket/$prefix/metadata/cities15000.txt \
--schemasLocation=$bucket/$prefix/metadata/schemas.tar.gz \
--inputType=file \
--input=$bucket/$prefix/input/plaintext/'part-*' \
--outputType=file \
--output=$bucket/$prefix/output/plaintext/ \
--errorOutputType=file \
--errorOutput=$bucket/$prefix/error/plaintext/ \
"

# ciphertext
./bin/mvn compile exec:java -Dexec.mainClass=com.mozilla.telemetry.Decoder -Dexec.args="\
--runner=Dataflow \
--profilingAgentConfiguration='{\"APICurated\": true}'
--project=$project \
--autoscalingAlgorithm=NONE \
--workerMachineType=n1-standard-1 \
--gcpTempLocation=$bucket/tmp \
--numWorkers=2 \
--pioneerEnabled=true \
--pioneerMetadataLocation=$bucket/$prefix/metadata/metadata.json \
--pioneerKmsEnabled=false \
--pioneerDecompressPayload=false \
--geoCityDatabase=$bucket/$prefix/metadata/GeoLite2-City.mmdb \
--geoCityFilter=$bucket/$prefix/metadata/cities15000.txt \
--schemasLocation=$bucket/$prefix/metadata/schemas.tar.gz \
--inputType=file \
--input=$bucket/$prefix/input/ciphertext/'part-*' \
--outputType=file \
--output=$bucket/$prefix/output/ciphertext/ \
--errorOutputType=file \
--errorOutput=$bucket/$prefix/error/ciphertext/ \
"
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package com.mozilla.telemetry;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Charsets;
import com.mozilla.telemetry.util.Json;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.PublicKey;
import java.util.HashSet;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.jose4j.jwe.ContentEncryptionAlgorithmIdentifiers;
import org.jose4j.jwe.JsonWebEncryption;
import org.jose4j.jwe.KeyManagementAlgorithmIdentifiers;
import org.jose4j.jwk.EcJwkGenerator;
import org.jose4j.jwk.EllipticCurveJsonWebKey;
import org.jose4j.jwk.JsonWebKey.OutputControlLevel;
import org.jose4j.keys.EllipticCurves;
import org.jose4j.lang.JoseException;

/** Generate a dataset for benchmarking the DecryptPioneerPayloads transform. */
public class PioneerBenchmarkGenerator {

static final ObjectMapper mapper = new ObjectMapper();

/** Encrypt a payload using a public key and insert it into an envelope. */
public static byte[] encrypt(byte[] data, PublicKey key) throws IOException, JoseException {
JsonWebEncryption jwe = new JsonWebEncryption();
jwe.setPayload(new String(data, Charsets.UTF_8));
jwe.setAlgorithmHeaderValue(KeyManagementAlgorithmIdentifiers.ECDH_ES);
jwe.setEncryptionMethodHeaderParameter(ContentEncryptionAlgorithmIdentifiers.AES_256_GCM);
jwe.setKey(key);
String serializedJwe = jwe.getCompactSerialization();
ObjectNode node = mapper.createObjectNode();
node.put("payload", serializedJwe);
return Json.asString(node).getBytes(Charsets.UTF_8);
}

/** Read a Pubsub ndjson message wrapping failures in an Optional. */
public static Optional<PubsubMessage> parsePubsub(String data) {
try {
return Optional.of(Json.readPubsubMessage(data));
} catch (Exception e) {
e.printStackTrace();
return Optional.empty();
}
}

/** Encrypt the payload in a Pubsub message and place it into an envelope. */
public static Optional<String> transform(PubsubMessage message, PublicKey key) {
try {
PubsubMessage encryptedMessage = new PubsubMessage(encrypt(message.getPayload(), key),
message.getAttributeMap());
return Optional.of(Json.asString(encryptedMessage));
} catch (IOException | JoseException e) {
e.printStackTrace();
return Optional.empty();
}
}

/** Read in documents in the shape of Pubsub ndjson files and encrypt using
* Pioneer parameters and envelope. Write out the relevant metadata files for
* the single key in use. */
public static void main(final String[] args) throws JoseException, IOException {
final Path inputPath = Paths.get("document_sample.ndjson");
final Path outputPath = Paths.get("pioneer_benchmark_data.ndjson");
final Path keyPath = Paths.get("pioneer_benchmark_key.json");
final Path metadataPath = Paths.get("pioneer_benchmark_metadata.json");

EllipticCurveJsonWebKey key = EcJwkGenerator.generateJwk(EllipticCurves.P256);
HashSet<String> namespaces = new HashSet<String>();

// runs in ~2:30 min
try (Stream<String> stream = Files.lines(inputPath)) {
// side-effects to get the set of attributes
Files.write(outputPath, (Iterable<String>) stream.map(PioneerBenchmarkGenerator::parsePubsub)
.filter(Optional::isPresent).map(Optional::get).map(message -> {
// side-effects and side-input
namespaces.add(message.getAttribute("document_namespace"));
return transform(message, key.getPublicKey());
}).filter(Optional::isPresent).map(Optional::get)::iterator);
} catch (IOException e) {
e.printStackTrace();
}

// write out the key
Files.write(keyPath, key.toJson(OutputControlLevel.INCLUDE_PRIVATE).getBytes(Charsets.UTF_8));

// write out the metadata
ArrayNode metadata = mapper.createArrayNode();
for (String namespace : namespaces) {
ObjectNode node = mapper.createObjectNode();
node.put("private_key_id", namespace);
// TODO: write this to the appropriate location like gcs, if relevant
node.put("private_key_uri", keyPath.toString());
// empty value
node.put("kms_resource_id", "");
metadata.add(node);
}
Files.write(metadataPath, metadata.toPrettyString().getBytes(Charsets.UTF_8));

}
}

0 comments on commit 8945ede

Please sign in to comment.