Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added CDH dependency versions to pom.xml #41

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions hadoop-pcap-lib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>2.3.0-mr1-cdh5.1.3</version>
<version>${hadoopVersion}-mr1-cdh${cdhVersion}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.3.0-cdh5.1.3</version>
<version>${hadoopVersion}-cdh${cdhVersion}</version>
<scope>provided</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package net.ripe.hadoop.pcap.mr1.io;

import java.io.IOException;

import net.ripe.hadoop.pcap.io.reader.CombinePcapRecordReader;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
import org.apache.hadoop.mapred.lib.CombineFileSplit;

public class CombinePcapInputFormat extends CombineFileInputFormat<LongWritable, ObjectWritable> {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public RecordReader<LongWritable, ObjectWritable> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
return new CombineFileRecordReader(job, (CombineFileSplit)split, reporter, CombinePcapRecordReader.class);
}

/**
* A PCAP can only be read as a whole. There is no way to know where to
* start reading in the middle of the file. It needs to be read from the
* beginning to the end.
* @see http://wiki.wireshark.org/Development/LibpcapFileFormat
*/
@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package net.ripe.hadoop.pcap.mr1.io;

import java.io.DataInputStream;
import java.io.IOException;
import java.lang.reflect.Constructor;

import net.ripe.hadoop.pcap.mr1.io.reader.PcapRecordReader;
import net.ripe.hadoop.pcap.PcapReader;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;

public class PcapInputFormat extends FileInputFormat<LongWritable, ObjectWritable> {
static final String READER_CLASS_PROPERTY = "net.ripe.hadoop.pcap.io.reader.class";

public static final Log LOG = LogFactory.getLog(PcapInputFormat.class);

@Override
public RecordReader<LongWritable, ObjectWritable> getRecordReader(InputSplit split, JobConf config, Reporter reporter) throws IOException {
FileSplit fileSplit = (FileSplit)split;
Path path = fileSplit.getPath();
LOG.info("Reading PCAP: " + path.toString());
long start = 0L;
long length = fileSplit.getLength();
return initPcapRecordReader(path, start, length, reporter, config);
}

public static PcapRecordReader initPcapRecordReader(Path path, long start, long length, Reporter reporter, Configuration conf) throws IOException {
FileSystem fs = path.getFileSystem(conf);
FSDataInputStream baseStream = fs.open(path);
DataInputStream stream = baseStream;
CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf);
final CompressionCodec codec = compressionCodecs.getCodec(path);
if (codec != null)
stream = new DataInputStream(codec.createInputStream(stream));

PcapReader reader = initPcapReader(stream, conf);
return new PcapRecordReader(reader, start, length, baseStream, stream, reporter);
}

