Skip to content

Commit

Permalink
Support Cross-database references in views (babelfish-for-postgresql#…
Browse files Browse the repository at this point in the history
…2899)

Support execution of views which references objects (tables/views/functions) from
across the databases. Here we are talking about Babelfish logical database (T-SQL database)
which is different from a physical Postgres database.
To support this, perform permission checks for cross database objects using session user (login)
instead of current_user (user of current T-SQL database). The reason login can be used for
permission check is since login is member of all it’s users, so it inherits all their permissions so it
will be able execute any cross database objects owned by its users.

This commit handles functions and tables/views separately for cross database permission
checks. For functions/procedures, a new hook `ExecFuncProc_AclCheck_hook` and for tables/views
existing `ExecutorStart_hook` will be used to decide whether to use session user or current_user
for permission check depending upon whether the object is from same or different database.
We will be using `is_schema_from_db` function to identify if the object is from different database
which performs a lookup into `babelfish_namespace_ext` catalog table which can be expensive as will
be doing it pretty frequently. So, added this table into SYSCACHE for better performance.
Tables/views permissions are handled slightly different than functions as we do not blindly want to check
the permissions against session user (current login) since permissions of RTEs inside a view are checked
against that view's owner which can very well be a user of some different database. So if we blindly check
permission against session user instead of view's owner then it would break view's ownership chaining.
Instead, we will replace `checkAsUser` with it's corresponding mapped login if present and only in cases
where `checkAsUser` is not set, we will replace it with session user (login). We are using login to allow cross
database queries since login can access all its objects across the databases. Getting mapped login to a
user require lookup into sys.babelfish_authid_user_ext catalog table using its primary key column (rolname) so
added this table is also into SYSCACHE.

Additionally, remove previous code to globally set current user to session user since newer logic
takes care of the permission check now.

Task: BABEL-5206
Signed-off-by: Rishabh Tanwar <[email protected]>

Engine PR: babelfish-for-postgresql/postgresql_modified_for_babelfish#434
  • Loading branch information
rishabhtanwar29 authored Sep 20, 2024
1 parent 62115bd commit 868472a
Show file tree
Hide file tree
Showing 29 changed files with 1,425 additions and 395 deletions.
229 changes: 105 additions & 124 deletions contrib/babelfishpg_tsql/src/catalog.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,28 @@ static struct cachedesc my_cacheinfo[] = {
0
},
16
},
{-1, /* SYSNAMESPACENAME */
-1,
1,
{
Anum_namespace_ext_namespace,
0,
0,
0
},
16
},
{-1, /* AUTHIDUSEREXTROLENAME */
-1,
1,
{
Anum_bbf_authid_user_ext_rolname,
0,
0,
0
},
16
}
};

Expand All @@ -208,24 +230,29 @@ init_catalog(PG_FUNCTION_ARGS)
bbf_function_ext_oid = get_relname_relid(BBF_FUNCTION_EXT_TABLE_NAME, sys_schema_oid);
bbf_function_ext_idx_oid = get_relname_relid(BBF_FUNCTION_EXT_IDX_NAME, sys_schema_oid);

/* user ext */
bbf_authid_user_ext_oid = get_relname_relid(BBF_AUTHID_USER_EXT_TABLE_NAME,
sys_schema_oid);
bbf_authid_user_ext_idx_oid = get_relname_relid(BBF_AUTHID_USER_EXT_IDX_NAME,
sys_schema_oid);

/* syscache info */
my_cacheinfo[0].reloid = sysdatabases_oid;
my_cacheinfo[0].indoid = sysdatabaese_idx_oid_oid;
my_cacheinfo[1].reloid = sysdatabases_oid;
my_cacheinfo[1].indoid = sysdatabaese_idx_name_oid;
my_cacheinfo[2].reloid = bbf_function_ext_oid;
my_cacheinfo[2].indoid = bbf_function_ext_idx_oid;
my_cacheinfo[3].reloid = namespace_ext_oid;
my_cacheinfo[3].indoid = namespace_ext_idx_oid_oid;
my_cacheinfo[4].reloid = bbf_authid_user_ext_oid;
my_cacheinfo[4].indoid = bbf_authid_user_ext_idx_oid;

