Skip to content

Commit

Permalink
Revert "Revert "Fix concurrency issues with ENR temp tables. (babelfi…
Browse files Browse the repository at this point in the history
  • Loading branch information
Sairakan and Jason Teng authored Sep 5, 2024
1 parent 8bd47d7 commit 670b113
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 10 deletions.
2 changes: 1 addition & 1 deletion contrib/babelfishpg_tsql/src/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -1163,7 +1163,7 @@ define_custom_variables(void)
gettext_noop("Temp oid buffer size"),
NULL,
&temp_oid_buffer_size,
0, 0, 131072,
65536, 0, 131072,
PGC_SUSET,
GUC_NOT_IN_SAMPLE | GUC_DISALLOW_IN_FILE | GUC_DISALLOW_IN_AUTO_FILE,
NULL, NULL, NULL);
Expand Down
24 changes: 24 additions & 0 deletions contrib/babelfishpg_tsql/src/hooks.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
#include "parser/scansup.h"
#include "replication/logical.h"
#include "rewrite/rewriteHandler.h"
#include "storage/lock.h"
#include "storage/sinvaladt.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
Expand Down Expand Up @@ -163,6 +165,8 @@ static void pltsql_GetNewObjectId(VariableCache variableCache);
static Oid pltsql_GetNewTempObjectId(void);
static Oid pltsql_GetNewTempOidWithIndex(Relation relation, Oid indexId, AttrNumber oidcolumn);
static bool set_and_persist_temp_oid_buffer_start(Oid new_oid);
static bool pltsql_is_local_only_inval_msg(const SharedInvalidationMessage *msg);
static EphemeralNamedRelation pltsql_get_tsql_enr_from_oid(Oid oid);
static void pltsql_validate_var_datatype_scale(const TypeName *typeName, Type typ);
static bool pltsql_bbfCustomProcessUtility(ParseState *pstate,
PlannedStmt *pstmt,
Expand Down Expand Up @@ -240,6 +244,8 @@ static ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
static GetNewObjectId_hook_type prev_GetNewObjectId_hook = NULL;
static GetNewTempObjectId_hook_type prev_GetNewTempObjectId_hook = NULL;
static GetNewTempOidWithIndex_hook_type prev_GetNewTempOidWithIndex_hook = NULL;
static pltsql_is_local_only_inval_msg_hook_type prev_pltsql_is_local_only_inval_msg_hook = NULL;
static pltsql_get_tsql_enr_from_oid_hook_type prev_pltsql_get_tsql_enr_from_oid_hook = NULL;
static inherit_view_constraints_from_table_hook_type prev_inherit_view_constraints_from_table = NULL;
static bbfViewHasInsteadofTrigger_hook_type prev_bbfViewHasInsteadofTrigger_hook = NULL;
static detect_numeric_overflow_hook_type prev_detect_numeric_overflow_hook = NULL;
Expand Down Expand Up @@ -367,6 +373,12 @@ InstallExtendedHooks(void)
prev_GetNewTempOidWithIndex_hook = GetNewTempOidWithIndex_hook;
GetNewTempOidWithIndex_hook = pltsql_GetNewTempOidWithIndex;

prev_pltsql_is_local_only_inval_msg_hook = pltsql_is_local_only_inval_msg_hook;
pltsql_is_local_only_inval_msg_hook = pltsql_is_local_only_inval_msg;

prev_pltsql_get_tsql_enr_from_oid_hook = pltsql_get_tsql_enr_from_oid_hook;
pltsql_get_tsql_enr_from_oid_hook = pltsql_get_tsql_enr_from_oid;

prev_inherit_view_constraints_from_table = inherit_view_constraints_from_table_hook;
inherit_view_constraints_from_table_hook = preserve_view_constraints_from_base_table;
TriggerRecuresiveCheck_hook = plsql_TriggerRecursiveCheck;
Expand Down Expand Up @@ -4685,6 +4697,18 @@ static bool set_and_persist_temp_oid_buffer_start(Oid new_oid)
return true;
}

static bool
pltsql_is_local_only_inval_msg(const SharedInvalidationMessage *msg)
{
return temp_oid_buffer_size > 0 && (msg->id == SHAREDINVALRELCACHE_ID && msg->rc.local_only);
}

static EphemeralNamedRelation
pltsql_get_tsql_enr_from_oid(const Oid oid)
{
return temp_oid_buffer_size > 0 ? get_ENR_withoid(currentQueryEnv, oid, ENR_TSQL_TEMP) : NULL;
}

