Skip to content

Commit

Permalink
Merge branch 'master' into fix_heartbeat_namespace_create_event_topic…
Browse files Browse the repository at this point in the history
…_and_cannot_delete_heartbeat_topic
  • Loading branch information
Technoboy- authored Oct 19, 2023
2 parents 9617145 + c8a2f49 commit bec574e
Show file tree
Hide file tree
Showing 107 changed files with 2,395 additions and 782 deletions.
2 changes: 1 addition & 1 deletion .github/actions/clean-disk/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ runs:
directories=(/usr/local/lib/android /opt/ghc)
if [[ "${{ inputs.mode }}" == "full" ]]; then
# remove these directories only when mode is 'full'
directories+=(/usr/share/dotnet)
directories+=(/usr/share/dotnet /opt/hostedtoolcache/CodeQL)
fi
emptydir=/tmp/empty$$/
mkdir $emptydir
Expand Down
14 changes: 14 additions & 0 deletions .github/workflows/pulsar-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ jobs:
group: BROKER_GROUP_2
- name: Brokers - Broker Group 3
group: BROKER_GROUP_3
- name: Brokers - Broker Group 4
group: BROKER_GROUP_4
- name: Brokers - Client Api
group: BROKER_CLIENT_API
- name: Brokers - Client Impl
Expand Down Expand Up @@ -746,6 +748,8 @@ jobs:

- name: Clean Disk
uses: ./.github/actions/clean-disk
with:
mode: full

- name: Cache local Maven repository
uses: actions/cache@v3
Expand Down Expand Up @@ -861,6 +865,7 @@ jobs:

- name: Pulsar IO
group: PULSAR_IO
clean_disk: true

- name: Sql
group: SQL
Expand All @@ -872,6 +877,10 @@ jobs:
- name: Tune Runner VM
uses: ./.github/actions/tune-runner-vm

- name: Clean Disk when needed
if: ${{ matrix.clean_disk }}
uses: ./.github/actions/clean-disk

- name: Setup ssh access to build runner VM
# ssh access is enabled for builds in own forks
if: ${{ github.repository != 'apache/pulsar' && github.event_name == 'pull_request' }}
Expand Down Expand Up @@ -1072,6 +1081,7 @@ jobs:

- name: Pulsar IO - Oracle
group: PULSAR_IO_ORA
clean_disk: true

steps:
- name: checkout
Expand All @@ -1080,6 +1090,10 @@ jobs:
- name: Tune Runner VM
uses: ./.github/actions/tune-runner-vm

- name: Clean Disk when needed
if: ${{ matrix.clean_disk }}
uses: ./.github/actions/clean-disk

- name: Setup ssh access to build runner VM
# ssh access is enabled for builds in own forks
if: ${{ github.repository != 'apache/pulsar' && github.event_name == 'pull_request' }}
Expand Down
4 changes: 2 additions & 2 deletions .mvn/extensions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
<extension>
<groupId>com.gradle</groupId>
<artifactId>gradle-enterprise-maven-extension</artifactId>
<version>1.17.1</version>
<version>1.19.3</version>
</extension>
<extension>
<groupId>com.gradle</groupId>
<artifactId>common-custom-user-data-maven-extension</artifactId>
<version>1.11.1</version>
<version>1.12.4</version>
</extension>
</extensions>
4 changes: 4 additions & 0 deletions build/run_unit_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ function test_group_broker_group_3() {
mvn_test -pl pulsar-broker -Dgroups='broker-admin'
}

function test_group_broker_group_4() {
mvn_test -pl pulsar-broker -Dgroups='cluster-migration'
}

function test_group_broker_client_api() {
mvn_test -pl pulsar-broker -Dgroups='broker-api'
}
Expand Down
6 changes: 5 additions & 1 deletion conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ dispatcherReadFailureBackoffMaxTimeInMs=60000
# The read failure backoff mandatory stop time in milliseconds. By default it is 0s.
dispatcherReadFailureBackoffMandatoryStopTimeInMs=0

# Precise dispathcer flow control according to history message number of each entry
# Precise dispatcher flow control according to history message number of each entry
preciseDispatcherFlowControl=false

# Class name of Pluggable entry filter that can decide whether the entry needs to be filtered
Expand Down Expand Up @@ -1593,6 +1593,10 @@ aggregatePublisherStatsByProducerName=false
# if cluster is marked migrated. Disable with value 0. (Default disabled).
clusterMigrationCheckDurationSeconds=0

# Flag to start cluster migration for topic only after creating all topic's resources
# such as tenant, namespaces, subscriptions at new green cluster. (Default disabled).
clusterMigrationAutoResourceCreation=false

### --- Schema storage --- ###
# The schema storage implementation used by this broker
schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory
Expand Down
8 changes: 8 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,14 @@ splitTopicAndPartitionLabelInPrometheus=false
# Otherwise, aggregate it by list index.
aggregatePublisherStatsByProducerName=false

# Interval between checks to see if cluster is migrated and marks topic migrated
# if cluster is marked migrated. Disable with value 0. (Default disabled).
clusterMigrationCheckDurationSeconds=0

# Flag to start cluster migration for topic only after creating all topic's resources
# such as tenant, namespaces, subscriptions at new green cluster. (Default disabled).
clusterMigrationAutoResourceCreation=false