/* login ext */
bbf_authid_login_ext_oid = get_relname_relid(BBF_AUTHID_LOGIN_EXT_TABLE_NAME,
sys_schema_oid);
bbf_authid_login_ext_idx_oid = get_relname_relid(BBF_AUTHID_LOGIN_EXT_IDX_NAME,
sys_schema_oid);
/* user ext */
bbf_authid_user_ext_oid = get_relname_relid(BBF_AUTHID_USER_EXT_TABLE_NAME,
sys_schema_oid);
bbf_authid_user_ext_idx_oid = get_relname_relid(BBF_AUTHID_USER_EXT_IDX_NAME,
sys_schema_oid);

/* bbf_view_def */
bbf_view_def_oid = get_relname_relid(BBF_VIEW_DEF_TABLE_NAME, sys_schema_oid);
Expand Down Expand Up @@ -285,7 +312,7 @@ initTsqlSyscache()
/* Initialize info for catcache */
if (!tsql_syscache_inited)
{
InitExtensionCatalogCache(my_cacheinfo, SYSDATABASEOID, 3);
InitExtensionCatalogCache(my_cacheinfo, SYSDATABASEOID, 5);
tsql_syscache_inited = true;
}
}
Expand Down Expand Up @@ -602,90 +629,54 @@ babelfish_helpdb(PG_FUNCTION_ARGS)
const char *
get_logical_schema_name(const char *physical_schema_name, bool missingOk)
{
Relation rel;
HeapTuple tuple;
ScanKeyData scanKey;
SysScanDesc scan;
Datum datum;
const char *logical_name;
TupleDesc dsc;
bool isnull;

if (!physical_schema_name || get_namespace_oid(physical_schema_name, missingOk) == InvalidOid)
return NULL;

rel = table_open(namespace_ext_oid, AccessShareLock);
dsc = RelationGetDescr(rel);

ScanKeyInit(&scanKey,
Anum_namespace_ext_namespace,
BTEqualStrategyNumber, F_NAMEEQ,
CStringGetDatum(physical_schema_name));

scan = systable_beginscan(rel, namespace_ext_idx_oid_oid, true,
NULL, 1, &scanKey);

tuple = systable_getnext(scan);
tuple = SearchSysCache1(SYSNAMESPACENAME, CStringGetDatum(physical_schema_name));
if (!HeapTupleIsValid(tuple))
{
systable_endscan(scan);
table_close(rel, AccessShareLock);
if (!missingOk)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Could find logical schema name for: \"%s\"", physical_schema_name)));
return NULL;
}
datum = heap_getattr(tuple, Anum_namespace_ext_orig_name, dsc, &isnull);
datum = SysCacheGetAttr(SYSNAMESPACENAME, tuple, Anum_namespace_ext_orig_name, &isnull);
logical_name = pstrdup(TextDatumGetCString(datum));
ReleaseSysCache(tuple);

systable_endscan(scan);
table_close(rel, AccessShareLock);
return logical_name;
}

int16
get_dbid_from_physical_schema_name(const char *physical_schema_name, bool missingOk)
{
Relation rel;
HeapTuple tuple;
ScanKeyData scanKey;
SysScanDesc scan;
Datum datum;
int16 dbid;
TupleDesc dsc;
bool isnull;

if (get_namespace_oid(physical_schema_name, false) == InvalidOid)
return InvalidDbid;

rel = table_open(namespace_ext_oid, AccessShareLock);
dsc = RelationGetDescr(rel);

ScanKeyInit(&scanKey,
Anum_namespace_ext_namespace,
BTEqualStrategyNumber, F_NAMEEQ,
CStringGetDatum(physical_schema_name));

scan = systable_beginscan(rel, namespace_ext_idx_oid_oid, true,
NULL, 1, &scanKey);

tuple = systable_getnext(scan);
tuple = SearchSysCache1(SYSNAMESPACENAME, CStringGetDatum(physical_schema_name));
if (!HeapTupleIsValid(tuple))
{
systable_endscan(scan);
table_close(rel, AccessShareLock);
if (!missingOk)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Could not find db id for: \"%s\"", physical_schema_name)));
return InvalidDbid;
}
datum = heap_getattr(tuple, Anum_namespace_ext_dbid, dsc, &isnull);
datum = SysCacheGetAttr(SYSNAMESPACENAME, tuple, Anum_namespace_ext_dbid, &isnull);
dbid = DatumGetInt16(datum);
ReleaseSysCache(tuple);

