Skip to content

Commit

Permalink
rgw: modify topic owner check when creating
Browse files Browse the repository at this point in the history
add tests to cover topic policies
as well as behavior when no policies are defined

Fixes: https://tracker.ceph.com/issues/64124

Signed-off-by: Zhipeng Li <[email protected]>
  • Loading branch information
qiuxinyidian authored and yuvalif committed Feb 6, 2024
1 parent 4c0fc4a commit 4581d0d
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 30 deletions.
9 changes: 9 additions & 0 deletions PendingReleaseNotes
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@ CephFS: Disallow delegating preallocated inode ranges to clients. Config
trim the log every second (`mds_log_trim_upkeep_interval` config). Also,
a couple of configs govern how much time the MDS spends in trimming its
logs. These configs are `mds_log_trim_threshold` and `mds_log_trim_decay_rate`.
* RGW: Notification topics are now owned by the user that created them.
By default, only the owner can read/write their topics. Topic policy documents
are now supported to grant these permissions to other users. Preexisting topics
are treated as if they have no owner, and any user can read/write them using the SNS API.
If such a topic is recreated with CreateTopic, the issuing user becomes the new owner.
For backward compatibility, all users still have permission to publish bucket
notifications to topics owned by other users. A new configuration parameter:
``rgw_topic_require_publish_policy`` can be enabled to deny ``sns:Publish``
permissions unless explicitly granted by topic policy.

>=18.0.0

Expand Down
8 changes: 4 additions & 4 deletions src/common/options/rgw.yaml.in
Original file line number Diff line number Diff line change
Expand Up @@ -3917,15 +3917,15 @@ options:
services:
- rgw
with_legacy: true
- name: mandatory_topic_permissions
- name: rgw_topic_require_publish_policy
type: bool
level: basic
desc: Whether to validate user permissions to access notification topics.
desc: Whether to validate user permissions to publish notifications to topics.
long_desc: If true, all users (other then the owner of the topic) will need
to have a policy to access topics.
to have a policy to publish notifications to topics.
The topic policy can be set by owner via CreateTopic() or SetTopicAttribute().
Following permissions can be granted "sns:Publish", "sns:GetTopicAttributes",
"sns:SetTopicAttributes" and "sns:DeleteTopic" via Policy.
"sns:SetTopicAttributes", "sns:DeleteTopic" and "sns:CreateTopic" via Policy.
NOTE that even if set to "false" topics will still follow the policies if set on them.
default: false
services:
Expand Down
4 changes: 4 additions & 0 deletions src/rgw/rgw_iam_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ static const actpair actpairs[] =
{ "sns:DeleteTopic", snsDeleteTopic},
{ "sns:Publish", snsPublish},
{ "sns:SetTopicAttributes", snsSetTopicAttributes},
{ "sns:CreateTopic", snsCreateTopic},
};

struct PolicyParser;
Expand Down Expand Up @@ -1476,6 +1477,9 @@ const char* action_bit_string(uint64_t action) {

case snsPublish:
return "sns:Publish";

case snsCreateTopic:
return "sns:CreateTopic";
}
return "s3Invalid";
}
Expand Down
3 changes: 2 additions & 1 deletion src/rgw/rgw_iam_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ static constexpr std::uint64_t snsGetTopicAttributes = stsAll + 1;
static constexpr std::uint64_t snsDeleteTopic = stsAll + 2;
static constexpr std::uint64_t snsPublish = stsAll + 3;
static constexpr std::uint64_t snsSetTopicAttributes = stsAll + 4;
static constexpr std::uint64_t snsAll = stsAll + 5;
static constexpr std::uint64_t snsCreateTopic = stsAll + 5;
static constexpr std::uint64_t snsAll = stsAll + 6;

