Skip to content

Commit

Permalink
[improve][log] Print ZK path if write to ZK fails due to data being t…
Browse files Browse the repository at this point in the history
…oo large to persist (#23652)
  • Loading branch information
poorbarcode authored Dec 30, 2024
1 parent 6e3eaf5 commit 5a3a1f1
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2247,6 +2247,8 @@ public void operationFailed(ManagedLedgerException exception) {

if (State.NoLedger.equals(STATE_UPDATER.get(this))) {
if (ledger.isNoMessagesAfterPos(mdEntry.newPosition)) {
log.error("[{}][{}] Metadata ledger creation failed, try to persist the position in the metadata"
+ " store.", ledger.getName(), name);
persistPositionToMetaStore(mdEntry, cb);
} else {
cb.operationFailed(new ManagedLedgerException("Switch new cursor ledger failed"));
Expand Down Expand Up @@ -2969,9 +2971,7 @@ public void operationComplete() {

@Override
public void operationFailed(ManagedLedgerException exception) {
log.error("[{}][{}] Metadata ledger creation failed {}, try to persist the position in the metadata"
+ " store.", ledger.getName(), name, exception);

log.error("[{}][{}] Metadata ledger creation failed {}", ledger.getName(), name, exception);
synchronized (pendingMarkDeleteOps) {
// At this point we don't have a ledger ready
STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStore;
Expand Down Expand Up @@ -201,10 +202,15 @@ protected void batchOperation(List<MetadataOp> ops) {
Collectors.groupingBy(MetadataOp::getType, Collectors.summingInt(op -> 1)))
.entrySet().stream().map(e -> e.getValue() + " " + e.getKey().name() + " entries")
.collect(Collectors.joining(", "));
List<Pair> opsForLog = ops.stream()
.filter(item -> item.size() > 256 * 1024)
.map(op -> Pair.of(op.getPath(), op.size()))
.collect(Collectors.toList());
Long totalSize = ops.stream().collect(Collectors.summingLong(MetadataOp::size));
log.warn("Connection loss while executing batch operation of {} "
+ "of total data size of {}. "
+ "Retrying individual operations one-by-one.", countsByType, totalSize);
+ "Retrying individual operations one-by-one. ops whose size > 256KB: {}",
countsByType, totalSize, opsForLog);

// Retry with the individual operations
executor.schedule(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,6 @@ default OpGetChildren asGetChildren() {
default OpPut asPut() {
return (OpPut) this;
}

String getPath();
}

0 comments on commit 5a3a1f1

Please sign in to comment.