systable_endscan(scan);
table_close(rel, AccessShareLock);
return dbid;
}

Expand Down Expand Up @@ -860,59 +851,35 @@ get_authid_login_ext_idx_oid(void)
bool
is_user(Oid role_oid)
{
Relation relation;
bool is_user = true;
ScanKeyData scanKey;
SysScanDesc scan;
HeapTuple tuple;
HeapTuple authtuple;
NameData rolname;
char *type_str = "";

authtuple = SearchSysCache1(AUTHOID, ObjectIdGetDatum(role_oid));
if (!HeapTupleIsValid(authtuple))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("role with OID %u does not exist", role_oid)));
rolname = ((Form_pg_authid) GETSTRUCT(authtuple))->rolname;

relation = table_open(get_authid_user_ext_oid(), AccessShareLock);

ScanKeyInit(&scanKey,
Anum_bbf_authid_user_ext_rolname,
BTEqualStrategyNumber, F_NAMEEQ,
NameGetDatum(&rolname));

scan = systable_beginscan(relation,
get_authid_user_ext_idx_oid(),
true, NULL, 1, &scanKey);

tuple = systable_getnext(scan);
tuple = SearchSysCache1(AUTHIDUSEREXTROLENAME, NameGetDatum(&rolname));

if (!HeapTupleIsValid(tuple))
is_user = false;
else
{
Datum datum;
bool isnull;
TupleDesc dsc;
BpChar type = ((Form_authid_user_ext) GETSTRUCT(tuple))->type;
char *type_str = bpchar_to_cstring(&type);

dsc = RelationGetDescr(relation);
datum = heap_getattr(tuple, USER_EXT_TYPE + 1, dsc, &isnull);
if (!isnull)
type_str = pstrdup(TextDatumGetCString(datum));
/*
* Only sysadmin can not be dropped. For the rest of the cases i.e., type
* is "S" or "U" etc, we should drop the user
*/
if (strcmp(type_str, "R") == 0)
is_user = false;
ReleaseSysCache(tuple);
}

/*
* Only sysadmin can not be dropped. For the rest of the cases i.e., type
* is "S" or "U" etc, we should drop the user
*/
if (strcmp(type_str, "R") == 0)
is_user = false;

systable_endscan(scan);
table_close(relation, AccessShareLock);

ReleaseSysCache(authtuple);

return is_user;
Expand All @@ -921,50 +888,31 @@ is_user(Oid role_oid)
bool
is_role(Oid role_oid)
{
Relation relation;
bool is_role = true;
ScanKeyData scanKey;
SysScanDesc scan;
HeapTuple tuple;
HeapTuple authtuple;
NameData rolname;
BpChar type;
char *type_str = "";

authtuple = SearchSysCache1(AUTHOID, ObjectIdGetDatum(role_oid));
if (!HeapTupleIsValid(authtuple))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("role with OID %u does not exist", role_oid)));
rolname = ((Form_pg_authid) GETSTRUCT(authtuple))->rolname;

relation = table_open(get_authid_user_ext_oid(), AccessShareLock);

ScanKeyInit(&scanKey,
Anum_bbf_authid_user_ext_rolname,
BTEqualStrategyNumber, F_NAMEEQ,
NameGetDatum(&rolname));

scan = systable_beginscan(relation,
get_authid_user_ext_idx_oid(),
true, NULL, 1, &scanKey);

tuple = systable_getnext(scan);
tuple = SearchSysCache1(AUTHIDUSEREXTROLENAME, NameGetDatum(&rolname));

if (!HeapTupleIsValid(tuple))
is_role = false;
else
{
type = ((Form_authid_user_ext) GETSTRUCT(tuple))->type;
type_str = bpchar_to_cstring(&type);
BpChar type = ((Form_authid_user_ext) GETSTRUCT(tuple))->type;
char *type_str = bpchar_to_cstring(&type);

if (strcmp(type_str, "R") != 0)
is_role = false;
ReleaseSysCache(tuple);
}

