diff --git a/dependencies.gradle b/dependencies.gradle index 37e23bd1..76592d0e 100644 --- a/dependencies.gradle +++ b/dependencies.gradle @@ -1,6 +1,6 @@ project.ext.versions = [ checkstyle: '7.6.1', - atlas: '5.0.10', + atlas: '5.0.12', spark: '1.6.0-cdh5.7.0', snappy: '1.1.1.6', ] diff --git a/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java b/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java index 69a366d2..a82dbf82 100644 --- a/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java +++ b/src/main/java/org/openstreetmap/atlas/generator/AtlasGenerator.java @@ -2,10 +2,10 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -24,13 +24,11 @@ import org.openstreetmap.atlas.generator.persistence.delta.RemovedMultipleAtlasDeltaOutputFormat; import org.openstreetmap.atlas.generator.persistence.scheme.SlippyTilePersistenceScheme; import org.openstreetmap.atlas.generator.sharding.AtlasSharding; -import org.openstreetmap.atlas.generator.tools.filesystem.FileSystemHelper; import org.openstreetmap.atlas.generator.tools.spark.SparkJob; import org.openstreetmap.atlas.geography.atlas.Atlas; import org.openstreetmap.atlas.geography.atlas.delta.AtlasDelta; import org.openstreetmap.atlas.geography.atlas.pbf.AtlasLoadingOption; import org.openstreetmap.atlas.geography.atlas.statistics.AtlasStatistics; -import org.openstreetmap.atlas.geography.atlas.statistics.Counter; import org.openstreetmap.atlas.geography.boundary.CountryBoundary; import org.openstreetmap.atlas.geography.boundary.CountryBoundaryMap; import org.openstreetmap.atlas.geography.boundary.CountryBoundaryMapArchiver; @@ -38,9 +36,7 @@ import org.openstreetmap.atlas.geography.sharding.Shard; import org.openstreetmap.atlas.geography.sharding.Sharding; import org.openstreetmap.atlas.streaming.resource.Resource; -import org.openstreetmap.atlas.tags.filters.ConfiguredTaggableFilter; import org.openstreetmap.atlas.utilities.collections.StringList; -import org.openstreetmap.atlas.utilities.configuration.StandardConfiguration; import org.openstreetmap.atlas.utilities.conversion.StringConverter; import org.openstreetmap.atlas.utilities.maps.MultiMapWithSet; import org.openstreetmap.atlas.utilities.runtime.CommandMap; @@ -53,13 +49,13 @@ import scala.Tuple2; /** - * Generate {@link Atlas} Shards for a specific version and a specific set of Countries + * Generate {@link Atlas} Shards for a specific version and a specific set of countries * * @author matthieun + * @author mgostintsev */ public class AtlasGenerator extends SparkJob { - /** * @author matthieun */ @@ -79,6 +75,8 @@ public String apply(final String key) private static final Logger logger = LoggerFactory.getLogger(AtlasGenerator.class); public static final String ATLAS_FOLDER = "atlas"; + public static final String RAW_ATLAS_FOLDER = "rawAtlas"; + public static final String SLICED_RAW_ATLAS_FOLDER = "slicedRawAtlas"; public static final String SHARD_STATISTICS_FOLDER = "shardStats"; public static final String COUNTRY_STATISTICS_FOLDER = "countryStats"; public static final String SHARD_DELTAS_FOLDER = "deltas"; @@ -137,6 +135,9 @@ public String apply(final String key) "osmPbfRelationConfiguration", "The path to the configuration file that defines which PBF Relations becomes an Atlas Entity", StringConverter.IDENTITY, Optionality.OPTIONAL); + // TODO - once this is fully baked and tested, remove flag and old flow + private static final Flag USE_RAW_ATLAS = new Flag("useRawAtlas", + "Allow PBF to Atlas process to use Raw Atlas flow"); public static void main(final String[] args) { @@ -175,7 +176,7 @@ protected static List generateTasks(final StringList countr } // Queue boundary for processing - countryBoundaries.forEach(countryBoundary -> queue.add(countryBoundary)); + countryBoundaries.forEach(queue::add); }); // Use all available processors except one (used by main thread) @@ -211,16 +212,24 @@ protected static List generateTasks(final StringList countr return tasks; } - protected static StandardConfiguration getStandardConfigurationFrom(final String path, - final Map configuration) - { - return new StandardConfiguration(FileSystemHelper.resource(path, configuration)); - } - - private static ConfiguredTaggableFilter getTaggableFilterFrom(final String path, - final Map configuration) + private static Map extractAtlasLoadingProperties(final CommandMap command) { - return new ConfiguredTaggableFilter(getStandardConfigurationFrom(path, configuration)); + final Map propertyMap = new HashMap<>(); + propertyMap.put(CODE_VERSION.getName(), (String) command.get(CODE_VERSION)); + propertyMap.put(DATA_VERSION.getName(), (String) command.get(DATA_VERSION)); + propertyMap.put(EDGE_CONFIGURATION.getName(), (String) command.get(EDGE_CONFIGURATION)); + propertyMap.put(WAY_SECTIONING_CONFIGURATION.getName(), + (String) command.get(WAY_SECTIONING_CONFIGURATION)); + propertyMap.put(PBF_NODE_CONFIGURATION.getName(), + (String) command.get(PBF_NODE_CONFIGURATION)); + propertyMap.put(PBF_WAY_CONFIGURATION.getName(), + (String) command.get(PBF_WAY_CONFIGURATION)); + propertyMap.put(PBF_RELATION_CONFIGURATION.getName(), + (String) command.get(PBF_RELATION_CONFIGURATION)); + propertyMap.put(CODE_VERSION.getName(), (String) command.get(CODE_VERSION)); + propertyMap.put(DATA_VERSION.getName(), (String) command.get(DATA_VERSION)); + + return propertyMap; } @Override @@ -272,6 +281,8 @@ public void start(final CommandMap command) final String pbfNodeConfiguration = (String) command.get(PBF_NODE_CONFIGURATION); final String pbfWayConfiguration = (String) command.get(PBF_WAY_CONFIGURATION); final String pbfRelationConfiguration = (String) command.get(PBF_RELATION_CONFIGURATION); + final boolean useRawAtlas = (boolean) command.get(USE_RAW_ATLAS); + final String output = output(command); final Map sparkContext = configurationMap(); @@ -295,222 +306,276 @@ public void start(final CommandMap command) final List tasks = generateTasks(countries, boundaries, sharding); logger.debug("Generated {} tasks in {}.", tasks.size(), timer.elapsedSince()); - // Transform the map country name to shard to country name to Atlas - // This is not enforced, but it has to be a 1-1 mapping here. - final Map configurationMap = configurationMap(); - final JavaPairRDD countryAtlasShardsRDD = getContext() - .parallelize(tasks, tasks.size()).mapToPair(task -> - { - // Get the country name - final String countryName = task.getCountry(); - // Get the country shard - final Shard shard = task.getShard(); - // Build the AtlasLoadingOption - final AtlasLoadingOption atlasLoadingOption = AtlasLoadingOption - .createOptionWithAllEnabled(boundaries) - .setAdditionalCountryCodes(countryName); - - // Apply all configurations - if (edgeConfiguration != null) - { - atlasLoadingOption.setEdgeFilter( - getTaggableFilterFrom(edgeConfiguration, configurationMap)); - } - if (waySectioningConfiguration != null) - { - atlasLoadingOption.setWaySectionFilter(getTaggableFilterFrom( - waySectioningConfiguration, configurationMap)); - } - if (pbfNodeConfiguration != null) - { - atlasLoadingOption.setOsmPbfNodeFilter( - getTaggableFilterFrom(pbfNodeConfiguration, configurationMap)); - } - if (pbfWayConfiguration != null) - { - atlasLoadingOption.setOsmPbfWayFilter( - getTaggableFilterFrom(pbfWayConfiguration, configurationMap)); - } - if (pbfRelationConfiguration != null) - { - atlasLoadingOption.setOsmPbfRelationFilter( - getTaggableFilterFrom(pbfRelationConfiguration, configurationMap)); - } - - // Build the appropriate PbfLoader - final PbfLoader loader = new PbfLoader(pbfContext, sparkContext, boundaries, - atlasLoadingOption, codeVersion, dataVersion, task.getAllShards()); - final String name = countryName + CountryShard.COUNTRY_SHARD_SEPARATOR - + shard.getName(); - final Atlas atlas; - try - { - // Generate the Atlas for this shard - atlas = loader.load(countryName, shard); - } - catch (final Throwable e) - { - throw new CoreException("Building Atlas {} failed!", name, e); - } - - // Report on memory usage - logger.info("Printing memory after loading Atlas {}", name); - Memory.printCurrentMemory(); - // Output the Name/Atlas couple - final Tuple2 result = new Tuple2<>( - name + CountryShard.COUNTRY_SHARD_SEPARATOR + atlasScheme.getScheme(), - atlas); - return result; - }); - - // Filter out null Atlas. - final JavaPairRDD countryNonNullAtlasShardsRDD = countryAtlasShardsRDD - .filter(tuple -> tuple._2() != null); - - // Cache the Atlas - countryNonNullAtlasShardsRDD.cache(); - logger.info("\n\n********** CACHED THE ATLAS **********\n"); - - // Run the metrics - final JavaPairRDD statisticsRDD = countryNonNullAtlasShardsRDD - .mapToPair(tuple -> - { - final Counter counter = new Counter().withSharding(sharding); - counter.setCountsDefinition(Counter.POI_COUNTS_DEFINITION.getDefault()); - final AtlasStatistics statistics; - try - { - statistics = counter.processAtlas(tuple._2()); - } - catch (final Exception e) - { - throw new CoreException("Building Atlas Statistics for {} failed!", - tuple._1(), e); - } - final Tuple2 result = new Tuple2<>(tuple._1(), - statistics); - return result; - }); - - // Cache the statistics - statisticsRDD.cache(); - logger.info("\n\n********** CACHED THE SHARD STATISTICS **********\n"); - - // Save the metrics - // splitAndSaveAsHadoopFile(statisticsRDD, - // getAlternateParallelFolderOutput(output, SHARD_STATISTICS_FOLDER), - // AtlasStatistics.class, MultipleAtlasStatisticsOutputFormat.class, - // new CountrySplitter()); - statisticsRDD.saveAsHadoopFile(getAlternateSubFolderOutput(output, SHARD_STATISTICS_FOLDER), - Text.class, AtlasStatistics.class, MultipleAtlasStatisticsOutputFormat.class, - new JobConf(configuration())); - logger.info("\n\n********** SAVED THE SHARD STATISTICS **********\n"); - - // Aggregate the metrics - final JavaPairRDD reducedStatisticsRDD = statisticsRDD - .mapToPair(tuple -> - { - final String countryShardName = tuple._1(); - final String countryName = StringList.split(countryShardName, "_").get(0); - return new Tuple2<>(countryName, tuple._2()); - }).reduceByKey(AtlasStatistics::merge); - - // Save the aggregated metrics - // reducedStatisticsRDD.saveAsHadoopFile( - // getAlternateSubFolderOutput(output, COUNTRY_STATISTICS_FOLDER), Text.class, - // AtlasStatistics.class, MultipleAtlasStatisticsOutputFormat.class, - // new JobConf(configuration())); - reducedStatisticsRDD.saveAsHadoopFile( - getAlternateSubFolderOutput(output, COUNTRY_STATISTICS_FOLDER), Text.class, - AtlasStatistics.class, MultipleAtlasCountryStatisticsOutputFormat.class, - new JobConf(configuration())); - logger.info("\n\n********** SAVED THE COUNTRY STATISTICS **********\n"); - - // Compute the deltas if needed - if (!previousOutputForDelta.isEmpty()) + if (useRawAtlas) { - // Compute the deltas - final JavaPairRDD deltasRDD = countryNonNullAtlasShardsRDD - .flatMapToPair(tuple -> + // AtlasLoadingOption isn't serializable, neither is command map. To avoid duplicating + // boiler-plate code for creating the AtlasLoadingOption, extract the properties we need + // from the command map and pass those around to create the AtlasLoadingOption + final Map atlasLoadingOptions = extractAtlasLoadingProperties(command); + + // Generate the raw Atlas and filter any null atlases + final JavaPairRDD countryRawAtlasShardsRDD = getContext() + .parallelize(tasks, tasks.size()) + .mapToPair(AtlasGeneratorHelper.generateRawAtlas(boundaries, sparkContext, + atlasLoadingOptions, pbfContext, atlasScheme)) + .filter(tuple -> tuple._2() != null); + + // Persist the RDD and save the intermediary state + countryRawAtlasShardsRDD.cache(); + countryRawAtlasShardsRDD.saveAsHadoopFile( + getAlternateSubFolderOutput(output, RAW_ATLAS_FOLDER), Text.class, Atlas.class, + MultipleAtlasOutputFormat.class, new JobConf(configuration())); + logger.info("\n\n********** SAVED THE RAW ATLAS **********\n"); + + // Slice the raw Atlas and filter any null atlases + final JavaPairRDD countrySlicedRawAtlasShardsRDD = countryRawAtlasShardsRDD + .mapToPair(AtlasGeneratorHelper.sliceRawAtlas(boundaries)) + .filter(tuple -> tuple._2() != null); + + // Persist the RDD and save the intermediary state + final String slicedRawAtlasPath = getAlternateSubFolderOutput(output, + SLICED_RAW_ATLAS_FOLDER); + countrySlicedRawAtlasShardsRDD.cache(); + countrySlicedRawAtlasShardsRDD.saveAsHadoopFile(slicedRawAtlasPath, Text.class, + Atlas.class, MultipleAtlasOutputFormat.class, new JobConf(configuration())); + logger.info("\n\n********** SAVED THE SLICED RAW ATLAS **********\n"); + + // Remove the raw atlas RDD from cache since we've cached the sliced RDD + countryRawAtlasShardsRDD.unpersist(); + + // Section the sliced raw Atlas + final JavaPairRDD countryAtlasShardsRDD = countrySlicedRawAtlasShardsRDD + .mapToPair(AtlasGeneratorHelper.sectionRawAtlas(boundaries, sharding, + sparkContext, atlasLoadingOptions, slicedRawAtlasPath, tasks)); + + // Persist the RDD and save the final atlas + countryAtlasShardsRDD.cache(); + countryAtlasShardsRDD.saveAsHadoopFile( + getAlternateSubFolderOutput(output, ATLAS_FOLDER), Text.class, Atlas.class, + MultipleAtlasOutputFormat.class, new JobConf(configuration())); + logger.info("\n\n********** SAVED THE FINAL ATLAS **********\n"); + + // Remove the sliced atlas RDD from cache since we've cached the final RDD + countrySlicedRawAtlasShardsRDD.unpersist(); + + // Create the metrics + final JavaPairRDD statisticsRDD = countryAtlasShardsRDD + .mapToPair(AtlasGeneratorHelper.generateAtlasStatistics(sharding)); + + // Persist the RDD and save + statisticsRDD.cache(); + statisticsRDD.saveAsHadoopFile( + getAlternateSubFolderOutput(output, SHARD_STATISTICS_FOLDER), Text.class, + AtlasStatistics.class, MultipleAtlasStatisticsOutputFormat.class, + new JobConf(configuration())); + logger.info("\n\n********** SAVED THE SHARD STATISTICS **********\n"); + + // Aggregate the metrics + final JavaPairRDD reducedStatisticsRDD = statisticsRDD + .mapToPair(tuple -> { final String countryShardName = tuple._1(); - final Atlas current = tuple._2(); - final List> result = new ArrayList<>(); + final String countryName = StringList.split(countryShardName, "_").get(0); + return new Tuple2<>(countryName, tuple._2()); + }).reduceByKey(AtlasStatistics::merge); + + // Save aggregated metrics + reducedStatisticsRDD.saveAsHadoopFile( + getAlternateSubFolderOutput(output, COUNTRY_STATISTICS_FOLDER), Text.class, + AtlasStatistics.class, MultipleAtlasCountryStatisticsOutputFormat.class, + new JobConf(configuration())); + logger.info("\n\n********** SAVED THE COUNTRY STATISTICS **********\n"); + + // Compute the deltas, if needed + if (!previousOutputForDelta.isEmpty()) + { + final JavaPairRDD deltasRDD = countryAtlasShardsRDD + .flatMapToPair(AtlasGeneratorHelper.computeAtlasDelta(sparkContext, + previousOutputForDelta)); + + // Save the deltas + deltasRDD.saveAsHadoopFile(getAlternateSubFolderOutput(output, SHARD_DELTAS_FOLDER), + Text.class, AtlasDelta.class, RemovedMultipleAtlasDeltaOutputFormat.class, + new JobConf(configuration())); + logger.info("\n\n********** SAVED THE DELTAS **********\n"); + } + } + else + { + // Transform the map country name to shard to country name to Atlas + // This is not enforced, but it has to be a 1-1 mapping here. + final JavaPairRDD countryAtlasShardsRDD = getContext() + .parallelize(tasks, tasks.size()).mapToPair(task -> + { + // Get the country name + final String countryName = task.getCountry(); + // Get the country shard + final Shard shard = task.getShard(); + // Build the AtlasLoadingOption + final AtlasLoadingOption atlasLoadingOption = AtlasLoadingOption + .createOptionWithAllEnabled(boundaries) + .setAdditionalCountryCodes(countryName); + + // Apply all configurations + if (edgeConfiguration != null) + { + atlasLoadingOption.setEdgeFilter(AtlasGeneratorHelper + .getTaggableFilterFrom(edgeConfiguration, sparkContext)); + } + if (waySectioningConfiguration != null) + { + atlasLoadingOption + .setWaySectionFilter(AtlasGeneratorHelper.getTaggableFilterFrom( + waySectioningConfiguration, sparkContext)); + } + if (pbfNodeConfiguration != null) + { + atlasLoadingOption.setOsmPbfNodeFilter(AtlasGeneratorHelper + .getTaggableFilterFrom(pbfNodeConfiguration, sparkContext)); + } + if (pbfWayConfiguration != null) + { + atlasLoadingOption.setOsmPbfWayFilter(AtlasGeneratorHelper + .getTaggableFilterFrom(pbfWayConfiguration, sparkContext)); + } + if (pbfRelationConfiguration != null) + { + atlasLoadingOption.setOsmPbfRelationFilter(AtlasGeneratorHelper + .getTaggableFilterFrom(pbfRelationConfiguration, sparkContext)); + } + + // Build the appropriate PbfLoader + final PbfLoader loader = new PbfLoader(pbfContext, sparkContext, boundaries, + atlasLoadingOption, codeVersion, dataVersion, task.getAllShards()); + final String name = countryName + CountryShard.COUNTRY_SHARD_SEPARATOR + + shard.getName(); + final Atlas atlas; try { - final Optional alter = new AtlasLocator(sparkContext) - .atlasForShard(previousOutputForDelta + "/" - + StringList - .split(countryShardName, - CountryShard.COUNTRY_SHARD_SEPARATOR) - .get(0), - countryShardName); - if (alter.isPresent()) - { - logger.info("Printing memory after other Atlas loaded for Delta {}", - countryShardName); - Memory.printCurrentMemory(); - final AtlasDelta delta = new AtlasDelta(current, alter.get()) - .generate(); - result.add(new Tuple2<>(countryShardName, delta)); - } + // Generate the Atlas for this shard + atlas = loader.load(countryName, shard); } - catch (final Exception e) + catch (final Throwable e) { - logger.error("Skipping! Could not generate deltas for {}", - countryShardName, e); + throw new CoreException("Building Atlas {} failed!", name, e); } + + // Report on memory usage + logger.info("Printing memory after loading Atlas {}", name); + Memory.printCurrentMemory(); + // Output the Name/Atlas couple + final Tuple2 result = new Tuple2<>(name + + CountryShard.COUNTRY_SHARD_SEPARATOR + atlasScheme.getScheme(), + atlas); return result; }); - // deltasRDD.cache(); - // logger.info("\n\n********** CACHED THE DELTAS **********\n"); + // Filter out null Atlas + final JavaPairRDD countryNonNullAtlasShardsRDD = countryAtlasShardsRDD + .filter(tuple -> tuple._2() != null); - // Save the deltas - // splitAndSaveAsHadoopFile(deltasRDD, getAlternateOutput(output, SHARD_DELTAS_FOLDER), - // AtlasDelta.class, MultipleAtlasDeltaOutputFormat.class, - // COUNTRY_NAME_FROM_COUNTRY_SHARD); - // deltasRDD.saveAsHadoopFile(getAlternateOutput(output, SHARD_DELTAS_FOLDER), - // Text.class, - // AtlasDelta.class, MultipleAtlasDeltaOutputFormat.class, - // new JobConf(configuration())); - // logger.info("\n\n********** SAVED THE DELTAS **********\n"); - // - // splitAndSaveAsHadoopFile(deltasRDD, - // getAlternateOutput(output, SHARD_DELTAS_ADDED_FOLDER), AtlasDelta.class, - // AddedMultipleAtlasDeltaOutputFormat.class, COUNTRY_NAME_FROM_COUNTRY_SHARD); - // deltasRDD.saveAsHadoopFile(getAlternateOutput(output, SHARD_DELTAS_ADDED_FOLDER), - // Text.class, AtlasDelta.class, AddedMultipleAtlasDeltaOutputFormat.class, - // new JobConf(configuration())); - // logger.info("\n\n********** SAVED THE DELTAS ADDED **********\n"); - // - // splitAndSaveAsHadoopFile(deltasRDD, - // getAlternateOutput(output, SHARD_DELTAS_CHANGED_FOLDER), AtlasDelta.class, - // ChangedMultipleAtlasDeltaOutputFormat.class, COUNTRY_NAME_FROM_COUNTRY_SHARD); - // deltasRDD.saveAsHadoopFile(getAlternateOutput(output, SHARD_DELTAS_CHANGED_FOLDER), - // Text.class, AtlasDelta.class, ChangedMultipleAtlasDeltaOutputFormat.class, - // new JobConf(configuration())); - // logger.info("\n\n********** SAVED THE DELTAS CHANGED **********\n"); + // Cache the Atlas + countryNonNullAtlasShardsRDD.cache(); + logger.info("\n\n********** CACHED THE ATLAS **********\n"); - // splitAndSaveAsHadoopFile(deltasRDD, - // getAlternateParallelFolderOutput(output, SHARD_DELTAS_REMOVED_FOLDER), - // AtlasDelta.class, RemovedMultipleAtlasDeltaOutputFormat.class, + // Run the metrics + final JavaPairRDD statisticsRDD = countryNonNullAtlasShardsRDD + .mapToPair(AtlasGeneratorHelper.generateAtlasStatistics(sharding)); + + // Cache the statistics + statisticsRDD.cache(); + logger.info("\n\n********** CACHED THE SHARD STATISTICS **********\n"); + + // Save the metrics + // splitAndSaveAsHadoopFile(statisticsRDD, + // getAlternateParallelFolderOutput(output, SHARD_STATISTICS_FOLDER), + // AtlasStatistics.class, MultipleAtlasStatisticsOutputFormat.class, // new CountrySplitter()); - deltasRDD.saveAsHadoopFile( - getAlternateSubFolderOutput(output, SHARD_DELTAS_REMOVED_FOLDER), Text.class, - AtlasDelta.class, RemovedMultipleAtlasDeltaOutputFormat.class, + statisticsRDD.saveAsHadoopFile( + getAlternateSubFolderOutput(output, SHARD_STATISTICS_FOLDER), Text.class, + AtlasStatistics.class, MultipleAtlasStatisticsOutputFormat.class, new JobConf(configuration())); - logger.info("\n\n********** SAVED THE DELTAS REMOVED **********\n"); - } + logger.info("\n\n********** SAVED THE SHARD STATISTICS **********\n"); + + // Aggregate the metrics + final JavaPairRDD reducedStatisticsRDD = statisticsRDD + .mapToPair(tuple -> + { + final String countryShardName = tuple._1(); + final String countryName = StringList.split(countryShardName, "_").get(0); + return new Tuple2<>(countryName, tuple._2()); + }).reduceByKey(AtlasStatistics::merge); + + // Save the aggregated metrics + // reducedStatisticsRDD.saveAsHadoopFile( + // getAlternateSubFolderOutput(output, COUNTRY_STATISTICS_FOLDER), Text.class, + // AtlasStatistics.class, MultipleAtlasStatisticsOutputFormat.class, + // new JobConf(configuration())); + reducedStatisticsRDD.saveAsHadoopFile( + getAlternateSubFolderOutput(output, COUNTRY_STATISTICS_FOLDER), Text.class, + AtlasStatistics.class, MultipleAtlasCountryStatisticsOutputFormat.class, + new JobConf(configuration())); + logger.info("\n\n********** SAVED THE COUNTRY STATISTICS **********\n"); + + // Compute the deltas if needed + if (!previousOutputForDelta.isEmpty()) + { + // Compute the deltas + final JavaPairRDD deltasRDD = countryNonNullAtlasShardsRDD + .flatMapToPair(AtlasGeneratorHelper.computeAtlasDelta(sparkContext, + previousOutputForDelta)); + + // deltasRDD.cache(); + // logger.info("\n\n********** CACHED THE DELTAS **********\n"); + + // Save the deltas + // splitAndSaveAsHadoopFile(deltasRDD, getAlternateOutput(output, + // SHARD_DELTAS_FOLDER), + // AtlasDelta.class, MultipleAtlasDeltaOutputFormat.class, + // COUNTRY_NAME_FROM_COUNTRY_SHARD); + // deltasRDD.saveAsHadoopFile(getAlternateOutput(output, SHARD_DELTAS_FOLDER), + // Text.class, + // AtlasDelta.class, MultipleAtlasDeltaOutputFormat.class, + // new JobConf(configuration())); + // logger.info("\n\n********** SAVED THE DELTAS **********\n"); + // + // splitAndSaveAsHadoopFile(deltasRDD, + // getAlternateOutput(output, SHARD_DELTAS_ADDED_FOLDER), AtlasDelta.class, + // AddedMultipleAtlasDeltaOutputFormat.class, COUNTRY_NAME_FROM_COUNTRY_SHARD); + // deltasRDD.saveAsHadoopFile(getAlternateOutput(output, SHARD_DELTAS_ADDED_FOLDER), + // Text.class, AtlasDelta.class, AddedMultipleAtlasDeltaOutputFormat.class, + // new JobConf(configuration())); + // logger.info("\n\n********** SAVED THE DELTAS ADDED **********\n"); + // + // splitAndSaveAsHadoopFile(deltasRDD, + // getAlternateOutput(output, SHARD_DELTAS_CHANGED_FOLDER), AtlasDelta.class, + // ChangedMultipleAtlasDeltaOutputFormat.class, COUNTRY_NAME_FROM_COUNTRY_SHARD); + // deltasRDD.saveAsHadoopFile(getAlternateOutput(output, + // SHARD_DELTAS_CHANGED_FOLDER), + // Text.class, AtlasDelta.class, ChangedMultipleAtlasDeltaOutputFormat.class, + // new JobConf(configuration())); + // logger.info("\n\n********** SAVED THE DELTAS CHANGED **********\n"); + + // splitAndSaveAsHadoopFile(deltasRDD, + // getAlternateParallelFolderOutput(output, SHARD_DELTAS_REMOVED_FOLDER), + // AtlasDelta.class, RemovedMultipleAtlasDeltaOutputFormat.class, + // new CountrySplitter()); + deltasRDD.saveAsHadoopFile( + getAlternateSubFolderOutput(output, SHARD_DELTAS_REMOVED_FOLDER), + Text.class, AtlasDelta.class, RemovedMultipleAtlasDeltaOutputFormat.class, + new JobConf(configuration())); + logger.info("\n\n********** SAVED THE DELTAS REMOVED **********\n"); + } - // Save the result as Atlas files, one for each key. - // splitAndSaveAsHadoopFile(countryNonNullAtlasShardsRDD, - // getAlternateParallelFolderOutput(output, ATLAS_FOLDER), Atlas.class, - // MultipleAtlasOutputFormat.class, new CountrySplitter()); - countryNonNullAtlasShardsRDD.saveAsHadoopFile( - getAlternateSubFolderOutput(output, ATLAS_FOLDER), Text.class, Atlas.class, - MultipleAtlasOutputFormat.class, new JobConf(configuration())); - logger.info("\n\n********** SAVED THE ATLAS **********\n"); + // Save the result as Atlas files, one for each key. + // splitAndSaveAsHadoopFile(countryNonNullAtlasShardsRDD, + // getAlternateParallelFolderOutput(output, ATLAS_FOLDER), Atlas.class, + // MultipleAtlasOutputFormat.class, new CountrySplitter()); + countryNonNullAtlasShardsRDD.saveAsHadoopFile( + getAlternateSubFolderOutput(output, ATLAS_FOLDER), Text.class, Atlas.class, + MultipleAtlasOutputFormat.class, new JobConf(configuration())); + logger.info("\n\n********** SAVED THE ATLAS **********\n"); + } } @Override @@ -534,6 +599,6 @@ protected SwitchList switches() return super.switches().with(COUNTRIES, COUNTRY_SHAPES, SHARDING_TYPE, PBF_PATH, PBF_SCHEME, PBF_SHARDING, PREVIOUS_OUTPUT_FOR_DELTA, CODE_VERSION, DATA_VERSION, EDGE_CONFIGURATION, WAY_SECTIONING_CONFIGURATION, PBF_NODE_CONFIGURATION, - PBF_WAY_CONFIGURATION, PBF_RELATION_CONFIGURATION, ATLAS_SCHEME); + PBF_WAY_CONFIGURATION, PBF_RELATION_CONFIGURATION, ATLAS_SCHEME, USE_RAW_ATLAS); } } diff --git a/src/main/java/org/openstreetmap/atlas/generator/AtlasGeneratorHelper.java b/src/main/java/org/openstreetmap/atlas/generator/AtlasGeneratorHelper.java new file mode 100644 index 00000000..faf1ac4c --- /dev/null +++ b/src/main/java/org/openstreetmap/atlas/generator/AtlasGeneratorHelper.java @@ -0,0 +1,514 @@ +package org.openstreetmap.atlas.generator; + +import java.io.IOException; +import java.io.Serializable; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.api.java.function.PairFunction; +import org.openstreetmap.atlas.exception.CoreException; +import org.openstreetmap.atlas.generator.persistence.scheme.SlippyTilePersistenceScheme; +import org.openstreetmap.atlas.generator.tools.filesystem.FileSystemCreator; +import org.openstreetmap.atlas.generator.tools.filesystem.FileSystemHelper; +import org.openstreetmap.atlas.generator.tools.spark.utilities.SparkFileHelper; +import org.openstreetmap.atlas.geography.atlas.Atlas; +import org.openstreetmap.atlas.geography.atlas.AtlasResourceLoader; +import org.openstreetmap.atlas.geography.atlas.delta.AtlasDelta; +import org.openstreetmap.atlas.geography.atlas.pbf.AtlasLoadingOption; +import org.openstreetmap.atlas.geography.atlas.raw.sectioning.WaySectionProcessor; +import org.openstreetmap.atlas.geography.atlas.raw.slicing.RawAtlasCountrySlicer; +import org.openstreetmap.atlas.geography.atlas.statistics.AtlasStatistics; +import org.openstreetmap.atlas.geography.atlas.statistics.Counter; +import org.openstreetmap.atlas.geography.boundary.CountryBoundaryMap; +import org.openstreetmap.atlas.geography.sharding.CountryShard; +import org.openstreetmap.atlas.geography.sharding.Shard; +import org.openstreetmap.atlas.geography.sharding.Sharding; +import org.openstreetmap.atlas.locale.IsoCountry; +import org.openstreetmap.atlas.streaming.resource.File; +import org.openstreetmap.atlas.streaming.resource.FileSuffix; +import org.openstreetmap.atlas.streaming.resource.Resource; +import org.openstreetmap.atlas.tags.filters.ConfiguredTaggableFilter; +import org.openstreetmap.atlas.utilities.collections.StringList; +import org.openstreetmap.atlas.utilities.configuration.StandardConfiguration; +import org.openstreetmap.atlas.utilities.runtime.system.memory.Memory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Tuple2; + +/** + * Utility class for {@link AtlasGenerator}. + * + * @author matthieun + * @author mgostintsev + */ +public final class AtlasGeneratorHelper implements Serializable +{ + private static final long serialVersionUID = 1300098384789754747L; + private static final Logger logger = LoggerFactory.getLogger(AtlasGeneratorHelper.class); + + private static final String GZIPPED_ATLAS_EXTENSION = FileSuffix.ATLAS.toString() + + FileSuffix.GZIP.toString(); + private static final String ATLAS_EXTENSION = FileSuffix.ATLAS.toString(); + private static final AtlasResourceLoader ATLAS_LOADER = new AtlasResourceLoader(); + + /** + * @param atlasDirectory + * The path of the folder containing the Atlas files, in format CTRY_z-x-y.atlas.gz + * @param temporaryDirectory + * The path of the temporary folder to download Atlas files if they are not + * downloaded already + * @param country + * The country to look for + * @param sparkContext + * The Spark configuration as a map (to allow the creation of the proper FileSystem) + * @param validShards + * All available shards for given country, to avoid fetching shards that do not exist + * @return A function that returns an {@link Atlas} given a {@link Shard} + */ + protected static Function> atlasFetcher(final String atlasDirectory, + final String temporaryDirectory, final String country, + final Map sparkContext, final Set validShards) + { + // & Serializable is very important as that function will be passed around by Spark, and + // functions are not serializable by default. + return (Function> & Serializable) shard -> + { + if (!validShards.isEmpty() && !validShards.contains(shard)) + { + return Optional.empty(); + } + + // Check if non-gzipped file exists in final temporary directory + final String pathFromTemporaryDirectory = SparkFileHelper.combine(temporaryDirectory, + String.format("%s%s", getAtlasName(country, shard), ATLAS_EXTENSION)); + final File fileFromTemporaryDirectory = new File(pathFromTemporaryDirectory); + + // Download file to disk if it is not cached already + if (!fileFromTemporaryDirectory.exists()) + { + try + { + String path = SparkFileHelper.combine(atlasDirectory, + String.format("%s%s", getAtlasName(country, shard), ATLAS_EXTENSION)); + + if (!fileExists(path, sparkContext)) + { + path = SparkFileHelper.combine(atlasDirectory, String.format("%s%s", + getAtlasName(country, shard), GZIPPED_ATLAS_EXTENSION)); + } + + final Resource fileFromNetwork = FileSystemHelper.resource(path, sparkContext); + final File temporaryLocalFile = File + .temporary(getAtlasName(country, shard) + "-", ATLAS_EXTENSION); + + System.out.println("Downloaded atlas from " + path + + " and is found as temp file " + temporaryLocalFile.getAbsolutePath()); + + // FileSystemHelper.resource sets the Decompressor on the Resource for us, so + // this call will gunzip the file + fileFromNetwork.copyTo(temporaryLocalFile); + + // Before making the move, check again if file is there or not + if (!fileFromTemporaryDirectory.exists()) + { + try + { + Files.move(Paths.get(temporaryLocalFile.getPath()), + Paths.get(fileFromTemporaryDirectory.getPath()), + StandardCopyOption.ATOMIC_MOVE); + } + catch (final FileAlreadyExistsException e) + { + logger.warn("Failed to rename file, but file exists already.", e); + } + catch (final Exception e) + { + logger.warn("Failed to rename file on local disk.", e); + } + } + } + catch (final Exception e) + { + logger.warn("Failed to cache file on local disk.", e); + } + } + + // If we were able to find the file on local disk, then load from there + if (fileFromTemporaryDirectory.exists()) + { + System.out.println("AtlasExisted - Cache Hit: " + + fileFromTemporaryDirectory.getAbsolutePath()); + return loadAtlas(fileFromTemporaryDirectory); + } + else + { + logger.warn("Falling back to Atlas file hosted on {} for shard {}.", atlasDirectory, + shard.getName()); + final String path = SparkFileHelper.combine(atlasDirectory, + String.format("%s%s", getAtlasName(country, shard), ATLAS_EXTENSION)); + final Resource fileFromNetwork = FileSystemHelper.resource(path, sparkContext); + return loadAtlas(fileFromNetwork); + } + }; + } + + /** + * @param sparkContext + * Spark context (or configuration) as a key-value map + * @param previousOutputForDelta + * Previous Atlas generation delta output location + * @return A Spark {@link PairFlatMapFunction} that takes a tuple of a country shard name and + * atlas file and returns all the {@link AtlasDelta} for the country + */ + protected static PairFlatMapFunction, String, AtlasDelta> computeAtlasDelta( + final Map sparkContext, final String previousOutputForDelta) + { + return tuple -> + { + final String countryShardName = tuple._1(); + final Atlas current = tuple._2(); + final List> result = new ArrayList<>(); + try + { + final Optional alter = new AtlasLocator(sparkContext).atlasForShard( + previousOutputForDelta + "/" + + StringList.split(countryShardName, + CountryShard.COUNTRY_SHARD_SEPARATOR).get(0), + countryShardName); + if (alter.isPresent()) + { + logger.info("Printing memory after other Atlas loaded for Delta {}", + countryShardName); + Memory.printCurrentMemory(); + final AtlasDelta delta = new AtlasDelta(current, alter.get()).generate(); + result.add(new Tuple2<>(countryShardName, delta)); + } + } + catch (final Exception e) + { + logger.error("Skipping! Could not generate deltas for {}", countryShardName, e); + } + return result; + }; + } + + /** + * @param sharding + * The sharding tree + * @return a Spark {@link PairFunction} that processes a shard to Atlas tuple, and constructs a + * {@link AtlasStatistics} for each shard. + */ + protected static PairFunction, String, AtlasStatistics> generateAtlasStatistics( + final Sharding sharding) + { + return tuple -> + { + final Counter counter = new Counter().withSharding(sharding); + counter.setCountsDefinition(Counter.POI_COUNTS_DEFINITION.getDefault()); + final AtlasStatistics statistics; + try + { + statistics = counter.processAtlas(tuple._2()); + } + catch (final Exception e) + { + throw new CoreException("Building Atlas Statistics for {} failed!", tuple._1(), e); + } + final Tuple2 result = new Tuple2<>(tuple._1(), statistics); + return result; + }; + } + + /** + * @param boundaries + * The {@link CountryBoundaryMap} to use for pbf to atlas generation + * @param sparkContext + * Spark context (or configuration) as a key-value map + * @param loadingOptions + * The basic required properties to create an {@link AtlasLoadingOption} + * @param pbfContext + * The context explaining where to find the PBFs + * @param atlasScheme + * The folder structure of the output atlas + * @return a Spark {@link PairFunction} that processes an {@link AtlasGenerationTask}, loads the + * PBF for the task's shard, generates the raw atlas for the shard and outputs a shard + * name to raw atlas tuple. + */ + protected static PairFunction generateRawAtlas( + final CountryBoundaryMap boundaries, final Map sparkContext, + final Map loadingOptions, final PbfContext pbfContext, + final SlippyTilePersistenceScheme atlasScheme) + { + return task -> + { + final String countryName = task.getCountry(); + final Shard shard = task.getShard(); + + // Set the country code that is being processed! + final AtlasLoadingOption atlasLoadingOption = buildAtlasLoadingOption(boundaries, + sparkContext, loadingOptions); + atlasLoadingOption.setAdditionalCountryCodes(countryName); + + // Build the PbfLoader + final PbfLoader loader = new PbfLoader(pbfContext, sparkContext, boundaries, + atlasLoadingOption, loadingOptions.get(AtlasGenerator.CODE_VERSION.getName()), + loadingOptions.get(AtlasGenerator.DATA_VERSION.getName()), task.getAllShards()); + final String name = countryName + CountryShard.COUNTRY_SHARD_SEPARATOR + + shard.getName(); + + // Generate the raw Atlas for this shard + final Atlas atlas; + try + { + atlas = loader.generateRawAtlas(countryName, shard); + } + catch (final Throwable e) + { + throw new CoreException("Building raw Atlas {} failed!", name, e); + } + + // Report on memory usage + logger.info("Printing memory after loading raw Atlas {}", name); + Memory.printCurrentMemory(); + + // Output the Name/Atlas couple + final Tuple2 result = new Tuple2<>( + name + CountryShard.COUNTRY_SHARD_SEPARATOR + atlasScheme.getScheme(), atlas); + return result; + }; + } + + protected static StandardConfiguration getStandardConfigurationFrom(final String path, + final Map configuration) + { + return new StandardConfiguration(FileSystemHelper.resource(path, configuration)); + } + + protected static ConfiguredTaggableFilter getTaggableFilterFrom(final String path, + final Map configuration) + { + return new ConfiguredTaggableFilter(getStandardConfigurationFrom(path, configuration)); + } + + /** + * @param boundaries + * The {@link CountryBoundaryMap} required to create an {@link AtlasLoadingOption} + * @param sharding + * The {@link Sharding} strategy + * @param sparkContext + * Spark context (or configuration) as a key-value map + * @param loadingOptions + * The basic required properties to create an {@link AtlasLoadingOption} + * @param slicedRawAtlasPath + * The path where the sliced raw atlas files were saved + * @param tasks + * The list of {@link AtlasGenerationTask}s used to grab all possible {@link Shard}s + * for a country + * @return a Spark {@link PairFunction} that processes a tuple of shard-name and sliced raw + * atlas, sections the sliced raw atlas and returns the final sectioned (and sliced) raw + * atlas for that shard name. + */ + protected static PairFunction, String, Atlas> sectionRawAtlas( + final CountryBoundaryMap boundaries, final Sharding sharding, + final Map sparkContext, final Map loadingOptions, + final String slicedRawAtlasPath, final List tasks) + { + return tuple -> + { + final Atlas atlas; + try + { + final AtlasLoadingOption atlasLoadingOption = buildAtlasLoadingOption(boundaries, + sparkContext, loadingOptions); + + // Calculate the shard, country name and possible shards + final String countryShardString = tuple._1(); + final CountryShard countryShard = CountryShard.forName(countryShardString); + final String country = countryShard.getCountry(); + final Set possibleShards = getAllShardsForCountry(tasks, country); + + // Create the fetcher + final Function> slicedRawAtlasFetcher = AtlasGeneratorHelper + .atlasFetcher(SparkFileHelper.combine(slicedRawAtlasPath, country), + System.getProperty("java.io.tmpdir"), country, sparkContext, + possibleShards); + + // Section the Atlas + atlas = new WaySectionProcessor(countryShard.getShard(), atlasLoadingOption, + sharding, slicedRawAtlasFetcher).run(); + } + catch (final Throwable e) + { + throw new CoreException("Sectioning Raw Atlas {} failed!", tuple._2().getName(), e); + } + + // Report on memory usage + logger.info("Printing memory after loading final Atlas {}", tuple._2().getName()); + Memory.printCurrentMemory(); + + // Output the Name/Atlas couple + final Tuple2 result = new Tuple2<>(tuple._1(), atlas); + return result; + }; + } + + /** + * @param boundaries + * The {@link CountryBoundaryMap} to use for slicing + * @return a Spark {@link PairFunction} that processes a tuple of shard-name and raw atlas, + * slices the raw atlas and returns the sliced raw atlas for that shard name. + */ + protected static PairFunction, String, Atlas> sliceRawAtlas( + final CountryBoundaryMap boundaries) + { + return tuple -> + { + final Atlas slicedAtlas; + + // Grab the tuple contents + final String shardName = tuple._1(); + final Atlas rawAtlas = tuple._2(); + + try + { + // Extract the country code + final Set isoCountries = new HashSet<>(); + final Optional countryName = IsoCountry + .forCountryCode(shardName.split(CountryShard.COUNTRY_SHARD_SEPARATOR)[0]); + if (countryName.isPresent()) + { + isoCountries.add(countryName.get()); + } + else + { + logger.error("Unable to extract valid IsoCountry code from {}", shardName); + } + + // Slice the Atlas + slicedAtlas = new RawAtlasCountrySlicer(isoCountries, boundaries).slice(rawAtlas); + } + catch (final Throwable e) + { + throw new CoreException("Slicing raw Atlas {} failed!", rawAtlas.getName(), e); + } + + // Report on memory usage + logger.info("Printing memory after loading sliced raw Atlas {}", rawAtlas.getName()); + Memory.printCurrentMemory(); + + // Output the Name/Atlas couple + final Tuple2 result = new Tuple2<>(tuple._1(), slicedAtlas); + return result; + }; + } + + private static AtlasLoadingOption buildAtlasLoadingOption(final CountryBoundaryMap boundaries, + final Map sparkContext, final Map properties) + { + final AtlasLoadingOption atlasLoadingOption = AtlasLoadingOption + .createOptionWithAllEnabled(boundaries); + + // Apply all configurations + final String edgeConfiguration = properties + .get(AtlasGenerator.EDGE_CONFIGURATION.getName()); + if (edgeConfiguration != null) + { + atlasLoadingOption + .setEdgeFilter(getTaggableFilterFrom(edgeConfiguration, sparkContext)); + } + + final String waySectioningConfiguration = properties + .get(AtlasGenerator.WAY_SECTIONING_CONFIGURATION.getName()); + if (waySectioningConfiguration != null) + { + atlasLoadingOption.setWaySectionFilter( + getTaggableFilterFrom(waySectioningConfiguration, sparkContext)); + } + + final String pbfNodeConfiguration = properties + .get(AtlasGenerator.PBF_NODE_CONFIGURATION.getName()); + if (pbfNodeConfiguration != null) + { + atlasLoadingOption + .setOsmPbfNodeFilter(getTaggableFilterFrom(pbfNodeConfiguration, sparkContext)); + } + + final String pbfWayConfiguration = properties + .get(AtlasGenerator.PBF_WAY_CONFIGURATION.getName()); + if (pbfWayConfiguration != null) + { + atlasLoadingOption + .setOsmPbfWayFilter(getTaggableFilterFrom(pbfWayConfiguration, sparkContext)); + } + + final String pbfRelationConfiguration = properties + .get(AtlasGenerator.PBF_RELATION_CONFIGURATION.getName()); + if (pbfRelationConfiguration != null) + { + atlasLoadingOption.setOsmPbfRelationFilter( + getTaggableFilterFrom(pbfRelationConfiguration, sparkContext)); + } + + return atlasLoadingOption; + } + + private static boolean fileExists(final String path, final Map configuration) + { + final FileSystem fileSystem = new FileSystemCreator().get(path, configuration); + try + { + return fileSystem.exists(new Path(path)); + } + catch (IllegalArgumentException | IOException e) + { + logger.warn("can't determine if {} exists", path); + return false; + } + } + + private static Set getAllShardsForCountry(final List tasks, + final String country) + { + for (final AtlasGenerationTask task : tasks) + { + if (task.getCountry().equals(country)) + { + // We found the target country, return its shards + return task.getAllShards(); + } + } + logger.debug("Could not find shards for {}", country); + return Collections.emptySet(); + } + + private static String getAtlasName(final String country, final Shard shard) + { + return String.format("%s_%s", country, shard.getName()); + } + + private static Optional loadAtlas(final Resource file) + { + return Optional.ofNullable(ATLAS_LOADER.load(file)); + } + + /** + * Hide constructor for this utility class. + */ + private AtlasGeneratorHelper() + { + } +} diff --git a/src/main/java/org/openstreetmap/atlas/generator/PbfLoader.java b/src/main/java/org/openstreetmap/atlas/generator/PbfLoader.java index e78d9799..e09dd7d0 100644 --- a/src/main/java/org/openstreetmap/atlas/generator/PbfLoader.java +++ b/src/main/java/org/openstreetmap/atlas/generator/PbfLoader.java @@ -4,6 +4,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -15,6 +16,7 @@ import org.openstreetmap.atlas.geography.atlas.packed.PackedAtlas; import org.openstreetmap.atlas.geography.atlas.pbf.AtlasLoadingOption; import org.openstreetmap.atlas.geography.atlas.pbf.OsmPbfLoader; +import org.openstreetmap.atlas.geography.atlas.raw.creation.RawAtlasGenerator; import org.openstreetmap.atlas.geography.boundary.CountryBoundary; import org.openstreetmap.atlas.geography.boundary.CountryBoundaryMap; import org.openstreetmap.atlas.geography.clipping.Clip.ClipType; @@ -61,7 +63,7 @@ public static void setAtlasSaveFolder(final File atlasSaveFolder) * @param sparkContext * The context from Spark * @param boundaries - * The country boundary map + * The {@link CountryBoundaryMap} * @param atlasLoadingOption * The loading options for the {@link OsmPbfLoader} * @param codeVersion @@ -84,6 +86,88 @@ public PbfLoader(final PbfContext pbfContext, final Map sparkCon this.countryShards = countryShards; } + public Atlas generateRawAtlas(final String countryName, final Shard shard) + { + final Optional loadingArea = calculateLoadingArea(countryName, shard); + if (loadingArea.isPresent()) + { + final Iterable pbfPool = this.locator.pbfsCovering(loadingArea.get()); + + final List atlases = new ArrayList<>(); + final Map metaDataTags = Maps.hashMap(); + + // Add shard information to the meta data + metaDataTags.put("countryShards", + this.countryShards + .stream().map(countryShard -> countryName + + CountryShard.COUNTRY_SHARD_SEPARATOR + countryShard.getName()) + .collect(Collectors.joining(","))); + metaDataTags.put(shard.getName() + "_boundary", loadingArea.toString()); + + // For each PBF, create an atlas + pbfPool.forEach(locatedPbf -> + { + final MultiPolygon pbfLoadingArea = locatedPbf.bounds() + .clip(loadingArea.get(), ClipType.AND).getClipMultiPolygon(); + + // Guard against an empty loading area + if (!pbfLoadingArea.isEmpty()) + { + final AtlasMetaData metaData = new AtlasMetaData(null, true, this.codeVersion, + this.dataVersion, countryName, shard.getName(), metaDataTags); + Atlas shardPbfSlice = null; + try + { + shardPbfSlice = new RawAtlasGenerator(locatedPbf.getResource(), + pbfLoadingArea).withMetaData(metaData).build(); + } + catch (final Exception e) + { + logger.error("Dropping PBF {} for Atlas shard {}", + locatedPbf.getResource().getName(), shard, e); + } + if (shardPbfSlice != null) + { + atlases.add(shardPbfSlice); + } + } + }); + + // Save, if needed + if (ATLAS_SAVE_FOLDER != null) + { + int index = 0; + for (final Atlas atlas : atlases) + { + atlas.save( + ATLAS_SAVE_FOLDER.child(shard.getName() + "_" + index++ + ".atlas.gz")); + } + } + + if (atlases.size() > 1) + { + // Concatenate many PBFs in one single Atlas + logger.info("Concatenating {} PBF-made Atlas into one Atlas Shard {}", + atlases.size(), shard); + return PackedAtlas.cloneFrom(new MultiAtlas(atlases)); + } + else if (atlases.size() == 1) + { + // Only one PBF was used + return atlases.get(0); + } + else + { + // There are no PBF resources. + return null; + } + } + else + { + return null; + } + } + /** * Generate the {@link Atlas} for a {@link Shard}. * @@ -91,10 +175,33 @@ public PbfLoader(final PbfContext pbfContext, final Map sparkCon * The Country to process * @param shard * The shard to output - * @return The build {@link Atlas} for the specified {@link Shard}. null if there is no Atlas to - * be build (because no PBF or empty PBFs or no overlap) + * @return The built {@link Atlas} for the specified {@link Shard}. {@code null} if there is no + * Atlas to be built (because no PBF or empty PBFs or no overlap) */ public Atlas load(final String countryName, final Shard shard) + { + final Optional loadingArea = calculateLoadingArea(countryName, shard); + if (loadingArea.isPresent()) + { + final Iterable pbfPool = this.locator.pbfsCovering(loadingArea.get()); + return loadFromPool(pbfPool, loadingArea.get(), countryName, shard); + } + else + { + return null; + } + } + + /** + * Calculates the loading area given a country name and working {@link Shard}. + * + * @param countryName + * The country being built + * @param shard + * The shard being processed + * @return the intersection of given country's boundary clipped with the given shard's bounds + */ + private Optional calculateLoadingArea(final String countryName, final Shard shard) { final List countryBoundaries = this.boundaries .countryBoundary(countryName); @@ -109,15 +216,12 @@ public Atlas load(final String countryName, final Shard shard) } if (boundary != null) { - final MultiPolygon loadingArea = shard.bounds().clip(boundary, ClipType.AND) - .getClipMultiPolygon(); - final Iterable pbfPool = this.locator.pbfsCovering(loadingArea); - return loadFromPool(pbfPool, loadingArea, countryName, shard); + return Optional.of(shard.bounds().clip(boundary, ClipType.AND).getClipMultiPolygon()); } else { - logger.error("Can't found shard {} for country {}", shard, countryName); - return null; + logger.error("Can't find shard {} for country {}", shard, countryName); + return Optional.empty(); } }