Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
Add Unit tests for HdfsUploader (#603)
Browse files Browse the repository at this point in the history
1. Refactor HdfsUploader
2. Add Unit tests for HdfsUploader
  • Loading branch information
maosongfu committed May 8, 2016
1 parent b28007b commit ba97081
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,33 @@

import com.twitter.heron.spi.common.ShellUtils;

final class HdfsUtils {
public class HdfsController {
private final String configDir;
private final boolean isVerbose;

private HdfsUtils() {
public HdfsController(String configDir, boolean isVerbose) {
this.configDir = configDir;
this.isVerbose = isVerbose;
}

public static boolean isFileExists(String configDir, String fileURI, boolean isVerbose) {
String command = String.format("hadoop --config %s fs -test -e %s", configDir, fileURI);
public boolean exists(String filePath) {
String command = String.format("hadoop --config %s fs -test -e %s", configDir, filePath);
return 0 == ShellUtils.runProcess(isVerbose, command, null, null);
}

public static boolean createDir(String configDir, String dir, boolean isVerbose) {
public boolean mkdirs(String dir) {
String command = String.format("hadoop --config %s fs -mkdir -p %s", configDir, dir);
return 0 == ShellUtils.runProcess(isVerbose, command, null, null);
}

public static boolean copyFromLocal(
String configDir, String source, String target, boolean isVerbose) {
public boolean copyFromLocalFile(String source, String destination) {
String command = String.format("hadoop --config %s fs -copyFromLocal -f %s %s",
configDir, source, target);
configDir, source, destination);
return 0 == ShellUtils.runProcess(isVerbose, command, null, null);
}

public static boolean remove(String configDir, String fileURI, boolean isVerbose) {
String command = String.format("hadoop --config %s fs -rm %s", configDir, fileURI);
public boolean delete(String filePath) {
String command = String.format("hadoop --config %s fs -rm %s", configDir, filePath);
return 0 == ShellUtils.runProcess(isVerbose, command, null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,19 @@ public class HdfsUploader implements IUploader {
// get the directory containing the file
private String destTopologyDirectoryURI;
private Config config;
private String hadoopConfdir;
private String topologyPackageLocation;
private URI packageURI;

// The controller on hdfs
private HdfsController controller;

@Override
public void initialize(Config ipconfig) {
this.config = ipconfig;

this.hadoopConfdir = HdfsContext.hadoopConfigDirectory(config);
// Instantiate the hdfs controller
this.controller = getHdfsController();

this.destTopologyDirectoryURI = HdfsContext.hdfsTopologiesDirectoryURI(config);
// get the original topology package location
this.topologyPackageLocation = Context.topologyPackageFile(config);
Expand All @@ -49,36 +53,43 @@ public void initialize(Config ipconfig) {
packageURI = TypeUtils.getURI(String.format("%s/%s", destTopologyDirectoryURI, fileName));
}

// Utils method
protected HdfsController getHdfsController() {
return new HdfsController(
HdfsContext.hadoopConfigDirectory(config), Context.verbose(config));
}

// Utils method
protected boolean isLocalFileExists(String file) {
return new File(file).isFile();
}

@Override
public URI uploadPackage() {
// first, check if the topology package exists

boolean fileExists = new File(topologyPackageLocation).isFile();
if (!fileExists) {
if (!isLocalFileExists(topologyPackageLocation)) {
LOG.info("Topology file " + topologyPackageLocation + " does not exist.");
return null;
}

// if the dest directory does not exist, create it.
if (!HdfsUtils.isFileExists(hadoopConfdir, destTopologyDirectoryURI, true)) {
if (!controller.exists(destTopologyDirectoryURI)) {
LOG.info("The destination directory does not exist; creating it.");
if (!HdfsUtils.createDir(hadoopConfdir, destTopologyDirectoryURI, true)) {
if (!controller.mkdirs(destTopologyDirectoryURI)) {
LOG.severe("Failed to create directory: " + destTopologyDirectoryURI);
return null;
}
}

// if the destination file exists, write a log message
if (HdfsUtils.isFileExists(hadoopConfdir, packageURI.toString(), true)) {
} else {
// if the destination file exists, write a log message
LOG.info("Target topology file " + packageURI.toString() + " exists, overwriting...");

}

// copy the topology package to target working directory
LOG.info("Uploading topology " + topologyPackageLocation
+ " package to target hdfs " + packageURI.toString());

if (!HdfsUtils.copyFromLocal(
hadoopConfdir, topologyPackageLocation, packageURI.toString(), true)) {
if (!controller.copyFromLocalFile(topologyPackageLocation, packageURI.toString())) {
LOG.severe("Failed to upload the package to:" + packageURI.toString());
return null;
}
Expand All @@ -88,7 +99,7 @@ public URI uploadPackage() {

@Override
public boolean undo() {
return HdfsUtils.remove(hadoopConfdir, packageURI.toString(), true);
return controller.delete(packageURI.toString());
}

@Override
Expand Down
13 changes: 13 additions & 0 deletions heron/uploaders/tests/java/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ localfs_deps_files = \
"//heron/uploaders/src/java:localfs-uploader-java",
]

hdfs_deps_files = \
common_deps_files + \
spi_deps_files + [
"//heron/uploaders/src/java:hdfs-uploader-java",
]

java_library(
name = "local-filesystem-constants-java",
srcs = glob(["**/localfs/LocalFileSystemConstantsTest.java"]),
Expand Down Expand Up @@ -49,3 +55,10 @@ java_test(
"//3rdparty/java:aws-java-sdk"
],
)

java_test(
name = "hadoop-uploader_unittest",
srcs = glob(["**/hdfs/HdfsUploaderTest.java"]),
deps = hdfs_deps_files,
size = "small",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2016 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.twitter.heron.uploader.hdfs;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import com.twitter.heron.spi.common.Config;

public class HdfsUploaderTest {
private HdfsUploader uploader;
private HdfsController controller;

@Before
public void setUp() throws Exception {
Config config = Mockito.mock(Config.class);

// Insert mock HdfsController
uploader = Mockito.spy(new HdfsUploader());
controller = Mockito.mock(HdfsController.class);
Mockito.doReturn(controller).when(uploader).getHdfsController();

uploader.initialize(config);
}

@After
public void after() throws Exception {
}

@Test
public void testUploadPackage() throws Exception {
// Local file not exist
Mockito.doReturn(false).when(uploader).isLocalFileExists(Mockito.anyString());
Assert.assertNull(uploader.uploadPackage());
Mockito.verify(controller, Mockito.never()).copyFromLocalFile(
Mockito.anyString(), Mockito.anyString());

// Failed to create folder on hdfs
Mockito.doReturn(true).when(uploader).isLocalFileExists(Mockito.anyString());
Mockito.doReturn(false).when(controller).exists(Mockito.anyString());
Mockito.doReturn(false).when(controller).mkdirs(Mockito.anyString());
Assert.assertNull(uploader.uploadPackage());
Mockito.verify(controller, Mockito.never()).copyFromLocalFile(
Mockito.anyString(), Mockito.anyString());

// Failed to copy file from local to hdfs
Mockito.doReturn(true).when(controller).mkdirs(Mockito.anyString());
Mockito.doReturn(false).when(controller).copyFromLocalFile(
Mockito.anyString(), Mockito.anyString());
Assert.assertNull(uploader.uploadPackage());
Mockito.verify(controller).copyFromLocalFile(Mockito.anyString(), Mockito.anyString());

// Happy path
Mockito.doReturn(true).when(controller).copyFromLocalFile(
Mockito.anyString(), Mockito.anyString());
Assert.assertNotNull(uploader.uploadPackage());
Mockito.verify(controller, Mockito.atLeastOnce()).copyFromLocalFile(
Mockito.anyString(), Mockito.anyString());
}

@Test
public void testUndo() throws Exception {
Mockito.doReturn(false).when(controller).delete(Mockito.anyString());
Assert.assertFalse(uploader.undo());
Mockito.verify(controller).delete(Mockito.anyString());

Mockito.doReturn(true).when(controller).delete(Mockito.anyString());
Assert.assertTrue(uploader.undo());
}
}

0 comments on commit ba97081

Please sign in to comment.