public static PcapReader initPcapReader(DataInputStream stream, Configuration conf) {
try {
Class<? extends PcapReader> pcapReaderClass = conf.getClass(READER_CLASS_PROPERTY, PcapReader.class, PcapReader.class);
Constructor<? extends PcapReader> pcapReaderConstructor = pcapReaderClass.getConstructor(DataInputStream.class);
return pcapReaderConstructor.newInstance(stream);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}

/**
* A PCAP can only be read as a whole. There is no way to know where to
* start reading in the middle of the file. It needs to be read from the
* beginning to the end.
* @see http://wiki.wireshark.org/Development/LibpcapFileFormat
*/
@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package net.ripe.hadoop.pcap.mr1.io.reader;

import net.ripe.hadoop.pcap.mr1.io.PcapInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileSplit;

import java.io.IOException;

/**
* Wrapper for CombineFileSplit to RecordReader
* @author wnagele
*/
public class CombinePcapRecordReader implements RecordReader<LongWritable, ObjectWritable> {
private PcapRecordReader recordReader;

public CombinePcapRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer index) throws IOException {
Path path = split.getPath(index);
long start = 0L;
long length = split.getLength(index);
recordReader = PcapInputFormat.initPcapRecordReader(path, start, length, reporter, conf);
}

@Override
public boolean next(LongWritable key, ObjectWritable value) throws IOException {
return recordReader.next(key, value);
}

@Override
public LongWritable createKey() {
return recordReader.createKey();
}

@Override
public ObjectWritable createValue() {
return recordReader.createValue();
}

@Override
public long getPos() throws IOException {
return recordReader.getPos();
}

@Override
public void close() throws IOException {
recordReader.close();
}

@Override
public float getProgress() throws IOException {
return recordReader.getProgress();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package net.ripe.hadoop.pcap.mr1.io.reader;

import net.ripe.hadoop.pcap.PcapReader;
import net.ripe.hadoop.pcap.packet.Packet;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;

import java.io.DataInputStream;
import java.io.IOException;
import java.util.Iterator;

public class PcapRecordReader implements RecordReader<LongWritable, ObjectWritable> {
PcapReader pcapReader;
Iterator<Packet> pcapReaderIterator;
Seekable baseStream;
DataInputStream stream;
Reporter reporter;

long packetCount = 0;
long start, end;

public PcapRecordReader(PcapReader pcapReader, long start, long end, Seekable baseStream, DataInputStream stream, Reporter reporter) throws IOException {
this.pcapReader = pcapReader;
this.baseStream = baseStream;
this.stream = stream;
this.start = start;
this.end = end;
this.reporter = reporter;

pcapReaderIterator = pcapReader.iterator();
}

@Override
public void close() throws IOException {
stream.close();
}

@Override
public boolean next(LongWritable key, ObjectWritable value) throws IOException {
if (!pcapReaderIterator.hasNext())
return false;

key.set(++packetCount);
value.set(pcapReaderIterator.next());

reporter.setStatus("Read " + getPos() + " of " + end + " bytes");
reporter.progress();

return true;
}

@Override
public LongWritable createKey() {
return new LongWritable();
}

@Override
public ObjectWritable createValue() {
return new ObjectWritable();
}

@Override
public long getPos() throws IOException {
return baseStream.getPos();
}

@Override
public float getProgress() throws IOException {
if (start == end)
return 0;
return Math.min(1.0f, (getPos() - start) / (float)(end - start));
}
}
4 changes: 2 additions & 2 deletions hadoop-pcap-serde/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ You can use the following parameters to combine multiple input files into splits
dns_authority array<string>,
dns_additional array<string>)
ROW FORMAT SERDE 'net.ripe.hadoop.pcap.serde.PcapDeserializer'
STORED AS INPUTFORMAT 'net.ripe.hadoop.pcap.io.PcapInputFormat'
STORED AS INPUTFORMAT 'net.ripe.hadoop.pcap.mr1.io.PcapInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 'hdfs:///pcaps/';

Expand All @@ -56,6 +56,6 @@ You can use the following parameters to combine multiple input files into splits
len int,
ttl int)
ROW FORMAT SERDE 'net.ripe.hadoop.pcap.serde.PcapDeserializer'
STORED AS INPUTFORMAT 'net.ripe.hadoop.pcap.io.PcapInputFormat'
STORED AS INPUTFORMAT 'net.ripe.hadoop.pcap.mr1.io.PcapInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3n://pcaps/';
18 changes: 9 additions & 9 deletions hadoop-pcap-serde/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,27 @@
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>0.12.0-cdh5.1.3</version>
<version>${hiveVersion}-cdh${cdhVersion}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-serde</artifactId>
<version>0.12.0-cdh5.1.3</version>
<version>${hiveVersion}-cdh${cdhVersion}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>2.3.0-mr1-cdh5.1.3</version>
<version>${hadoopVersion}-mr1-cdh${cdhVersion}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.3.0-cdh5.1.3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoopVersion}-cdh${cdhVersion}</version>
<scope>provided</scope>
</dependency>

</dependencies>

Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
<artifactId>hadoop-pcap-root</artifactId>
<version>1.2-SNAPSHOT</version>
<packaging>pom</packaging>
<properties>
<hadoopVersion>2.6.0</hadoopVersion>
<cdhVersion>5.10.0</cdhVersion>
<hiveVersion>1.1.0</hiveVersion>
</properties>

<modules>
<module>hadoop-pcap-lib</module>
Expand Down