From c8aba49d7dad432940d352adedfe0afd3563679b Mon Sep 17 00:00:00 2001 From: Hiroyuki Sato Date: Wed, 1 May 2024 13:31:49 +0900 Subject: [PATCH] Use embulk-spi v0.11 --- build.gradle | 3 + .../embulk/input/CommandFileInputPlugin.java | 55 ++++++++++--------- .../input/TestCommandFileInputPlugin.java | 10 ++-- 3 files changed, 36 insertions(+), 32 deletions(-) diff --git a/build.gradle b/build.gradle index 61ba013..05d7b68 100644 --- a/build.gradle +++ b/build.gradle @@ -32,11 +32,14 @@ java { } dependencies { + compileOnly "org.embulk:embulk-spi:0.11" implementation "org.embulk:embulk-util-config:0.5.0" implementation "org.embulk:embulk-util-file:0.2.0" testImplementation "junit:junit:4.13.2" + testImplementation "org.embulk:embulk-deps:0.11.3" + testImplementation "org.embulk:embulk-junit4:0.11.3" } embulkPlugin { diff --git a/src/main/java/org/embulk/input/CommandFileInputPlugin.java b/src/main/java/org/embulk/input/CommandFileInputPlugin.java index 2f74a95..4022d4d 100644 --- a/src/main/java/org/embulk/input/CommandFileInputPlugin.java +++ b/src/main/java/org/embulk/input/CommandFileInputPlugin.java @@ -1,29 +1,29 @@ package org.embulk.input; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.ArrayList; import java.io.InputStream; import java.io.IOException; import java.io.FilterInputStream; -import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableList; -import com.google.common.base.Throwables; import org.embulk.config.TaskReport; -import org.embulk.config.Config; -import org.embulk.config.ConfigDefault; +import org.embulk.util.config.Config; +import org.embulk.util.config.ConfigDefault; +import org.embulk.util.config.ConfigMapper; +import org.embulk.util.config.ConfigMapperFactory; +import org.embulk.util.config.Task; +import org.embulk.util.config.TaskMapper; import org.embulk.config.ConfigDiff; -import org.embulk.config.ConfigInject; import org.embulk.config.ConfigSource; import org.embulk.config.ConfigException; -import org.embulk.config.Task; import org.embulk.config.TaskSource; -import org.embulk.spi.BufferAllocator; import org.embulk.spi.Exec; import org.embulk.spi.FileInputPlugin; import org.embulk.spi.TransactionalFileInput; -import org.embulk.spi.util.InputStreamFileInput; +import org.embulk.util.file.InputStreamFileInput; +import org.slf4j.LoggerFactory; public class CommandFileInputPlugin implements FileInputPlugin @@ -38,16 +38,13 @@ public interface PluginTask @ConfigDefault("\"stdout\"") public String getPipe(); - @ConfigInject - public BufferAllocator getBufferAllocator(); } - private final Logger logger = Exec.getLogger(getClass()); - @Override public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control) { - PluginTask task = config.loadConfig(PluginTask.class); + final ConfigMapper configMapper = CONFIG_MAPPER_FACTORY.createConfigMapper(); + final PluginTask task = configMapper.map(config, PluginTask.class); switch (task.getPipe()) { case "stdout": @@ -59,7 +56,7 @@ public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control contr "Unknown 'pipe' option '%s'. It must be either 'stdout' or 'stderr'", task.getPipe())); } - return resume(task.dump(), 1, control); + return resume(task.toTaskSource(), 1, control); } @Override @@ -68,7 +65,8 @@ public ConfigDiff resume(TaskSource taskSource, FileInputPlugin.Control control) { control.run(taskSource, taskCount); - return Exec.newConfigDiff(); + + return CONFIG_MAPPER_FACTORY.newConfigDiff(); } @Override @@ -81,7 +79,8 @@ public void cleanup(TaskSource taskSource, @Override public TransactionalFileInput open(TaskSource taskSource, int taskIndex) { - PluginTask task = taskSource.loadTask(PluginTask.class); + final TaskMapper taskMapper = CONFIG_MAPPER_FACTORY.createTaskMapper(); + final PluginTask task = taskMapper.map(taskSource, PluginTask.class); List cmdline = new ArrayList(); cmdline.addAll(buildShell()); @@ -123,18 +122,17 @@ public TransactionalFileInput open(TaskSource taskSource, int taskIndex) } } } catch (IOException ex) { - throw Throwables.propagate(ex); + throw new RuntimeException(ex); } } - @VisibleForTesting - static List buildShell() + protected static List buildShell() { String osName = System.getProperty("os.name"); if(osName.indexOf("Windows") >= 0) { - return ImmutableList.of("PowerShell.exe", "-Command"); + return Collections.unmodifiableList(Arrays.asList("PowerShell.exe", "-Command")); } else { - return ImmutableList.of("sh", "-c"); + return Collections.unmodifiableList(Arrays.asList("sh", "-c")); } } @@ -193,7 +191,7 @@ private synchronized void waitFor() throws IOException try { code = process.waitFor(); } catch (InterruptedException ex) { - throw Throwables.propagate(ex); + throw new RuntimeException(ex); } process = null; if (code != 0) { @@ -212,7 +210,7 @@ public static class PluginFileInput private static class SingleFileProvider implements InputStreamFileInput.Provider { - private InputStream stream; + private final InputStream stream; private boolean opened = false; public SingleFileProvider(InputStream stream) @@ -241,17 +239,20 @@ public void close() throws IOException public PluginFileInput(PluginTask task, InputStream stream) { - super(task.getBufferAllocator(), new SingleFileProvider(stream)); + super(Exec.getBufferAllocator(), new SingleFileProvider(stream)); } public void abort() { } public TaskReport commit() { - return Exec.newTaskReport(); + return CONFIG_MAPPER_FACTORY.newTaskReport(); } @Override public void close() { } } + private static final Logger logger = LoggerFactory.getLogger(CommandFileInputPlugin.class); + + private static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = ConfigMapperFactory.builder().addDefaultModules().build(); } diff --git a/src/test/java/org/embulk/input/TestCommandFileInputPlugin.java b/src/test/java/org/embulk/input/TestCommandFileInputPlugin.java index b54af75..9795bd3 100644 --- a/src/test/java/org/embulk/input/TestCommandFileInputPlugin.java +++ b/src/test/java/org/embulk/input/TestCommandFileInputPlugin.java @@ -1,10 +1,10 @@ package org.embulk.input; -import com.google.common.collect.ImmutableList; -import org.junit.Before; +import java.util.Arrays; +import java.util.Collections; import org.junit.Rule; import org.junit.Test; -import org.embulk.EmbulkTestRuntime; +import org.embulk.test.EmbulkTestRuntime; import static org.embulk.input.CommandFileInputPlugin.buildShell; import java.util.List; @@ -19,10 +19,10 @@ public class TestCommandFileInputPlugin @Test public void testShell() { if (System.getProperty("os.name").indexOf("Windows") >= 0) { - assertEquals(ImmutableList.of("PowerShell.exe", "-Command"), buildShell()); + assertEquals(Collections.unmodifiableList(Arrays.asList("PowerShell.exe", "-Command")), buildShell()); } else { - assertEquals(ImmutableList.of("sh", "-c"), buildShell()); + assertEquals(Collections.unmodifiableList(Arrays.asList("sh", "-c")), buildShell()); } } }