Skip to content

Commit

Permalink
Refactor ArchiveRecord classes; addresses archivesunleashed#101 and a…
Browse files Browse the repository at this point in the history
…rchivesunleashed#102 (archivesunleashed#107)

* remove ArcRecord and WarcRecord; rename GenericArchiveRecord to ArchiveRecord

* fix import statements

* change tests to use loadArchives instead of loadArc or loadWarc

* change ArcTest to use loadArchives instead of loadArc

* remove loadArc and loadWarc from RecordLoader.scala

* change tests to use loadArchives with keepValidPages=false so as to not break them

* make 'filter date' in ArcTest and 'discard mime' in RecordRDDTest NOT call collect() on RDD[ArchiveRecord] --- ArcRecords and WarcRecords are serializable, but ArchiveRecords are not...

* remove commented line in RecordRDDTest.scala
  • Loading branch information
MapleOx authored and ruebot committed Oct 29, 2017
1 parent 010fe24 commit 5e99d63
Show file tree
Hide file tree
Showing 13 changed files with 117 additions and 288 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,81 @@
*/
package io.archivesunleashed.spark.archive.io

trait ArchiveRecord extends Serializable {
val getCrawlDate: String
import java.text.SimpleDateFormat

val getCrawlMonth: String
import org.apache.spark.SerializableWritable
import org.archive.io.arc.ARCRecord
import org.archive.io.warc.WARCRecord

val getCrawlYear: String
import org.archive.util.ArchiveUtils
import io.archivesunleashed.data.{ArcRecordUtils, WarcRecordUtils}
import io.archivesunleashed.io.GenericArchiveRecordWritable
import io.archivesunleashed.io.GenericArchiveRecordWritable.ArchiveFormat
import io.archivesunleashed.spark.matchbox.ExtractDate.DateComponent
import io.archivesunleashed.spark.matchbox.{RemoveHttpHeader, ExtractDate, ExtractDomain}

val getUrl: String
class ArchiveRecord(r: SerializableWritable[GenericArchiveRecordWritable]) extends Serializable {
var arcRecord: ARCRecord = null
var warcRecord: WARCRecord = null

val getDomain: String
if (r.t.getFormat == ArchiveFormat.ARC)
arcRecord = r.t.getRecord.asInstanceOf[ARCRecord]
else if (r.t.getFormat == ArchiveFormat.WARC)
warcRecord = r.t.getRecord.asInstanceOf[WARCRecord]

val getMimeType: String

val getContentString: String
val ISO8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssX")

val getContentBytes: Array[Byte]
val getCrawlDate: String = {
if (r.t.getFormat == ArchiveFormat.ARC) {
ExtractDate(arcRecord.getMetaData.getDate, DateComponent.YYYYMMDD)
} else {
ExtractDate(ArchiveUtils.get14DigitDate(ISO8601.parse(warcRecord.getHeader.getDate)), DateComponent.YYYYMMDD)
}
}

val getImageBytes: Array[Byte]
val getCrawlMonth: String = {
if (r.t.getFormat == ArchiveFormat.ARC) {
ExtractDate(arcRecord.getMetaData.getDate, DateComponent.YYYYMM)
} else {
ExtractDate(ArchiveUtils.get14DigitDate(ISO8601.parse(warcRecord.getHeader.getDate)), DateComponent.YYYYMM)
}
}

val getContentBytes: Array[Byte] = {
if (r.t.getFormat == ArchiveFormat.ARC) {
ArcRecordUtils.getBodyContent(arcRecord)
} else {
WarcRecordUtils.getContent(warcRecord)
}
}

val getContentString: String = new String(getContentBytes)

val getMimeType = {
if (r.t.getFormat == ArchiveFormat.ARC) {
arcRecord.getMetaData.getMimetype
} else {
WarcRecordUtils.getWarcResponseMimeType(getContentBytes)
}
}

val getUrl = {
if (r.t.getFormat == ArchiveFormat.ARC) {
arcRecord.getMetaData.getUrl
} else {
warcRecord.getHeader.getUrl
}
}

val getDomain: String = ExtractDomain(getUrl)

val getImageBytes: Array[Byte] = {
if (getContentString.startsWith("HTTP/"))
getContentBytes.slice(
getContentString.indexOf(RemoveHttpHeader.headerEnd)
+ RemoveHttpHeader.headerEnd.length, getContentBytes.length)
else
getContentBytes
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
*/
package io.archivesunleashed.spark.matchbox

import io.archivesunleashed.spark.archive.io.ArchiveRecord
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import io.archivesunleashed.spark.archive.io.ArchiveRecord
import io.archivesunleashed.spark.matchbox.StringUtils._
import io.archivesunleashed.spark.rdd.RecordRDD._
import io.archivesunleashed.spark.utils.JsonUtil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,21 @@ import org.apache.spark.rdd.RDD
import org.json4s._
import org.json4s.jackson.JsonMethods._
import io.archivesunleashed.io.GenericArchiveRecordWritable.ArchiveFormat
import io.archivesunleashed.io.{GenericArchiveRecordWritable, WarcRecordWritable, ArcRecordWritable}
import io.archivesunleashed.mapreduce.{WacGenericInputFormat, WacWarcInputFormat, WacArcInputFormat}
import io.archivesunleashed.spark.archive.io.{WarcRecord, ArcRecord, ArchiveRecord, GenericArchiveRecord}
import io.archivesunleashed.io.GenericArchiveRecordWritable
import io.archivesunleashed.mapreduce.WacGenericInputFormat
import io.archivesunleashed.spark.archive.io.ArchiveRecord
import io.archivesunleashed.spark.rdd.RecordRDD._

object RecordLoader {
def loadArc(path: String, sc: SparkContext): RDD[ArchiveRecord] = {
sc.newAPIHadoopFile(path, classOf[WacArcInputFormat], classOf[LongWritable], classOf[ArcRecordWritable])
.map(r => new ArcRecord(new SerializableWritable(r._2)))
}

def loadWarc(path: String, sc: SparkContext): RDD[ArchiveRecord] = {
sc.newAPIHadoopFile(path, classOf[WacWarcInputFormat], classOf[LongWritable], classOf[WarcRecordWritable])
.filter(r => r._2.getRecord.getHeader.getHeaderValue("WARC-Type").equals("response"))
.map(r => new WarcRecord(new SerializableWritable(r._2)))
}

def loadArchives(path: String, sc: SparkContext): RDD[ArchiveRecord] = {
sc.newAPIHadoopFile(path, classOf[WacGenericInputFormat], classOf[LongWritable], classOf[GenericArchiveRecordWritable])
def loadArchives(path: String, sc: SparkContext, keepValidPages: Boolean = true): RDD[ArchiveRecord] = {
val rdd: RDD[ArchiveRecord] =
sc.newAPIHadoopFile(path, classOf[WacGenericInputFormat], classOf[LongWritable], classOf[GenericArchiveRecordWritable])
.filter(r => (r._2.getFormat == ArchiveFormat.ARC) ||
((r._2.getFormat == ArchiveFormat.WARC) && r._2.getRecord.getHeader.getHeaderValue("WARC-Type").equals("response")))
.map(r => new GenericArchiveRecord(new SerializableWritable(r._2)))
.map(r => new ArchiveRecord(new SerializableWritable(r._2)))

if (keepValidPages) rdd.keepValidPages() else rdd
}

def loadTweets(path: String, sc: SparkContext): RDD[JValue] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package io.archivesunleashed.spark.rdd

import org.apache.spark.rdd.RDD
import io.archivesunleashed.spark.archive.io.ArchiveRecord
import org.apache.spark.rdd.RDD
import io.archivesunleashed.spark.matchbox.{DetectLanguage, ExtractDate, ExtractDomain, RemoveHTML}
import io.archivesunleashed.spark.matchbox.ExtractDate.DateComponent
import io.archivesunleashed.spark.matchbox.ExtractDate.DateComponent.DateComponent
Expand Down
Loading

0 comments on commit 5e99d63

Please sign in to comment.