static constexpr std::uint64_t s3Count = s3All;
static constexpr std::uint64_t allCount = snsAll + 1;
Expand Down
55 changes: 32 additions & 23 deletions src/rgw/rgw_rest_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <algorithm>
#include <boost/tokenizer.hpp>
#include <optional>
#include "rgw_iam_policy.h"
#include "rgw_rest_pubsub.h"
#include "rgw_pubsub_push.h"
#include "rgw_pubsub.h"
Expand Down Expand Up @@ -75,8 +76,8 @@ std::optional<rgw::IAM::Policy> get_policy_from_text(req_state* const s,
s->cct, s->owner.id.tenant, bl,
s->cct->_conf.get_val<bool>("rgw_policy_reject_invalid_principals"));
} catch (rgw::IAM::PolicyParseException& e) {
ldout(s->cct, 1) << "failed to parse policy:' " << policy_text
<< " ' with error: " << e.what() << dendl;
ldout(s->cct, 1) << "failed to parse policy: '" << policy_text
<< "' with error: " << e.what() << dendl;
s->err.message = e.what();
return std::nullopt;
}
Expand All @@ -91,13 +92,18 @@ int verify_topic_owner_or_policy(req_state* const s,
}
// no policy set.
if (topic.policy_text.empty()) {
// if mandatory_topic_permissions is true, then validate all users for
// permission.
if (s->cct->_conf->mandatory_topic_permissions) {
return -EACCES;
} else {
// if rgw_topic_require_publish_policy is "false" dont validate "publish" policies
if (op == rgw::IAM::snsPublish && !s->cct->_conf->rgw_topic_require_publish_policy) {
return 0;
}
if (topic.user.empty()) {
// if we don't know the original user and there is no policy
// we will not reject the request.
// this is for compatibility with versions that did not store the user in the topic
return 0;
}
s->err.message = "Topic was created by another user.";
return -EACCES;
}
// bufferlist::static_from_string wants non const string
std::string policy_text(topic.policy_text);
Expand All @@ -107,7 +113,7 @@ int verify_topic_owner_or_policy(req_state* const s,
s->user->get_tenant(), topic.name);
if (!p || p->eval(s->env, *s->auth.identity, op, arn, princ_type) !=
rgw::IAM::Effect::Allow) {
ldout(s->cct, 1) << "topic_policy failed validation, topic_policy: " << p
ldout(s->cct, 1) << "topic policy failed validation, topic policy: " << p
<< dendl;
return -EACCES;
}
Expand Down Expand Up @@ -195,13 +201,17 @@ class RGWPSCreateTopicOp : public RGWOp {
return 0;
}
if (ret == 0) {
if (result.user == s->owner.id ||
!s->cct->_conf->mandatory_topic_permissions) {
ret = verify_topic_owner_or_policy(
s, result, driver->get_zone()->get_zonegroup().get_name(),
rgw::IAM::snsCreateTopic);
if (ret == 0)
{
return 0;
}
ldpp_dout(this, 1) << "failed to create topic '" << topic_name

ldpp_dout(this, 1) << "no permission to modify topic '" << topic_name
<< "', topic already exist." << dendl;
return -EPERM;
return -EACCES;
}
ldpp_dout(this, 1) << "failed to read topic '" << topic_name
<< "', with error:" << ret << dendl;
Expand Down Expand Up @@ -408,8 +418,8 @@ void RGWPSGetTopicOp::execute(optional_yield y) {
s, result, driver->get_zone()->get_zonegroup().get_name(),
rgw::IAM::snsGetTopicAttributes);
if (op_ret != 0) {
ldpp_dout(this, 1) << "failed to get topic '" << topic_name
<< "', topic owned by other user" << dendl;
ldpp_dout(this, 1) << "no permission to get topic '" << topic_name
<< "'" << dendl;
return;
}
ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
Expand Down Expand Up @@ -492,8 +502,8 @@ void RGWPSGetTopicAttributesOp::execute(optional_yield y) {
s, result, driver->get_zone()->get_zonegroup().get_name(),
rgw::IAM::snsGetTopicAttributes);
if (op_ret != 0) {
ldpp_dout(this, 1) << "failed to get topic '" << topic_name
<< "', topic owned by other user" << dendl;
ldpp_dout(this, 1) << "no permission to get topic '" << topic_name
<< "'" << dendl;
return;
}
ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl;
Expand Down Expand Up @@ -617,8 +627,8 @@ class RGWPSSetTopicAttributesOp : public RGWOp {
s, result, driver->get_zone()->get_zonegroup().get_name(),
rgw::IAM::snsSetTopicAttributes);
if (ret != 0) {
ldpp_dout(this, 1) << "failed to set attributes for topic '" << topic_name
<< "', topic owned by other user" << dendl;
ldpp_dout(this, 1) << "no permission to set attributes for topic '" << topic_name
<< "'" << dendl;
return ret;
}

Expand Down Expand Up @@ -750,8 +760,8 @@ void RGWPSDeleteTopicOp::execute(optional_yield y) {
s, result, driver->get_zone()->get_zonegroup().get_name(),
rgw::IAM::snsDeleteTopic);
if (op_ret != 0) {
ldpp_dout(this, 1) << "failed to remove topic '" << topic_name
<< "' topic owned by other user" << dendl;
ldpp_dout(this, 1) << "no permission to remove topic '" << topic_name
<< "'" << dendl;
return;
}
} else {
Expand Down Expand Up @@ -1025,9 +1035,8 @@ void RGWPSCreateNotifOp::execute(optional_yield y) {
s, topic_info, driver->get_zone()->get_zonegroup().get_name(),
rgw::IAM::snsPublish);
if (op_ret != 0) {
ldpp_dout(this, 1) << "failed to create notification for topic '"
<< topic_name << "' topic owned by other user"
<< dendl;
ldpp_dout(this, 1) << "no permission to create notification for topic '"
<< topic_name << "'" << dendl;
return;
}
// make sure that full topic configuration match
Expand Down
107 changes: 105 additions & 2 deletions src/test/rgw/bucket_notification/test_bn.py
Original file line number Diff line number Diff line change
Expand Up @@ -4395,6 +4395,7 @@ def test_ps_s3_multiple_topics_notification():
conn.delete_bucket(bucket_name)
http_server.close()


@attr('basic_test')
def test_ps_s3_topic_permissions():
""" test s3 topic set/get/delete permissions """
Expand All @@ -4410,7 +4411,7 @@ def test_ps_s3_topic_permissions():
"Sid": "Statement",
"Effect": "Deny",
"Principal": "*",
"Action": ["sns:Publish", "sns:SetTopicAttributes", "sns:GetTopicAttributes"],
"Action": ["sns:Publish", "sns:SetTopicAttributes", "sns:GetTopicAttributes", "sns:DeleteTopic", "sns:CreateTopic"],
"Resource": f"arn:aws:sns:{zonegroup}::{topic_name}"
}
]
Expand All @@ -4421,10 +4422,23 @@ def test_ps_s3_topic_permissions():
topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args, policy_text=topic_policy)
topic_arn = topic_conf.set_config()