### --- Schema storage --- ###
# The schema storage implementation used by this broker.
schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory
Expand Down
4 changes: 2 additions & 2 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,8 @@ The Apache Software License, Version 2.0
- net.jodah-typetools-0.5.0.jar
- net.jodah-failsafe-2.4.4.jar
* Apache Avro
- org.apache.avro-avro-1.10.2.jar
- org.apache.avro-avro-protobuf-1.10.2.jar
- org.apache.avro-avro-1.11.3.jar
- org.apache.avro-avro-protobuf-1.11.3.jar
* Apache Curator
- org.apache.curator-curator-client-5.1.0.jar
- org.apache.curator-curator-framework-5.1.0.jar
Expand Down
4 changes: 2 additions & 2 deletions distribution/shell/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,8 @@ The Apache Software License, Version 2.0
* Google Error Prone Annotations - error_prone_annotations-2.5.1.jar
* Javassist -- javassist-3.25.0-GA.jar
* Apache Avro
- avro-1.10.2.jar
- avro-protobuf-1.10.2.jar
- avro-1.11.3.jar
- avro-protobuf-1.11.3.jar

BSD 3-clause "New" or "Revised" License
* JSR305 -- jsr305-3.0.2.jar -- ../licenses/LICENSE-JSR305.txt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ ManagedLedger open(String name, ManagedLedgerConfig config)
* opaque context
*/
void asyncOpen(String name, ManagedLedgerConfig config, OpenLedgerCallback callback,
Supplier<Boolean> mlOwnershipChecker, Object ctx);
Supplier<CompletableFuture<Boolean>> mlOwnershipChecker, Object ctx);

/**
* Open a {@link ReadOnlyCursor} positioned to the earliest entry for the specified managed ledger.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2685,32 +2685,47 @@ public void operationComplete(Void result, Stat stat) {
}

@Override
public void operationFailed(MetaStoreException e) {
if (e instanceof MetaStoreException.BadVersionException) {
public void operationFailed(MetaStoreException topLevelException) {
if (topLevelException instanceof MetaStoreException.BadVersionException) {
log.warn("[{}] Failed to update cursor metadata for {} due to version conflict {}",
ledger.name, name, e.getMessage());
ledger.name, name, topLevelException.getMessage());
// it means previous owner of the ml might have updated the version incorrectly. So, check
// the ownership and refresh the version again.
if (ledger.mlOwnershipChecker != null && ledger.mlOwnershipChecker.get()) {
ledger.getStore().asyncGetCursorInfo(ledger.getName(), name,
new MetaStoreCallback<ManagedCursorInfo>() {
@Override
public void operationComplete(ManagedCursorInfo info, Stat stat) {
updateCursorLedgerStat(info, stat);
}

@Override
public void operationFailed(MetaStoreException e) {
if (log.isDebugEnabled()) {
log.debug(
"[{}] Failed to refresh cursor metadata-version for {} due "
+ "to {}", ledger.name, name, e.getMessage());
}
}
});
if (ledger.mlOwnershipChecker != null) {
ledger.mlOwnershipChecker.get().whenComplete((hasOwnership, t) -> {
if (t == null && hasOwnership) {
ledger.getStore().asyncGetCursorInfo(ledger.getName(), name,
new MetaStoreCallback<>() {
@Override
public void operationComplete(ManagedCursorInfo info, Stat stat) {
updateCursorLedgerStat(info, stat);
// fail the top level call so that the caller can retry
callback.operationFailed(topLevelException);
}

@Override
public void operationFailed(MetaStoreException e) {
if (log.isDebugEnabled()) {
log.debug(
"[{}] Failed to refresh cursor metadata-version "
+ "for {} due to {}", ledger.name, name,
e.getMessage());
}
// fail the top level call so that the caller can retry
callback.operationFailed(topLevelException);
}
});
} else {
// fail the top level call so that the caller can retry
callback.operationFailed(topLevelException);
}
});
} else {
callback.operationFailed(topLevelException);
}
} else {
callback.operationFailed(topLevelException);
}
callback.operationFailed(e);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ public void asyncOpen(String name, OpenLedgerCallback callback, Object ctx) {

@Override
public void asyncOpen(final String name, final ManagedLedgerConfig config, final OpenLedgerCallback callback,
Supplier<Boolean> mlOwnershipChecker, final Object ctx) {
Supplier<CompletableFuture<Boolean>> mlOwnershipChecker, final Object ctx) {
if (closed) {
callback.openLedgerFailed(new ManagedLedgerException.ManagedLedgerFactoryClosedException(), ctx);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {

private static final Random random = new Random(System.currentTimeMillis());
private long maximumRolloverTimeMs;
protected final Supplier<Boolean> mlOwnershipChecker;
protected final Supplier<CompletableFuture<Boolean>> mlOwnershipChecker;

volatile PositionImpl lastConfirmedEntry;

Expand Down Expand Up @@ -336,7 +336,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
}
public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store,
ManagedLedgerConfig config, OrderedScheduler scheduledExecutor,
final String name, final Supplier<Boolean> mlOwnershipChecker) {
final String name, final Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) {
this.factory = factory;
this.bookKeeper = bookKeeper;
this.config = config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.AsyncCallback;
Expand All @@ -50,7 +51,7 @@ public class ShadowManagedLedgerImpl extends ManagedLedgerImpl {
public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper,
MetaStore store, ManagedLedgerConfig config,
OrderedScheduler scheduledExecutor,
String name, final Supplier<Boolean> mlOwnershipChecker) {
String name, final Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) {
super(factory, bookKeeper, store, config, scheduledExecutor, name, mlOwnershipChecker);
this.sourceMLName = config.getShadowSourceName();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3389,7 +3389,7 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
@Override
public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
}
}, checkOwnershipFlag ? () -> true : null, null);
}, checkOwnershipFlag ? () -> CompletableFuture.completedFuture(true) : null, null);
latch.await();
}

Expand Down
Loading

0 comments on commit bec574e

Please sign in to comment.