Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hdfs close from async to sync and roll back after retry 3 #345

Open
wants to merge 1 commit into
base: flume-1.9
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ public Void call() throws Exception {
* sink that owns it. This method should be used only when size or count
* based rolling closes this file.
*/
public void close() throws InterruptedException {
public void close() throws InterruptedException, IOException {
close(false);
}

Expand Down Expand Up @@ -338,7 +338,7 @@ public Void call() throws Exception {
* If all close attempts were unsuccessful we try to recover the lease.
* @param immediate An immediate close is required
*/
public void close(boolean immediate) {
public void close(boolean immediate) throws IOException,InterruptedException {
closeTries++;
boolean shouldRetry = closeTries < maxRetries && !immediate;
try {
Expand All @@ -349,7 +349,11 @@ public void close(boolean immediate) {
"retry again in " + retryInterval + " seconds.", e);
if (timedRollerPool != null && !timedRollerPool.isTerminated()) {
if (shouldRetry) {
timedRollerPool.schedule(this, retryInterval, TimeUnit.SECONDS);
close(false);
} else {
LOG.error("Closing file: " + path + " failed. beyond max retry " +
maxRetries, e);
throw e;
}
} else {
LOG.warn("Cannot retry close any more timedRollerPool is null or terminated");
Expand Down Expand Up @@ -406,7 +410,7 @@ private synchronized void recoverLease() {
}
}

public void close(boolean callCloseCallback) throws InterruptedException {
public void close(boolean callCloseCallback) throws InterruptedException, IOException {
close(callCloseCallback, false);
}

Expand All @@ -415,7 +419,7 @@ public void close(boolean callCloseCallback) throws InterruptedException {
* Safe to call multiple times. Logs HDFSWriter.close() exceptions.
*/
public void close(boolean callCloseCallback, boolean immediate)
throws InterruptedException {
throws InterruptedException, IOException {
if (callCloseCallback) {
if (closed.compareAndSet(false, true)) {
runCloseAction(); //remove from the cache as soon as possible
Expand All @@ -427,7 +431,7 @@ public void close(boolean callCloseCallback, boolean immediate)
}

private synchronized void doClose(boolean immediate)
throws InterruptedException {
throws InterruptedException, IOException {
checkAndThrowInterruptedException();
try {
flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public interface WriterCallback {
// Time between close retries, in seconds
private static final long defaultRetryInterval = 180;
// Retry forever.
private static final int defaultTryCount = Integer.MAX_VALUE;
private static final int defaultTryCount = 3;

public static final String IN_USE_SUFFIX_PARAM_NAME = "hdfs.inUseSuffix";

Expand Down Expand Up @@ -163,7 +163,7 @@ protected boolean removeEldestEntry(Entry<String, BucketWriter> eldest) {
// return true
try {
eldest.getValue().close();
} catch (InterruptedException e) {
} catch (InterruptedException | IOException e) {
LOG.warn(eldest.getKey().toString(), e);
Thread.currentThread().interrupt();
}
Expand Down Expand Up @@ -412,14 +412,20 @@ public void run(String bucketPath) {
bucketWriter.append(event);
} catch (BucketClosedException ex) {
LOG.info("Bucket was closed while trying to append, " +
"reinitializing bucket and writing event.");
"reinitializing bucket and writing event.");
hdfsWriter = writerFactory.getWriter(fileType);
bucketWriter = initializeBucketWriter(realPath, realName,
lookupPath, hdfsWriter, closeCallback);
lookupPath, hdfsWriter, closeCallback);
synchronized (sfWritersLock) {
sfWriters.put(lookupPath, bucketWriter);
}
bucketWriter.append(event);
} catch (IOException e) {
transaction.rollback();
LOG.warn("HDFS IO error", e);
sinkCounter.incrementEventWriteFail();
sfWriters.remove(lookupPath);
return Status.BACKOFF;
}

// track the buckets getting written in this transaction
Expand Down