diff --git a/PendingReleaseNotes b/PendingReleaseNotes index dfc6294b36a2c..0ecec83ec81f9 100644 --- a/PendingReleaseNotes +++ b/PendingReleaseNotes @@ -121,6 +121,15 @@ CephFS: Disallow delegating preallocated inode ranges to clients. Config trim the log every second (`mds_log_trim_upkeep_interval` config). Also, a couple of configs govern how much time the MDS spends in trimming its logs. These configs are `mds_log_trim_threshold` and `mds_log_trim_decay_rate`. +* RGW: Notification topics are now owned by the user that created them. + By default, only the owner can read/write their topics. Topic policy documents + are now supported to grant these permissions to other users. Preexisting topics + are treated as if they have no owner, and any user can read/write them using the SNS API. + If such a topic is recreated with CreateTopic, the issuing user becomes the new owner. + For backward compatibility, all users still have permission to publish bucket + notifications to topics owned by other users. A new configuration parameter: + ``rgw_topic_require_publish_policy`` can be enabled to deny ``sns:Publish`` + permissions unless explicitly granted by topic policy. >=18.0.0 diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index f4ef0079dd6be..6fab43e5589ff 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -3917,15 +3917,15 @@ options: services: - rgw with_legacy: true -- name: mandatory_topic_permissions +- name: rgw_topic_require_publish_policy type: bool level: basic - desc: Whether to validate user permissions to access notification topics. + desc: Whether to validate user permissions to publish notifications to topics. long_desc: If true, all users (other then the owner of the topic) will need - to have a policy to access topics. + to have a policy to publish notifications to topics. The topic policy can be set by owner via CreateTopic() or SetTopicAttribute(). Following permissions can be granted "sns:Publish", "sns:GetTopicAttributes", - "sns:SetTopicAttributes" and "sns:DeleteTopic" via Policy. + "sns:SetTopicAttributes", "sns:DeleteTopic" and "sns:CreateTopic" via Policy. NOTE that even if set to "false" topics will still follow the policies if set on them. default: false services: diff --git a/src/rgw/rgw_iam_policy.cc b/src/rgw/rgw_iam_policy.cc index 76b24034d6149..813b78f161e11 100644 --- a/src/rgw/rgw_iam_policy.cc +++ b/src/rgw/rgw_iam_policy.cc @@ -161,6 +161,7 @@ static const actpair actpairs[] = { "sns:DeleteTopic", snsDeleteTopic}, { "sns:Publish", snsPublish}, { "sns:SetTopicAttributes", snsSetTopicAttributes}, + { "sns:CreateTopic", snsCreateTopic}, }; struct PolicyParser; @@ -1476,6 +1477,9 @@ const char* action_bit_string(uint64_t action) { case snsPublish: return "sns:Publish"; + + case snsCreateTopic: + return "sns:CreateTopic"; } return "s3Invalid"; } diff --git a/src/rgw/rgw_iam_policy.h b/src/rgw/rgw_iam_policy.h index e528d1515c774..5d6f334c176eb 100644 --- a/src/rgw/rgw_iam_policy.h +++ b/src/rgw/rgw_iam_policy.h @@ -145,7 +145,8 @@ static constexpr std::uint64_t snsGetTopicAttributes = stsAll + 1; static constexpr std::uint64_t snsDeleteTopic = stsAll + 2; static constexpr std::uint64_t snsPublish = stsAll + 3; static constexpr std::uint64_t snsSetTopicAttributes = stsAll + 4; -static constexpr std::uint64_t snsAll = stsAll + 5; +static constexpr std::uint64_t snsCreateTopic = stsAll + 5; +static constexpr std::uint64_t snsAll = stsAll + 6; static constexpr std::uint64_t s3Count = s3All; static constexpr std::uint64_t allCount = snsAll + 1; diff --git a/src/rgw/rgw_rest_pubsub.cc b/src/rgw/rgw_rest_pubsub.cc index 611589d721f37..191f535d82bd4 100644 --- a/src/rgw/rgw_rest_pubsub.cc +++ b/src/rgw/rgw_rest_pubsub.cc @@ -4,6 +4,7 @@ #include #include #include +#include "rgw_iam_policy.h" #include "rgw_rest_pubsub.h" #include "rgw_pubsub_push.h" #include "rgw_pubsub.h" @@ -75,8 +76,8 @@ std::optional get_policy_from_text(req_state* const s, s->cct, s->owner.id.tenant, bl, s->cct->_conf.get_val("rgw_policy_reject_invalid_principals")); } catch (rgw::IAM::PolicyParseException& e) { - ldout(s->cct, 1) << "failed to parse policy:' " << policy_text - << " ' with error: " << e.what() << dendl; + ldout(s->cct, 1) << "failed to parse policy: '" << policy_text + << "' with error: " << e.what() << dendl; s->err.message = e.what(); return std::nullopt; } @@ -91,13 +92,18 @@ int verify_topic_owner_or_policy(req_state* const s, } // no policy set. if (topic.policy_text.empty()) { - // if mandatory_topic_permissions is true, then validate all users for - // permission. - if (s->cct->_conf->mandatory_topic_permissions) { - return -EACCES; - } else { + // if rgw_topic_require_publish_policy is "false" dont validate "publish" policies + if (op == rgw::IAM::snsPublish && !s->cct->_conf->rgw_topic_require_publish_policy) { + return 0; + } + if (topic.user.empty()) { + // if we don't know the original user and there is no policy + // we will not reject the request. + // this is for compatibility with versions that did not store the user in the topic return 0; } + s->err.message = "Topic was created by another user."; + return -EACCES; } // bufferlist::static_from_string wants non const string std::string policy_text(topic.policy_text); @@ -107,7 +113,7 @@ int verify_topic_owner_or_policy(req_state* const s, s->user->get_tenant(), topic.name); if (!p || p->eval(s->env, *s->auth.identity, op, arn, princ_type) != rgw::IAM::Effect::Allow) { - ldout(s->cct, 1) << "topic_policy failed validation, topic_policy: " << p + ldout(s->cct, 1) << "topic policy failed validation, topic policy: " << p << dendl; return -EACCES; } @@ -195,13 +201,17 @@ class RGWPSCreateTopicOp : public RGWOp { return 0; } if (ret == 0) { - if (result.user == s->owner.id || - !s->cct->_conf->mandatory_topic_permissions) { + ret = verify_topic_owner_or_policy( + s, result, driver->get_zone()->get_zonegroup().get_name(), + rgw::IAM::snsCreateTopic); + if (ret == 0) + { return 0; } - ldpp_dout(this, 1) << "failed to create topic '" << topic_name + + ldpp_dout(this, 1) << "no permission to modify topic '" << topic_name << "', topic already exist." << dendl; - return -EPERM; + return -EACCES; } ldpp_dout(this, 1) << "failed to read topic '" << topic_name << "', with error:" << ret << dendl; @@ -408,8 +418,8 @@ void RGWPSGetTopicOp::execute(optional_yield y) { s, result, driver->get_zone()->get_zonegroup().get_name(), rgw::IAM::snsGetTopicAttributes); if (op_ret != 0) { - ldpp_dout(this, 1) << "failed to get topic '" << topic_name - << "', topic owned by other user" << dendl; + ldpp_dout(this, 1) << "no permission to get topic '" << topic_name + << "'" << dendl; return; } ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl; @@ -492,8 +502,8 @@ void RGWPSGetTopicAttributesOp::execute(optional_yield y) { s, result, driver->get_zone()->get_zonegroup().get_name(), rgw::IAM::snsGetTopicAttributes); if (op_ret != 0) { - ldpp_dout(this, 1) << "failed to get topic '" << topic_name - << "', topic owned by other user" << dendl; + ldpp_dout(this, 1) << "no permission to get topic '" << topic_name + << "'" << dendl; return; } ldpp_dout(this, 1) << "successfully got topic '" << topic_name << "'" << dendl; @@ -617,8 +627,8 @@ class RGWPSSetTopicAttributesOp : public RGWOp { s, result, driver->get_zone()->get_zonegroup().get_name(), rgw::IAM::snsSetTopicAttributes); if (ret != 0) { - ldpp_dout(this, 1) << "failed to set attributes for topic '" << topic_name - << "', topic owned by other user" << dendl; + ldpp_dout(this, 1) << "no permission to set attributes for topic '" << topic_name + << "'" << dendl; return ret; } @@ -750,8 +760,8 @@ void RGWPSDeleteTopicOp::execute(optional_yield y) { s, result, driver->get_zone()->get_zonegroup().get_name(), rgw::IAM::snsDeleteTopic); if (op_ret != 0) { - ldpp_dout(this, 1) << "failed to remove topic '" << topic_name - << "' topic owned by other user" << dendl; + ldpp_dout(this, 1) << "no permission to remove topic '" << topic_name + << "'" << dendl; return; } } else { @@ -1025,9 +1035,8 @@ void RGWPSCreateNotifOp::execute(optional_yield y) { s, topic_info, driver->get_zone()->get_zonegroup().get_name(), rgw::IAM::snsPublish); if (op_ret != 0) { - ldpp_dout(this, 1) << "failed to create notification for topic '" - << topic_name << "' topic owned by other user" - << dendl; + ldpp_dout(this, 1) << "no permission to create notification for topic '" + << topic_name << "'" << dendl; return; } // make sure that full topic configuration match diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 6e9248a3669b5..30cbfdfe7865b 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -4395,6 +4395,7 @@ def test_ps_s3_multiple_topics_notification(): conn.delete_bucket(bucket_name) http_server.close() + @attr('basic_test') def test_ps_s3_topic_permissions(): """ test s3 topic set/get/delete permissions """ @@ -4410,7 +4411,7 @@ def test_ps_s3_topic_permissions(): "Sid": "Statement", "Effect": "Deny", "Principal": "*", - "Action": ["sns:Publish", "sns:SetTopicAttributes", "sns:GetTopicAttributes"], + "Action": ["sns:Publish", "sns:SetTopicAttributes", "sns:GetTopicAttributes", "sns:DeleteTopic", "sns:CreateTopic"], "Resource": f"arn:aws:sns:{zonegroup}::{topic_name}" } ] @@ -4421,10 +4422,23 @@ def test_ps_s3_topic_permissions(): topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args, policy_text=topic_policy) topic_arn = topic_conf.set_config() - # 2nd user tries to fetch the topic topic_conf2 = PSTopicS3(conn2, topic_name, zonegroup, endpoint_args=endpoint_args) + try: + # 2nd user tries to override the topic + topic_arn = topic_conf2.set_config() + assert False, "'AccessDenied' error is expected" + except ClientError as err: + if 'Error' in err.response: + assert_equal(err.response['Error']['Code'], 'AccessDenied') + else: + assert_equal(err.response['Code'], 'AccessDenied') + except Exception as err: + print('unexpected error type: '+type(err).__name__) + + # 2nd user tries to fetch the topic _, status = topic_conf2.get_config(topic_arn=topic_arn) assert_equal(status, 403) + try: # 2nd user tries to set the attribute status = topic_conf2.set_attributes(attribute_name="persistent", attribute_val="false", topic_arn=topic_arn) @@ -4455,6 +4469,18 @@ def test_ps_s3_topic_permissions(): except Exception as err: print('unexpected error type: '+type(err).__name__) + try: + # 2nd user tries to delete the topic + status = topic_conf2.del_config(topic_arn=topic_arn) + assert False, "'AccessDenied' error is expected" + except ClientError as err: + if 'Error' in err.response: + assert_equal(err.response['Error']['Code'], 'AccessDenied') + else: + assert_equal(err.response['Code'], 'AccessDenied') + except Exception as err: + print('unexpected error type: '+type(err).__name__) + # Topic policy is now added by the 1st user to allow 2nd user. topic_policy = topic_policy.replace("Deny", "Allow") topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args, policy_text=topic_policy) @@ -4469,6 +4495,82 @@ def test_ps_s3_topic_permissions(): s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list) _, status = s3_notification_conf2.set_config() assert_equal(status, 200) + # 2nd user tries to delete the topic again + status = topic_conf2.del_config(topic_arn=topic_arn) + assert_equal(status, 200) + + # cleanup + s3_notification_conf2.del_config() + # delete the bucket + conn2.delete_bucket(bucket_name) + + +@attr('basic_test') +def test_ps_s3_topic_no_permissions(): + """ test s3 topic set/get/delete permissions """ + conn1 = connection() + conn2 = another_user() + zonegroup = 'default' + bucket_name = gen_bucket_name() + topic_name = bucket_name + TOPIC_SUFFIX + + # create s3 topic without policy + endpoint_address = 'amqp://127.0.0.1:7001' + endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' + topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args) + topic_arn = topic_conf.set_config() + + topic_conf2 = PSTopicS3(conn2, topic_name, zonegroup, endpoint_args=endpoint_args) + try: + # 2nd user tries to override the topic + topic_arn = topic_conf2.set_config() + assert False, "'AccessDenied' error is expected" + except ClientError as err: + if 'Error' in err.response: + assert_equal(err.response['Error']['Code'], 'AccessDenied') + else: + assert_equal(err.response['Code'], 'AccessDenied') + except Exception as err: + print('unexpected error type: '+type(err).__name__) + + # 2nd user tries to fetch the topic + _, status = topic_conf2.get_config(topic_arn=topic_arn) + assert_equal(status, 403) + + try: + # 2nd user tries to set the attribute + status = topic_conf2.set_attributes(attribute_name="persistent", attribute_val="false", topic_arn=topic_arn) + assert False, "'AccessDenied' error is expected" + except ClientError as err: + if 'Error' in err.response: + assert_equal(err.response['Error']['Code'], 'AccessDenied') + else: + assert_equal(err.response['Code'], 'AccessDenied') + except Exception as err: + print('unexpected error type: '+type(err).__name__) + + # create bucket for conn2 publish notification to topic + # should be allowed based on the default value of rgw_topic_require_publish_policy=false + _ = conn2.create_bucket(bucket_name) + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, 'TopicArn': topic_arn, + 'Events': [] + }] + s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list) + _, status = s3_notification_conf2.set_config() + assert_equal(status, 200) + + try: + # 2nd user tries to delete the topic + status = topic_conf2.del_config(topic_arn=topic_arn) + assert False, "'AccessDenied' error is expected" + except ClientError as err: + if 'Error' in err.response: + assert_equal(err.response['Error']['Code'], 'AccessDenied') + else: + assert_equal(err.response['Code'], 'AccessDenied') + except Exception as err: + print('unexpected error type: '+type(err).__name__) # cleanup s3_notification_conf2.del_config() @@ -4476,6 +4578,7 @@ def test_ps_s3_topic_permissions(): # delete the bucket conn2.delete_bucket(bucket_name) + def kafka_security(security_type, mechanism='PLAIN'): """ test pushing kafka s3 notification securly to master """ conn = connection()