Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

创建1.0.5分支,修改时间字段不支持问题 #233

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ libraryDependencies ++= Seq(
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client" % testHadoopVersion.value % "test",
"org.apache.spark" %% "spark-core" % testSparkVersion.value % "test" exclude("org.apache.hadoop", "hadoop-client"),
"org.apache.spark" %% "spark-sql" % testSparkVersion.value % "test" exclude("org.apache.hadoop", "hadoop-client")
"org.apache.spark" %% "spark-sql" % testSparkVersion.value % "test" exclude("org.apache.hadoop", "hadoop-client"),
"org.apache.hadoop" % "hadoop-client" % testHadoopVersion.value % "provided"
)

// This is necessary because of how we explicitly specify Spark dependencies
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
sbt.version=0.13.6
sbt.version=0.13.13
24 changes: 12 additions & 12 deletions src/main/scala/magellan/io/ShapeReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ class NullShapeReader extends ShapeReader {
private[magellan] class PointReader extends ShapeReader {

override def readFields(dataInput: DataInput): Shape = {
val x = EndianUtils.swapDouble(dataInput.readDouble())
val y = EndianUtils.swapDouble(dataInput.readDouble())
val x = Double.longBitsToDouble(EndianUtils.swapLong(dataInput.readLong()))
val y = Double.longBitsToDouble(EndianUtils.swapLong(dataInput.readLong()))
Point(x, y)
}

Expand All @@ -52,10 +52,10 @@ private[magellan] class PolygonReader extends ShapeReader {

override def readFields(dataInput: DataInput): Polygon = {
// extract bounding box.
val xmin = EndianUtils.swapDouble(dataInput.readDouble())
val ymin = EndianUtils.swapDouble(dataInput.readDouble())
val xmax = EndianUtils.swapDouble(dataInput.readDouble())
val ymax = EndianUtils.swapDouble(dataInput.readDouble())
val xmin = Double.longBitsToDouble(EndianUtils.swapLong(dataInput.readLong()))
val ymin = Double.longBitsToDouble(EndianUtils.swapLong(dataInput.readLong()))
val xmax = Double.longBitsToDouble(EndianUtils.swapLong(dataInput.readLong()))
val ymax = Double.longBitsToDouble(EndianUtils.swapLong(dataInput.readLong()))

val box = BoundingBox(xmin, ymin, xmax, ymax)

Expand All @@ -81,8 +81,8 @@ private[magellan] class PolygonReader extends ShapeReader {
val ycoordinates = Array.fill(numPoints)(0.0)

for (index <- 0 until numPoints) {
val x = Double.longBitsToDouble(Long.reverseBytes(dataInput.readLong()))
val y = Double.longBitsToDouble(Long.reverseBytes(dataInput.readLong()))
val x = Double.longBitsToDouble(EndianUtils.swapLong(dataInput.readLong()))
val y = Double.longBitsToDouble(EndianUtils.swapLong(dataInput.readLong()))
xcoordinates.update(index, x)
ycoordinates.update(index, y)
}
Expand All @@ -96,7 +96,7 @@ private[magellan] class PolyLineReader extends ShapeReader {

def extract(dataInput: DataInput): (Array[Int], Array[Point]) = {
// extract bounding box.
(0 until 4).foreach { _ => EndianUtils.swapDouble(dataInput.readDouble())}
(0 until 4).foreach { _ => Double.longBitsToDouble(EndianUtils.swapLong(dataInput.readLong()))}

// numRings
val numRings = EndianUtils.swapInteger(dataInput.readInt())
Expand All @@ -119,8 +119,8 @@ private[magellan] class PolyLineReader extends ShapeReader {
val points = ArrayBuffer[Point]()
for (_ <- 0 until numPoints) {
points.+= {
val x = EndianUtils.swapDouble(dataInput.readDouble())
val y = EndianUtils.swapDouble(dataInput.readDouble())
val x = Double.longBitsToDouble(EndianUtils.swapLong(dataInput.readLong()))
val y = Double.longBitsToDouble(EndianUtils.swapLong(dataInput.readLong()))
Point(x, y)
}
}
Expand All @@ -139,7 +139,7 @@ private[magellan] class PolyLineZReader extends PolyLineReader {
val (indices, points) = extract(dataInput)
// throw away the Z and M values
val size = points.length
(0 until (4 + 2 * size)).foreach(_ => dataInput.readDouble())
(0 until (4 + 2 * size)).foreach(_ => Double.longBitsToDouble(EndianUtils.swapLong(dataInput.readLong())))
if(indices.size != points.size)
PolyLine( new Array[Int](points.size), points)
else
Expand Down
36 changes: 30 additions & 6 deletions src/main/scala/magellan/mapreduce/DBReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@
package magellan.mapreduce

import java.io.DataInputStream
import java.text.SimpleDateFormat

import scala.collection.mutable.ListBuffer

import magellan.io.ShapeKey
import org.apache.commons.io.EndianUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext}

import magellan.io.ShapeKey
import scala.collection.mutable.ListBuffer

private[magellan] class DBReader extends RecordReader[ShapeKey, MapWritable] {

Expand Down Expand Up @@ -73,16 +72,38 @@ private[magellan] class DBReader extends RecordReader[ShapeKey, MapWritable] {
case 'C' => {
val b = Array.fill[Byte](length)(0)
dis.readFully(b)
val value = new String(b).trim
val fld = new Text()
fld.append(b, 0, length)
fld.clear()
fld.set(value)
fld
}
case 'N' | 'F' => {
val b = Array.fill[Byte](length)(0)
dis.readFully(b)
var value = new String(b).trim
val index = value.indexOf(0)
if (index != -1) {
value = value.substring(0, index)
}
val fld = new Text()
fld.clear()
fld.set(new String(b))
fld.set(value)
fld
}
case 'D' => {
val b = Array.fill[Byte](length)(0)
dis.readFully(b, 0, length)
val fld = new Text()
fld.clear()
if (b(0) != '0') {
val value = new String(b).trim
if (!value.isEmpty) {
val format = new SimpleDateFormat(if (length == 8 ) "yyyyMMdd" else "yyyyMMddHHmmss")
val date = format.parse(value)
fld.set(date.toString)
}
}
fld
}

Expand Down Expand Up @@ -115,6 +136,9 @@ private[magellan] class DBReader extends RecordReader[ShapeKey, MapWritable] {
val date = Array.fill[Byte](3)(0)
dis.read(date)
numRecords = EndianUtils.swapInteger(dis.readInt())
if (numRecords == 0) {
return
}
val numBytesInHeader = EndianUtils.swapShort(dis.readShort())
numBytesInRecord = EndianUtils.swapShort(dis.readShort())
// skip the next 20 bytes
Expand Down
6 changes: 6 additions & 0 deletions src/main/scala/magellan/mapreduce/ShapeInputFormat.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ private[magellan] class ShapeInputFormat
splits.+= (makeSplit(path, 0, length, blkLocations(blkIndex).getHosts,
blkLocations(blkIndex).getCachedHosts))
} else {
val s = splitInfos(key).toSeq
val blkIndex = getBlockIndex(blkLocations, s(0))
splits.+=(makeSplit(path, s(0), length - s(0), blkLocations(blkIndex).getHosts,
blkLocations(blkIndex).getCachedHosts))
/*
val s = splitInfos(key).toSeq
val start = s
val end = s.drop(1) ++ Seq(length)
Expand All @@ -73,6 +78,7 @@ private[magellan] class ShapeInputFormat
splits.+=(makeSplit(path, startOffset, endOffset - startOffset, blkLocations(blkIndex).getHosts,
blkLocations(blkIndex).getCachedHosts))
}
*/
}
}
sw.stop
Expand Down