Skip to content

Commit

Permalink
[FOLLOW-UP] Format Spark Integration code by javafmtAll (unitycatalog…
Browse files Browse the repository at this point in the history
…#291)

**Description of changes**

<!-- Please state what you've changed and how it might affect the users.
-->
[FOLLOW-UP] Format Spark Integration code by javafmtAll.
  • Loading branch information
amaliujia authored Jul 29, 2024
1 parent 78b7e13 commit 0de24a0
Show file tree
Hide file tree
Showing 3 changed files with 355 additions and 356 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import io.unitycatalog.client.model.{AwsCredentials, GenerateTemporaryTableCrede

import java.net.URI
import java.util

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,88 +1,87 @@
package io.unitycatalog.connectors.spark;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.util.Progressable;

import java.io.IOException;

// A wrapper over the local file system to test UC table credentials.
public class CredentialTestFileSystem extends RawLocalFileSystem {

@Override
protected void checkPath(Path path) {
// Do nothing.
}
@Override
protected void checkPath(Path path) {
// Do nothing.
}

@Override
public FSDataOutputStream create(
Path f,
boolean overwrite,
int bufferSize,
short replication,
long blockSize,
Progressable progress) throws IOException {
return super.create(
toLocalPath(f), overwrite, bufferSize, replication, blockSize, progress);
}
@Override
public FSDataOutputStream create(
Path f,
boolean overwrite,
int bufferSize,
short replication,
long blockSize,
Progressable progress)
throws IOException {
return super.create(toLocalPath(f), overwrite, bufferSize, replication, blockSize, progress);
}

@Override
public FileStatus getFileStatus(Path f) throws IOException {
if (f.toString().startsWith("s3:")) {
String s3Prefix = "s3://" + f.toUri().getHost();
return restorePathInFileStatus(s3Prefix, super.getFileStatus(toLocalPath(f)));
} else {
assert f.toString().startsWith("file:");
return super.getFileStatus(f);
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
if (f.toString().startsWith("s3:")) {
String s3Prefix = "s3://" + f.toUri().getHost();
return restorePathInFileStatus(s3Prefix, super.getFileStatus(toLocalPath(f)));
} else {
assert f.toString().startsWith("file:");
return super.getFileStatus(f);
}
}

@Override
public FSDataInputStream open(Path f) throws IOException {
return super.open(toLocalPath(f));
}
@Override
public FSDataInputStream open(Path f) throws IOException {
return super.open(toLocalPath(f));
}

@Override
public RemoteIterator<LocatedFileStatus> listFiles(Path f, boolean recursive) throws IOException {
throw new RuntimeException("implement it when testing s3a");
}
@Override
public RemoteIterator<LocatedFileStatus> listFiles(Path f, boolean recursive) throws IOException {
throw new RuntimeException("implement it when testing s3a");
}

@Override
public FileStatus[] listStatus(Path f) throws IOException {
String s3Prefix = "s3://" + f.toUri().getHost();
FileStatus[] files = super.listStatus(toLocalPath(f));
FileStatus[] res = new FileStatus[files.length];
for (int i = 0; i < files.length; i++) {
res[i] = restorePathInFileStatus(s3Prefix, files[i]);
}
return res;
@Override
public FileStatus[] listStatus(Path f) throws IOException {
String s3Prefix = "s3://" + f.toUri().getHost();
FileStatus[] files = super.listStatus(toLocalPath(f));
FileStatus[] res = new FileStatus[files.length];
for (int i = 0; i < files.length; i++) {
res[i] = restorePathInFileStatus(s3Prefix, files[i]);
}
return res;
}

private FileStatus restorePathInFileStatus(String s3Prefix, FileStatus f) {
String path = f.getPath().toString().replace("file:", s3Prefix);
return new FileStatus(
f.getLen(),
f.isDirectory(),
f.getReplication(),
f.getBlockSize(),
f.getModificationTime(),
new Path(path));
}
private FileStatus restorePathInFileStatus(String s3Prefix, FileStatus f) {
String path = f.getPath().toString().replace("file:", s3Prefix);
return new FileStatus(
f.getLen(),
f.isDirectory(),
f.getReplication(),
f.getBlockSize(),
f.getModificationTime(),
new Path(path));
}

private Path toLocalPath(Path f) {
Configuration conf = getConf();
String host = f.toUri().getHost();
if ("test-bucket0".equals(host)) {
assert "accessKey0".equals(conf.get("fs.s3a.access.key"));
assert "secretKey0".equals(conf.get("fs.s3a.secret.key"));
assert "sessionToken0".equals(conf.get("fs.s3a.session.token"));
} else if ("test-bucket1".equals(host)) {
assert "accessKey1".equals(conf.get("fs.s3a.access.key"));
assert "secretKey1".equals(conf.get("fs.s3a.secret.key"));
assert "sessionToken1".equals(conf.get("fs.s3a.session.token"));
} else {
throw new RuntimeException("invalid path: " + f);
}
return new Path(f.toString().replaceAll("s3://.*?/", "file:///"));
private Path toLocalPath(Path f) {
Configuration conf = getConf();
String host = f.toUri().getHost();
if ("test-bucket0".equals(host)) {
assert "accessKey0".equals(conf.get("fs.s3a.access.key"));
assert "secretKey0".equals(conf.get("fs.s3a.secret.key"));
assert "sessionToken0".equals(conf.get("fs.s3a.session.token"));
} else if ("test-bucket1".equals(host)) {
assert "accessKey1".equals(conf.get("fs.s3a.access.key"));
assert "secretKey1".equals(conf.get("fs.s3a.secret.key"));
assert "sessionToken1".equals(conf.get("fs.s3a.session.token"));
} else {
throw new RuntimeException("invalid path: " + f);
}
return new Path(f.toString().replaceAll("s3://.*?/", "file:///"));
}
}
Loading

0 comments on commit 0de24a0

Please sign in to comment.