diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 022cecf8d57b5..0cd9fc0d54cc8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -2247,6 +2247,8 @@ public void operationFailed(ManagedLedgerException exception) { if (State.NoLedger.equals(STATE_UPDATER.get(this))) { if (ledger.isNoMessagesAfterPos(mdEntry.newPosition)) { + log.error("[{}][{}] Metadata ledger creation failed, try to persist the position in the metadata" + + " store.", ledger.getName(), name); persistPositionToMetaStore(mdEntry, cb); } else { cb.operationFailed(new ManagedLedgerException("Switch new cursor ledger failed")); @@ -2969,9 +2971,7 @@ public void operationComplete() { @Override public void operationFailed(ManagedLedgerException exception) { - log.error("[{}][{}] Metadata ledger creation failed {}, try to persist the position in the metadata" - + " store.", ledger.getName(), name, exception); - + log.error("[{}][{}] Metadata ledger creation failed {}", ledger.getName(), name, exception); synchronized (pendingMarkDeleteOps) { // At this point we don't have a ledger ready STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java index 603a4503dc8bb..4c24aa5938b93 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java @@ -32,6 +32,7 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataStore; @@ -201,10 +202,15 @@ protected void batchOperation(List ops) { Collectors.groupingBy(MetadataOp::getType, Collectors.summingInt(op -> 1))) .entrySet().stream().map(e -> e.getValue() + " " + e.getKey().name() + " entries") .collect(Collectors.joining(", ")); + List opsForLog = ops.stream() + .filter(item -> item.size() > 256 * 1024) + .map(op -> Pair.of(op.getPath(), op.size())) + .collect(Collectors.toList()); Long totalSize = ops.stream().collect(Collectors.summingLong(MetadataOp::size)); log.warn("Connection loss while executing batch operation of {} " + "of total data size of {}. " - + "Retrying individual operations one-by-one.", countsByType, totalSize); + + "Retrying individual operations one-by-one. ops whose size > 256KB: {}", + countsByType, totalSize, opsForLog); // Retry with the individual operations executor.schedule(() -> { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataOp.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataOp.java index abf60f7b7245c..06ff425372b58 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataOp.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/MetadataOp.java @@ -51,4 +51,6 @@ default OpGetChildren asGetChildren() { default OpPut asPut() { return (OpPut) this; } + + String getPath(); }