Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix checkstyle error. Reformat codes and Optimize imports #16

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 61 additions & 78 deletions src/main/java/org/embulk/input/CommandFileInputPlugin.java
Original file line number Diff line number Diff line change
@@ -1,36 +1,34 @@
package org.embulk.input;

import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.ArrayList;
import java.io.InputStream;
import java.io.IOException;
import java.io.FilterInputStream;
import org.slf4j.Logger;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigException;
import org.embulk.config.ConfigSource;
import org.embulk.config.TaskReport;
import org.embulk.config.TaskSource;
import org.embulk.spi.Exec;
import org.embulk.spi.FileInputPlugin;
import org.embulk.spi.TransactionalFileInput;
import org.embulk.util.config.Config;
import org.embulk.util.config.ConfigDefault;
import org.embulk.util.config.ConfigMapper;
import org.embulk.util.config.ConfigMapperFactory;
import org.embulk.util.config.Task;
import org.embulk.util.config.TaskMapper;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigSource;
import org.embulk.config.ConfigException;
import org.embulk.config.TaskSource;
import org.embulk.spi.Exec;
import org.embulk.spi.FileInputPlugin;
import org.embulk.spi.TransactionalFileInput;
import org.embulk.util.file.InputStreamFileInput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommandFileInputPlugin
implements FileInputPlugin
{
implements FileInputPlugin {
public interface PluginTask
extends Task
{
extends Task {
@Config("command")
public String getCommand();

Expand All @@ -41,18 +39,17 @@ public interface PluginTask
}

@Override
public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control)
{
public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control) {
final ConfigMapper configMapper = CONFIG_MAPPER_FACTORY.createConfigMapper();
final PluginTask task = configMapper.map(config, PluginTask.class);

switch (task.getPipe()) {
case "stdout":
break;
case "stderr":
break;
default:
throw new ConfigException(String.format(
case "stdout":
break;
case "stderr":
break;
default:
throw new ConfigException(String.format(
"Unknown 'pipe' option '%s'. It must be either 'stdout' or 'stderr'", task.getPipe()));
}

Expand All @@ -61,24 +58,22 @@ public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control contr

@Override
public ConfigDiff resume(TaskSource taskSource,
int taskCount,
FileInputPlugin.Control control)
{
int taskCount,
FileInputPlugin.Control control) {
control.run(taskSource, taskCount);

return CONFIG_MAPPER_FACTORY.newConfigDiff();
}

@Override
public void cleanup(TaskSource taskSource,
int taskCount,
List<TaskReport> successTaskReports)
{
int taskCount,
List<TaskReport> successTaskReports) {
}

@SuppressWarnings("MissingSwitchDefault")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of this annotation?

@Override
public TransactionalFileInput open(TaskSource taskSource, int taskIndex)
{
public TransactionalFileInput open(TaskSource taskSource, int taskIndex) {
final TaskMapper taskMapper = CONFIG_MAPPER_FACTORY.createTaskMapper();
final PluginTask task = taskMapper.map(taskSource, PluginTask.class);

Expand All @@ -90,12 +85,12 @@ public TransactionalFileInput open(TaskSource taskSource, int taskIndex)

ProcessBuilder builder = new ProcessBuilder(cmdline.toArray(new String[cmdline.size()]));
switch (task.getPipe()) {
case "stdout":
builder.redirectError(ProcessBuilder.Redirect.INHERIT);
break;
case "stderr":
builder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
break;
case "stdout":
builder.redirectError(ProcessBuilder.Redirect.INHERIT);
break;
case "stderr":
builder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
break;
}

try {
Expand All @@ -104,12 +99,12 @@ public TransactionalFileInput open(TaskSource taskSource, int taskIndex)
InputStream stream = null;
try {
switch (task.getPipe()) {
case "stdout":
stream = process.getInputStream();
break;
case "stderr":
stream = process.getErrorStream();
break;
case "stdout":
stream = process.getInputStream();
break;
case "stderr":
stream = process.getErrorStream();
break;
}

PluginFileInput input = new PluginFileInput(task, new ProcessWaitInputStream(stream, process));
Expand All @@ -126,30 +121,26 @@ public TransactionalFileInput open(TaskSource taskSource, int taskIndex)
}
}

protected static List<String> buildShell()
{
protected static List<String> buildShell() {
String osName = System.getProperty("os.name");
if(osName.indexOf("Windows") >= 0) {
if (osName.indexOf("Windows") >= 0) {
return Collections.unmodifiableList(Arrays.asList("PowerShell.exe", "-Command"));
} else {
return Collections.unmodifiableList(Arrays.asList("sh", "-c"));
}
}

private static class ProcessWaitInputStream
extends FilterInputStream
{
extends FilterInputStream {
private Process process;

public ProcessWaitInputStream(InputStream in, Process process)
{
public ProcessWaitInputStream(InputStream in, Process process) {
super(in);
this.process = process;
}

@Override
public int read() throws IOException
{
public int read() throws IOException {
int c = super.read();
if (c < 0) {
waitFor();
Expand All @@ -158,8 +149,7 @@ public int read() throws IOException
}

@Override
public int read(byte[] b) throws IOException
{
public int read(byte[] b) throws IOException {
int c = super.read(b);
if (c < 0) {
waitFor();
Expand All @@ -168,8 +158,7 @@ public int read(byte[] b) throws IOException
}

@Override
public int read(byte[] b, int off, int len) throws IOException
{
public int read(byte[] b, int off, int len) throws IOException {
int c = super.read(b, off, len);
if (c < 0) {
waitFor();
Expand All @@ -178,14 +167,12 @@ public int read(byte[] b, int off, int len) throws IOException
}

@Override
public void close() throws IOException
{
public void close() throws IOException {
super.close();
waitFor();
}

private synchronized void waitFor() throws IOException
{
private synchronized void waitFor() throws IOException {
if (process != null) {
int code;
try {
Expand All @@ -196,7 +183,7 @@ private synchronized void waitFor() throws IOException
process = null;
if (code != 0) {
throw new IOException(String.format(
"Command finished with non-zero exit code. Exit code is %d.", code));
"Command finished with non-zero exit code. Exit code is %d.", code));
}
}
}
Expand All @@ -205,22 +192,18 @@ private synchronized void waitFor() throws IOException
// TODO almost copied from S3FileInputPlugin. include an InputStreamFileInput utility to embulk-core.
public static class PluginFileInput
extends InputStreamFileInput
implements TransactionalFileInput
{
implements TransactionalFileInput {
private static class SingleFileProvider
implements InputStreamFileInput.Provider
{
implements InputStreamFileInput.Provider {
private final InputStream stream;
private boolean opened = false;

public SingleFileProvider(InputStream stream)
{
public SingleFileProvider(InputStream stream) {
this.stream = stream;
}

@Override
public InputStream openNext() throws IOException
{
public InputStream openNext() throws IOException {
if (opened) {
return null;
}
Expand All @@ -229,29 +212,29 @@ public InputStream openNext() throws IOException
}

@Override
public void close() throws IOException
{
public void close() throws IOException {
if (!opened) {
stream.close();
}
}
}

public PluginFileInput(PluginTask task, InputStream stream)
{
public PluginFileInput(PluginTask task, InputStream stream) {
super(Exec.getBufferAllocator(), new SingleFileProvider(stream));
}

public void abort() { }
public void abort() {
}

public TaskReport commit()
{
public TaskReport commit() {
return CONFIG_MAPPER_FACTORY.newTaskReport();
}

@Override
public void close() { }
public void close() {
}
}

private static final Logger logger = LoggerFactory.getLogger(CommandFileInputPlugin.class);

private static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = ConfigMapperFactory.builder().addDefaultModules().build();
Expand Down
16 changes: 6 additions & 10 deletions src/test/java/org/embulk/input/TestCommandFileInputPlugin.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,23 @@
package org.embulk.input;

import static org.embulk.input.CommandFileInputPlugin.buildShell;
import static org.junit.Assert.assertEquals;

import java.util.Arrays;
import java.util.Collections;
import org.embulk.test.EmbulkTestRuntime;
import org.junit.Rule;
import org.junit.Test;
import org.embulk.test.EmbulkTestRuntime;
import static org.embulk.input.CommandFileInputPlugin.buildShell;

import java.util.List;

import static org.junit.Assert.assertEquals;

public class TestCommandFileInputPlugin
{
public class TestCommandFileInputPlugin {
@Rule
public EmbulkTestRuntime runtime = new EmbulkTestRuntime();

@Test
public void testShell() {
if (System.getProperty("os.name").indexOf("Windows") >= 0) {
assertEquals(Collections.unmodifiableList(Arrays.asList("PowerShell.exe", "-Command")), buildShell());
}
else {
} else {
assertEquals(Collections.unmodifiableList(Arrays.asList("sh", "-c")), buildShell());
}
}
Expand Down