Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graph generator with HDFS support #27

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions .idea/libraries/buildScala.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ libraryDependencies += "org.mockito" % "mockito-all" % "1.8.5" % "test" withSour

libraryDependencies += "org.scala-tools.testing" % "specs_2.8.1" % "1.6.6" % "test" withSources()

libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "0.20.2"

publishMavenStyle := true

publishTo <<= version { (v: String) =>
Expand Down
2 changes: 2 additions & 0 deletions cassovary.iml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
<configuration>
<option name="compilerLibraryLevel" value="Project" />
<option name="compilerLibraryName" value="buildScala" />
<option name="maximumHeapSize" value="4096" />
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this equivalent to -Xmx4096m ? If so, kind of big to have.

<option name="vmOptions" value="-Xss256m -server" />
</configuration>
</facet>
</component>
Expand Down
63 changes: 63 additions & 0 deletions src/main/java/com/twitter/pers/graph_generator/EdgeListOutput.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.twitter.pers.graph_generator;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add copyright/license header information, see other files as an example.


import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;

/**
* Efficient parallel output of edges into a binary edge list format.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls write a couple of lines about this "edge list format"

* @author Aapo Kyrola, [email protected], [email protected]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we generally have not listed author names in the past in the source code. So, pls leave it out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, please ensure that you use your twitter email as the author too (e.g., git commit --amend --author="abc [email protected]"

*/
public class EdgeListOutput implements GraphOutput {

private String fileNamePrefix;

static int partSeq = 0;

public EdgeListOutput(String fileNamePrefix) {
this.fileNamePrefix = fileNamePrefix;
}

@Override
public void addEdges(int[] from, int[] to) {
try {
DataOutputStream dos = partitionOut.get();
int n = from.length;
for(int i=0; i<n; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'n' is not really needed

dos.writeInt(Integer.reverseBytes(from[i]));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any particular rationale to "reverseBytes" ?

dos.writeInt(Integer.reverseBytes(to[i]));
}
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}

public void finishUp() {
try {
partitionOut.get().close();
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}

/* Each thread will have a local partition */
private ThreadLocal<DataOutputStream> partitionOut = new ThreadLocal<DataOutputStream>() {
@Override
protected DataOutputStream initialValue() {
try {
int thisPartId;
synchronized (this) {
thisPartId = partSeq++;
}

String fileName = fileNamePrefix + "-part" + thisPartId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer "-part-" (k.e., extra '-' after "part")

return new DataOutputStream(new BufferedOutputStream(new FileOutputStream(fileName)));
} catch (Exception err) {
err.printStackTrace();
throw new RuntimeException(err);
}
}
};

}
12 changes: 12 additions & 0 deletions src/main/java/com/twitter/pers/graph_generator/GraphOutput.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.twitter.pers.graph_generator;

/**
* @author Aapo Kyrola, [email protected], [email protected]
*/
public interface GraphOutput {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls add comments on what this interface is, as well as some doc for each method.


void addEdges(int[] from, int[] to);

void finishUp();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.twitter.pers.graph_generator;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add copyright/license header information, see other files as an example.


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.IOException;

/**
* Outputs edges into HDFS in tab delimited edge list format
* @author Aapo Kyrola, [email protected], [email protected]
*/
public class HDFSEdgeListOutput implements GraphOutput {

private String fileNamePrefix;

static int partSeq = 0;

public HDFSEdgeListOutput(String fileNamePrefix) {
this.fileNamePrefix = fileNamePrefix;
System.out.println("Using HDFS output: " + fileNamePrefix);
}

@Override
public void addEdges(int[] from, int[] to) {
try {
FSDataOutputStream dos = partitionOut.get();
int n = from.length;
StringBuffer sb = new StringBuffer(from.length * 32);
for(int i=0; i<n; i++) {
sb.append(from[i]);
sb.append("\t");
sb.append(to[i]);
sb.append("\n");
}
dos.write(sb.toString().getBytes());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like the binary format here would be different than the one above (in EdgeListOutput). Is that correct? If so, it would be nice to have the same binary format. If it would be the same, then it would be nice to use the same code to do the actual writing if possible.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On Tue, Sep 25, 2012 at 3:10 PM, Pankaj Gupta [email protected]:

In src/main/java/com/twitter/pers/graph_generator/HDFSEdgeListOutput.java:

  •    System.out.println("Using HDFS output: " + fileNamePrefix);
    
  • }
  • @OverRide
  • public void addEdges(int[] from, int[] to) {
  •    try {
    
  •        FSDataOutputStream dos = partitionOut.get();
    
  •        int n = from.length;
    
  •        StringBuffer sb = new StringBuffer(from.length \* 32);
    
  •        for(int i=0; i<n; i++) {
    
  •            sb.append(from[i]);
    
  •            sb.append("\t");
    
  •            sb.append(to[i]);
    
  •            sb.append("\n");
    
  •        }
    
  •        dos.write(sb.toString().getBytes());
    

seems like the binary format here would be different than the one above
(in EdgeListOutput). Is that correct? If so, it would be nice to have the
same binary format. If it would be the same, then it would be nice to use
the same code to do the actual writing if possible.

Actually, this is not a binary format all, but Ascii format targeted for
Hadoop/Pig jobs.

The binary format was committed in mistake, as it is for generating graphs
for GraphChi to use (that is reason for the reverseBytes() as well). It
does not belong to the cassovary project.

There were many mistakes in my pull request, for example I don't
undersstand why the .idea files we commited. I will fix this week.

Aapo

} catch (Exception ioe) {
throw new RuntimeException(ioe);
}
}

public void finishUp() {
try {
partitionOut.get().close();
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}

/* Each thread will have a local partition */
private ThreadLocal<FSDataOutputStream> partitionOut = new ThreadLocal<FSDataOutputStream>() {
@Override
protected FSDataOutputStream initialValue() {
try {
int thisPartId;
synchronized (this) {
thisPartId = partSeq++;
}

String hadoopHome = System.getProperty("HADOOP_HOME");
if (hadoopHome == null) hadoopHome = System.getenv("HADOOP_HOME");

if (hadoopHome == null) {
throw new IllegalArgumentException("You need to specify environment variable or JVM option HADOOP_HOME!");
}

Configuration conf = new Configuration();
conf.addResource(new Path(hadoopHome + "/conf/core-site.xml"));
conf.addResource(new Path(hadoopHome + "/conf/hdfs-site.xml"));


String fileName = fileNamePrefix + "-part" + thisPartId;
FileSystem fs = FileSystem.get(conf);
return fs.create(new Path(fileName));
} catch (Exception err) {
err.printStackTrace();
throw new RuntimeException(err);
}
}
};

}
166 changes: 166 additions & 0 deletions src/main/java/com/twitter/pers/graph_generator/RMATGraphGenerator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package com.twitter.pers.graph_generator;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add copyright/license header information, see other files as an example.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, sorry about that. I have now added the license informations. I am not sure if they are included in this Pull request, but I cannot make a new Pull request....? (First time making a pull request!)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, looks like figured out how to update the PR :)


import java.util.ArrayList;
import java.util.Random;

/**
* Graph generator based on the R-MAT algorithm
* R-MAT: A Recursive Model for Graph Mining
* Chakrabarti, Zhan, Faloutsos: http://www.cs.cmu.edu/~christos/PUBLICATIONS/siam04.pdf
*
* Usage:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you pls add a note on how to use this in /examples/ExamplesREADME.md? It would be be even nicer to give an example of doing parallel pagerank on a huge graph generated using this (but that can wait when you have the parallel pagerank)

* [outputfile-prefix] [num-vertices] [num-edges] probA probB probC probD
*
* See the paper for the description of parameters probA, probB, probC, probD.
*
* If the outputfile-prefix starts with hdfs://, the graph is written into HDFS as tab-delimited
* partitions.
*
* Specify the number of threads used by passing a JVM option -Dnum_threads
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer to specify this as a cmdline option rather than use JVM option

* Each thread will create its own partition.
*
* Note that the result may (will) contain duplicate edges.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/may (will)/likely to, as explained in the paper./

*/
public class RMATGraphGenerator {

private GraphOutput outputter;

/* Parameters for top-left, top-right, bottom-left, bottom-right probabilities */
private double pA, pB, pC, pD;
private long numEdges;
private int numVertices;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this long?


/**
* From http://pywebgraph.sourceforge.net
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls write a couple of lines about why these probabilities are chosen (in addition to, if still needed, link to this reference)

## Probability of choosing quadrant A
self.probA = 0.45

## Probability of choosing quadrant B
self.probB = 0.15

## Probability of choosing quadrant C
self.probC = 0.15

## Probability of choosing quadrant D
self.probD = 0.25
*/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra newline


public RMATGraphGenerator(GraphOutput outputter, double pA, double pB, double pC, double pD, int nVertices,long nEdges) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inconsistent spacing (e.g., need extra space after the final ',')

this.outputter = outputter;
this.pA = pA;
this.pB = pB;
this.pC = pC;
this.pD = pD;

if (Math.abs(pA + pB + pC + pD - 1.0) > 0.01)
throw new IllegalArgumentException("Probabilities do not add up to one!");
numVertices = nVertices;
numEdges = nEdges;
}

public void execute() {
int nThreads = Integer.parseInt(System.getProperty("num_threads", Runtime.getRuntime().availableProcessors() + ""));

ArrayList<Thread> threads = new ArrayList<Thread>();
for(int i=0; i < nThreads; i++) {
Thread t = new Thread(new RMATGenerator(numEdges / nThreads));
t.start();
threads.add(t);
}

/* Wait */
try {
for(Thread t : threads) t.join();
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
}

private class RMATGenerator implements Runnable {

private long edgesToGenerate;

private RMATGenerator(long genEdges) {
this.edgesToGenerate = genEdges;
}

public void run() {
int nEdgesATime = 1000000;
long createdEdges = 0;

Random r = new Random(System.currentTimeMillis() + this.hashCode());

double cumA = pA;
double cumB = cumA + pB;
double cumC = cumB + pC;
double cumD = 1.0;
assert(cumD > cumC);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

= ?


while(edgesToGenerate > createdEdges) {
int ne = (int) Math.min(edgesToGenerate - createdEdges, nEdgesATime);
int[] fromIds = new int[ne];
int[] toIds = new int[ne];

for(int j=0; j < ne; j++) {
int col_st = 0, col_en = numVertices - 1, row_st = 0, row_en = numVertices - 1;
while (col_st != col_en || row_st != row_en) {
double x = r.nextDouble();

if (x < cumA) {
// Top-left
col_en = col_st + (col_en - col_st) / 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider calculating (col_en - col_st)/2 and (row_en-row_st)/2 outside the if statement for better readability.

row_en = row_st + (row_en - row_st) / 2;
} else if (x < cumB) {
// Top-right
col_st = col_en - (col_en - col_st) / 2;
row_en = row_st + (row_en - row_st) / 2;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kill extra newline

} else if (x < cumC) {
// Bottom-left
col_en = col_st + (col_en - col_st) / 2;
row_st = row_en - (row_en - row_st) / 2;
} else {
// Bottom-right
col_st = col_en - (col_en - col_st) / 2;
row_st = row_en - (row_en - row_st) / 2;
}
}
fromIds[j] = col_st;
toIds[j] = row_st;
}

outputter.addEdges(fromIds, toIds);
createdEdges += ne;
System.out.println(Thread.currentThread().getId() + " created " + createdEdges + " edges.");
}
outputter.finishUp();
}
}

public static void main(String[] args) {
int k = 0;
String outputFile = args[k++];
int numVertices = Integer.parseInt(args[k++]);
long numEdges = Long.parseLong(args[k++]);

double pA = Double.parseDouble(args[k++]);
double pB = Double.parseDouble(args[k++]);
double pC = Double.parseDouble(args[k++]);
double pD = Double.parseDouble(args[k++]);

System.out.println("Going to create graph with approx. " + numVertices + " vertices and " + numEdges + " edges");

GraphOutput outputInstance = null;
if (outputFile.startsWith("hdfs://")) outputInstance = new HDFSEdgeListOutput(outputFile);
else outputInstance = new EdgeListOutput(outputFile);

long t = System.currentTimeMillis();
RMATGraphGenerator generator = new RMATGraphGenerator(outputInstance,
pA, pB, pC, pD, numVertices, numEdges);
generator.execute();
System.out.println("Generating took " + (System.currentTimeMillis() - t) * 0.001 + " secs");

}

}
Loading