diff --git a/hadoop-pcap-lib/pom.xml b/hadoop-pcap-lib/pom.xml index 551acf5..5765fb2 100644 --- a/hadoop-pcap-lib/pom.xml +++ b/hadoop-pcap-lib/pom.xml @@ -35,13 +35,13 @@ <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> - <version>2.3.0-mr1-cdh5.1.3</version> + <version>${hadoopVersion}-mr1-cdh${cdhVersion}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> - <version>2.3.0-cdh5.1.3</version> + <version>${hadoopVersion}-cdh${cdhVersion}</version> <scope>provided</scope> </dependency> diff --git a/hadoop-pcap-lib/src/main/java/net/ripe/hadoop/pcap/mr1/io/CombinePcapInputFormat.java b/hadoop-pcap-lib/src/main/java/net/ripe/hadoop/pcap/mr1/io/CombinePcapInputFormat.java new file mode 100644 index 0000000..203a8bd --- /dev/null +++ b/hadoop-pcap-lib/src/main/java/net/ripe/hadoop/pcap/mr1/io/CombinePcapInputFormat.java @@ -0,0 +1,36 @@ +package net.ripe.hadoop.pcap.mr1.io; + +import java.io.IOException; + +import net.ripe.hadoop.pcap.io.reader.CombinePcapRecordReader; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.lib.CombineFileInputFormat; +import org.apache.hadoop.mapred.lib.CombineFileRecordReader; +import org.apache.hadoop.mapred.lib.CombineFileSplit; + +public class CombinePcapInputFormat extends CombineFileInputFormat<LongWritable, ObjectWritable> { + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public RecordReader<LongWritable, ObjectWritable> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { + return new CombineFileRecordReader(job, (CombineFileSplit)split, reporter, CombinePcapRecordReader.class); + } + + /** + * A PCAP can only be read as a whole. There is no way to know where to + * start reading in the middle of the file. It needs to be read from the + * beginning to the end. + * @see http://wiki.wireshark.org/Development/LibpcapFileFormat + */ + @Override + protected boolean isSplitable(FileSystem fs, Path filename) { + return false; + } +} diff --git a/hadoop-pcap-lib/src/main/java/net/ripe/hadoop/pcap/mr1/io/PcapInputFormat.java b/hadoop-pcap-lib/src/main/java/net/ripe/hadoop/pcap/mr1/io/PcapInputFormat.java new file mode 100644 index 0000000..680d935 --- /dev/null +++ b/hadoop-pcap-lib/src/main/java/net/ripe/hadoop/pcap/mr1/io/PcapInputFormat.java @@ -0,0 +1,76 @@ +package net.ripe.hadoop.pcap.mr1.io; + +import java.io.DataInputStream; +import java.io.IOException; +import java.lang.reflect.Constructor; + +import net.ripe.hadoop.pcap.mr1.io.reader.PcapRecordReader; +import net.ripe.hadoop.pcap.PcapReader; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +public class PcapInputFormat extends FileInputFormat<LongWritable, ObjectWritable> { + static final String READER_CLASS_PROPERTY = "net.ripe.hadoop.pcap.io.reader.class"; + + public static final Log LOG = LogFactory.getLog(PcapInputFormat.class); + + @Override + public RecordReader<LongWritable, ObjectWritable> getRecordReader(InputSplit split, JobConf config, Reporter reporter) throws IOException { + FileSplit fileSplit = (FileSplit)split; + Path path = fileSplit.getPath(); + LOG.info("Reading PCAP: " + path.toString()); + long start = 0L; + long length = fileSplit.getLength(); + return initPcapRecordReader(path, start, length, reporter, config); + } + + public static PcapRecordReader initPcapRecordReader(Path path, long start, long length, Reporter reporter, Configuration conf) throws IOException { + FileSystem fs = path.getFileSystem(conf); + FSDataInputStream baseStream = fs.open(path); + DataInputStream stream = baseStream; + CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf); + final CompressionCodec codec = compressionCodecs.getCodec(path); + if (codec != null) + stream = new DataInputStream(codec.createInputStream(stream)); + + PcapReader reader = initPcapReader(stream, conf); + return new PcapRecordReader(reader, start, length, baseStream, stream, reporter); + } + + public static PcapReader initPcapReader(DataInputStream stream, Configuration conf) { + try { + Class<? extends PcapReader> pcapReaderClass = conf.getClass(READER_CLASS_PROPERTY, PcapReader.class, PcapReader.class); + Constructor<? extends PcapReader> pcapReaderConstructor = pcapReaderClass.getConstructor(DataInputStream.class); + return pcapReaderConstructor.newInstance(stream); + } catch (Exception e) { + e.printStackTrace(); + return null; + } + } + + /** + * A PCAP can only be read as a whole. There is no way to know where to + * start reading in the middle of the file. It needs to be read from the + * beginning to the end. + * @see http://wiki.wireshark.org/Development/LibpcapFileFormat + */ + @Override + protected boolean isSplitable(FileSystem fs, Path filename) { + return false; + } +} diff --git a/hadoop-pcap-lib/src/main/java/net/ripe/hadoop/pcap/mr1/io/reader/CombinePcapRecordReader.java b/hadoop-pcap-lib/src/main/java/net/ripe/hadoop/pcap/mr1/io/reader/CombinePcapRecordReader.java new file mode 100644 index 0000000..91cef63 --- /dev/null +++ b/hadoop-pcap-lib/src/main/java/net/ripe/hadoop/pcap/mr1/io/reader/CombinePcapRecordReader.java @@ -0,0 +1,57 @@ +package net.ripe.hadoop.pcap.mr1.io.reader; + +import net.ripe.hadoop.pcap.mr1.io.PcapInputFormat; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.lib.CombineFileSplit; + +import java.io.IOException; + +/** + * Wrapper for CombineFileSplit to RecordReader + * @author wnagele + */ +public class CombinePcapRecordReader implements RecordReader<LongWritable, ObjectWritable> { + private PcapRecordReader recordReader; + + public CombinePcapRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer index) throws IOException { + Path path = split.getPath(index); + long start = 0L; + long length = split.getLength(index); + recordReader = PcapInputFormat.initPcapRecordReader(path, start, length, reporter, conf); + } + + @Override + public boolean next(LongWritable key, ObjectWritable value) throws IOException { + return recordReader.next(key, value); + } + + @Override + public LongWritable createKey() { + return recordReader.createKey(); + } + + @Override + public ObjectWritable createValue() { + return recordReader.createValue(); + } + + @Override + public long getPos() throws IOException { + return recordReader.getPos(); + } + + @Override + public void close() throws IOException { + recordReader.close(); + } + + @Override + public float getProgress() throws IOException { + return recordReader.getProgress(); + } +} \ No newline at end of file diff --git a/hadoop-pcap-lib/src/main/java/net/ripe/hadoop/pcap/mr1/io/reader/PcapRecordReader.java b/hadoop-pcap-lib/src/main/java/net/ripe/hadoop/pcap/mr1/io/reader/PcapRecordReader.java new file mode 100644 index 0000000..d2804f7 --- /dev/null +++ b/hadoop-pcap-lib/src/main/java/net/ripe/hadoop/pcap/mr1/io/reader/PcapRecordReader.java @@ -0,0 +1,76 @@ +package net.ripe.hadoop.pcap.mr1.io.reader; + +import net.ripe.hadoop.pcap.PcapReader; +import net.ripe.hadoop.pcap.packet.Packet; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +import java.io.DataInputStream; +import java.io.IOException; +import java.util.Iterator; + +public class PcapRecordReader implements RecordReader<LongWritable, ObjectWritable> { + PcapReader pcapReader; + Iterator<Packet> pcapReaderIterator; + Seekable baseStream; + DataInputStream stream; + Reporter reporter; + + long packetCount = 0; + long start, end; + + public PcapRecordReader(PcapReader pcapReader, long start, long end, Seekable baseStream, DataInputStream stream, Reporter reporter) throws IOException { + this.pcapReader = pcapReader; + this.baseStream = baseStream; + this.stream = stream; + this.start = start; + this.end = end; + this.reporter = reporter; + + pcapReaderIterator = pcapReader.iterator(); + } + + @Override + public void close() throws IOException { + stream.close(); + } + + @Override + public boolean next(LongWritable key, ObjectWritable value) throws IOException { + if (!pcapReaderIterator.hasNext()) + return false; + + key.set(++packetCount); + value.set(pcapReaderIterator.next()); + + reporter.setStatus("Read " + getPos() + " of " + end + " bytes"); + reporter.progress(); + + return true; + } + + @Override + public LongWritable createKey() { + return new LongWritable(); + } + + @Override + public ObjectWritable createValue() { + return new ObjectWritable(); + } + + @Override + public long getPos() throws IOException { + return baseStream.getPos(); + } + + @Override + public float getProgress() throws IOException { + if (start == end) + return 0; + return Math.min(1.0f, (getPos() - start) / (float)(end - start)); + } +} \ No newline at end of file diff --git a/hadoop-pcap-serde/README.md b/hadoop-pcap-serde/README.md index ba69977..a3ff365 100644 --- a/hadoop-pcap-serde/README.md +++ b/hadoop-pcap-serde/README.md @@ -40,7 +40,7 @@ You can use the following parameters to combine multiple input files into splits dns_authority array<string>, dns_additional array<string>) ROW FORMAT SERDE 'net.ripe.hadoop.pcap.serde.PcapDeserializer' - STORED AS INPUTFORMAT 'net.ripe.hadoop.pcap.io.PcapInputFormat' + STORED AS INPUTFORMAT 'net.ripe.hadoop.pcap.mr1.io.PcapInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 'hdfs:///pcaps/'; @@ -56,6 +56,6 @@ You can use the following parameters to combine multiple input files into splits len int, ttl int) ROW FORMAT SERDE 'net.ripe.hadoop.pcap.serde.PcapDeserializer' - STORED AS INPUTFORMAT 'net.ripe.hadoop.pcap.io.PcapInputFormat' + STORED AS INPUTFORMAT 'net.ripe.hadoop.pcap.mr1.io.PcapInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3n://pcaps/'; diff --git a/hadoop-pcap-serde/pom.xml b/hadoop-pcap-serde/pom.xml index d6fecff..ebe0d30 100644 --- a/hadoop-pcap-serde/pom.xml +++ b/hadoop-pcap-serde/pom.xml @@ -40,27 +40,27 @@ <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> - <version>0.12.0-cdh5.1.3</version> + <version>${hiveVersion}-cdh${cdhVersion}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-serde</artifactId> - <version>0.12.0-cdh5.1.3</version> + <version>${hiveVersion}-cdh${cdhVersion}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> - <version>2.3.0-mr1-cdh5.1.3</version> + <version>${hadoopVersion}-mr1-cdh${cdhVersion}</version> <scope>provided</scope> </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>2.3.0-cdh5.1.3</version> - <scope>provided</scope> - </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoopVersion}-cdh${cdhVersion}</version> + <scope>provided</scope> + </dependency> </dependencies> diff --git a/pom.xml b/pom.xml index 809dd71..040d873 100644 --- a/pom.xml +++ b/pom.xml @@ -4,6 +4,11 @@ <artifactId>hadoop-pcap-root</artifactId> <version>1.2-SNAPSHOT</version> <packaging>pom</packaging> + <properties> + <hadoopVersion>2.6.0</hadoopVersion> + <cdhVersion>5.10.0</cdhVersion> + <hiveVersion>1.1.0</hiveVersion> + </properties> <modules> <module>hadoop-pcap-lib</module>