# 2nd user tries to fetch the topic
topic_conf2 = PSTopicS3(conn2, topic_name, zonegroup, endpoint_args=endpoint_args)
try:
# 2nd user tries to override the topic
topic_arn = topic_conf2.set_config()
assert False, "'AccessDenied' error is expected"
except ClientError as err:
if 'Error' in err.response:
assert_equal(err.response['Error']['Code'], 'AccessDenied')
else:
assert_equal(err.response['Code'], 'AccessDenied')
except Exception as err:
print('unexpected error type: '+type(err).__name__)

# 2nd user tries to fetch the topic
_, status = topic_conf2.get_config(topic_arn=topic_arn)
assert_equal(status, 403)

try:
# 2nd user tries to set the attribute
status = topic_conf2.set_attributes(attribute_name="persistent", attribute_val="false", topic_arn=topic_arn)
Expand Down Expand Up @@ -4455,6 +4469,18 @@ def test_ps_s3_topic_permissions():
except Exception as err:
print('unexpected error type: '+type(err).__name__)

try:
# 2nd user tries to delete the topic
status = topic_conf2.del_config(topic_arn=topic_arn)
assert False, "'AccessDenied' error is expected"
except ClientError as err:
if 'Error' in err.response:
assert_equal(err.response['Error']['Code'], 'AccessDenied')
else:
assert_equal(err.response['Code'], 'AccessDenied')
except Exception as err:
print('unexpected error type: '+type(err).__name__)

