From 0602def322bfeb7f55347adaa4862abf4a6a1663 Mon Sep 17 00:00:00 2001 From: Michael Gostintsev Date: Fri, 6 Apr 2018 15:12:06 -0700 Subject: [PATCH 1/9] Working end-to-end raw atlas flow --- .../atlas/generator/AtlasGenerator.java | 647 ++++++++++++------ .../atlas/generator/PbfLoader.java | 117 +++- 2 files changed, 562 insertions(+), 202 deletions(-) diff --git a/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java b/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java index dd7c251e..8c6d95ac 100644 --- a/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java +++ b/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java @@ -27,6 +27,8 @@ import org.openstreetmap.atlas.geography.atlas.Atlas; import org.openstreetmap.atlas.geography.atlas.delta.AtlasDelta; import org.openstreetmap.atlas.geography.atlas.pbf.AtlasLoadingOption; +import org.openstreetmap.atlas.geography.atlas.raw.sectioning.WaySectionProcessor; +import org.openstreetmap.atlas.geography.atlas.raw.slicing.RawAtlasCountrySlicer; import org.openstreetmap.atlas.geography.atlas.statistics.AtlasStatistics; import org.openstreetmap.atlas.geography.atlas.statistics.Counter; import org.openstreetmap.atlas.geography.boundary.CountryBoundary; @@ -35,6 +37,7 @@ import org.openstreetmap.atlas.geography.sharding.CountryShard; import org.openstreetmap.atlas.geography.sharding.Shard; import org.openstreetmap.atlas.geography.sharding.Sharding; +import org.openstreetmap.atlas.locale.IsoCountry; import org.openstreetmap.atlas.streaming.resource.Resource; import org.openstreetmap.atlas.tags.filters.ConfiguredTaggableFilter; import org.openstreetmap.atlas.utilities.collections.StringList; @@ -48,16 +51,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Maps; + import scala.Tuple2; /** - * Generate {@link Atlas} Shards for a specific version and a specific set of Countries + * Generate {@link Atlas} Shards for a specific version and a specific set of countries * * @author matthieun */ public class AtlasGenerator extends SparkJob { - /** * @author matthieun */ @@ -77,6 +81,8 @@ public String apply(final String key) private static final Logger logger = LoggerFactory.getLogger(AtlasGenerator.class); public static final String ATLAS_FOLDER = "atlas"; + public static final String RAW_ATLAS_FOLDER = "rawAtlas"; + public static final String SLICED_RAW_ATLAS_FOLDER = "slicedRawAtlas"; public static final String SHARD_STATISTICS_FOLDER = "shardStats"; public static final String COUNTRY_STATISTICS_FOLDER = "countryStats"; public static final String SHARD_DELTAS_FOLDER = "deltas"; @@ -129,6 +135,9 @@ public String apply(final String key) "osmPbfRelationConfiguration", "The path to the configuration file that defines which PBF Relations becomes an Atlas Entity", StringConverter.IDENTITY, Optionality.OPTIONAL); + // TODO - once this is fully baked and tested, remove flag and old flow + private static final Flag USE_RAW_ATLAS = new Flag("useRawAtlas", + "Allow PBF to Atlas process to use Raw Atlas flow"); public static void main(final String[] args) { @@ -167,7 +176,7 @@ protected static List generateTasks(final StringList countr } // Queue boundary for processing - countryBoundaries.forEach(countryBoundary -> queue.add(countryBoundary)); + countryBoundaries.forEach(queue::add); }); // Use all available processors except one (used by main thread) @@ -209,6 +218,71 @@ protected static StandardConfiguration getStandardConfigurationFrom(final String return new StandardConfiguration(FileSystemHelper.resource(path, configuration)); } + private static AtlasLoadingOption buildAtlasLoadingOption(final CountryBoundaryMap boundaries, + final Map sparkContext, final Map properties) + { + final AtlasLoadingOption atlasLoadingOption = AtlasLoadingOption + .createOptionWithAllEnabled(boundaries); + + // Apply all configurations + final String edgeConfiguration = properties.get(EDGE_CONFIGURATION.getName()); + if (edgeConfiguration != null) + { + atlasLoadingOption + .setEdgeFilter(getTaggableFilterFrom(edgeConfiguration, sparkContext)); + } + + final String waySectioningConfiguration = properties + .get(WAY_SECTIONING_CONFIGURATION.getName()); + if (waySectioningConfiguration != null) + { + atlasLoadingOption.setWaySectionFilter( + getTaggableFilterFrom(waySectioningConfiguration, sparkContext)); + } + + final String pbfNodeConfiguration = properties.get(PBF_NODE_CONFIGURATION.getName()); + if (pbfNodeConfiguration != null) + { + atlasLoadingOption + .setOsmPbfNodeFilter(getTaggableFilterFrom(pbfNodeConfiguration, sparkContext)); + } + + final String pbfWayConfiguration = properties.get(PBF_WAY_CONFIGURATION.getName()); + if (pbfWayConfiguration != null) + { + atlasLoadingOption + .setOsmPbfWayFilter(getTaggableFilterFrom(pbfWayConfiguration, sparkContext)); + } + + final String pbfRelationConfiguration = properties + .get(PBF_RELATION_CONFIGURATION.getName()); + if (pbfRelationConfiguration != null) + { + atlasLoadingOption.setOsmPbfRelationFilter( + getTaggableFilterFrom(pbfRelationConfiguration, sparkContext)); + } + + return atlasLoadingOption; + } + + private static Map extractAtlasLoadingProperties(final CommandMap command) + { + final Map propertyMap = Maps.newHashMap(); + propertyMap.put(CODE_VERSION.getName(), (String) command.get(CODE_VERSION)); + propertyMap.put(DATA_VERSION.getName(), (String) command.get(DATA_VERSION)); + propertyMap.put(EDGE_CONFIGURATION.getName(), (String) command.get(EDGE_CONFIGURATION)); + propertyMap.put(WAY_SECTIONING_CONFIGURATION.getName(), + (String) command.get(WAY_SECTIONING_CONFIGURATION)); + propertyMap.put(PBF_NODE_CONFIGURATION.getName(), + (String) command.get(PBF_NODE_CONFIGURATION)); + propertyMap.put(PBF_WAY_CONFIGURATION.getName(), + (String) command.get(PBF_WAY_CONFIGURATION)); + propertyMap.put(PBF_RELATION_CONFIGURATION.getName(), + (String) command.get(PBF_RELATION_CONFIGURATION)); + + return propertyMap; + } + private static ConfiguredTaggableFilter getTaggableFilterFrom(final String path, final Map configuration) { @@ -261,6 +335,8 @@ public void start(final CommandMap command) final String pbfNodeConfiguration = (String) command.get(PBF_NODE_CONFIGURATION); final String pbfWayConfiguration = (String) command.get(PBF_WAY_CONFIGURATION); final String pbfRelationConfiguration = (String) command.get(PBF_RELATION_CONFIGURATION); + final boolean useRawAtlas = (boolean) command.get(USE_RAW_ATLAS); + final String output = output(command); final Map sparkContext = configurationMap(); @@ -284,49 +360,295 @@ public void start(final CommandMap command) final List tasks = generateTasks(countries, boundaries, sharding); logger.debug("Generated {} tasks in {}.", tasks.size(), timer.elapsedSince()); - // Transform the map country name to shard to country name to Atlas - // This is not enforced, but it has to be a 1-1 mapping here. - final Map configurationMap = configurationMap(); - final JavaPairRDD countryAtlasShardsRDD = getContext() + if (useRawAtlas) + { + // AtlasLoadingOption isn't serializable, neither is command map. To avoid duplicating + // boiler-plate code for creating the AtlasLoadingOption, extract the properties we need + // from the command map and pass those around to create the AtlasLoadingOption + final Map atlasLoadingOptions = extractAtlasLoadingProperties(command); + + // Generate the raw Atlas and save it + final JavaPairRDD countryRawAtlasShardsRDD = generateAndSaveRawAtlas( + tasks, boundaries, sparkContext, atlasLoadingOptions, pbfContext, output, + codeVersion, dataVersion); + + // Slice the raw Atlas and save result + final JavaPairRDD countrySlicedRawAtlasShardsRDD = sliceAndSaveRawAtlases( + countryRawAtlasShardsRDD, boundaries, output); + + // Section the sliced raw Atlas and save result + final JavaPairRDD countryNonNullAtlasShardsRDD = sectionAndSaveRawAtlas( + countrySlicedRawAtlasShardsRDD, boundaries, sparkContext, atlasLoadingOptions, + output); + + // TODO Metrics, Deltas and Statistics for final Atlas + } + else + { + // Transform the map country name to shard to country name to Atlas + // This is not enforced, but it has to be a 1-1 mapping here. + final Map configurationMap = configurationMap(); + final JavaPairRDD countryAtlasShardsRDD = getContext() + .parallelize(tasks, tasks.size()).mapToPair(task -> + { + // Get the country name + final String countryName = task.getCountry(); + // Get the country shard + final Shard shard = task.getShard(); + // Build the AtlasLoadingOption + final AtlasLoadingOption atlasLoadingOption = AtlasLoadingOption + .createOptionWithAllEnabled(boundaries) + .setAdditionalCountryCodes(countryName); + + // Apply all configurations + if (edgeConfiguration != null) + { + atlasLoadingOption.setEdgeFilter( + getTaggableFilterFrom(edgeConfiguration, configurationMap)); + } + if (waySectioningConfiguration != null) + { + atlasLoadingOption.setWaySectionFilter(getTaggableFilterFrom( + waySectioningConfiguration, configurationMap)); + } + if (pbfNodeConfiguration != null) + { + atlasLoadingOption.setOsmPbfNodeFilter( + getTaggableFilterFrom(pbfNodeConfiguration, configurationMap)); + } + if (pbfWayConfiguration != null) + { + atlasLoadingOption.setOsmPbfWayFilter( + getTaggableFilterFrom(pbfWayConfiguration, configurationMap)); + } + if (pbfRelationConfiguration != null) + { + atlasLoadingOption.setOsmPbfRelationFilter(getTaggableFilterFrom( + pbfRelationConfiguration, configurationMap)); + } + + // Build the appropriate PbfLoader + final PbfLoader loader = new PbfLoader(pbfContext, sparkContext, boundaries, + atlasLoadingOption, codeVersion, dataVersion, task.getAllShards()); + final String name = countryName + CountryShard.COUNTRY_SHARD_SEPARATOR + + shard.getName(); + final Atlas atlas; + try + { + // Generate the Atlas for this shard + atlas = loader.load(countryName, shard); + } + catch (final Throwable e) + { + throw new CoreException("Building Atlas {} failed!", name, e); + } + + // Report on memory usage + logger.info("Printing memory after loading Atlas {}", name); + Memory.printCurrentMemory(); + // Output the Name/Atlas couple + final Tuple2 result = new Tuple2<>(name, atlas); + return result; + }); + + // Filter out null Atlas. + final JavaPairRDD countryNonNullAtlasShardsRDD = countryAtlasShardsRDD + .filter(tuple -> tuple._2() != null); + + // Cache the Atlas + countryNonNullAtlasShardsRDD.cache(); + logger.info("\n\n********** CACHED THE ATLAS **********\n"); + + // Run the metrics + final JavaPairRDD statisticsRDD = countryNonNullAtlasShardsRDD + .mapToPair(tuple -> + { + final Counter counter = new Counter().withSharding(sharding); + counter.setCountsDefinition(Counter.POI_COUNTS_DEFINITION.getDefault()); + final AtlasStatistics statistics; + try + { + statistics = counter.processAtlas(tuple._2()); + } + catch (final Exception e) + { + throw new CoreException("Building Atlas Statistics for {} failed!", + tuple._1(), e); + } + final Tuple2 result = new Tuple2<>(tuple._1(), + statistics); + return result; + }); + + // Cache the statistics + statisticsRDD.cache(); + logger.info("\n\n********** CACHED THE SHARD STATISTICS **********\n"); + + // Save the metrics + // splitAndSaveAsHadoopFile(statisticsRDD, + // getAlternateParallelFolderOutput(output, SHARD_STATISTICS_FOLDER), + // AtlasStatistics.class, MultipleAtlasStatisticsOutputFormat.class, + // new CountrySplitter()); + statisticsRDD.saveAsHadoopFile( + getAlternateSubFolderOutput(output, SHARD_STATISTICS_FOLDER), Text.class, + AtlasStatistics.class, MultipleAtlasStatisticsOutputFormat.class, + new JobConf(configuration())); + logger.info("\n\n********** SAVED THE SHARD STATISTICS **********\n"); + + // Aggregate the metrics + final JavaPairRDD reducedStatisticsRDD = statisticsRDD + .mapToPair(tuple -> + { + final String countryShardName = tuple._1(); + final String countryName = StringList.split(countryShardName, "_").get(0); + return new Tuple2<>(countryName, tuple._2()); + }).reduceByKey(AtlasStatistics::merge); + + // Save the aggregated metrics + // reducedStatisticsRDD.saveAsHadoopFile( + // getAlternateSubFolderOutput(output, COUNTRY_STATISTICS_FOLDER), Text.class, + // AtlasStatistics.class, MultipleAtlasStatisticsOutputFormat.class, + // new JobConf(configuration())); + reducedStatisticsRDD.saveAsHadoopFile( + getAlternateSubFolderOutput(output, COUNTRY_STATISTICS_FOLDER), Text.class, + AtlasStatistics.class, MultipleAtlasCountryStatisticsOutputFormat.class, + new JobConf(configuration())); + logger.info("\n\n********** SAVED THE COUNTRY STATISTICS **********\n"); + + // Compute the deltas if needed + if (!previousOutputForDelta.isEmpty()) + { + // Compute the deltas + final JavaPairRDD deltasRDD = countryNonNullAtlasShardsRDD + .flatMapToPair(tuple -> + { + final String countryShardName = tuple._1(); + final Atlas current = tuple._2(); + final List> result = new ArrayList<>(); + try + { + final Optional alter = new AtlasLocator(sparkContext) + .atlasForShard(previousOutputForDelta + "/" + + StringList + .split(countryShardName, + CountryShard.COUNTRY_SHARD_SEPARATOR) + .get(0), + countryShardName); + if (alter.isPresent()) + { + logger.info( + "Printing memory after other Atlas loaded for Delta {}", + countryShardName); + Memory.printCurrentMemory(); + final AtlasDelta delta = new AtlasDelta(current, alter.get()) + .generate(); + result.add(new Tuple2<>(countryShardName, delta)); + } + } + catch (final Exception e) + { + logger.error("Skipping! Could not generate deltas for {}", + countryShardName, e); + } + return result; + }); + + // deltasRDD.cache(); + // logger.info("\n\n********** CACHED THE DELTAS **********\n"); + + // Save the deltas + // splitAndSaveAsHadoopFile(deltasRDD, getAlternateOutput(output, + // SHARD_DELTAS_FOLDER), + // AtlasDelta.class, MultipleAtlasDeltaOutputFormat.class, + // COUNTRY_NAME_FROM_COUNTRY_SHARD); + // deltasRDD.saveAsHadoopFile(getAlternateOutput(output, SHARD_DELTAS_FOLDER), + // Text.class, + // AtlasDelta.class, MultipleAtlasDeltaOutputFormat.class, + // new JobConf(configuration())); + // logger.info("\n\n********** SAVED THE DELTAS **********\n"); + // + // splitAndSaveAsHadoopFile(deltasRDD, + // getAlternateOutput(output, SHARD_DELTAS_ADDED_FOLDER), AtlasDelta.class, + // AddedMultipleAtlasDeltaOutputFormat.class, COUNTRY_NAME_FROM_COUNTRY_SHARD); + // deltasRDD.saveAsHadoopFile(getAlternateOutput(output, SHARD_DELTAS_ADDED_FOLDER), + // Text.class, AtlasDelta.class, AddedMultipleAtlasDeltaOutputFormat.class, + // new JobConf(configuration())); + // logger.info("\n\n********** SAVED THE DELTAS ADDED **********\n"); + // + // splitAndSaveAsHadoopFile(deltasRDD, + // getAlternateOutput(output, SHARD_DELTAS_CHANGED_FOLDER), AtlasDelta.class, + // ChangedMultipleAtlasDeltaOutputFormat.class, COUNTRY_NAME_FROM_COUNTRY_SHARD); + // deltasRDD.saveAsHadoopFile(getAlternateOutput(output, + // SHARD_DELTAS_CHANGED_FOLDER), + // Text.class, AtlasDelta.class, ChangedMultipleAtlasDeltaOutputFormat.class, + // new JobConf(configuration())); + // logger.info("\n\n********** SAVED THE DELTAS CHANGED **********\n"); + + // splitAndSaveAsHadoopFile(deltasRDD, + // getAlternateParallelFolderOutput(output, SHARD_DELTAS_REMOVED_FOLDER), + // AtlasDelta.class, RemovedMultipleAtlasDeltaOutputFormat.class, + // new CountrySplitter()); + deltasRDD.saveAsHadoopFile( + getAlternateSubFolderOutput(output, SHARD_DELTAS_REMOVED_FOLDER), + Text.class, AtlasDelta.class, RemovedMultipleAtlasDeltaOutputFormat.class, + new JobConf(configuration())); + logger.info("\n\n********** SAVED THE DELTAS REMOVED **********\n"); + } + + // Save the result as Atlas files, one for each key. + // splitAndSaveAsHadoopFile(countryNonNullAtlasShardsRDD, + // getAlternateParallelFolderOutput(output, ATLAS_FOLDER), Atlas.class, + // MultipleAtlasOutputFormat.class, new CountrySplitter()); + countryNonNullAtlasShardsRDD.saveAsHadoopFile( + getAlternateSubFolderOutput(output, ATLAS_FOLDER), Text.class, Atlas.class, + MultipleAtlasOutputFormat.class, new JobConf(configuration())); + logger.info("\n\n********** SAVED THE ATLAS **********\n"); + } + } + + @Override + protected List outputToClean(final CommandMap command) + { + final String output = output(command); + final List staticPaths = super.outputToClean(command); + staticPaths.add(getAlternateSubFolderOutput(output, COUNTRY_STATISTICS_FOLDER)); + staticPaths.add(getAlternateSubFolderOutput(output, SHARD_STATISTICS_FOLDER)); + staticPaths.add(getAlternateSubFolderOutput(output, SHARD_DELTAS_FOLDER)); + staticPaths.add(getAlternateSubFolderOutput(output, SHARD_DELTAS_ADDED_FOLDER)); + staticPaths.add(getAlternateSubFolderOutput(output, SHARD_DELTAS_CHANGED_FOLDER)); + staticPaths.add(getAlternateSubFolderOutput(output, SHARD_DELTAS_REMOVED_FOLDER)); + staticPaths.add(getAlternateSubFolderOutput(output, ATLAS_FOLDER)); + return staticPaths; + } + + @Override + protected SwitchList switches() + { + return super.switches().with(COUNTRIES, COUNTRY_SHAPES, SHARDING_TYPE, PBF_PATH, PBF_SCHEME, + PBF_SHARDING, PREVIOUS_OUTPUT_FOR_DELTA, CODE_VERSION, DATA_VERSION, + EDGE_CONFIGURATION, WAY_SECTIONING_CONFIGURATION, PBF_NODE_CONFIGURATION, + PBF_WAY_CONFIGURATION, PBF_RELATION_CONFIGURATION, USE_RAW_ATLAS); + } + + private JavaPairRDD generateAndSaveRawAtlas( + final List tasks, final CountryBoundaryMap boundaries, + final Map sparkContext, final Map loadingOptions, + final PbfContext pbfContext, final String outputDirectory, final String codeVersion, + final String dataVersion) + { + // Construct the RDD, filter out null atlases + final JavaPairRDD countryRawAtlasShardsRDD = getContext() .parallelize(tasks, tasks.size()).mapToPair(task -> { - // Get the country name final String countryName = task.getCountry(); - // Get the country shard final Shard shard = task.getShard(); - // Build the AtlasLoadingOption - final AtlasLoadingOption atlasLoadingOption = AtlasLoadingOption - .createOptionWithAllEnabled(boundaries) - .setAdditionalCountryCodes(countryName); - // Apply all configurations - if (edgeConfiguration != null) - { - atlasLoadingOption.setEdgeFilter( - getTaggableFilterFrom(edgeConfiguration, configurationMap)); - } - if (waySectioningConfiguration != null) - { - atlasLoadingOption.setWaySectionFilter(getTaggableFilterFrom( - waySectioningConfiguration, configurationMap)); - } - if (pbfNodeConfiguration != null) - { - atlasLoadingOption.setOsmPbfNodeFilter( - getTaggableFilterFrom(pbfNodeConfiguration, configurationMap)); - } - if (pbfWayConfiguration != null) - { - atlasLoadingOption.setOsmPbfWayFilter( - getTaggableFilterFrom(pbfWayConfiguration, configurationMap)); - } - if (pbfRelationConfiguration != null) - { - atlasLoadingOption.setOsmPbfRelationFilter( - getTaggableFilterFrom(pbfRelationConfiguration, configurationMap)); - } + // Set the country code that is being processed! + final AtlasLoadingOption atlasLoadingOption = buildAtlasLoadingOption( + boundaries, sparkContext, loadingOptions); + atlasLoadingOption.setAdditionalCountryCodes(countryName); - // Build the appropriate PbfLoader + // Build the PbfLoader final PbfLoader loader = new PbfLoader(pbfContext, sparkContext, boundaries, atlasLoadingOption, codeVersion, dataVersion, task.getAllShards()); final String name = countryName + CountryShard.COUNTRY_SHARD_SEPARATOR @@ -334,193 +656,132 @@ public void start(final CommandMap command) final Atlas atlas; try { - // Generate the Atlas for this shard - atlas = loader.load(countryName, shard); + // Generate the raw Atlas for this shard + atlas = loader.generateRawAtlas(countryName, shard); } catch (final Throwable e) { - throw new CoreException("Building Atlas {} failed!", name, e); + throw new CoreException("Building raw Atlas {} failed!", name, e); } // Report on memory usage - logger.info("Printing memory after loading Atlas {}", name); + logger.info("Printing memory after loading raw Atlas {}", name); Memory.printCurrentMemory(); + // Output the Name/Atlas couple final Tuple2 result = new Tuple2<>(name, atlas); return result; - }); + }).filter(tuple -> tuple._2() != null); - // Filter out null Atlas. - final JavaPairRDD countryNonNullAtlasShardsRDD = countryAtlasShardsRDD - .filter(tuple -> tuple._2() != null); + // Persist the RDD to avoid re-computing and save each raw atlas + countryRawAtlasShardsRDD.cache().saveAsHadoopFile( + getAlternateSubFolderOutput(outputDirectory, RAW_ATLAS_FOLDER), Text.class, + Atlas.class, MultipleAtlasOutputFormat.class, new JobConf(configuration())); + logger.info("\n\n********** SAVED THE RAW ATLAS **********\n"); - // Cache the Atlas - countryNonNullAtlasShardsRDD.cache(); - logger.info("\n\n********** CACHED THE ATLAS **********\n"); + return countryRawAtlasShardsRDD; + } - // Run the metrics - final JavaPairRDD statisticsRDD = countryNonNullAtlasShardsRDD + private JavaPairRDD sectionAndSaveRawAtlas( + final JavaPairRDD countrySlicedRawAtlasShardsRDD, + final CountryBoundaryMap boundaries, final Map sparkContext, + final Map loadingOptions, final String outputDirectory) + { + final JavaPairRDD countryAtlasShardsRDD = countrySlicedRawAtlasShardsRDD .mapToPair(tuple -> { - final Counter counter = new Counter().withSharding(sharding); - counter.setCountsDefinition(Counter.POI_COUNTS_DEFINITION.getDefault()); - final AtlasStatistics statistics; + final Atlas atlas; try { - statistics = counter.processAtlas(tuple._2()); + final AtlasLoadingOption atlasLoadingOption = buildAtlasLoadingOption( + boundaries, sparkContext, loadingOptions); + + // TODO - Leverage sharding and Atlas fetcher here! + + // Section the Atlas + atlas = new WaySectionProcessor(tuple._2(), atlasLoadingOption).run(); } - catch (final Exception e) + catch (final Throwable e) { - throw new CoreException("Building Atlas Statistics for {} failed!", - tuple._1(), e); + throw new CoreException("Slicing Atlas {} failed!", tuple._2().getName(), + e); } - final Tuple2 result = new Tuple2<>(tuple._1(), - statistics); + + // Report on memory usage + logger.info("Printing memory after loading final Atlas {}", + tuple._2().getName()); + Memory.printCurrentMemory(); + + // Output the Name/Atlas couple + final Tuple2 result = new Tuple2<>(tuple._1(), atlas); return result; }); - // Cache the statistics - statisticsRDD.cache(); - logger.info("\n\n********** CACHED THE SHARD STATISTICS **********\n"); - - // Save the metrics - // splitAndSaveAsHadoopFile(statisticsRDD, - // getAlternateParallelFolderOutput(output, SHARD_STATISTICS_FOLDER), - // AtlasStatistics.class, MultipleAtlasStatisticsOutputFormat.class, - // new CountrySplitter()); - statisticsRDD.saveAsHadoopFile(getAlternateSubFolderOutput(output, SHARD_STATISTICS_FOLDER), - Text.class, AtlasStatistics.class, MultipleAtlasStatisticsOutputFormat.class, - new JobConf(configuration())); - logger.info("\n\n********** SAVED THE SHARD STATISTICS **********\n"); - - // Aggregate the metrics - final JavaPairRDD reducedStatisticsRDD = statisticsRDD + // Save the final Atlas files, one for each key + countryAtlasShardsRDD.saveAsHadoopFile( + getAlternateSubFolderOutput(outputDirectory, ATLAS_FOLDER), Text.class, Atlas.class, + MultipleAtlasOutputFormat.class, new JobConf(configuration())); + logger.info("\n\n********** SAVED THE FINAL ATLAS **********\n"); + + return countryAtlasShardsRDD; + } + + private JavaPairRDD sliceAndSaveRawAtlases( + final JavaPairRDD countryRawAtlasShardsRDD, + final CountryBoundaryMap boundaries, final String outputDirectory) + { + // Construct the RDD, filter out null atlases + final JavaPairRDD countrySlicedRawAtlasShardsRDD = countryRawAtlasShardsRDD .mapToPair(tuple -> { - final String countryShardName = tuple._1(); - final String countryName = StringList.split(countryShardName, "_").get(0); - return new Tuple2<>(countryName, tuple._2()); - }).reduceByKey(AtlasStatistics::merge); - - // Save the aggregated metrics - // reducedStatisticsRDD.saveAsHadoopFile( - // getAlternateSubFolderOutput(output, COUNTRY_STATISTICS_FOLDER), Text.class, - // AtlasStatistics.class, MultipleAtlasStatisticsOutputFormat.class, - // new JobConf(configuration())); - reducedStatisticsRDD.saveAsHadoopFile( - getAlternateSubFolderOutput(output, COUNTRY_STATISTICS_FOLDER), Text.class, - AtlasStatistics.class, MultipleAtlasCountryStatisticsOutputFormat.class, - new JobConf(configuration())); - logger.info("\n\n********** SAVED THE COUNTRY STATISTICS **********\n"); - - // Compute the deltas if needed - if (!previousOutputForDelta.isEmpty()) - { - // Compute the deltas - final JavaPairRDD deltasRDD = countryNonNullAtlasShardsRDD - .flatMapToPair(tuple -> + final Atlas slicedAtlas; + + // Grab the tuple contents + final String shardName = tuple._1(); + final Atlas rawAtlas = tuple._2(); + + try { - final String countryShardName = tuple._1(); - final Atlas current = tuple._2(); - final List> result = new ArrayList<>(); - try + // Extract the country code + final Set isoCountries = new HashSet<>(); + final Optional countryName = IsoCountry.forCountryCode( + shardName.split(CountryShard.COUNTRY_SHARD_SEPARATOR)[0]); + if (countryName.isPresent()) { - final Optional alter = new AtlasLocator(sparkContext) - .atlasForShard(previousOutputForDelta + "/" - + StringList - .split(countryShardName, - CountryShard.COUNTRY_SHARD_SEPARATOR) - .get(0), - countryShardName); - if (alter.isPresent()) - { - logger.info("Printing memory after other Atlas loaded for Delta {}", - countryShardName); - Memory.printCurrentMemory(); - final AtlasDelta delta = new AtlasDelta(current, alter.get()) - .generate(); - result.add(new Tuple2<>(countryShardName, delta)); - } + isoCountries.add(countryName.get()); } - catch (final Exception e) + else { - logger.error("Skipping! Could not generate deltas for {}", - countryShardName, e); + logger.error("Unable to extract valid IsoCountry code from {}", + shardName); } - return result; - }); - - // deltasRDD.cache(); - // logger.info("\n\n********** CACHED THE DELTAS **********\n"); - // Save the deltas - // splitAndSaveAsHadoopFile(deltasRDD, getAlternateOutput(output, SHARD_DELTAS_FOLDER), - // AtlasDelta.class, MultipleAtlasDeltaOutputFormat.class, - // COUNTRY_NAME_FROM_COUNTRY_SHARD); - // deltasRDD.saveAsHadoopFile(getAlternateOutput(output, SHARD_DELTAS_FOLDER), - // Text.class, - // AtlasDelta.class, MultipleAtlasDeltaOutputFormat.class, - // new JobConf(configuration())); - // logger.info("\n\n********** SAVED THE DELTAS **********\n"); - // - // splitAndSaveAsHadoopFile(deltasRDD, - // getAlternateOutput(output, SHARD_DELTAS_ADDED_FOLDER), AtlasDelta.class, - // AddedMultipleAtlasDeltaOutputFormat.class, COUNTRY_NAME_FROM_COUNTRY_SHARD); - // deltasRDD.saveAsHadoopFile(getAlternateOutput(output, SHARD_DELTAS_ADDED_FOLDER), - // Text.class, AtlasDelta.class, AddedMultipleAtlasDeltaOutputFormat.class, - // new JobConf(configuration())); - // logger.info("\n\n********** SAVED THE DELTAS ADDED **********\n"); - // - // splitAndSaveAsHadoopFile(deltasRDD, - // getAlternateOutput(output, SHARD_DELTAS_CHANGED_FOLDER), AtlasDelta.class, - // ChangedMultipleAtlasDeltaOutputFormat.class, COUNTRY_NAME_FROM_COUNTRY_SHARD); - // deltasRDD.saveAsHadoopFile(getAlternateOutput(output, SHARD_DELTAS_CHANGED_FOLDER), - // Text.class, AtlasDelta.class, ChangedMultipleAtlasDeltaOutputFormat.class, - // new JobConf(configuration())); - // logger.info("\n\n********** SAVED THE DELTAS CHANGED **********\n"); + // Slice the Atlas + slicedAtlas = new RawAtlasCountrySlicer(isoCountries, boundaries) + .slice(rawAtlas); + } + catch (final Throwable e) + { + throw new CoreException("Slicing raw Atlas {} failed!", rawAtlas.getName(), + e); + } - // splitAndSaveAsHadoopFile(deltasRDD, - // getAlternateParallelFolderOutput(output, SHARD_DELTAS_REMOVED_FOLDER), - // AtlasDelta.class, RemovedMultipleAtlasDeltaOutputFormat.class, - // new CountrySplitter()); - deltasRDD.saveAsHadoopFile( - getAlternateSubFolderOutput(output, SHARD_DELTAS_REMOVED_FOLDER), Text.class, - AtlasDelta.class, RemovedMultipleAtlasDeltaOutputFormat.class, - new JobConf(configuration())); - logger.info("\n\n********** SAVED THE DELTAS REMOVED **********\n"); - } + // Report on memory usage + logger.info("Printing memory after loading sliced raw Atlas {}", + rawAtlas.getName()); + Memory.printCurrentMemory(); - // Save the result as Atlas files, one for each key. - // splitAndSaveAsHadoopFile(countryNonNullAtlasShardsRDD, - // getAlternateParallelFolderOutput(output, ATLAS_FOLDER), Atlas.class, - // MultipleAtlasOutputFormat.class, new CountrySplitter()); - countryNonNullAtlasShardsRDD.saveAsHadoopFile( - getAlternateSubFolderOutput(output, ATLAS_FOLDER), Text.class, Atlas.class, - MultipleAtlasOutputFormat.class, new JobConf(configuration())); - logger.info("\n\n********** SAVED THE ATLAS **********\n"); - } + // Output the Name/Atlas couple + final Tuple2 result = new Tuple2<>(tuple._1(), slicedAtlas); + return result; + }).filter(tuple -> tuple._2() != null); - @Override - protected List outputToClean(final CommandMap command) - { - final String output = output(command); - final List staticPaths = super.outputToClean(command); - staticPaths.add(getAlternateSubFolderOutput(output, COUNTRY_STATISTICS_FOLDER)); - staticPaths.add(getAlternateSubFolderOutput(output, SHARD_STATISTICS_FOLDER)); - staticPaths.add(getAlternateSubFolderOutput(output, SHARD_DELTAS_FOLDER)); - staticPaths.add(getAlternateSubFolderOutput(output, SHARD_DELTAS_ADDED_FOLDER)); - staticPaths.add(getAlternateSubFolderOutput(output, SHARD_DELTAS_CHANGED_FOLDER)); - staticPaths.add(getAlternateSubFolderOutput(output, SHARD_DELTAS_REMOVED_FOLDER)); - staticPaths.add(getAlternateSubFolderOutput(output, ATLAS_FOLDER)); - return staticPaths; - } + // Persist the RDD to avoid re-computing and save each sliced raw atlas + countrySlicedRawAtlasShardsRDD.cache().saveAsHadoopFile( + getAlternateSubFolderOutput(outputDirectory, SLICED_RAW_ATLAS_FOLDER), Text.class, + Atlas.class, MultipleAtlasOutputFormat.class, new JobConf(configuration())); + logger.info("\n\n********** SAVED THE SLICED RAW ATLAS **********\n"); - @Override - protected SwitchList switches() - { - return super.switches().with(COUNTRIES, COUNTRY_SHAPES, SHARDING_TYPE, PBF_PATH, PBF_SCHEME, - PBF_SHARDING, PREVIOUS_OUTPUT_FOR_DELTA, CODE_VERSION, DATA_VERSION, - EDGE_CONFIGURATION, WAY_SECTIONING_CONFIGURATION, PBF_NODE_CONFIGURATION, - PBF_WAY_CONFIGURATION, PBF_RELATION_CONFIGURATION); + return countrySlicedRawAtlasShardsRDD; } } diff --git a/src/main/java/org/openstreetmap/atlas/generator/PbfLoader.java b/src/main/java/org/openstreetmap/atlas/generator/PbfLoader.java index e78d9799..c5079a50 100644 --- a/src/main/java/org/openstreetmap/atlas/generator/PbfLoader.java +++ b/src/main/java/org/openstreetmap/atlas/generator/PbfLoader.java @@ -4,6 +4,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -15,6 +16,7 @@ import org.openstreetmap.atlas.geography.atlas.packed.PackedAtlas; import org.openstreetmap.atlas.geography.atlas.pbf.AtlasLoadingOption; import org.openstreetmap.atlas.geography.atlas.pbf.OsmPbfLoader; +import org.openstreetmap.atlas.geography.atlas.raw.creation.RawAtlasGenerator; import org.openstreetmap.atlas.geography.boundary.CountryBoundary; import org.openstreetmap.atlas.geography.boundary.CountryBoundaryMap; import org.openstreetmap.atlas.geography.clipping.Clip.ClipType; @@ -61,7 +63,7 @@ public static void setAtlasSaveFolder(final File atlasSaveFolder) * @param sparkContext * The context from Spark * @param boundaries - * The country boundary map + * The {@link CountryBoundaryMap} * @param atlasLoadingOption * The loading options for the {@link OsmPbfLoader} * @param codeVersion @@ -84,6 +86,83 @@ public PbfLoader(final PbfContext pbfContext, final Map sparkCon this.countryShards = countryShards; } + public Atlas generateRawAtlas(final String countryName, final Shard shard) + { + final Optional loadingArea = calculateLoadingArea(countryName, shard); + if (loadingArea.isPresent()) + { + final Iterable pbfPool = this.locator.pbfsCovering(loadingArea.get()); + + final List atlases = new ArrayList<>(); + final Map metaDataTags = Maps.hashMap(); + + // Add shard information to the meta data + metaDataTags.put("countryShards", + this.countryShards + .stream().map(countryShard -> countryName + + CountryShard.COUNTRY_SHARD_SEPARATOR + countryShard.getName()) + .collect(Collectors.joining(","))); + metaDataTags.put(shard.getName() + "_boundary", loadingArea.toString()); + + // For each PBF, create an atlas + pbfPool.forEach(locatedPbf -> + { + final MultiPolygon pbfLoadingArea = locatedPbf.bounds() + .clip(loadingArea.get(), ClipType.AND).getClipMultiPolygon(); + final AtlasMetaData metaData = new AtlasMetaData(null, true, this.codeVersion, + this.dataVersion, countryName, shard.getName(), metaDataTags); + Atlas shardPbfSlice = null; + try + { + shardPbfSlice = new RawAtlasGenerator(locatedPbf.getResource(), pbfLoadingArea) + .withMetaData(metaData).build(); + } + catch (final Exception e) + { + logger.error("Dropping PBF {} for Atlas shard {}", + locatedPbf.getResource().getName(), shard, e); + } + if (shardPbfSlice != null) + { + atlases.add(shardPbfSlice); + } + }); + + // Save, if needed + if (ATLAS_SAVE_FOLDER != null) + { + int index = 0; + for (final Atlas atlas : atlases) + { + atlas.save( + ATLAS_SAVE_FOLDER.child(shard.getName() + "_" + index++ + ".atlas.gz")); + } + } + + if (atlases.size() > 1) + { + // Concatenate many PBFs in one single Atlas + logger.info("Concatenating {} PBF-made Atlas into one Atlas Shard {}", + atlases.size(), shard); + return PackedAtlas.cloneFrom(new MultiAtlas(atlases)); + } + else if (atlases.size() == 1) + { + // Only one PBF was used + return atlases.get(0); + } + else + { + // There are no PBF resources. + return null; + } + } + else + { + return null; + } + } + /** * Generate the {@link Atlas} for a {@link Shard}. * @@ -91,10 +170,33 @@ public PbfLoader(final PbfContext pbfContext, final Map sparkCon * The Country to process * @param shard * The shard to output - * @return The build {@link Atlas} for the specified {@link Shard}. null if there is no Atlas to - * be build (because no PBF or empty PBFs or no overlap) + * @return The built {@link Atlas} for the specified {@link Shard}. {@code null} if there is no + * Atlas to be built (because no PBF or empty PBFs or no overlap) */ public Atlas load(final String countryName, final Shard shard) + { + final Optional loadingArea = calculateLoadingArea(countryName, shard); + if (loadingArea.isPresent()) + { + final Iterable pbfPool = this.locator.pbfsCovering(loadingArea.get()); + return loadFromPool(pbfPool, loadingArea.get(), countryName, shard); + } + else + { + return null; + } + } + + /** + * Calculates the loading area given a country name and working {@link Shard}. + * + * @param countryName + * The country being built + * @param shard + * The shard being processed + * @return the intersection of given country's boundary clipped with the given shard's bounds + */ + private Optional calculateLoadingArea(final String countryName, final Shard shard) { final List countryBoundaries = this.boundaries .countryBoundary(countryName); @@ -109,15 +211,12 @@ public Atlas load(final String countryName, final Shard shard) } if (boundary != null) { - final MultiPolygon loadingArea = shard.bounds().clip(boundary, ClipType.AND) - .getClipMultiPolygon(); - final Iterable pbfPool = this.locator.pbfsCovering(loadingArea); - return loadFromPool(pbfPool, loadingArea, countryName, shard); + return Optional.of(shard.bounds().clip(boundary, ClipType.AND).getClipMultiPolygon()); } else { - logger.error("Can't found shard {} for country {}", shard, countryName); - return null; + logger.error("Can't find shard {} for country {}", shard, countryName); + return Optional.empty(); } } From 78c155b3213f91d09c10020a4e4b559851aeeed1 Mon Sep 17 00:00:00 2001 From: Michael Gostintsev Date: Mon, 9 Apr 2018 12:10:11 -0700 Subject: [PATCH 2/9] refactor to make Spark flow more apparent --- .../atlas/generator/AtlasGenerator.java | 313 +++++++++--------- 1 file changed, 165 insertions(+), 148 deletions(-) diff --git a/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java b/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java index 8c6d95ac..22adffef 100644 --- a/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java +++ b/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java @@ -16,6 +16,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.PairFunction; import org.openstreetmap.atlas.exception.CoreException; import org.openstreetmap.atlas.generator.persistence.MultipleAtlasCountryStatisticsOutputFormat; import org.openstreetmap.atlas.generator.persistence.MultipleAtlasOutputFormat; @@ -59,6 +60,7 @@ * Generate {@link Atlas} Shards for a specific version and a specific set of countries * * @author matthieun + * @author mgostintsev */ public class AtlasGenerator extends SparkJob { @@ -279,6 +281,8 @@ private static Map extractAtlasLoadingProperties(final CommandMa (String) command.get(PBF_WAY_CONFIGURATION)); propertyMap.put(PBF_RELATION_CONFIGURATION.getName(), (String) command.get(PBF_RELATION_CONFIGURATION)); + propertyMap.put(CODE_VERSION.getName(), (String) command.get(CODE_VERSION)); + propertyMap.put(DATA_VERSION.getName(), (String) command.get(DATA_VERSION)); return propertyMap; } @@ -367,19 +371,39 @@ public void start(final CommandMap command) // from the command map and pass those around to create the AtlasLoadingOption final Map atlasLoadingOptions = extractAtlasLoadingProperties(command); - // Generate the raw Atlas and save it - final JavaPairRDD countryRawAtlasShardsRDD = generateAndSaveRawAtlas( - tasks, boundaries, sparkContext, atlasLoadingOptions, pbfContext, output, - codeVersion, dataVersion); + // Generate the raw Atlas and filter any null atlases + final JavaPairRDD countryRawAtlasShardsRDD = getContext() + .parallelize(tasks, tasks.size()).mapToPair(this.generateRawAtlas(boundaries, + sparkContext, atlasLoadingOptions, pbfContext)) + .filter(tuple -> tuple._2() != null); - // Slice the raw Atlas and save result - final JavaPairRDD countrySlicedRawAtlasShardsRDD = sliceAndSaveRawAtlases( - countryRawAtlasShardsRDD, boundaries, output); + // Persist the RDD and save the intermediary state + countryRawAtlasShardsRDD.cache(); + countryRawAtlasShardsRDD.saveAsHadoopFile( + getAlternateSubFolderOutput(output, RAW_ATLAS_FOLDER), Text.class, Atlas.class, + MultipleAtlasOutputFormat.class, new JobConf(configuration())); + logger.info("\n\n********** SAVED THE RAW ATLAS **********\n"); - // Section the sliced raw Atlas and save result - final JavaPairRDD countryNonNullAtlasShardsRDD = sectionAndSaveRawAtlas( - countrySlicedRawAtlasShardsRDD, boundaries, sparkContext, atlasLoadingOptions, - output); + // Slice the raw Atlas and filter any null atlases + final JavaPairRDD countrySlicedRawAtlasShardsRDD = countryRawAtlasShardsRDD + .mapToPair(this.sliceRawAtlas(boundaries)).filter(tuple -> tuple._2() != null); + + // Persist the RDD and save the intermediary state + countrySlicedRawAtlasShardsRDD.cache(); + countrySlicedRawAtlasShardsRDD.saveAsHadoopFile( + getAlternateSubFolderOutput(output, SLICED_RAW_ATLAS_FOLDER), Text.class, + Atlas.class, MultipleAtlasOutputFormat.class, new JobConf(configuration())); + logger.info("\n\n********** SAVED THE SLICED RAW ATLAS **********\n"); + + // Section the sliced raw Atlas + final JavaPairRDD countryAtlasShardsRDD = countrySlicedRawAtlasShardsRDD + .mapToPair(this.sectionRawAtlas(boundaries, sparkContext, atlasLoadingOptions)); + + // Save the result, one for each key + countryAtlasShardsRDD.saveAsHadoopFile( + getAlternateSubFolderOutput(output, ATLAS_FOLDER), Text.class, Atlas.class, + MultipleAtlasOutputFormat.class, new JobConf(configuration())); + logger.info("\n\n********** SAVED THE FINAL ATLAS **********\n"); // TODO Metrics, Deltas and Statistics for final Atlas } @@ -630,158 +654,151 @@ protected SwitchList switches() PBF_WAY_CONFIGURATION, PBF_RELATION_CONFIGURATION, USE_RAW_ATLAS); } - private JavaPairRDD generateAndSaveRawAtlas( - final List tasks, final CountryBoundaryMap boundaries, - final Map sparkContext, final Map loadingOptions, - final PbfContext pbfContext, final String outputDirectory, final String codeVersion, - final String dataVersion) + /** + * @param boundaries + * The {@link CountryBoundaryMap} to use for pbf to atlas generation + * @param sparkContext + * Spark context (or configuration) as a key-value map + * @param loadingOptions + * The basic required properties to create an {@link AtlasLoadingOption} + * @param pbfContext + * The context explaining where to find the PBFs + * @return a Spark {@link PairFunction} that processes an {@link AtlasGenerationTask}, loads the + * PBF for the task's shard, generates the raw atlas for the shard and outputs a shard + * name to raw atlas tuple. + */ + private PairFunction generateRawAtlas( + final CountryBoundaryMap boundaries, final Map sparkContext, + final Map loadingOptions, final PbfContext pbfContext) { - // Construct the RDD, filter out null atlases - final JavaPairRDD countryRawAtlasShardsRDD = getContext() - .parallelize(tasks, tasks.size()).mapToPair(task -> - { - final String countryName = task.getCountry(); - final Shard shard = task.getShard(); - - // Set the country code that is being processed! - final AtlasLoadingOption atlasLoadingOption = buildAtlasLoadingOption( - boundaries, sparkContext, loadingOptions); - atlasLoadingOption.setAdditionalCountryCodes(countryName); - - // Build the PbfLoader - final PbfLoader loader = new PbfLoader(pbfContext, sparkContext, boundaries, - atlasLoadingOption, codeVersion, dataVersion, task.getAllShards()); - final String name = countryName + CountryShard.COUNTRY_SHARD_SEPARATOR - + shard.getName(); - final Atlas atlas; - try - { - // Generate the raw Atlas for this shard - atlas = loader.generateRawAtlas(countryName, shard); - } - catch (final Throwable e) - { - throw new CoreException("Building raw Atlas {} failed!", name, e); - } - - // Report on memory usage - logger.info("Printing memory after loading raw Atlas {}", name); - Memory.printCurrentMemory(); - - // Output the Name/Atlas couple - final Tuple2 result = new Tuple2<>(name, atlas); - return result; - }).filter(tuple -> tuple._2() != null); + return task -> + { + final String countryName = task.getCountry(); + final Shard shard = task.getShard(); + + // Set the country code that is being processed! + final AtlasLoadingOption atlasLoadingOption = buildAtlasLoadingOption(boundaries, + sparkContext, loadingOptions); + atlasLoadingOption.setAdditionalCountryCodes(countryName); + + // Build the PbfLoader + final PbfLoader loader = new PbfLoader(pbfContext, sparkContext, boundaries, + atlasLoadingOption, loadingOptions.get(CODE_VERSION.getName()), + loadingOptions.get(DATA_VERSION.getName()), task.getAllShards()); + final String name = countryName + CountryShard.COUNTRY_SHARD_SEPARATOR + + shard.getName(); + + // Generate the raw Atlas for this shard + final Atlas atlas; + try + { + atlas = loader.generateRawAtlas(countryName, shard); + } + catch (final Throwable e) + { + throw new CoreException("Building raw Atlas {} failed!", name, e); + } - // Persist the RDD to avoid re-computing and save each raw atlas - countryRawAtlasShardsRDD.cache().saveAsHadoopFile( - getAlternateSubFolderOutput(outputDirectory, RAW_ATLAS_FOLDER), Text.class, - Atlas.class, MultipleAtlasOutputFormat.class, new JobConf(configuration())); - logger.info("\n\n********** SAVED THE RAW ATLAS **********\n"); + // Report on memory usage + logger.info("Printing memory after loading raw Atlas {}", name); + Memory.printCurrentMemory(); - return countryRawAtlasShardsRDD; + // Output the Name/Atlas couple + final Tuple2 result = new Tuple2<>(name, atlas); + return result; + }; } - private JavaPairRDD sectionAndSaveRawAtlas( - final JavaPairRDD countrySlicedRawAtlasShardsRDD, + /** + * @param boundaries + * The {@link CountryBoundaryMap} required to create an {@link AtlasLoadingOption} + * @param sparkContext + * Spark context (or configuration) as a key-value map + * @param loadingOptions + * The basic required properties to create an {@link AtlasLoadingOption} + * @return a Spark {@link PairFunction} that processes a tuple of shard-name and sliced raw + * atlas, sections the sliced raw atlas and returns the final sectioned (and sliced) raw + * atlas for that shard name. + */ + private PairFunction, String, Atlas> sectionRawAtlas( final CountryBoundaryMap boundaries, final Map sparkContext, - final Map loadingOptions, final String outputDirectory) + final Map loadingOptions) { - final JavaPairRDD countryAtlasShardsRDD = countrySlicedRawAtlasShardsRDD - .mapToPair(tuple -> - { - final Atlas atlas; - try - { - final AtlasLoadingOption atlasLoadingOption = buildAtlasLoadingOption( - boundaries, sparkContext, loadingOptions); + return tuple -> + { + final Atlas atlas; + try + { + final AtlasLoadingOption atlasLoadingOption = buildAtlasLoadingOption(boundaries, + sparkContext, loadingOptions); - // TODO - Leverage sharding and Atlas fetcher here! + // TODO - Leverage sharding and Atlas fetcher here! - // Section the Atlas - atlas = new WaySectionProcessor(tuple._2(), atlasLoadingOption).run(); - } - catch (final Throwable e) - { - throw new CoreException("Slicing Atlas {} failed!", tuple._2().getName(), - e); - } - - // Report on memory usage - logger.info("Printing memory after loading final Atlas {}", - tuple._2().getName()); - Memory.printCurrentMemory(); - - // Output the Name/Atlas couple - final Tuple2 result = new Tuple2<>(tuple._1(), atlas); - return result; - }); - - // Save the final Atlas files, one for each key - countryAtlasShardsRDD.saveAsHadoopFile( - getAlternateSubFolderOutput(outputDirectory, ATLAS_FOLDER), Text.class, Atlas.class, - MultipleAtlasOutputFormat.class, new JobConf(configuration())); - logger.info("\n\n********** SAVED THE FINAL ATLAS **********\n"); - - return countryAtlasShardsRDD; + // Section the Atlas + atlas = new WaySectionProcessor(tuple._2(), atlasLoadingOption).run(); + } + catch (final Throwable e) + { + throw new CoreException("Slicing Atlas {} failed!", tuple._2().getName(), e); + } + + // Report on memory usage + logger.info("Printing memory after loading final Atlas {}", tuple._2().getName()); + Memory.printCurrentMemory(); + + // Output the Name/Atlas couple + final Tuple2 result = new Tuple2<>(tuple._1(), atlas); + return result; + }; } - private JavaPairRDD sliceAndSaveRawAtlases( - final JavaPairRDD countryRawAtlasShardsRDD, - final CountryBoundaryMap boundaries, final String outputDirectory) + /** + * @param boundaries + * The {@link CountryBoundaryMap} to use for slicing + * @return a Spark {@link PairFunction} that processes a tuple of shard-name and raw atlas, + * slices the raw atlas and returns the sliced raw atlas for that shard name. + */ + private PairFunction, String, Atlas> sliceRawAtlas( + final CountryBoundaryMap boundaries) { - // Construct the RDD, filter out null atlases - final JavaPairRDD countrySlicedRawAtlasShardsRDD = countryRawAtlasShardsRDD - .mapToPair(tuple -> + return tuple -> + { + final Atlas slicedAtlas; + + // Grab the tuple contents + final String shardName = tuple._1(); + final Atlas rawAtlas = tuple._2(); + + try + { + // Extract the country code + final Set isoCountries = new HashSet<>(); + final Optional countryName = IsoCountry + .forCountryCode(shardName.split(CountryShard.COUNTRY_SHARD_SEPARATOR)[0]); + if (countryName.isPresent()) + { + isoCountries.add(countryName.get()); + } + else { - final Atlas slicedAtlas; + logger.error("Unable to extract valid IsoCountry code from {}", shardName); + } - // Grab the tuple contents - final String shardName = tuple._1(); - final Atlas rawAtlas = tuple._2(); + // Slice the Atlas + slicedAtlas = new RawAtlasCountrySlicer(isoCountries, boundaries).slice(rawAtlas); + } + catch (final Throwable e) + { + throw new CoreException("Slicing raw Atlas {} failed!", rawAtlas.getName(), e); + } - try - { - // Extract the country code - final Set isoCountries = new HashSet<>(); - final Optional countryName = IsoCountry.forCountryCode( - shardName.split(CountryShard.COUNTRY_SHARD_SEPARATOR)[0]); - if (countryName.isPresent()) - { - isoCountries.add(countryName.get()); - } - else - { - logger.error("Unable to extract valid IsoCountry code from {}", - shardName); - } + // Report on memory usage + logger.info("Printing memory after loading sliced raw Atlas {}", rawAtlas.getName()); + Memory.printCurrentMemory(); - // Slice the Atlas - slicedAtlas = new RawAtlasCountrySlicer(isoCountries, boundaries) - .slice(rawAtlas); - } - catch (final Throwable e) - { - throw new CoreException("Slicing raw Atlas {} failed!", rawAtlas.getName(), - e); - } - - // Report on memory usage - logger.info("Printing memory after loading sliced raw Atlas {}", - rawAtlas.getName()); - Memory.printCurrentMemory(); - - // Output the Name/Atlas couple - final Tuple2 result = new Tuple2<>(tuple._1(), slicedAtlas); - return result; - }).filter(tuple -> tuple._2() != null); - - // Persist the RDD to avoid re-computing and save each sliced raw atlas - countrySlicedRawAtlasShardsRDD.cache().saveAsHadoopFile( - getAlternateSubFolderOutput(outputDirectory, SLICED_RAW_ATLAS_FOLDER), Text.class, - Atlas.class, MultipleAtlasOutputFormat.class, new JobConf(configuration())); - logger.info("\n\n********** SAVED THE SLICED RAW ATLAS **********\n"); - - return countrySlicedRawAtlasShardsRDD; + // Output the Name/Atlas couple + final Tuple2 result = new Tuple2<>(tuple._1(), slicedAtlas); + return result; + }; } } From 205d5c9125b82298827936d18f8a3d8e21672ee1 Mon Sep 17 00:00:00 2001 From: Michael Gostintsev Date: Mon, 9 Apr 2018 15:23:18 -0700 Subject: [PATCH 3/9] Adding metrics and atlas deltas to new flow, moving calculations to helper functions --- .../atlas/generator/AtlasGenerator.java | 162 ++++++++++++------ 1 file changed, 111 insertions(+), 51 deletions(-) diff --git a/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java b/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java index ed1a1676..273e3d60 100644 --- a/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java +++ b/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java @@ -16,6 +16,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import org.openstreetmap.atlas.exception.CoreException; import org.openstreetmap.atlas.generator.persistence.AbstractMultipleAtlasBasedOutputFormat; @@ -416,7 +417,47 @@ public void start(final CommandMap command) MultipleAtlasOutputFormat.class, new JobConf(configuration())); logger.info("\n\n********** SAVED THE FINAL ATLAS **********\n"); - // TODO Metrics, Deltas and Statistics for final Atlas + // Create the metrics + final JavaPairRDD statisticsRDD = countryAtlasShardsRDD + .mapToPair(generateAtlasStatistics(sharding)); + + // Persist the RDD and save + statisticsRDD.cache(); + statisticsRDD.saveAsHadoopFile( + getAlternateSubFolderOutput(output, SHARD_STATISTICS_FOLDER), Text.class, + AtlasStatistics.class, MultipleAtlasStatisticsOutputFormat.class, + new JobConf(configuration())); + logger.info("\n\n********** SAVED THE SHARD STATISTICS **********\n"); + + // Aggregate the metrics + final JavaPairRDD reducedStatisticsRDD = statisticsRDD + .mapToPair(tuple -> + { + final String countryShardName = tuple._1(); + final String countryName = StringList.split(countryShardName, "_").get(0); + return new Tuple2<>(countryName, tuple._2()); + }).reduceByKey(AtlasStatistics::merge); + + // Save aggregated metrics + reducedStatisticsRDD.saveAsHadoopFile( + getAlternateSubFolderOutput(output, COUNTRY_STATISTICS_FOLDER), Text.class, + AtlasStatistics.class, MultipleAtlasCountryStatisticsOutputFormat.class, + new JobConf(configuration())); + logger.info("\n\n********** SAVED THE COUNTRY STATISTICS **********\n"); + + // Compute the deltas, if needed + if (!previousOutputForDelta.isEmpty()) + { + final JavaPairRDD deltasRDD = countryAtlasShardsRDD + .flatMapToPair( + this.computeAtlasDelta(sparkContext, previousOutputForDelta)); + + // Save the deltas + deltasRDD.saveAsHadoopFile(getAlternateSubFolderOutput(output, SHARD_DELTAS_FOLDER), + Text.class, AtlasDelta.class, RemovedMultipleAtlasDeltaOutputFormat.class, + new JobConf(configuration())); + logger.info("\n\n********** SAVED THE DELTAS **********\n"); + } } else { @@ -497,24 +538,7 @@ public void start(final CommandMap command) // Run the metrics final JavaPairRDD statisticsRDD = countryNonNullAtlasShardsRDD - .mapToPair(tuple -> - { - final Counter counter = new Counter().withSharding(sharding); - counter.setCountsDefinition(Counter.POI_COUNTS_DEFINITION.getDefault()); - final AtlasStatistics statistics; - try - { - statistics = counter.processAtlas(tuple._2()); - } - catch (final Exception e) - { - throw new CoreException("Building Atlas Statistics for {} failed!", - tuple._1(), e); - } - final Tuple2 result = new Tuple2<>(tuple._1(), - statistics); - return result; - }); + .mapToPair(generateAtlasStatistics(sharding)); // Cache the statistics statisticsRDD.cache(); @@ -556,38 +580,7 @@ public void start(final CommandMap command) { // Compute the deltas final JavaPairRDD deltasRDD = countryNonNullAtlasShardsRDD - .flatMapToPair(tuple -> - { - final String countryShardName = tuple._1(); - final Atlas current = tuple._2(); - final List> result = new ArrayList<>(); - try - { - final Optional alter = new AtlasLocator(sparkContext) - .atlasForShard(previousOutputForDelta + "/" - + StringList - .split(countryShardName, - CountryShard.COUNTRY_SHARD_SEPARATOR) - .get(0), - countryShardName); - if (alter.isPresent()) - { - logger.info( - "Printing memory after other Atlas loaded for Delta {}", - countryShardName); - Memory.printCurrentMemory(); - final AtlasDelta delta = new AtlasDelta(current, alter.get()) - .generate(); - result.add(new Tuple2<>(countryShardName, delta)); - } - } - catch (final Exception e) - { - logger.error("Skipping! Could not generate deltas for {}", - countryShardName, e); - } - return result; - }); + .flatMapToPair(computeAtlasDelta(sparkContext, previousOutputForDelta)); // deltasRDD.cache(); // logger.info("\n\n********** CACHED THE DELTAS **********\n"); @@ -666,6 +659,73 @@ protected SwitchList switches() PBF_WAY_CONFIGURATION, PBF_RELATION_CONFIGURATION, ATLAS_SCHEME, USE_RAW_ATLAS); } + /** + * @param sparkContext + * Spark context (or configuration) as a key-value map + * @param previousOutputForDelta + * Previous Atlas generation delta output location + * @return A Spark {@link PairFlatMapFunction} that takes a tuple of a country shard name and + * atlas file and returns all the {@link AtlasDelta} for the country + */ + private PairFlatMapFunction, String, AtlasDelta> computeAtlasDelta( + final Map sparkContext, final String previousOutputForDelta) + { + return tuple -> + { + final String countryShardName = tuple._1(); + final Atlas current = tuple._2(); + final List> result = new ArrayList<>(); + try + { + final Optional alter = new AtlasLocator(sparkContext).atlasForShard( + previousOutputForDelta + "/" + + StringList.split(countryShardName, + CountryShard.COUNTRY_SHARD_SEPARATOR).get(0), + countryShardName); + if (alter.isPresent()) + { + logger.info("Printing memory after other Atlas loaded for Delta {}", + countryShardName); + Memory.printCurrentMemory(); + final AtlasDelta delta = new AtlasDelta(current, alter.get()).generate(); + result.add(new Tuple2<>(countryShardName, delta)); + } + } + catch (final Exception e) + { + logger.error("Skipping! Could not generate deltas for {}", countryShardName, e); + } + return result; + }; + } + + /** + * @param sharding + * The sharding tree + * @return a Spark {@link PairFunction} that processes a shard to Atlas tuple, and constructs a + * {@link AtlasStatistics} for each shard. + */ + private PairFunction, String, AtlasStatistics> generateAtlasStatistics( + final Sharding sharding) + { + return tuple -> + { + final Counter counter = new Counter().withSharding(sharding); + counter.setCountsDefinition(Counter.POI_COUNTS_DEFINITION.getDefault()); + final AtlasStatistics statistics; + try + { + statistics = counter.processAtlas(tuple._2()); + } + catch (final Exception e) + { + throw new CoreException("Building Atlas Statistics for {} failed!", tuple._1(), e); + } + final Tuple2 result = new Tuple2<>(tuple._1(), statistics); + return result; + }; + } + /** * @param boundaries * The {@link CountryBoundaryMap} to use for pbf to atlas generation From 3bc990cb13c4cd35a6df9911bd1ba168632c1203 Mon Sep 17 00:00:00 2001 From: Michael Gostintsev Date: Mon, 9 Apr 2018 15:24:54 -0700 Subject: [PATCH 4/9] Upgrading to atlas 5.0.11 --- dependencies.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dependencies.gradle b/dependencies.gradle index 37e23bd1..4d2fb293 100644 --- a/dependencies.gradle +++ b/dependencies.gradle @@ -1,6 +1,6 @@ project.ext.versions = [ checkstyle: '7.6.1', - atlas: '5.0.10', + atlas: '5.0.11', spark: '1.6.0-cdh5.7.0', snappy: '1.1.1.6', ] From cd9db035043213fc52c8314521744d88d5e42554 Mon Sep 17 00:00:00 2001 From: Michael Gostintsev Date: Tue, 10 Apr 2018 11:37:53 -0700 Subject: [PATCH 5/9] Updating integraton test, accounting for empty Clipped loading area --- .../AtlasGeneratorIntegrationTest.java | 2 ++ .../atlas/generator/PbfLoader.java | 35 +++++++++++-------- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/src/integrationTest/java/org/openstreetmap/atlas/generator/AtlasGeneratorIntegrationTest.java b/src/integrationTest/java/org/openstreetmap/atlas/generator/AtlasGeneratorIntegrationTest.java index 83d51194..102a2005 100644 --- a/src/integrationTest/java/org/openstreetmap/atlas/generator/AtlasGeneratorIntegrationTest.java +++ b/src/integrationTest/java/org/openstreetmap/atlas/generator/AtlasGeneratorIntegrationTest.java @@ -52,6 +52,8 @@ public void testAtlasGeneration() arguments.add("-pbfSharding=dynamic@" + TREE_14); arguments.add("-pbfScheme=zz/zz-xx-yy.osm.pbf"); arguments.add("-atlasScheme=zz/"); + // TODO - uncomment line below after switching to raw atlas flow + // arguments.add("-useRawAtlas"); arguments.add("-sharding=dynamic@" + TREE_13); arguments.add( "-sparkOptions=fs.resource.impl=" + ResourceFileSystem.class.getCanonicalName()); diff --git a/src/main/java/org/openstreetmap/atlas/generator/PbfLoader.java b/src/main/java/org/openstreetmap/atlas/generator/PbfLoader.java index c5079a50..e09dd7d0 100644 --- a/src/main/java/org/openstreetmap/atlas/generator/PbfLoader.java +++ b/src/main/java/org/openstreetmap/atlas/generator/PbfLoader.java @@ -109,22 +109,27 @@ public Atlas generateRawAtlas(final String countryName, final Shard shard) { final MultiPolygon pbfLoadingArea = locatedPbf.bounds() .clip(loadingArea.get(), ClipType.AND).getClipMultiPolygon(); - final AtlasMetaData metaData = new AtlasMetaData(null, true, this.codeVersion, - this.dataVersion, countryName, shard.getName(), metaDataTags); - Atlas shardPbfSlice = null; - try - { - shardPbfSlice = new RawAtlasGenerator(locatedPbf.getResource(), pbfLoadingArea) - .withMetaData(metaData).build(); - } - catch (final Exception e) - { - logger.error("Dropping PBF {} for Atlas shard {}", - locatedPbf.getResource().getName(), shard, e); - } - if (shardPbfSlice != null) + + // Guard against an empty loading area + if (!pbfLoadingArea.isEmpty()) { - atlases.add(shardPbfSlice); + final AtlasMetaData metaData = new AtlasMetaData(null, true, this.codeVersion, + this.dataVersion, countryName, shard.getName(), metaDataTags); + Atlas shardPbfSlice = null; + try + { + shardPbfSlice = new RawAtlasGenerator(locatedPbf.getResource(), + pbfLoadingArea).withMetaData(metaData).build(); + } + catch (final Exception e) + { + logger.error("Dropping PBF {} for Atlas shard {}", + locatedPbf.getResource().getName(), shard, e); + } + if (shardPbfSlice != null) + { + atlases.add(shardPbfSlice); + } } }); From 551a282400489604c7e0980c37cde6a070378366 Mon Sep 17 00:00:00 2001 From: Michael Gostintsev Date: Tue, 10 Apr 2018 13:30:49 -0700 Subject: [PATCH 6/9] Implementing Atlas fetcher policy when sectioning atlases --- .../AtlasGeneratorIntegrationTest.java | 2 - .../atlas/generator/AtlasGenerator.java | 54 +++++- .../atlas/generator/AtlasGeneratorHelper.java | 175 ++++++++++++++++++ 3 files changed, 220 insertions(+), 11 deletions(-) create mode 100644 src/main/java/org/openstreetmap/atlas/generator/AtlasGeneratorHelper.java diff --git a/src/integrationTest/java/org/openstreetmap/atlas/generator/AtlasGeneratorIntegrationTest.java b/src/integrationTest/java/org/openstreetmap/atlas/generator/AtlasGeneratorIntegrationTest.java index 102a2005..83d51194 100644 --- a/src/integrationTest/java/org/openstreetmap/atlas/generator/AtlasGeneratorIntegrationTest.java +++ b/src/integrationTest/java/org/openstreetmap/atlas/generator/AtlasGeneratorIntegrationTest.java @@ -52,8 +52,6 @@ public void testAtlasGeneration() arguments.add("-pbfSharding=dynamic@" + TREE_14); arguments.add("-pbfScheme=zz/zz-xx-yy.osm.pbf"); arguments.add("-atlasScheme=zz/"); - // TODO - uncomment line below after switching to raw atlas flow - // arguments.add("-useRawAtlas"); arguments.add("-sharding=dynamic@" + TREE_13); arguments.add( "-sparkOptions=fs.resource.impl=" + ResourceFileSystem.class.getCanonicalName()); diff --git a/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java b/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java index 273e3d60..4bcd76b2 100644 --- a/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java +++ b/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java @@ -28,6 +28,7 @@ import org.openstreetmap.atlas.generator.sharding.AtlasSharding; import org.openstreetmap.atlas.generator.tools.filesystem.FileSystemHelper; import org.openstreetmap.atlas.generator.tools.spark.SparkJob; +import org.openstreetmap.atlas.generator.tools.spark.utilities.SparkFileHelper; import org.openstreetmap.atlas.geography.atlas.Atlas; import org.openstreetmap.atlas.geography.atlas.delta.AtlasDelta; import org.openstreetmap.atlas.geography.atlas.pbf.AtlasLoadingOption; @@ -44,6 +45,7 @@ import org.openstreetmap.atlas.locale.IsoCountry; import org.openstreetmap.atlas.streaming.resource.Resource; import org.openstreetmap.atlas.tags.filters.ConfiguredTaggableFilter; +import org.openstreetmap.atlas.utilities.collections.Iterables; import org.openstreetmap.atlas.utilities.collections.StringList; import org.openstreetmap.atlas.utilities.configuration.StandardConfiguration; import org.openstreetmap.atlas.utilities.conversion.StringConverter; @@ -296,6 +298,20 @@ private static Map extractAtlasLoadingProperties(final CommandMa return propertyMap; } + private static Set getAllShardsForCountry(final CountryBoundaryMap boundaries, + final String country, final Sharding sharding) + { + return boundaries.countryBoundary(country).stream() + .flatMap( + boundary -> boundary.getBoundary().outers() + .stream()) + .flatMap(subBoundary -> Iterables + .asList(Iterables.filter(sharding.shards(subBoundary.bounds()), + shard -> subBoundary.overlaps(shard.bounds()))) + .stream()) + .collect(Collectors.toSet()); + } + private static ConfiguredTaggableFilter getTaggableFilterFrom(final String path, final Map configuration) { @@ -401,17 +417,20 @@ public void start(final CommandMap command) .mapToPair(this.sliceRawAtlas(boundaries)).filter(tuple -> tuple._2() != null); // Persist the RDD and save the intermediary state + final String slicedRawAtlasPath = getAlternateSubFolderOutput(output, + SLICED_RAW_ATLAS_FOLDER); countrySlicedRawAtlasShardsRDD.cache(); - countrySlicedRawAtlasShardsRDD.saveAsHadoopFile( - getAlternateSubFolderOutput(output, SLICED_RAW_ATLAS_FOLDER), Text.class, + countrySlicedRawAtlasShardsRDD.saveAsHadoopFile(slicedRawAtlasPath, Text.class, Atlas.class, MultipleAtlasOutputFormat.class, new JobConf(configuration())); logger.info("\n\n********** SAVED THE SLICED RAW ATLAS **********\n"); // Section the sliced raw Atlas final JavaPairRDD countryAtlasShardsRDD = countrySlicedRawAtlasShardsRDD - .mapToPair(this.sectionRawAtlas(boundaries, sparkContext, atlasLoadingOptions)); + .mapToPair(this.sectionRawAtlas(boundaries, sharding, sparkContext, + atlasLoadingOptions, slicedRawAtlasPath)); - // Save the result, one for each key + // Persist the RDD and save the final atlas + countryAtlasShardsRDD.cache(); countryAtlasShardsRDD.saveAsHadoopFile( getAlternateSubFolderOutput(output, ATLAS_FOLDER), Text.class, Atlas.class, MultipleAtlasOutputFormat.class, new JobConf(configuration())); @@ -788,17 +807,22 @@ private PairFunction generateRawAtlas( /** * @param boundaries * The {@link CountryBoundaryMap} required to create an {@link AtlasLoadingOption} + * @param sharding + * The {@link Sharding} strategy * @param sparkContext * Spark context (or configuration) as a key-value map * @param loadingOptions * The basic required properties to create an {@link AtlasLoadingOption} + * @param slicedRawAtlasPath + * The path where the sliced raw atlas files were saved * @return a Spark {@link PairFunction} that processes a tuple of shard-name and sliced raw * atlas, sections the sliced raw atlas and returns the final sectioned (and sliced) raw * atlas for that shard name. */ private PairFunction, String, Atlas> sectionRawAtlas( - final CountryBoundaryMap boundaries, final Map sparkContext, - final Map loadingOptions) + final CountryBoundaryMap boundaries, final Sharding sharding, + final Map sparkContext, final Map loadingOptions, + final String slicedRawAtlasPath) { return tuple -> { @@ -808,14 +832,26 @@ private PairFunction, String, Atlas> sectionRawAtlas( final AtlasLoadingOption atlasLoadingOption = buildAtlasLoadingOption(boundaries, sparkContext, loadingOptions); - // TODO - Leverage sharding and Atlas fetcher here! + // Calculate the shard, country name and possible shards + final String countryShardString = tuple._1(); + final CountryShard countryShard = CountryShard.forName(countryShardString); + final String country = countryShard.getCountry(); + final Set possibleShards = getAllShardsForCountry(boundaries, country, + sharding); + + // Create the fetcher + final Function> slicedRawAtlasFetcher = AtlasGeneratorHelper + .atlasFetcher(SparkFileHelper.combine(slicedRawAtlasPath, country), + System.getProperty("java.io.tmpdir"), country, sparkContext, + possibleShards); // Section the Atlas - atlas = new WaySectionProcessor(tuple._2(), atlasLoadingOption).run(); + atlas = new WaySectionProcessor(countryShard.getShard(), atlasLoadingOption, + sharding, slicedRawAtlasFetcher).run(); } catch (final Throwable e) { - throw new CoreException("Slicing Atlas {} failed!", tuple._2().getName(), e); + throw new CoreException("Sectioning Raw Atlas {} failed!", tuple._2().getName(), e); } // Report on memory usage diff --git a/src/main/java/org/openstreetmap/atlas/generator/AtlasGeneratorHelper.java b/src/main/java/org/openstreetmap/atlas/generator/AtlasGeneratorHelper.java new file mode 100644 index 00000000..93ba99e2 --- /dev/null +++ b/src/main/java/org/openstreetmap/atlas/generator/AtlasGeneratorHelper.java @@ -0,0 +1,175 @@ +package org.openstreetmap.atlas.generator; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.openstreetmap.atlas.generator.tools.filesystem.FileSystemCreator; +import org.openstreetmap.atlas.generator.tools.filesystem.FileSystemHelper; +import org.openstreetmap.atlas.generator.tools.spark.utilities.SparkFileHelper; +import org.openstreetmap.atlas.geography.atlas.Atlas; +import org.openstreetmap.atlas.geography.atlas.AtlasResourceLoader; +import org.openstreetmap.atlas.geography.sharding.Shard; +import org.openstreetmap.atlas.streaming.resource.File; +import org.openstreetmap.atlas.streaming.resource.FileSuffix; +import org.openstreetmap.atlas.streaming.resource.Resource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class for {@link AtlasGenerator}. + * + * @author matthieun + * @author mgostintsev + */ +public final class AtlasGeneratorHelper implements Serializable +{ + private static final long serialVersionUID = 1300098384789754747L; + private static final Logger logger = LoggerFactory.getLogger(AtlasGeneratorHelper.class); + + private static final String GZIPPED_ATLAS_EXTENSION = FileSuffix.ATLAS.toString() + + FileSuffix.GZIP.toString(); + private static final String ATLAS_EXTENSION = FileSuffix.ATLAS.toString(); + private static final AtlasResourceLoader ATLAS_LOADER = new AtlasResourceLoader(); + + /** + * @param atlasDirectory + * The path of the folder containing the Atlas files, in format CTRY_z-x-y.atlas.gz + * @param temporaryDirectory + * The path of the temporary folder to download Atlas files if they are not + * downloaded already + * @param country + * The country to look for + * @param sparkContext + * The Spark configuration as a map (to allow the creation of the proper FileSystem) + * @param validShards + * All available shards for given country, to avoid fetching shards that do not exist + * @return A function that returns an {@link Atlas} given a {@link Shard} + */ + public static Function> atlasFetcher(final String atlasDirectory, + final String temporaryDirectory, final String country, + final Map sparkContext, final Set validShards) + { + // & Serializable is very important as that function will be passed around by Spark, and + // functions are not serializable by default. + return (Function> & Serializable) shard -> + { + if (!validShards.contains(shard)) + { + return Optional.empty(); + } + + // Check if non-gzipped file exists in final temporary directory + final String pathFromTemporaryDirectory = SparkFileHelper.combine(temporaryDirectory, + String.format("%s%s", getAtlasName(country, shard), ATLAS_EXTENSION)); + final File fileFromTemporaryDirectory = new File(pathFromTemporaryDirectory); + + // Download file to disk if it is not cached already + if (!fileFromTemporaryDirectory.exists()) + { + try + { + String path = SparkFileHelper.combine(atlasDirectory, + String.format("%s%s", getAtlasName(country, shard), ATLAS_EXTENSION)); + + if (!fileExists(path, sparkContext)) + { + path = SparkFileHelper.combine(atlasDirectory, String.format("%s%s", + getAtlasName(country, shard), GZIPPED_ATLAS_EXTENSION)); + } + + final Resource fileFromNetwork = FileSystemHelper.resource(path, sparkContext); + final File temporaryLocalFile = File + .temporary(getAtlasName(country, shard) + "-", ATLAS_EXTENSION); + + System.out.println("Downloaded atlas from " + path + + " and is found as temp file " + temporaryLocalFile.getAbsolutePath()); + + // FileSystemHelper.resource sets the Decompressor on the Resource for us, so + // this call will gunzip the file + fileFromNetwork.copyTo(temporaryLocalFile); + + // Before making the move, check again if file is there or not + if (!fileFromTemporaryDirectory.exists()) + { + try + { + Files.move(Paths.get(temporaryLocalFile.getPath()), + Paths.get(fileFromTemporaryDirectory.getPath()), + StandardCopyOption.ATOMIC_MOVE); + } + catch (final FileAlreadyExistsException e) + { + logger.warn("Failed to rename file, but file exists already.", e); + } + catch (final Exception e) + { + logger.warn("Failed to rename file on local disk.", e); + } + } + } + catch (final Exception e) + { + logger.warn("Failed to cache file on local disk.", e); + } + } + + // If we were able to find the file on local disk, then load from there + if (fileFromTemporaryDirectory.exists()) + { + System.out.println("AtlasExisted - Cache Hit: " + + fileFromTemporaryDirectory.getAbsolutePath()); + return loadAtlas(fileFromTemporaryDirectory); + } + else + { + logger.warn("Falling back to Atlas file hosted on {} for shard {}.", atlasDirectory, + shard.getName()); + final String path = SparkFileHelper.combine(atlasDirectory, + String.format("%s%s", getAtlasName(country, shard), ATLAS_EXTENSION)); + final Resource fileFromNetwork = FileSystemHelper.resource(path, sparkContext); + return loadAtlas(fileFromNetwork); + } + }; + } + + private static boolean fileExists(final String path, final Map configuration) + { + final FileSystem fileSystem = new FileSystemCreator().get(path, configuration); + try + { + return fileSystem.exists(new Path(path)); + } + catch (IllegalArgumentException | IOException e) + { + logger.warn("can't determine if {} exists", path); + return false; + } + } + + private static String getAtlasName(final String country, final Shard shard) + { + return String.format("%s_%s", country, shard.getName()); + } + + private static Optional loadAtlas(final Resource file) + { + return Optional.ofNullable(ATLAS_LOADER.load(file)); + } + + /** + * Hide constructor for this utility class. + */ + private AtlasGeneratorHelper() + { + } +} From 8fdcd9f0b22c3708caa8e6e8a1c4d010dd4fafa9 Mon Sep 17 00:00:00 2001 From: Michael Gostintsev Date: Wed, 11 Apr 2018 15:28:56 -0700 Subject: [PATCH 7/9] Depending on atlas 5.0.12 --- dependencies.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dependencies.gradle b/dependencies.gradle index 4d2fb293..76592d0e 100644 --- a/dependencies.gradle +++ b/dependencies.gradle @@ -1,6 +1,6 @@ project.ext.versions = [ checkstyle: '7.6.1', - atlas: '5.0.11', + atlas: '5.0.12', spark: '1.6.0-cdh5.7.0', snappy: '1.1.1.6', ] From 06bfd42cd65069d2327159d7455342c0c8e64b9b Mon Sep 17 00:00:00 2001 From: Michael Gostintsev Date: Thu, 12 Apr 2018 10:47:42 -0700 Subject: [PATCH 8/9] Re-using shard calculation, unpersisting un-needed RDDs, moving more methods to helper class --- .../atlas/generator/AtlasGenerator.java | 373 ++---------------- .../atlas/generator/AtlasGeneratorHelper.java | 343 +++++++++++++++- 2 files changed, 372 insertions(+), 344 deletions(-) diff --git a/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java b/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java index 4bcd76b2..ea37132c 100644 --- a/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java +++ b/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java @@ -5,7 +5,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -16,8 +15,6 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.function.PairFlatMapFunction; -import org.apache.spark.api.java.function.PairFunction; import org.openstreetmap.atlas.exception.CoreException; import org.openstreetmap.atlas.generator.persistence.AbstractMultipleAtlasBasedOutputFormat; import org.openstreetmap.atlas.generator.persistence.MultipleAtlasCountryStatisticsOutputFormat; @@ -26,28 +23,19 @@ import org.openstreetmap.atlas.generator.persistence.delta.RemovedMultipleAtlasDeltaOutputFormat; import org.openstreetmap.atlas.generator.persistence.scheme.SlippyTilePersistenceScheme; import org.openstreetmap.atlas.generator.sharding.AtlasSharding; -import org.openstreetmap.atlas.generator.tools.filesystem.FileSystemHelper; import org.openstreetmap.atlas.generator.tools.spark.SparkJob; -import org.openstreetmap.atlas.generator.tools.spark.utilities.SparkFileHelper; import org.openstreetmap.atlas.geography.atlas.Atlas; import org.openstreetmap.atlas.geography.atlas.delta.AtlasDelta; import org.openstreetmap.atlas.geography.atlas.pbf.AtlasLoadingOption; -import org.openstreetmap.atlas.geography.atlas.raw.sectioning.WaySectionProcessor; -import org.openstreetmap.atlas.geography.atlas.raw.slicing.RawAtlasCountrySlicer; import org.openstreetmap.atlas.geography.atlas.statistics.AtlasStatistics; -import org.openstreetmap.atlas.geography.atlas.statistics.Counter; import org.openstreetmap.atlas.geography.boundary.CountryBoundary; import org.openstreetmap.atlas.geography.boundary.CountryBoundaryMap; import org.openstreetmap.atlas.geography.boundary.CountryBoundaryMapArchiver; import org.openstreetmap.atlas.geography.sharding.CountryShard; import org.openstreetmap.atlas.geography.sharding.Shard; import org.openstreetmap.atlas.geography.sharding.Sharding; -import org.openstreetmap.atlas.locale.IsoCountry; import org.openstreetmap.atlas.streaming.resource.Resource; -import org.openstreetmap.atlas.tags.filters.ConfiguredTaggableFilter; -import org.openstreetmap.atlas.utilities.collections.Iterables; import org.openstreetmap.atlas.utilities.collections.StringList; -import org.openstreetmap.atlas.utilities.configuration.StandardConfiguration; import org.openstreetmap.atlas.utilities.conversion.StringConverter; import org.openstreetmap.atlas.utilities.maps.MultiMapWithSet; import org.openstreetmap.atlas.utilities.runtime.CommandMap; @@ -225,59 +213,6 @@ protected static List generateTasks(final StringList countr return tasks; } - protected static StandardConfiguration getStandardConfigurationFrom(final String path, - final Map configuration) - { - return new StandardConfiguration(FileSystemHelper.resource(path, configuration)); - } - - private static AtlasLoadingOption buildAtlasLoadingOption(final CountryBoundaryMap boundaries, - final Map sparkContext, final Map properties) - { - final AtlasLoadingOption atlasLoadingOption = AtlasLoadingOption - .createOptionWithAllEnabled(boundaries); - - // Apply all configurations - final String edgeConfiguration = properties.get(EDGE_CONFIGURATION.getName()); - if (edgeConfiguration != null) - { - atlasLoadingOption - .setEdgeFilter(getTaggableFilterFrom(edgeConfiguration, sparkContext)); - } - - final String waySectioningConfiguration = properties - .get(WAY_SECTIONING_CONFIGURATION.getName()); - if (waySectioningConfiguration != null) - { - atlasLoadingOption.setWaySectionFilter( - getTaggableFilterFrom(waySectioningConfiguration, sparkContext)); - } - - final String pbfNodeConfiguration = properties.get(PBF_NODE_CONFIGURATION.getName()); - if (pbfNodeConfiguration != null) - { - atlasLoadingOption - .setOsmPbfNodeFilter(getTaggableFilterFrom(pbfNodeConfiguration, sparkContext)); - } - - final String pbfWayConfiguration = properties.get(PBF_WAY_CONFIGURATION.getName()); - if (pbfWayConfiguration != null) - { - atlasLoadingOption - .setOsmPbfWayFilter(getTaggableFilterFrom(pbfWayConfiguration, sparkContext)); - } - - final String pbfRelationConfiguration = properties - .get(PBF_RELATION_CONFIGURATION.getName()); - if (pbfRelationConfiguration != null) - { - atlasLoadingOption.setOsmPbfRelationFilter( - getTaggableFilterFrom(pbfRelationConfiguration, sparkContext)); - } - - return atlasLoadingOption; - } - private static Map extractAtlasLoadingProperties(final CommandMap command) { final Map propertyMap = Maps.newHashMap(); @@ -298,26 +233,6 @@ private static Map extractAtlasLoadingProperties(final CommandMa return propertyMap; } - private static Set getAllShardsForCountry(final CountryBoundaryMap boundaries, - final String country, final Sharding sharding) - { - return boundaries.countryBoundary(country).stream() - .flatMap( - boundary -> boundary.getBoundary().outers() - .stream()) - .flatMap(subBoundary -> Iterables - .asList(Iterables.filter(sharding.shards(subBoundary.bounds()), - shard -> subBoundary.overlaps(shard.bounds()))) - .stream()) - .collect(Collectors.toSet()); - } - - private static ConfiguredTaggableFilter getTaggableFilterFrom(final String path, - final Map configuration) - { - return new ConfiguredTaggableFilter(getStandardConfigurationFrom(path, configuration)); - } - @Override public String getName() { @@ -401,8 +316,9 @@ public void start(final CommandMap command) // Generate the raw Atlas and filter any null atlases final JavaPairRDD countryRawAtlasShardsRDD = getContext() - .parallelize(tasks, tasks.size()).mapToPair(this.generateRawAtlas(boundaries, - sparkContext, atlasLoadingOptions, pbfContext, atlasScheme)) + .parallelize(tasks, tasks.size()) + .mapToPair(AtlasGeneratorHelper.generateRawAtlas(boundaries, sparkContext, + atlasLoadingOptions, pbfContext, atlasScheme)) .filter(tuple -> tuple._2() != null); // Persist the RDD and save the intermediary state @@ -414,7 +330,8 @@ public void start(final CommandMap command) // Slice the raw Atlas and filter any null atlases final JavaPairRDD countrySlicedRawAtlasShardsRDD = countryRawAtlasShardsRDD - .mapToPair(this.sliceRawAtlas(boundaries)).filter(tuple -> tuple._2() != null); + .mapToPair(AtlasGeneratorHelper.sliceRawAtlas(boundaries)) + .filter(tuple -> tuple._2() != null); // Persist the RDD and save the intermediary state final String slicedRawAtlasPath = getAlternateSubFolderOutput(output, @@ -424,10 +341,13 @@ public void start(final CommandMap command) Atlas.class, MultipleAtlasOutputFormat.class, new JobConf(configuration())); logger.info("\n\n********** SAVED THE SLICED RAW ATLAS **********\n"); + // Remove the raw atlas RDD from cache since we've cached the sliced RDD + countryRawAtlasShardsRDD.unpersist(); + // Section the sliced raw Atlas final JavaPairRDD countryAtlasShardsRDD = countrySlicedRawAtlasShardsRDD - .mapToPair(this.sectionRawAtlas(boundaries, sharding, sparkContext, - atlasLoadingOptions, slicedRawAtlasPath)); + .mapToPair(AtlasGeneratorHelper.sectionRawAtlas(boundaries, sharding, + sparkContext, atlasLoadingOptions, slicedRawAtlasPath, tasks)); // Persist the RDD and save the final atlas countryAtlasShardsRDD.cache(); @@ -436,9 +356,12 @@ public void start(final CommandMap command) MultipleAtlasOutputFormat.class, new JobConf(configuration())); logger.info("\n\n********** SAVED THE FINAL ATLAS **********\n"); + // Remove the sliced atlas RDD from cache since we've cached the final RDD + countrySlicedRawAtlasShardsRDD.unpersist(); + // Create the metrics final JavaPairRDD statisticsRDD = countryAtlasShardsRDD - .mapToPair(generateAtlasStatistics(sharding)); + .mapToPair(AtlasGeneratorHelper.generateAtlasStatistics(sharding)); // Persist the RDD and save statisticsRDD.cache(); @@ -468,8 +391,8 @@ public void start(final CommandMap command) if (!previousOutputForDelta.isEmpty()) { final JavaPairRDD deltasRDD = countryAtlasShardsRDD - .flatMapToPair( - this.computeAtlasDelta(sparkContext, previousOutputForDelta)); + .flatMapToPair(AtlasGeneratorHelper.computeAtlasDelta(sparkContext, + previousOutputForDelta)); // Save the deltas deltasRDD.saveAsHadoopFile(getAlternateSubFolderOutput(output, SHARD_DELTAS_FOLDER), @@ -497,28 +420,29 @@ public void start(final CommandMap command) // Apply all configurations if (edgeConfiguration != null) { - atlasLoadingOption.setEdgeFilter( - getTaggableFilterFrom(edgeConfiguration, sparkContext)); + atlasLoadingOption.setEdgeFilter(AtlasGeneratorHelper + .getTaggableFilterFrom(edgeConfiguration, sparkContext)); } if (waySectioningConfiguration != null) { - atlasLoadingOption.setWaySectionFilter(getTaggableFilterFrom( - waySectioningConfiguration, sparkContext)); + atlasLoadingOption + .setWaySectionFilter(AtlasGeneratorHelper.getTaggableFilterFrom( + waySectioningConfiguration, sparkContext)); } if (pbfNodeConfiguration != null) { - atlasLoadingOption.setOsmPbfNodeFilter( - getTaggableFilterFrom(pbfNodeConfiguration, sparkContext)); + atlasLoadingOption.setOsmPbfNodeFilter(AtlasGeneratorHelper + .getTaggableFilterFrom(pbfNodeConfiguration, sparkContext)); } if (pbfWayConfiguration != null) { - atlasLoadingOption.setOsmPbfWayFilter( - getTaggableFilterFrom(pbfWayConfiguration, sparkContext)); + atlasLoadingOption.setOsmPbfWayFilter(AtlasGeneratorHelper + .getTaggableFilterFrom(pbfWayConfiguration, sparkContext)); } if (pbfRelationConfiguration != null) { - atlasLoadingOption.setOsmPbfRelationFilter( - getTaggableFilterFrom(pbfRelationConfiguration, sparkContext)); + atlasLoadingOption.setOsmPbfRelationFilter(AtlasGeneratorHelper + .getTaggableFilterFrom(pbfRelationConfiguration, sparkContext)); } // Build the appropriate PbfLoader @@ -547,7 +471,7 @@ public void start(final CommandMap command) return result; }); - // Filter out null Atlas. + // Filter out null Atlas final JavaPairRDD countryNonNullAtlasShardsRDD = countryAtlasShardsRDD .filter(tuple -> tuple._2() != null); @@ -557,7 +481,7 @@ public void start(final CommandMap command) // Run the metrics final JavaPairRDD statisticsRDD = countryNonNullAtlasShardsRDD - .mapToPair(generateAtlasStatistics(sharding)); + .mapToPair(AtlasGeneratorHelper.generateAtlasStatistics(sharding)); // Cache the statistics statisticsRDD.cache(); @@ -599,7 +523,8 @@ public void start(final CommandMap command) { // Compute the deltas final JavaPairRDD deltasRDD = countryNonNullAtlasShardsRDD - .flatMapToPair(computeAtlasDelta(sparkContext, previousOutputForDelta)); + .flatMapToPair(AtlasGeneratorHelper.computeAtlasDelta(sparkContext, + previousOutputForDelta)); // deltasRDD.cache(); // logger.info("\n\n********** CACHED THE DELTAS **********\n"); @@ -677,240 +602,4 @@ protected SwitchList switches() EDGE_CONFIGURATION, WAY_SECTIONING_CONFIGURATION, PBF_NODE_CONFIGURATION, PBF_WAY_CONFIGURATION, PBF_RELATION_CONFIGURATION, ATLAS_SCHEME, USE_RAW_ATLAS); } - - /** - * @param sparkContext - * Spark context (or configuration) as a key-value map - * @param previousOutputForDelta - * Previous Atlas generation delta output location - * @return A Spark {@link PairFlatMapFunction} that takes a tuple of a country shard name and - * atlas file and returns all the {@link AtlasDelta} for the country - */ - private PairFlatMapFunction, String, AtlasDelta> computeAtlasDelta( - final Map sparkContext, final String previousOutputForDelta) - { - return tuple -> - { - final String countryShardName = tuple._1(); - final Atlas current = tuple._2(); - final List> result = new ArrayList<>(); - try - { - final Optional alter = new AtlasLocator(sparkContext).atlasForShard( - previousOutputForDelta + "/" - + StringList.split(countryShardName, - CountryShard.COUNTRY_SHARD_SEPARATOR).get(0), - countryShardName); - if (alter.isPresent()) - { - logger.info("Printing memory after other Atlas loaded for Delta {}", - countryShardName); - Memory.printCurrentMemory(); - final AtlasDelta delta = new AtlasDelta(current, alter.get()).generate(); - result.add(new Tuple2<>(countryShardName, delta)); - } - } - catch (final Exception e) - { - logger.error("Skipping! Could not generate deltas for {}", countryShardName, e); - } - return result; - }; - } - - /** - * @param sharding - * The sharding tree - * @return a Spark {@link PairFunction} that processes a shard to Atlas tuple, and constructs a - * {@link AtlasStatistics} for each shard. - */ - private PairFunction, String, AtlasStatistics> generateAtlasStatistics( - final Sharding sharding) - { - return tuple -> - { - final Counter counter = new Counter().withSharding(sharding); - counter.setCountsDefinition(Counter.POI_COUNTS_DEFINITION.getDefault()); - final AtlasStatistics statistics; - try - { - statistics = counter.processAtlas(tuple._2()); - } - catch (final Exception e) - { - throw new CoreException("Building Atlas Statistics for {} failed!", tuple._1(), e); - } - final Tuple2 result = new Tuple2<>(tuple._1(), statistics); - return result; - }; - } - - /** - * @param boundaries - * The {@link CountryBoundaryMap} to use for pbf to atlas generation - * @param sparkContext - * Spark context (or configuration) as a key-value map - * @param loadingOptions - * The basic required properties to create an {@link AtlasLoadingOption} - * @param pbfContext - * The context explaining where to find the PBFs - * @param atlasScheme - * The folder structure of the output atlas - * @return a Spark {@link PairFunction} that processes an {@link AtlasGenerationTask}, loads the - * PBF for the task's shard, generates the raw atlas for the shard and outputs a shard - * name to raw atlas tuple. - */ - private PairFunction generateRawAtlas( - final CountryBoundaryMap boundaries, final Map sparkContext, - final Map loadingOptions, final PbfContext pbfContext, - final SlippyTilePersistenceScheme atlasScheme) - { - return task -> - { - final String countryName = task.getCountry(); - final Shard shard = task.getShard(); - - // Set the country code that is being processed! - final AtlasLoadingOption atlasLoadingOption = buildAtlasLoadingOption(boundaries, - sparkContext, loadingOptions); - atlasLoadingOption.setAdditionalCountryCodes(countryName); - - // Build the PbfLoader - final PbfLoader loader = new PbfLoader(pbfContext, sparkContext, boundaries, - atlasLoadingOption, loadingOptions.get(CODE_VERSION.getName()), - loadingOptions.get(DATA_VERSION.getName()), task.getAllShards()); - final String name = countryName + CountryShard.COUNTRY_SHARD_SEPARATOR - + shard.getName(); - - // Generate the raw Atlas for this shard - final Atlas atlas; - try - { - atlas = loader.generateRawAtlas(countryName, shard); - } - catch (final Throwable e) - { - throw new CoreException("Building raw Atlas {} failed!", name, e); - } - - // Report on memory usage - logger.info("Printing memory after loading raw Atlas {}", name); - Memory.printCurrentMemory(); - - // Output the Name/Atlas couple - final Tuple2 result = new Tuple2<>( - name + CountryShard.COUNTRY_SHARD_SEPARATOR + atlasScheme.getScheme(), atlas); - return result; - }; - } - - /** - * @param boundaries - * The {@link CountryBoundaryMap} required to create an {@link AtlasLoadingOption} - * @param sharding - * The {@link Sharding} strategy - * @param sparkContext - * Spark context (or configuration) as a key-value map - * @param loadingOptions - * The basic required properties to create an {@link AtlasLoadingOption} - * @param slicedRawAtlasPath - * The path where the sliced raw atlas files were saved - * @return a Spark {@link PairFunction} that processes a tuple of shard-name and sliced raw - * atlas, sections the sliced raw atlas and returns the final sectioned (and sliced) raw - * atlas for that shard name. - */ - private PairFunction, String, Atlas> sectionRawAtlas( - final CountryBoundaryMap boundaries, final Sharding sharding, - final Map sparkContext, final Map loadingOptions, - final String slicedRawAtlasPath) - { - return tuple -> - { - final Atlas atlas; - try - { - final AtlasLoadingOption atlasLoadingOption = buildAtlasLoadingOption(boundaries, - sparkContext, loadingOptions); - - // Calculate the shard, country name and possible shards - final String countryShardString = tuple._1(); - final CountryShard countryShard = CountryShard.forName(countryShardString); - final String country = countryShard.getCountry(); - final Set possibleShards = getAllShardsForCountry(boundaries, country, - sharding); - - // Create the fetcher - final Function> slicedRawAtlasFetcher = AtlasGeneratorHelper - .atlasFetcher(SparkFileHelper.combine(slicedRawAtlasPath, country), - System.getProperty("java.io.tmpdir"), country, sparkContext, - possibleShards); - - // Section the Atlas - atlas = new WaySectionProcessor(countryShard.getShard(), atlasLoadingOption, - sharding, slicedRawAtlasFetcher).run(); - } - catch (final Throwable e) - { - throw new CoreException("Sectioning Raw Atlas {} failed!", tuple._2().getName(), e); - } - - // Report on memory usage - logger.info("Printing memory after loading final Atlas {}", tuple._2().getName()); - Memory.printCurrentMemory(); - - // Output the Name/Atlas couple - final Tuple2 result = new Tuple2<>(tuple._1(), atlas); - return result; - }; - } - - /** - * @param boundaries - * The {@link CountryBoundaryMap} to use for slicing - * @return a Spark {@link PairFunction} that processes a tuple of shard-name and raw atlas, - * slices the raw atlas and returns the sliced raw atlas for that shard name. - */ - private PairFunction, String, Atlas> sliceRawAtlas( - final CountryBoundaryMap boundaries) - { - return tuple -> - { - final Atlas slicedAtlas; - - // Grab the tuple contents - final String shardName = tuple._1(); - final Atlas rawAtlas = tuple._2(); - - try - { - // Extract the country code - final Set isoCountries = new HashSet<>(); - final Optional countryName = IsoCountry - .forCountryCode(shardName.split(CountryShard.COUNTRY_SHARD_SEPARATOR)[0]); - if (countryName.isPresent()) - { - isoCountries.add(countryName.get()); - } - else - { - logger.error("Unable to extract valid IsoCountry code from {}", shardName); - } - - // Slice the Atlas - slicedAtlas = new RawAtlasCountrySlicer(isoCountries, boundaries).slice(rawAtlas); - } - catch (final Throwable e) - { - throw new CoreException("Slicing raw Atlas {} failed!", rawAtlas.getName(), e); - } - - // Report on memory usage - logger.info("Printing memory after loading sliced raw Atlas {}", rawAtlas.getName()); - Memory.printCurrentMemory(); - - // Output the Name/Atlas couple - final Tuple2 result = new Tuple2<>(tuple._1(), slicedAtlas); - return result; - }; - } } diff --git a/src/main/java/org/openstreetmap/atlas/generator/AtlasGeneratorHelper.java b/src/main/java/org/openstreetmap/atlas/generator/AtlasGeneratorHelper.java index 93ba99e2..faf1ac4c 100644 --- a/src/main/java/org/openstreetmap/atlas/generator/AtlasGeneratorHelper.java +++ b/src/main/java/org/openstreetmap/atlas/generator/AtlasGeneratorHelper.java @@ -6,6 +6,10 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -13,18 +17,38 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.api.java.function.PairFunction; +import org.openstreetmap.atlas.exception.CoreException; +import org.openstreetmap.atlas.generator.persistence.scheme.SlippyTilePersistenceScheme; import org.openstreetmap.atlas.generator.tools.filesystem.FileSystemCreator; import org.openstreetmap.atlas.generator.tools.filesystem.FileSystemHelper; import org.openstreetmap.atlas.generator.tools.spark.utilities.SparkFileHelper; import org.openstreetmap.atlas.geography.atlas.Atlas; import org.openstreetmap.atlas.geography.atlas.AtlasResourceLoader; +import org.openstreetmap.atlas.geography.atlas.delta.AtlasDelta; +import org.openstreetmap.atlas.geography.atlas.pbf.AtlasLoadingOption; +import org.openstreetmap.atlas.geography.atlas.raw.sectioning.WaySectionProcessor; +import org.openstreetmap.atlas.geography.atlas.raw.slicing.RawAtlasCountrySlicer; +import org.openstreetmap.atlas.geography.atlas.statistics.AtlasStatistics; +import org.openstreetmap.atlas.geography.atlas.statistics.Counter; +import org.openstreetmap.atlas.geography.boundary.CountryBoundaryMap; +import org.openstreetmap.atlas.geography.sharding.CountryShard; import org.openstreetmap.atlas.geography.sharding.Shard; +import org.openstreetmap.atlas.geography.sharding.Sharding; +import org.openstreetmap.atlas.locale.IsoCountry; import org.openstreetmap.atlas.streaming.resource.File; import org.openstreetmap.atlas.streaming.resource.FileSuffix; import org.openstreetmap.atlas.streaming.resource.Resource; +import org.openstreetmap.atlas.tags.filters.ConfiguredTaggableFilter; +import org.openstreetmap.atlas.utilities.collections.StringList; +import org.openstreetmap.atlas.utilities.configuration.StandardConfiguration; +import org.openstreetmap.atlas.utilities.runtime.system.memory.Memory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.Tuple2; + /** * Utility class for {@link AtlasGenerator}. * @@ -55,7 +79,7 @@ public final class AtlasGeneratorHelper implements Serializable * All available shards for given country, to avoid fetching shards that do not exist * @return A function that returns an {@link Atlas} given a {@link Shard} */ - public static Function> atlasFetcher(final String atlasDirectory, + protected static Function> atlasFetcher(final String atlasDirectory, final String temporaryDirectory, final String country, final Map sparkContext, final Set validShards) { @@ -63,7 +87,7 @@ public static Function> atlasFetcher(final String atlasDi // functions are not serializable by default. return (Function> & Serializable) shard -> { - if (!validShards.contains(shard)) + if (!validShards.isEmpty() && !validShards.contains(shard)) { return Optional.empty(); } @@ -142,6 +166,306 @@ public static Function> atlasFetcher(final String atlasDi }; } + /** + * @param sparkContext + * Spark context (or configuration) as a key-value map + * @param previousOutputForDelta + * Previous Atlas generation delta output location + * @return A Spark {@link PairFlatMapFunction} that takes a tuple of a country shard name and + * atlas file and returns all the {@link AtlasDelta} for the country + */ + protected static PairFlatMapFunction, String, AtlasDelta> computeAtlasDelta( + final Map sparkContext, final String previousOutputForDelta) + { + return tuple -> + { + final String countryShardName = tuple._1(); + final Atlas current = tuple._2(); + final List> result = new ArrayList<>(); + try + { + final Optional alter = new AtlasLocator(sparkContext).atlasForShard( + previousOutputForDelta + "/" + + StringList.split(countryShardName, + CountryShard.COUNTRY_SHARD_SEPARATOR).get(0), + countryShardName); + if (alter.isPresent()) + { + logger.info("Printing memory after other Atlas loaded for Delta {}", + countryShardName); + Memory.printCurrentMemory(); + final AtlasDelta delta = new AtlasDelta(current, alter.get()).generate(); + result.add(new Tuple2<>(countryShardName, delta)); + } + } + catch (final Exception e) + { + logger.error("Skipping! Could not generate deltas for {}", countryShardName, e); + } + return result; + }; + } + + /** + * @param sharding + * The sharding tree + * @return a Spark {@link PairFunction} that processes a shard to Atlas tuple, and constructs a + * {@link AtlasStatistics} for each shard. + */ + protected static PairFunction, String, AtlasStatistics> generateAtlasStatistics( + final Sharding sharding) + { + return tuple -> + { + final Counter counter = new Counter().withSharding(sharding); + counter.setCountsDefinition(Counter.POI_COUNTS_DEFINITION.getDefault()); + final AtlasStatistics statistics; + try + { + statistics = counter.processAtlas(tuple._2()); + } + catch (final Exception e) + { + throw new CoreException("Building Atlas Statistics for {} failed!", tuple._1(), e); + } + final Tuple2 result = new Tuple2<>(tuple._1(), statistics); + return result; + }; + } + + /** + * @param boundaries + * The {@link CountryBoundaryMap} to use for pbf to atlas generation + * @param sparkContext + * Spark context (or configuration) as a key-value map + * @param loadingOptions + * The basic required properties to create an {@link AtlasLoadingOption} + * @param pbfContext + * The context explaining where to find the PBFs + * @param atlasScheme + * The folder structure of the output atlas + * @return a Spark {@link PairFunction} that processes an {@link AtlasGenerationTask}, loads the + * PBF for the task's shard, generates the raw atlas for the shard and outputs a shard + * name to raw atlas tuple. + */ + protected static PairFunction generateRawAtlas( + final CountryBoundaryMap boundaries, final Map sparkContext, + final Map loadingOptions, final PbfContext pbfContext, + final SlippyTilePersistenceScheme atlasScheme) + { + return task -> + { + final String countryName = task.getCountry(); + final Shard shard = task.getShard(); + + // Set the country code that is being processed! + final AtlasLoadingOption atlasLoadingOption = buildAtlasLoadingOption(boundaries, + sparkContext, loadingOptions); + atlasLoadingOption.setAdditionalCountryCodes(countryName); + + // Build the PbfLoader + final PbfLoader loader = new PbfLoader(pbfContext, sparkContext, boundaries, + atlasLoadingOption, loadingOptions.get(AtlasGenerator.CODE_VERSION.getName()), + loadingOptions.get(AtlasGenerator.DATA_VERSION.getName()), task.getAllShards()); + final String name = countryName + CountryShard.COUNTRY_SHARD_SEPARATOR + + shard.getName(); + + // Generate the raw Atlas for this shard + final Atlas atlas; + try + { + atlas = loader.generateRawAtlas(countryName, shard); + } + catch (final Throwable e) + { + throw new CoreException("Building raw Atlas {} failed!", name, e); + } + + // Report on memory usage + logger.info("Printing memory after loading raw Atlas {}", name); + Memory.printCurrentMemory(); + + // Output the Name/Atlas couple + final Tuple2 result = new Tuple2<>( + name + CountryShard.COUNTRY_SHARD_SEPARATOR + atlasScheme.getScheme(), atlas); + return result; + }; + } + + protected static StandardConfiguration getStandardConfigurationFrom(final String path, + final Map configuration) + { + return new StandardConfiguration(FileSystemHelper.resource(path, configuration)); + } + + protected static ConfiguredTaggableFilter getTaggableFilterFrom(final String path, + final Map configuration) + { + return new ConfiguredTaggableFilter(getStandardConfigurationFrom(path, configuration)); + } + + /** + * @param boundaries + * The {@link CountryBoundaryMap} required to create an {@link AtlasLoadingOption} + * @param sharding + * The {@link Sharding} strategy + * @param sparkContext + * Spark context (or configuration) as a key-value map + * @param loadingOptions + * The basic required properties to create an {@link AtlasLoadingOption} + * @param slicedRawAtlasPath + * The path where the sliced raw atlas files were saved + * @param tasks + * The list of {@link AtlasGenerationTask}s used to grab all possible {@link Shard}s + * for a country + * @return a Spark {@link PairFunction} that processes a tuple of shard-name and sliced raw + * atlas, sections the sliced raw atlas and returns the final sectioned (and sliced) raw + * atlas for that shard name. + */ + protected static PairFunction, String, Atlas> sectionRawAtlas( + final CountryBoundaryMap boundaries, final Sharding sharding, + final Map sparkContext, final Map loadingOptions, + final String slicedRawAtlasPath, final List tasks) + { + return tuple -> + { + final Atlas atlas; + try + { + final AtlasLoadingOption atlasLoadingOption = buildAtlasLoadingOption(boundaries, + sparkContext, loadingOptions); + + // Calculate the shard, country name and possible shards + final String countryShardString = tuple._1(); + final CountryShard countryShard = CountryShard.forName(countryShardString); + final String country = countryShard.getCountry(); + final Set possibleShards = getAllShardsForCountry(tasks, country); + + // Create the fetcher + final Function> slicedRawAtlasFetcher = AtlasGeneratorHelper + .atlasFetcher(SparkFileHelper.combine(slicedRawAtlasPath, country), + System.getProperty("java.io.tmpdir"), country, sparkContext, + possibleShards); + + // Section the Atlas + atlas = new WaySectionProcessor(countryShard.getShard(), atlasLoadingOption, + sharding, slicedRawAtlasFetcher).run(); + } + catch (final Throwable e) + { + throw new CoreException("Sectioning Raw Atlas {} failed!", tuple._2().getName(), e); + } + + // Report on memory usage + logger.info("Printing memory after loading final Atlas {}", tuple._2().getName()); + Memory.printCurrentMemory(); + + // Output the Name/Atlas couple + final Tuple2 result = new Tuple2<>(tuple._1(), atlas); + return result; + }; + } + + /** + * @param boundaries + * The {@link CountryBoundaryMap} to use for slicing + * @return a Spark {@link PairFunction} that processes a tuple of shard-name and raw atlas, + * slices the raw atlas and returns the sliced raw atlas for that shard name. + */ + protected static PairFunction, String, Atlas> sliceRawAtlas( + final CountryBoundaryMap boundaries) + { + return tuple -> + { + final Atlas slicedAtlas; + + // Grab the tuple contents + final String shardName = tuple._1(); + final Atlas rawAtlas = tuple._2(); + + try + { + // Extract the country code + final Set isoCountries = new HashSet<>(); + final Optional countryName = IsoCountry + .forCountryCode(shardName.split(CountryShard.COUNTRY_SHARD_SEPARATOR)[0]); + if (countryName.isPresent()) + { + isoCountries.add(countryName.get()); + } + else + { + logger.error("Unable to extract valid IsoCountry code from {}", shardName); + } + + // Slice the Atlas + slicedAtlas = new RawAtlasCountrySlicer(isoCountries, boundaries).slice(rawAtlas); + } + catch (final Throwable e) + { + throw new CoreException("Slicing raw Atlas {} failed!", rawAtlas.getName(), e); + } + + // Report on memory usage + logger.info("Printing memory after loading sliced raw Atlas {}", rawAtlas.getName()); + Memory.printCurrentMemory(); + + // Output the Name/Atlas couple + final Tuple2 result = new Tuple2<>(tuple._1(), slicedAtlas); + return result; + }; + } + + private static AtlasLoadingOption buildAtlasLoadingOption(final CountryBoundaryMap boundaries, + final Map sparkContext, final Map properties) + { + final AtlasLoadingOption atlasLoadingOption = AtlasLoadingOption + .createOptionWithAllEnabled(boundaries); + + // Apply all configurations + final String edgeConfiguration = properties + .get(AtlasGenerator.EDGE_CONFIGURATION.getName()); + if (edgeConfiguration != null) + { + atlasLoadingOption + .setEdgeFilter(getTaggableFilterFrom(edgeConfiguration, sparkContext)); + } + + final String waySectioningConfiguration = properties + .get(AtlasGenerator.WAY_SECTIONING_CONFIGURATION.getName()); + if (waySectioningConfiguration != null) + { + atlasLoadingOption.setWaySectionFilter( + getTaggableFilterFrom(waySectioningConfiguration, sparkContext)); + } + + final String pbfNodeConfiguration = properties + .get(AtlasGenerator.PBF_NODE_CONFIGURATION.getName()); + if (pbfNodeConfiguration != null) + { + atlasLoadingOption + .setOsmPbfNodeFilter(getTaggableFilterFrom(pbfNodeConfiguration, sparkContext)); + } + + final String pbfWayConfiguration = properties + .get(AtlasGenerator.PBF_WAY_CONFIGURATION.getName()); + if (pbfWayConfiguration != null) + { + atlasLoadingOption + .setOsmPbfWayFilter(getTaggableFilterFrom(pbfWayConfiguration, sparkContext)); + } + + final String pbfRelationConfiguration = properties + .get(AtlasGenerator.PBF_RELATION_CONFIGURATION.getName()); + if (pbfRelationConfiguration != null) + { + atlasLoadingOption.setOsmPbfRelationFilter( + getTaggableFilterFrom(pbfRelationConfiguration, sparkContext)); + } + + return atlasLoadingOption; + } + private static boolean fileExists(final String path, final Map configuration) { final FileSystem fileSystem = new FileSystemCreator().get(path, configuration); @@ -156,6 +480,21 @@ private static boolean fileExists(final String path, final Map c } } + private static Set getAllShardsForCountry(final List tasks, + final String country) + { + for (final AtlasGenerationTask task : tasks) + { + if (task.getCountry().equals(country)) + { + // We found the target country, return its shards + return task.getAllShards(); + } + } + logger.debug("Could not find shards for {}", country); + return Collections.emptySet(); + } + private static String getAtlasName(final String country, final Shard shard) { return String.format("%s_%s", country, shard.getName()); From 45fd4600783a0039ac537fc9efc018aac054b20e Mon Sep 17 00:00:00 2001 From: Michael Gostintsev Date: Thu, 12 Apr 2018 13:10:18 -0700 Subject: [PATCH 9/9] Removing google common Maps dependency --- .../org/openstreetmap/atlas/generator/AtlasGenerator.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java b/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java index ea37132c..a82dbf82 100644 --- a/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java +++ b/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java @@ -2,6 +2,7 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -45,8 +46,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Maps; - import scala.Tuple2; /** @@ -215,7 +214,7 @@ protected static List generateTasks(final StringList countr private static Map extractAtlasLoadingProperties(final CommandMap command) { - final Map propertyMap = Maps.newHashMap(); + final Map propertyMap = new HashMap<>(); propertyMap.put(CODE_VERSION.getName(), (String) command.get(CODE_VERSION)); propertyMap.put(DATA_VERSION.getName(), (String) command.get(DATA_VERSION)); propertyMap.put(EDGE_CONFIGURATION.getName(), (String) command.get(EDGE_CONFIGURATION));