systable_endscan(scan);
table_close(relation, AccessShareLock);

ReleaseSysCache(authtuple);

return is_role;
Expand Down Expand Up @@ -3095,34 +3043,67 @@ guest_role_exists_for_db(const char *dbname)
{
const char *guest_role = get_guest_role_name(dbname);
bool role_exists = false;
Relation bbf_authid_user_ext_rel;
HeapTuple tuple;
ScanKeyData scanKey;
SysScanDesc scan;

/* Fetch the relation */
bbf_authid_user_ext_rel = table_open(get_authid_user_ext_oid(),
RowExclusiveLock);
tuple = SearchSysCache1(AUTHIDUSEREXTROLENAME, CStringGetDatum(guest_role));

/* Search if the role exists */
ScanKeyInit(&scanKey,
Anum_bbf_authid_user_ext_rolname,
BTEqualStrategyNumber, F_NAMEEQ,
CStringGetDatum(guest_role));
if (HeapTupleIsValid(tuple))
{
role_exists = true;
ReleaseSysCache(tuple);
}

scan = systable_beginscan(bbf_authid_user_ext_rel,
get_authid_user_ext_idx_oid(),
true, NULL, 1, &scanKey);
return role_exists;
}

tuple = systable_getnext(scan);
/*
* get_login_for_user
* Get mapped login for given user_id.
* Usually login can be retrived from login_name column of bbf_authid_login_ext
* catalog although sometimes the column can be empty such as when user_id belongs
* to dbo or guest user. In case the user_id is of dbo role then we get owner of
* the respective database which can be deduced from physical_schema_name. For all
* other cases, return InvalidOid since mapped login does not exist for the rest.
*/
Oid
get_login_for_user(Oid user_id, const char *physical_schema_name)
{
HeapTuple tuple;
Oid loginId = InvalidOid;
char *physical_user_name = GetUserNameFromId(user_id, true);

if (!physical_user_name || !physical_schema_name)
return InvalidOid;

/* Search if the role exists */
tuple = SearchSysCache1(AUTHIDUSEREXTROLENAME, CStringGetDatum(physical_user_name));

if (HeapTupleIsValid(tuple))
role_exists = true;
{
Datum datum;
bool isnull;

systable_endscan(scan);
table_close(bbf_authid_user_ext_rel, RowExclusiveLock);
datum = SysCacheGetAttr(AUTHIDUSEREXTROLENAME, tuple, Anum_bbf_authid_user_ext_login_name, &isnull);
Assert(!isnull);
loginId = get_role_oid((DatumGetName(datum)->data), true);

return role_exists;
if (!OidIsValid(loginId))
{
char *orig_username = TextDatumGetCString(SysCacheGetAttr(AUTHIDUSEREXTROLENAME,
tuple, Anum_bbf_authid_user_ext_orig_username, &isnull));

Assert(!isnull);
/* Get owner of the db if the user is dbo */
if (strlen(orig_username) == 3 && pg_strcasecmp(orig_username, "dbo") == 0)
{
int16 dbid = get_dbid_from_physical_schema_name(physical_schema_name, false);
loginId = get_role_oid(get_owner_of_db(get_db_name(dbid)), false);
}
}
ReleaseSysCache(tuple);
}

return loginId;
}

static void
Expand Down
1 change: 1 addition & 0 deletions contrib/babelfishpg_tsql/src/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ extern void update_sysdatabases_db_name(const char *old_db_name, const char *new
extern List *update_babelfish_namespace_ext_nsp_name(int16 db_id, char *new_db_name);
extern List *update_babelfish_authid_user_ext_db_name(const char *old_db_name, const char *new_db_name);
extern void rename_tsql_db(char *old_db_name, char *new_db_name);
extern Oid get_login_for_user(Oid user_id, const char *physical_schema_name);

/* MUST comply with babelfish_authid_user_ext table */
typedef struct FormData_authid_user_ext
Expand Down
Loading

0 comments on commit 868472a

Please sign in to comment.