-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Raw Atlas Flow Integration #25
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few requests on the spark calls arrangement, otherwise that looks great!
codeVersion, dataVersion); | ||
|
||
// Slice the raw Atlas and save result | ||
final JavaPairRDD<String, Atlas> countrySlicedRawAtlasShardsRDD = sliceAndSaveRawAtlases( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make it easier to read the Spark flow, it would be nicer to see the "mapToPair" "filter", "reduce" calls here directly.
What I would do instead here is make the generateAndSaveRawAtlas
method return a Function
that does the transformation for you between the Tuples
. That way all you have to do here is something that will look like this:
final JavaPairRDD<String, Atlas> countrySlicedRawAtlasShardsRDD =
countryRawAtlasShardsRDD.mapToPair(sliceAndSaveRawAtlases(.....));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair point, I'll update this!
// Output the Name/Atlas couple | ||
final Tuple2<String, Atlas> result = new Tuple2<>(name, atlas); | ||
return result; | ||
}); | ||
}).filter(tuple -> tuple._2() != null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could also bubble up that filter call at the top level, for the same reasons.
final JavaPairRDD<String, Atlas> countryNonNullAtlasShardsRDD = countryAtlasShardsRDD | ||
.filter(tuple -> tuple._2() != null); | ||
// Persist the RDD to avoid re-computing and save each raw atlas | ||
countryRawAtlasShardsRDD.cache().saveAsHadoopFile( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same, bubble this up at the top level. cache()
calls are integral to understanding the Spark flow, and I would love to see them in the top method..
try | ||
{ | ||
statistics = counter.processAtlas(tuple._2()); | ||
final AtlasLoadingOption atlasLoadingOption = buildAtlasLoadingOption( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I remember how the legacy code passes the AtlasLoadingOption
around, but I believe it did not have to do this. We can review this at some point, there might be another way :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it passes it around, each slave builds it's own AtlasLoadingOption! Building it once and passing it around didn't work, since it will try to serialize the entire class and its contents - some of which aren't Serializable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try to think of something a little more elegant, if you have ideas, please let me know!
This now relies on this Atlas PR. |
This PR is feature complete. It needs a new release of the Atlas project before it can be merged. I've tested and visualized Atlas Generation with and without the new flag - it works as expected. I've also ran the integration test with the new flag, it passed! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The flow is much better and easy to read now. A few questions to be addressed! Thanks!!
return propertyMap; | ||
} | ||
|
||
private static Set<Shard> getAllShardsForCountry(final CountryBoundaryMap boundaries, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't that going to be slow? It would probably benefit from using the grid index, like there:
https://github.com/osmlab/atlas-generator/blob/4.0.4/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java#L138-L204
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - that's a good point, let me update to use this!
|
||
// Slice the raw Atlas and filter any null atlases | ||
final JavaPairRDD<String, Atlas> countrySlicedRawAtlasShardsRDD = countryRawAtlasShardsRDD | ||
.mapToPair(this.sliceRawAtlas(boundaries)).filter(tuple -> tuple._2() != null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here passing the boundaries means the driver will have to serialize it and pass it around to all the slaves. I guess the old flow does the same thing, not sure if there is a better way to do that... Would it be possible to only pass the boundary of the country that is being sliced? Not sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point, let me think of a better way to handle this or at least create an issue!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created: #28 - let's come back to this. I think it's important, but something we can live with in the meantime.
countrySlicedRawAtlasShardsRDD.cache(); | ||
countrySlicedRawAtlasShardsRDD.saveAsHadoopFile(slicedRawAtlasPath, Text.class, | ||
Atlas.class, MultipleAtlasOutputFormat.class, new JobConf(configuration())); | ||
logger.info("\n\n********** SAVED THE SLICED RAW ATLAS **********\n"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I believe you can now release the countryRawAtlasShardsRDD
, because the next step, countrySlicedRawAtlasShardsRDD
has already been cached.
countryAtlasShardsRDD.saveAsHadoopFile( | ||
getAlternateSubFolderOutput(output, ATLAS_FOLDER), Text.class, Atlas.class, | ||
MultipleAtlasOutputFormat.class, new JobConf(configuration())); | ||
logger.info("\n\n********** SAVED THE FINAL ATLAS **********\n"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, countrySlicedRawAtlasShardsRDD
should be releasable here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Knew I forgot something, good catch!!
getAlternateSubFolderOutput(output, SHARD_STATISTICS_FOLDER), Text.class, | ||
AtlasStatistics.class, MultipleAtlasStatisticsOutputFormat.class, | ||
new JobConf(configuration())); | ||
logger.info("\n\n********** SAVED THE SHARD STATISTICS **********\n"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And again, same here. Although at that point in the job, releasing RDD memory might not make much sense anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually not here, as it is still used by the Deltas below!
* @return A Spark {@link PairFlatMapFunction} that takes a tuple of a country shard name and | ||
* atlas file and returns all the {@link AtlasDelta} for the country | ||
*/ | ||
private PairFlatMapFunction<Tuple2<String, Atlas>, String, AtlasDelta> computeAtlasDelta( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make all those functions static so the whole SparkJob does not have to be serialized and passed around for the slaves to be able to call those methods!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And even better: push them to the helper!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call, let me do this!
/** | ||
* Utility class for {@link AtlasGenerator}. | ||
* | ||
* @author matthieun |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did I write that class??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You inspired it :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perfect!
This build is expected to fail, since it depends on this Atlas PR.The main change here is to integrate the Raw Atlas generation flow and make it backward compatible. This is done by to adding a "useRawAtlas"
Flag
, which when present will generate an Atlas using the Raw Atlas flow. I've tested this (both with and without) the flag successfully.Next steps include:
Adding Raw Atlas Fetcher for the sectioning stepAdding Metrics/Statistics for the final AtlasGeneral cleanup