diff --git a/src/main/java/org/embulk/input/CommandFileInputPlugin.java b/src/main/java/org/embulk/input/CommandFileInputPlugin.java index 4022d4d..c1b6964 100644 --- a/src/main/java/org/embulk/input/CommandFileInputPlugin.java +++ b/src/main/java/org/embulk/input/CommandFileInputPlugin.java @@ -1,36 +1,34 @@ package org.embulk.input; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.ArrayList; -import java.io.InputStream; -import java.io.IOException; -import java.io.FilterInputStream; -import org.slf4j.Logger; +import org.embulk.config.ConfigDiff; +import org.embulk.config.ConfigException; +import org.embulk.config.ConfigSource; import org.embulk.config.TaskReport; +import org.embulk.config.TaskSource; +import org.embulk.spi.Exec; +import org.embulk.spi.FileInputPlugin; +import org.embulk.spi.TransactionalFileInput; import org.embulk.util.config.Config; import org.embulk.util.config.ConfigDefault; import org.embulk.util.config.ConfigMapper; import org.embulk.util.config.ConfigMapperFactory; import org.embulk.util.config.Task; import org.embulk.util.config.TaskMapper; -import org.embulk.config.ConfigDiff; -import org.embulk.config.ConfigSource; -import org.embulk.config.ConfigException; -import org.embulk.config.TaskSource; -import org.embulk.spi.Exec; -import org.embulk.spi.FileInputPlugin; -import org.embulk.spi.TransactionalFileInput; import org.embulk.util.file.InputStreamFileInput; +import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class CommandFileInputPlugin - implements FileInputPlugin -{ + implements FileInputPlugin { public interface PluginTask - extends Task - { + extends Task { @Config("command") public String getCommand(); @@ -41,18 +39,17 @@ public interface PluginTask } @Override - public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control) - { + public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control) { final ConfigMapper configMapper = CONFIG_MAPPER_FACTORY.createConfigMapper(); final PluginTask task = configMapper.map(config, PluginTask.class); switch (task.getPipe()) { - case "stdout": - break; - case "stderr": - break; - default: - throw new ConfigException(String.format( + case "stdout": + break; + case "stderr": + break; + default: + throw new ConfigException(String.format( "Unknown 'pipe' option '%s'. It must be either 'stdout' or 'stderr'", task.getPipe())); } @@ -61,9 +58,8 @@ public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control contr @Override public ConfigDiff resume(TaskSource taskSource, - int taskCount, - FileInputPlugin.Control control) - { + int taskCount, + FileInputPlugin.Control control) { control.run(taskSource, taskCount); return CONFIG_MAPPER_FACTORY.newConfigDiff(); @@ -71,14 +67,13 @@ public ConfigDiff resume(TaskSource taskSource, @Override public void cleanup(TaskSource taskSource, - int taskCount, - List successTaskReports) - { + int taskCount, + List successTaskReports) { } + @SuppressWarnings("MissingSwitchDefault") @Override - public TransactionalFileInput open(TaskSource taskSource, int taskIndex) - { + public TransactionalFileInput open(TaskSource taskSource, int taskIndex) { final TaskMapper taskMapper = CONFIG_MAPPER_FACTORY.createTaskMapper(); final PluginTask task = taskMapper.map(taskSource, PluginTask.class); @@ -90,12 +85,12 @@ public TransactionalFileInput open(TaskSource taskSource, int taskIndex) ProcessBuilder builder = new ProcessBuilder(cmdline.toArray(new String[cmdline.size()])); switch (task.getPipe()) { - case "stdout": - builder.redirectError(ProcessBuilder.Redirect.INHERIT); - break; - case "stderr": - builder.redirectOutput(ProcessBuilder.Redirect.INHERIT); - break; + case "stdout": + builder.redirectError(ProcessBuilder.Redirect.INHERIT); + break; + case "stderr": + builder.redirectOutput(ProcessBuilder.Redirect.INHERIT); + break; } try { @@ -104,12 +99,12 @@ public TransactionalFileInput open(TaskSource taskSource, int taskIndex) InputStream stream = null; try { switch (task.getPipe()) { - case "stdout": - stream = process.getInputStream(); - break; - case "stderr": - stream = process.getErrorStream(); - break; + case "stdout": + stream = process.getInputStream(); + break; + case "stderr": + stream = process.getErrorStream(); + break; } PluginFileInput input = new PluginFileInput(task, new ProcessWaitInputStream(stream, process)); @@ -126,10 +121,9 @@ public TransactionalFileInput open(TaskSource taskSource, int taskIndex) } } - protected static List buildShell() - { + protected static List buildShell() { String osName = System.getProperty("os.name"); - if(osName.indexOf("Windows") >= 0) { + if (osName.indexOf("Windows") >= 0) { return Collections.unmodifiableList(Arrays.asList("PowerShell.exe", "-Command")); } else { return Collections.unmodifiableList(Arrays.asList("sh", "-c")); @@ -137,19 +131,16 @@ protected static List buildShell() } private static class ProcessWaitInputStream - extends FilterInputStream - { + extends FilterInputStream { private Process process; - public ProcessWaitInputStream(InputStream in, Process process) - { + public ProcessWaitInputStream(InputStream in, Process process) { super(in); this.process = process; } @Override - public int read() throws IOException - { + public int read() throws IOException { int c = super.read(); if (c < 0) { waitFor(); @@ -158,8 +149,7 @@ public int read() throws IOException } @Override - public int read(byte[] b) throws IOException - { + public int read(byte[] b) throws IOException { int c = super.read(b); if (c < 0) { waitFor(); @@ -168,8 +158,7 @@ public int read(byte[] b) throws IOException } @Override - public int read(byte[] b, int off, int len) throws IOException - { + public int read(byte[] b, int off, int len) throws IOException { int c = super.read(b, off, len); if (c < 0) { waitFor(); @@ -178,14 +167,12 @@ public int read(byte[] b, int off, int len) throws IOException } @Override - public void close() throws IOException - { + public void close() throws IOException { super.close(); waitFor(); } - private synchronized void waitFor() throws IOException - { + private synchronized void waitFor() throws IOException { if (process != null) { int code; try { @@ -196,7 +183,7 @@ private synchronized void waitFor() throws IOException process = null; if (code != 0) { throw new IOException(String.format( - "Command finished with non-zero exit code. Exit code is %d.", code)); + "Command finished with non-zero exit code. Exit code is %d.", code)); } } } @@ -205,22 +192,18 @@ private synchronized void waitFor() throws IOException // TODO almost copied from S3FileInputPlugin. include an InputStreamFileInput utility to embulk-core. public static class PluginFileInput extends InputStreamFileInput - implements TransactionalFileInput - { + implements TransactionalFileInput { private static class SingleFileProvider - implements InputStreamFileInput.Provider - { + implements InputStreamFileInput.Provider { private final InputStream stream; private boolean opened = false; - public SingleFileProvider(InputStream stream) - { + public SingleFileProvider(InputStream stream) { this.stream = stream; } @Override - public InputStream openNext() throws IOException - { + public InputStream openNext() throws IOException { if (opened) { return null; } @@ -229,29 +212,29 @@ public InputStream openNext() throws IOException } @Override - public void close() throws IOException - { + public void close() throws IOException { if (!opened) { stream.close(); } } } - public PluginFileInput(PluginTask task, InputStream stream) - { + public PluginFileInput(PluginTask task, InputStream stream) { super(Exec.getBufferAllocator(), new SingleFileProvider(stream)); } - public void abort() { } + public void abort() { + } - public TaskReport commit() - { + public TaskReport commit() { return CONFIG_MAPPER_FACTORY.newTaskReport(); } @Override - public void close() { } + public void close() { + } } + private static final Logger logger = LoggerFactory.getLogger(CommandFileInputPlugin.class); private static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = ConfigMapperFactory.builder().addDefaultModules().build(); diff --git a/src/test/java/org/embulk/input/TestCommandFileInputPlugin.java b/src/test/java/org/embulk/input/TestCommandFileInputPlugin.java index 9795bd3..bc9ef33 100644 --- a/src/test/java/org/embulk/input/TestCommandFileInputPlugin.java +++ b/src/test/java/org/embulk/input/TestCommandFileInputPlugin.java @@ -1,18 +1,15 @@ package org.embulk.input; +import static org.embulk.input.CommandFileInputPlugin.buildShell; +import static org.junit.Assert.assertEquals; + import java.util.Arrays; import java.util.Collections; +import org.embulk.test.EmbulkTestRuntime; import org.junit.Rule; import org.junit.Test; -import org.embulk.test.EmbulkTestRuntime; -import static org.embulk.input.CommandFileInputPlugin.buildShell; - -import java.util.List; -import static org.junit.Assert.assertEquals; - -public class TestCommandFileInputPlugin -{ +public class TestCommandFileInputPlugin { @Rule public EmbulkTestRuntime runtime = new EmbulkTestRuntime(); @@ -20,8 +17,7 @@ public class TestCommandFileInputPlugin public void testShell() { if (System.getProperty("os.name").indexOf("Windows") >= 0) { assertEquals(Collections.unmodifiableList(Arrays.asList("PowerShell.exe", "-Command")), buildShell()); - } - else { + } else { assertEquals(Collections.unmodifiableList(Arrays.asList("sh", "-c")), buildShell()); } }