Skip to content

Commit

Permalink
Refactor downloading of interop files
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Jan 18, 2024
1 parent 5a2ba7b commit d342142
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 221 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.parquet.hadoop;

import java.io.IOException;
import okhttp3.OkHttpClient;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ErrorCollector;
Expand All @@ -37,10 +36,8 @@ public class ITTestEncryptionOptions {

TestEncryptionOptions test = new TestEncryptionOptions();

OkHttpClient httpClient = new OkHttpClient();

@Test
public void testInteropReadEncryptedParquetFiles() throws IOException {
test.testInteropReadEncryptedParquetFiles(errorCollector, httpClient);
test.testInteropReadEncryptedParquetFiles(errorCollector);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.parquet.hadoop;

import java.io.IOException;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InterOpTester {
private static final String PARQUET_TESTING_REPO = "https://github.com/apache/parquet-testing/raw/";
private static final String PARQUET_TESTING_PATH = "target/parquet-testing/";
private static final Logger LOG = LoggerFactory.getLogger(TestInterOpReadByteStreamSplit.class);
private OkHttpClient httpClient = new OkHttpClient();

public Path GetInterOpFile(String fileName, String changeset) throws IOException {
return GetInterOpFile(fileName, changeset, "data");
}

/**
* Get interOp file from parquet-testing repo, possibly downloading it.
*
* @param fileName The name of the file to get.
* @param changeset The changeset ID in the parquet-testing repo.
* @param subdir The subdirectory the file lives in inside the repo (for example "data").
* @return Path The local path to the interOp file.
*/
public Path GetInterOpFile(String fileName, String changeset, String subdir) throws IOException {
Path rootPath = new Path(PARQUET_TESTING_PATH, subdir);
Configuration conf = new Configuration();
FileSystem fs = rootPath.getFileSystem(conf);
if (!fs.exists(rootPath)) {
LOG.info("Create folder for interOp files: " + rootPath);
if (!fs.mkdirs(rootPath)) {
throw new IOException("Cannot create path " + rootPath);
}
}

Path file = new Path(rootPath, fileName);
if (!fs.exists(file)) {
String downloadUrl = String.format("%s/%s/%s/%s", PARQUET_TESTING_REPO, changeset, subdir, fileName);
LOG.info("Download interOp file: " + downloadUrl);
Request request = new Request.Builder().url(downloadUrl).build();
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new IOException("Failed to download file: " + response);
}
try (FSDataOutputStream fdos = fs.create(file)) {
fdos.write(response.body().bytes());
}
}
}
return file;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,10 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.crypto.ColumnDecryptionProperties;
import org.apache.parquet.crypto.ColumnEncryptionProperties;
import org.apache.parquet.crypto.DecryptionKeyRetrieverMock;
import org.apache.parquet.crypto.FileDecryptionProperties;
import org.apache.parquet.crypto.FileEncryptionProperties;
import org.apache.parquet.crypto.ParquetCipher;
import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
import org.apache.parquet.crypto.SingleRow;
import org.apache.parquet.crypto.*;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
Expand Down Expand Up @@ -124,8 +108,6 @@
*/
public class TestEncryptionOptions {
private static final Logger LOG = LoggerFactory.getLogger(TestEncryptionOptions.class);
// The link includes a reference to a specific commit. To take a newer version - update this link.
private static final String PARQUET_TESTING_REPO = "https://github.com/apache/parquet-testing/raw/40379b3/data/";

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
Expand All @@ -134,8 +116,8 @@ public class TestEncryptionOptions {
public ErrorCollector localErrorCollector = new ErrorCollector();

private ErrorCollector errorCollector;

private static String PARQUET_TESTING_PATH = "target/parquet-testing/data";
private InterOpTester interop = new InterOpTester();
private static final String CHANGESET = "40379b3";

private static final byte[] FOOTER_ENCRYPTION_KEY = "0123456789012345".getBytes();
private static final byte[][] COLUMN_ENCRYPTION_KEYS = {
Expand Down Expand Up @@ -348,19 +330,14 @@ public void testWriteReadEncryptedParquetFiles() throws IOException {
* It's not moved into a separate file since it shares many utilities with the unit tests in this file.
*
* @param errorCollector - the error collector of the integration tests suite
* @param httpClient - HTTP client to be used for fetching parquet files for interop tests
* @throws IOException
*/
public void testInteropReadEncryptedParquetFiles(ErrorCollector errorCollector, OkHttpClient httpClient)
throws IOException {
public void testInteropReadEncryptedParquetFiles(ErrorCollector errorCollector) throws IOException {
this.errorCollector = errorCollector;
Path rootPath = new Path(PARQUET_TESTING_PATH);
LOG.info("======== testInteropReadEncryptedParquetFiles {} ========", rootPath.toString());
boolean readOnlyEncrypted = true;
downloadInteropFiles(rootPath, readOnlyEncrypted, httpClient);
byte[] AADPrefix = AAD_PREFIX_STRING.getBytes(StandardCharsets.UTF_8);
// Read using various decryption configurations.
testInteropReadEncryptedParquetFiles(rootPath, readOnlyEncrypted, LINEAR_DATA);
testInteropReadEncryptedParquetFiles(readOnlyEncrypted, LINEAR_DATA);
}

private void testWriteEncryptedParquetFiles(Path root, List<SingleRow> data) throws IOException {
Expand Down Expand Up @@ -505,48 +482,7 @@ private void testReadEncryptedParquetFiles(Path root, List<SingleRow> data) {
}
}

private void downloadInteropFiles(Path rootPath, boolean readOnlyEncrypted, OkHttpClient httpClient)
throws IOException {
LOG.info("Download interop files if needed");
Configuration conf = new Configuration();
FileSystem fs = rootPath.getFileSystem(conf);
LOG.info(rootPath + " exists?: " + fs.exists(rootPath));
if (!fs.exists(rootPath)) {
LOG.info("Create folder for interop files: " + rootPath);
if (!fs.mkdirs(rootPath)) {
throw new IOException("Cannot create path " + rootPath);
}
}

EncryptionConfiguration[] encryptionConfigurations = EncryptionConfiguration.values();
for (EncryptionConfiguration encryptionConfiguration : encryptionConfigurations) {
if (readOnlyEncrypted && (EncryptionConfiguration.NO_ENCRYPTION == encryptionConfiguration)) {
continue;
}
if (EncryptionConfiguration.UNIFORM_ENCRYPTION_PLAINTEXT_FOOTER == encryptionConfiguration) {
continue;
}
if (EncryptionConfiguration.ENCRYPT_COLUMNS_PLAIN_FOOTER_COMPLETE == encryptionConfiguration) {
continue;
}
String fileName = getFileName(encryptionConfiguration);
Path file = new Path(rootPath, fileName);
if (!fs.exists(file)) {
String downloadUrl = PARQUET_TESTING_REPO + fileName;
LOG.info("Download interop file: " + downloadUrl);
Request request = new Request.Builder().url(downloadUrl).build();
Response response = httpClient.newCall(request).execute();
if (!response.isSuccessful()) {
throw new IOException("Failed to download file: " + response);
}
try (FSDataOutputStream fdos = fs.create(file)) {
fdos.write(response.body().bytes());
}
}
}
}

private void testInteropReadEncryptedParquetFiles(Path root, boolean readOnlyEncrypted, List<SingleRow> data)
private void testInteropReadEncryptedParquetFiles(boolean readOnlyEncrypted, List<SingleRow> data)
throws IOException {
Configuration conf = new Configuration();
DecryptionConfiguration[] decryptionConfigurations = DecryptionConfiguration.values();
Expand All @@ -562,7 +498,7 @@ private void testInteropReadEncryptedParquetFiles(Path root, boolean readOnlyEnc
if (EncryptionConfiguration.ENCRYPT_COLUMNS_PLAIN_FOOTER_COMPLETE == encryptionConfiguration) {
continue;
}
Path file = new Path(root, getFileName(encryptionConfiguration));
Path file = interop.GetInterOpFile(getFileName(encryptionConfiguration), CHANGESET);
LOG.info("==> Decryption configuration {}", decryptionConfiguration);
FileDecryptionProperties fileDecryptionProperties = decryptionConfiguration.getDecryptionProperties();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,34 +23,19 @@
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestInterOpReadByteStreamSplit {
// The link includes a reference to a specific commit. To take a newer version - update this link.
private static final String PARQUET_TESTING_REPO = "https://github.com/apache/parquet-testing/raw/4cb3cff/data/";
private static String PARQUET_TESTING_PATH = "target/parquet-testing/data";
private static String FLOATS_FILE = "byte_stream_split.zstd.parquet";

private static final Logger LOG = LoggerFactory.getLogger(TestInterOpReadByteStreamSplit.class);
private OkHttpClient httpClient = new OkHttpClient();
private InterOpTester interop = new InterOpTester();
private static final String FLOATS_FILE = "byte_stream_split.zstd.parquet";
private static final String CHANGESET = "4cb3cff";

@Test
public void testReadFloats() throws IOException {
Path rootPath = new Path(PARQUET_TESTING_PATH);

// Test simple parquet file with lz4 raw compressed
Path floatsFile = downloadInterOpFiles(rootPath, FLOATS_FILE, httpClient);
Path floatsFile = interop.GetInterOpFile(FLOATS_FILE, CHANGESET);
final int expectRows = 300;

try (ParquetReader<Group> reader =
Expand Down Expand Up @@ -85,30 +70,4 @@ public void testReadFloats() throws IOException {
assertTrue(reader.read() == null);
}
}

private Path downloadInterOpFiles(Path rootPath, String fileName, OkHttpClient httpClient) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = rootPath.getFileSystem(conf);
if (!fs.exists(rootPath)) {
LOG.info("Create folder for interOp files: " + rootPath);
if (!fs.mkdirs(rootPath)) {
throw new IOException("Cannot create path " + rootPath);
}
}

Path file = new Path(rootPath, fileName);
if (!fs.exists(file)) {
String downloadUrl = PARQUET_TESTING_REPO + fileName;
LOG.info("Download interOp file: " + downloadUrl);
Request request = new Request.Builder().url(downloadUrl).build();
Response response = httpClient.newCall(request).execute();
if (!response.isSuccessful()) {
throw new IOException("Failed to download file: " + response);
}
try (FSDataOutputStream fdos = fs.create(file)) {
fdos.write(response.body().bytes());
}
}
return file;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,44 +19,29 @@

package org.apache.parquet.hadoop;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.junit.Assert.*;

import java.io.IOException;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.api.Binary;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestInterOpReadFloat16 {

// The link includes a reference to a specific commit. To take a newer version - update this link.
private static final String PARQUET_TESTING_REPO = "https://github.com/apache/parquet-testing/raw/da467da/data/";
private static String PARQUET_TESTING_PATH = "target/parquet-testing/data";
private static String FLOAT16_NONZEROS_NANS_FILE = "float16_nonzeros_and_nans.parquet";
private static String FLOAT16_ZEROS_NANS_FILE = "float16_zeros_and_nans.parquet";
private static final String CHANGESET = "da467da";

private static final Logger LOG = LoggerFactory.getLogger(TestInterOpReadFloat16.class);
private OkHttpClient httpClient = new OkHttpClient();
private InterOpTester interop = new InterOpTester();

@Test
public void testInterOpReadFloat16NonZerosAndNansParquetFiles() throws IOException {
Path rootPath = new Path(PARQUET_TESTING_PATH);
LOG.info("======== testInterOpReadFloat16NonZerosAndNansParquetFiles {} ========", rootPath);
Path filePath = interop.GetInterOpFile(FLOAT16_NONZEROS_NANS_FILE, CHANGESET);

Path filePath = downloadInterOpFiles(rootPath, FLOAT16_NONZEROS_NANS_FILE, httpClient);
final int expectRows = 8;
Binary[] c0ExpectValues = {
null,
Expand Down Expand Up @@ -100,10 +85,8 @@ public void testInterOpReadFloat16NonZerosAndNansParquetFiles() throws IOExcepti

@Test
public void testInterOpReadFloat16ZerosAndNansParquetFiles() throws IOException {
Path rootPath = new Path(PARQUET_TESTING_PATH);
LOG.info("======== testInterOpReadFloat16ZerosAndNansParquetFiles {} ========", rootPath);
Path filePath = interop.GetInterOpFile(FLOAT16_ZEROS_NANS_FILE, "da467da");

Path filePath = downloadInterOpFiles(rootPath, FLOAT16_ZEROS_NANS_FILE, httpClient);
final int expectRows = 3;
Binary[] c0ExpectValues = {
null,
Expand Down Expand Up @@ -138,32 +121,4 @@ public void testInterOpReadFloat16ZerosAndNansParquetFiles() throws IOException
}
}
}

private Path downloadInterOpFiles(Path rootPath, String fileName, OkHttpClient httpClient) throws IOException {
LOG.info("Download interOp files if needed");
Configuration conf = new Configuration();
FileSystem fs = rootPath.getFileSystem(conf);
LOG.info(rootPath + " exists?: " + fs.exists(rootPath));
if (!fs.exists(rootPath)) {
LOG.info("Create folder for interOp files: " + rootPath);
if (!fs.mkdirs(rootPath)) {
throw new IOException("Cannot create path " + rootPath);
}
}

Path file = new Path(rootPath, fileName);
if (!fs.exists(file)) {
String downloadUrl = PARQUET_TESTING_REPO + fileName;
LOG.info("Download interOp file: " + downloadUrl);
Request request = new Request.Builder().url(downloadUrl).build();
Response response = httpClient.newCall(request).execute();
if (!response.isSuccessful()) {
throw new IOException("Failed to download file: " + response);
}
try (FSDataOutputStream fdos = fs.create(file)) {
fdos.write(response.body().bytes());
}
}
return file;
}
}
Loading

0 comments on commit d342142

Please sign in to comment.