# Topic policy is now added by the 1st user to allow 2nd user.
topic_policy = topic_policy.replace("Deny", "Allow")
topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args, policy_text=topic_policy)
Expand All @@ -4469,13 +4495,90 @@ def test_ps_s3_topic_permissions():
s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list)
_, status = s3_notification_conf2.set_config()
assert_equal(status, 200)
# 2nd user tries to delete the topic again
status = topic_conf2.del_config(topic_arn=topic_arn)
assert_equal(status, 200)

# cleanup
s3_notification_conf2.del_config()
# delete the bucket
conn2.delete_bucket(bucket_name)


@attr('basic_test')
def test_ps_s3_topic_no_permissions():
""" test s3 topic set/get/delete permissions """
conn1 = connection()
conn2 = another_user()
zonegroup = 'default'
bucket_name = gen_bucket_name()
topic_name = bucket_name + TOPIC_SUFFIX

# create s3 topic without policy
endpoint_address = 'amqp://127.0.0.1:7001'
endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args)
topic_arn = topic_conf.set_config()

topic_conf2 = PSTopicS3(conn2, topic_name, zonegroup, endpoint_args=endpoint_args)
try:
# 2nd user tries to override the topic
topic_arn = topic_conf2.set_config()
assert False, "'AccessDenied' error is expected"
except ClientError as err:
if 'Error' in err.response:
assert_equal(err.response['Error']['Code'], 'AccessDenied')
else:
assert_equal(err.response['Code'], 'AccessDenied')
except Exception as err:
print('unexpected error type: '+type(err).__name__)

# 2nd user tries to fetch the topic
_, status = topic_conf2.get_config(topic_arn=topic_arn)
assert_equal(status, 403)

try:
# 2nd user tries to set the attribute
status = topic_conf2.set_attributes(attribute_name="persistent", attribute_val="false", topic_arn=topic_arn)
assert False, "'AccessDenied' error is expected"
except ClientError as err:
if 'Error' in err.response:
assert_equal(err.response['Error']['Code'], 'AccessDenied')
else:
assert_equal(err.response['Code'], 'AccessDenied')
except Exception as err:
print('unexpected error type: '+type(err).__name__)

# create bucket for conn2 publish notification to topic
# should be allowed based on the default value of rgw_topic_require_publish_policy=false
_ = conn2.create_bucket(bucket_name)
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn,
'Events': []
}]
s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list)
_, status = s3_notification_conf2.set_config()
assert_equal(status, 200)

try:
# 2nd user tries to delete the topic
status = topic_conf2.del_config(topic_arn=topic_arn)
assert False, "'AccessDenied' error is expected"
except ClientError as err:
if 'Error' in err.response:
assert_equal(err.response['Error']['Code'], 'AccessDenied')
else:
assert_equal(err.response['Code'], 'AccessDenied')
except Exception as err:
print('unexpected error type: '+type(err).__name__)

# cleanup
s3_notification_conf2.del_config()
topic_conf.del_config()
# delete the bucket
conn2.delete_bucket(bucket_name)


def kafka_security(security_type, mechanism='PLAIN'):
""" test pushing kafka s3 notification securly to master """
conn = connection()
Expand Down

0 comments on commit 4581d0d

Please sign in to comment.