Skip to content

Commit

Permalink
Make sure policies.is_allow_auto_update_schema not null (apache#14409)
Browse files Browse the repository at this point in the history
(cherry picked from commit 7d60795)
  • Loading branch information
wangjialing218 authored and michaeljmarshall committed Feb 24, 2022
1 parent bb50dc6 commit eba2671
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,10 @@ protected Policies getNamespacePolicies(NamespaceName namespaceName) {
BundlesData bundleData = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName).getBundlesData();
policies.bundles = bundleData != null ? bundleData : policies.bundles;
if (policies.is_allow_auto_update_schema == null) {
// the type changed from boolean to Boolean. return broker value here for keeping compatibility.
policies.is_allow_auto_update_schema = pulsar().getConfig().isAllowAutoUpdateSchemaEnabled();
}

return policies;
} catch (RestException re) {
Expand Down Expand Up @@ -545,20 +549,9 @@ protected void validateClusterExists(String cluster) {
}

protected Policies getNamespacePolicies(String property, String cluster, String namespace) {
try {
Policies policies = namespaceResources().get(AdminResource.path(POLICIES, property, cluster, namespace))
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
// fetch bundles from LocalZK-policies
BundlesData bundleData = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(NamespaceName.get(property, cluster, namespace)).getBundlesData();
policies.bundles = bundleData != null ? bundleData : policies.bundles;
return policies;
} catch (RestException re) {
throw re;
} catch (Exception e) {
log.error("[{}] Failed to get namespace policies {}/{}/{}", clientAppId(), property, cluster, namespace, e);
throw new RestException(e);
}
NamespaceName ns = NamespaceName.get(property, cluster, namespace);

return getNamespacePolicies(ns);
}

protected boolean isNamespaceReplicated(NamespaceName namespaceName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,7 @@ public void namespaces() throws Exception {
policies.bundles = PoliciesUtil.defaultBundle();
policies.auth_policies.getNamespaceAuthentication().put("spiffe://developer/passport-role", EnumSet.allOf(AuthAction.class));
policies.auth_policies.getNamespaceAuthentication().put("my-role", EnumSet.allOf(AuthAction.class));
policies.is_allow_auto_update_schema = conf.isAllowAutoUpdateSchemaEnabled();

assertEquals(admin.namespaces().getPolicies("prop-xyz/ns1"), policies);
assertEquals(admin.namespaces().getPermissions("prop-xyz/ns1"), policies.auth_policies.getNamespaceAuthentication());
Expand All @@ -750,6 +751,7 @@ public void namespaces() throws Exception {
admin.namespaces().revokePermissionsOnNamespace("prop-xyz/ns1", "my-role");
policies.auth_policies.getNamespaceAuthentication().remove("spiffe://developer/passport-role");
policies.auth_policies.getNamespaceAuthentication().remove("my-role");
policies.is_allow_auto_update_schema = conf.isAllowAutoUpdateSchemaEnabled();
assertEquals(admin.namespaces().getPolicies("prop-xyz/ns1"), policies);

assertEquals(admin.namespaces().getPersistence("prop-xyz/ns1"), null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@ public void namespaces() throws Exception {
Policies policies = new Policies();
policies.bundles = PoliciesUtil.defaultBundle();
policies.auth_policies.getNamespaceAuthentication().put("my-role", EnumSet.allOf(AuthAction.class));
policies.is_allow_auto_update_schema = conf.isAllowAutoUpdateSchemaEnabled();

assertEquals(admin.namespaces().getPolicies("prop-xyz/use/ns1"), policies);
assertEquals(admin.namespaces().getPermissions("prop-xyz/use/ns1"), policies.auth_policies.getNamespaceAuthentication());
Expand All @@ -654,6 +655,7 @@ public void namespaces() throws Exception {

admin.namespaces().revokePermissionsOnNamespace("prop-xyz/use/ns1", "my-role");
policies.auth_policies.getNamespaceAuthentication().remove("my-role");
policies.is_allow_auto_update_schema = conf.isAllowAutoUpdateSchemaEnabled();
assertEquals(admin.namespaces().getPolicies("prop-xyz/use/ns1"), policies);

assertNull(admin.namespaces().getPersistence("prop-xyz/use/ns1"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand Down Expand Up @@ -247,6 +248,7 @@ public void testBrokerAllowAutoUpdateSchemaDisabled(SchemaCompatibilityStrategy


pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false);

ProducerBuilder<Schemas.PersonTwo> producerThreeBuilder = pulsarClient
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
Expand All @@ -259,6 +261,9 @@ public void testBrokerAllowAutoUpdateSchemaDisabled(SchemaCompatibilityStrategy
}

pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(true);
Policies policies = admin.namespaces().getPolicies(namespaceName.toString());
Assert.assertTrue(policies.is_allow_auto_update_schema);

ConsumerBuilder<Schemas.PersonTwo> comsumerBuilder = pulsarClient.newConsumer(Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
Expand Down

0 comments on commit eba2671

Please sign in to comment.