From 6c005ce2c69e5bed6315ac4b64772da2359f9f6a Mon Sep 17 00:00:00 2001 From: Chris Larsen Date: Sat, 17 Dec 2016 12:32:33 -0800 Subject: [PATCH] Attempt a fix for #823 wherein writes to the annotations byte map were not synchronized, potentially leading to a race condition and stuck threads. Now we'll send a fresh list to the compaction code if notes were found, synchronize on the local byte map before making any modifications. Thanks to @thatsafunnyname. Signed-off-by: Chris Larsen --- src/core/SaltScanner.java | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/src/core/SaltScanner.java b/src/core/SaltScanner.java index ce4e433c1a..7a7aec7376 100644 --- a/src/core/SaltScanner.java +++ b/src/core/SaltScanner.java @@ -36,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; import com.stumbleupon.async.Callback; import com.stumbleupon.async.Deferred; @@ -305,8 +306,10 @@ final class ScannerCB implements Callback kvs = new ArrayList(); private final ByteMap> annotations = new ByteMap>(); - private final Set skips = Collections.newSetFromMap(new ConcurrentHashMap()); - private final Set keepers = Collections.newSetFromMap(new ConcurrentHashMap()); + private final Set skips = Collections.newSetFromMap( + new ConcurrentHashMap()); + private final Set keepers = Collections.newSetFromMap( + new ConcurrentHashMap()); private long scanner_start = -1; /** nanosecond timestamps */ @@ -524,12 +527,6 @@ void processRow(final byte[] key, final ArrayList row) { tsdb.getClient().delete(del); } - List notes = annotations.get(key); - if (notes == null) { - notes = new ArrayList(); - annotations.put(key, notes); - } - // calculate estimated data point count. We don't want to deserialize // the byte arrays so we'll just get a rough estimate of compacted // columns. @@ -565,7 +562,18 @@ void processRow(final byte[] key, final ArrayList row) { // the scanner final long compaction_start = DateTime.nanoTime(); try { + final List notes = Lists.newArrayList(); compacted = tsdb.compact(row, notes); + if (!notes.isEmpty()) { + synchronized (annotations) { + List map_notes = annotations.get(key); + if (map_notes == null) { + annotations.put(key, notes); + } else { + map_notes.addAll(notes); + } + } + } } catch (IllegalDataException idex) { compaction_time += (DateTime.nanoTime() - compaction_start); close(false);