/*
* Modify the Tuple Descriptor to match the expected
* result set. Currently used only for T-SQL OPENQUERY.
Expand Down
2 changes: 1 addition & 1 deletion test/JDBC/expected/temp_oid.out
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ show babelfishpg_tsql.temp_oid_buffer_size
GO
~~START~~
text
0
65536
~~END~~


Expand Down
1 change: 0 additions & 1 deletion test/JDBC/expected/temp_table_jdbc.out
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
Cannot create trigger on a temporary object.
123 changes: 116 additions & 7 deletions test/JDBC/src/main/java/com/sqlsamples/JDBCTempTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ public static void runTest(BufferedWriter bw, Logger logger) {
long startTime = System.nanoTime();

try {
// TODO: re-enable the temp table OID tests when the full fix is ready
// check_oids_equal(bw);
// test_oid_buffer(bw, logger);
// concurrency_test(bw);
// psql_test(bw, logger);
if (check_temp_oid_buffer_enabled(bw, logger)) {
check_oids_equal(bw);
test_oid_buffer(bw, logger);
psql_test(bw, logger);
test_lock_contention(bw, logger);
}
concurrency_test(bw);
test_trigger_on_temp_table(bw, logger);
} catch (Exception e) {
try {
Expand All @@ -46,6 +48,21 @@ public static void runTest(BufferedWriter bw, Logger logger) {
curr_exec_time = endTime - startTime;
}

private static boolean check_temp_oid_buffer_enabled(BufferedWriter bw, Logger logger) throws Exception {
String temp_oid_buffer_size = "babelfishpg_tsql.temp_oid_buffer_size";
Connection c = DriverManager.getConnection(connectionString);
JDBCCrossDialect cx = new JDBCCrossDialect(c);
Connection psql = cx.getPsqlConnection("-- psql", bw, logger);
Statement check_guc = psql.createStatement();
ResultSet rs = check_guc.executeQuery("SHOW " + temp_oid_buffer_size);
if (!rs.next()) {
bw.write("Table is missing.");
bw.newLine();
}
int buffer_size = Integer.parseInt(rs.getString(temp_oid_buffer_size));
return buffer_size != 0;
}

/*
* Helper function that creates the specified number of connections, creates a temp table on each connection, and returns whether all the OIDs are equal or not.
*/
Expand Down Expand Up @@ -320,8 +337,14 @@ private static void test_trigger_on_temp_table(BufferedWriter bw, Logger logger)
queryString = "CREATE TABLE #t1(a int)";
s.execute(queryString);
queryString = "CREATE TRIGGER bar ON #t1 FOR INSERT AS BEGIN SELECT 1 END";
s.execute(queryString);

try {
s.execute(queryString);
} catch (Exception e) {
if (!e.getMessage().equals("Cannot create trigger on a temporary object.")) {
bw.write(e.getMessage());
bw.newLine();
}
}
Connection c2 = connections.get(1);
s = c2.createStatement();
queryString = "DROP TRIGGER bar";
Expand All @@ -334,6 +357,65 @@ private static void test_trigger_on_temp_table(BufferedWriter bw, Logger logger)
}
}
}

private static void test_lock_contention(BufferedWriter bw, Logger logger) throws Exception {
String connectionString = initializeConnectionString();

/* Sanity check of pg_locks catalog */
Connection c = DriverManager.getConnection(connectionString);
Statement s = c.createStatement();
ResultSet rs;
s.execute("BEGIN TRAN");
s.execute("CREATE TABLE #t1 (a INT)");
rs = s.executeQuery("SELECT relation FROM pg_locks JOIN sys.babelfish_get_enr_list() ON (relation = reloid) where relname = '#t1'");
if (rs.next()) {
bw.write("Unexpected lock acquisition on a TSQL temp table.\n");
}
s.execute("ALTER TABLE #t1 ADD b AS a + 1");
rs = s.executeQuery("SELECT relation FROM pg_locks JOIN sys.babelfish_get_enr_list() ON (relation = reloid) where relname = '#t1'");
if (rs.next()) {
bw.write("Unexpected lock acquisition on a TSQL temp table.\n");
}
s.execute("ALTER TABLE #t1 DROP COLUMN b");
rs = s.executeQuery("SELECT relation FROM pg_locks JOIN sys.babelfish_get_enr_list() ON (relation = reloid) where relname = '#t1'");
if (rs.next()) {
bw.write("Unexpected lock acquisition on a TSQL temp table.\n");
}
s.execute("DROP TABLE #t1");
rs = s.executeQuery("SELECT relation FROM pg_locks JOIN sys.babelfish_get_enr_list() ON (relation = reloid) where relname = '#t1'");
if (rs.next()) {
bw.write("Unexpected lock acquisition on a TSQL temp table.\n");
}
c.close();

int num_connections = 2;

ArrayList<Connection> cxns = new ArrayList<>();

ArrayList<Thread> threads = new ArrayList<>();

/* Create connections */
for (int i = 0; i < num_connections; i++) {
Connection connection = DriverManager.getConnection(connectionString);
cxns.add(connection);
Thread t = new Thread(new LockContentionWorker(connection, bw));
threads.add(t);
t.start();
}

/*
* Unfortunately, setQueryTimeout (used in the worker thread) does not always work correctly.
* So we need to manaully try to detect a hanging thread.
*/
Thread.sleep(1000);
for (Thread t : threads)
{
if (t.isAlive()) {
bw.write("Lock contention detected.\n");
return;
}
}
}
}

class Worker implements Runnable {
Expand Down Expand Up @@ -381,4 +463,31 @@ public void run() {
e.printStackTrace();
}
}
}

class LockContentionWorker implements Runnable {

private Connection c;
private BufferedWriter bw;

LockContentionWorker(Connection c, BufferedWriter bw) {
this.c = c;
this.bw = bw;
}

public void run() {
try {
try {
Statement s = c.createStatement();
s.setQueryTimeout(1);
s.execute("BEGIN TRAN;");
s.execute("CREATE TABLE #temp_table1 (a int primary key not null identity, b as a + 1, c text);");
} catch (Exception e) {
bw.write(e.getMessage());
bw.newLine();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
Binary file added test/JDBC/target/JDBC-testsuite-1.0.0.jar
Binary file not shown.

0 comments on commit 670b113

Please sign in to comment.