diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java index 7a15c414aa8c4..03898b032b403 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java @@ -154,11 +154,17 @@ public static BlobStoreManagedLedgerOffloader create(TieredStorageConfiguration config.getProvider().getDriver(), config.getServiceEndpoint(), config.getBucket(), config.getRegion()); - blobStores.putIfAbsent(config.getBlobStoreLocation(), config.getBlobStore()); this.offloaderStats = offloaderStats; log.info("The ledger offloader was created."); } + private BlobStore getBlobStore(BlobStoreLocation blobStoreLocation) { + return blobStores.computeIfAbsent(blobStoreLocation, location -> { + log.info("Creating blob store for location {}", location); + return config.getBlobStore(); + }); + } + @Override public String getOffloadDriverName() { return config.getDriver(); @@ -179,11 +185,11 @@ public CompletableFuture offload(ReadHandle readHandle, Map extraMetadata) { final String managedLedgerName = extraMetadata.get(MANAGED_LEDGER_NAME); final String topicName = TopicName.fromPersistenceNamingEncoding(managedLedgerName); - final BlobStore writeBlobStore = blobStores.get(config.getBlobStoreLocation()); - log.info("offload {} uuid {} extraMetadata {} to {} {}", readHandle.getId(), uuid, extraMetadata, - config.getBlobStoreLocation(), writeBlobStore); CompletableFuture promise = new CompletableFuture<>(); scheduler.chooseThread(readHandle.getId()).execute(() -> { + final BlobStore writeBlobStore = getBlobStore(config.getBlobStoreLocation()); + log.info("offload {} uuid {} extraMetadata {} to {} {}", readHandle.getId(), uuid, extraMetadata, + config.getBlobStoreLocation(), writeBlobStore); if (readHandle.getLength() == 0 || !readHandle.isClosed() || readHandle.getLastAddConfirmed() < 0) { promise.completeExceptionally( new IllegalArgumentException("An empty or open ledger should never be offloaded")); @@ -330,7 +336,7 @@ public CompletableFuture streamingOffload(@NonNull ManagedLedger driverMetadata); log.debug("begin offload with {}:{}", beginLedger, beginEntry); this.offloadResult = new CompletableFuture<>(); - blobStore = blobStores.get(config.getBlobStoreLocation()); + blobStore = getBlobStore(config.getBlobStoreLocation()); streamingIndexBuilder = OffloadIndexBlockV2Builder.create(); streamingDataBlockKey = segmentInfo.uuid.toString(); streamingDataIndexKey = String.format("%s-index", segmentInfo.uuid); @@ -536,13 +542,13 @@ public CompletableFuture readOffloaded(long ledgerId, UUID uid, BlobStoreLocation bsKey = getBlobStoreLocation(offloadDriverMetadata); String readBucket = bsKey.getBucket(); - BlobStore readBlobstore = blobStores.get(config.getBlobStoreLocation()); CompletableFuture promise = new CompletableFuture<>(); String key = DataBlockUtils.dataBlockOffloadKey(ledgerId, uid); String indexKey = DataBlockUtils.indexBlockOffloadKey(ledgerId, uid); scheduler.chooseThread(ledgerId).execute(() -> { try { + BlobStore readBlobstore = getBlobStore(config.getBlobStoreLocation()); promise.complete(BlobStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId), readBlobstore, readBucket, key, indexKey, @@ -562,7 +568,6 @@ public CompletableFuture readOffloaded(long ledgerId, MLDataFormats. Map offloadDriverMetadata) { BlobStoreLocation bsKey = getBlobStoreLocation(offloadDriverMetadata); String readBucket = bsKey.getBucket(); - BlobStore readBlobstore = blobStores.get(config.getBlobStoreLocation()); CompletableFuture promise = new CompletableFuture<>(); final List offloadSegmentList = ledgerContext.getOffloadSegmentList(); List keys = Lists.newLinkedList(); @@ -577,6 +582,7 @@ public CompletableFuture readOffloaded(long ledgerId, MLDataFormats. scheduler.chooseThread(ledgerId).execute(() -> { try { + BlobStore readBlobstore = getBlobStore(config.getBlobStoreLocation()); promise.complete(BlobStoreBackedReadHandleImplV2.open(scheduler.chooseThread(ledgerId), readBlobstore, readBucket, keys, indexKeys, @@ -596,11 +602,11 @@ public CompletableFuture deleteOffloaded(long ledgerId, UUID uid, Map offloadDriverMetadata) { BlobStoreLocation bsKey = getBlobStoreLocation(offloadDriverMetadata); String readBucket = bsKey.getBucket(offloadDriverMetadata); - BlobStore readBlobstore = blobStores.get(config.getBlobStoreLocation()); CompletableFuture promise = new CompletableFuture<>(); scheduler.chooseThread(ledgerId).execute(() -> { try { + BlobStore readBlobstore = getBlobStore(config.getBlobStoreLocation()); readBlobstore.removeBlobs(readBucket, ImmutableList.of(DataBlockUtils.dataBlockOffloadKey(ledgerId, uid), DataBlockUtils.indexBlockOffloadKey(ledgerId, uid))); @@ -623,11 +629,11 @@ public CompletableFuture deleteOffloaded(long ledgerId, UUID uid, public CompletableFuture deleteOffloaded(UUID uid, Map offloadDriverMetadata) { BlobStoreLocation bsKey = getBlobStoreLocation(offloadDriverMetadata); String readBucket = bsKey.getBucket(offloadDriverMetadata); - BlobStore readBlobstore = blobStores.get(config.getBlobStoreLocation()); CompletableFuture promise = new CompletableFuture<>(); scheduler.execute(() -> { try { + BlobStore readBlobstore = getBlobStore(config.getBlobStoreLocation()); readBlobstore.removeBlobs(readBucket, ImmutableList.of(uid.toString(), DataBlockUtils.indexBlockOffloadKey(uid))); @@ -667,7 +673,7 @@ public void scanLedgers(OffloadedLedgerMetadataConsumer consumer, Map