From 1f5d3274c12a6f48ea48b3829047f56cc351d861 Mon Sep 17 00:00:00 2001 From: Ankit Tiwari Date: Wed, 20 Mar 2024 12:59:19 +0530 Subject: [PATCH 01/11] feat: Add BulkImport APIs and cron --- .../postgresql/BulkImportProxyStorage.java | 181 ++++++++++++ .../storage/postgresql/ConnectionPool.java | 21 +- .../postgresql/QueryExecutorTemplate.java | 30 +- .../supertokens/storage/postgresql/Start.java | 82 +++++- .../postgresql/config/PostgreSQLConfig.java | 4 + .../postgresql/queries/BulkImportQueries.java | 271 ++++++++++++++++++ .../queries/EmailPasswordQueries.java | 2 +- .../postgresql/queries/GeneralQueries.java | 20 +- .../queries/PasswordlessQueries.java | 2 +- .../postgresql/queries/ThirdPartyQueries.java | 9 +- 10 files changed, 608 insertions(+), 14 deletions(-) create mode 100644 src/main/java/io/supertokens/storage/postgresql/BulkImportProxyStorage.java create mode 100644 src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java diff --git a/src/main/java/io/supertokens/storage/postgresql/BulkImportProxyStorage.java b/src/main/java/io/supertokens/storage/postgresql/BulkImportProxyStorage.java new file mode 100644 index 00000000..e12f341c --- /dev/null +++ b/src/main/java/io/supertokens/storage/postgresql/BulkImportProxyStorage.java @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved. + * + * This software is licensed under the Apache License, Version 2.0 (the + * "License") as published by the Apache Software Foundation. + * + * You may not use this file except in compliance with the License. You may + * obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.supertokens.storage.postgresql; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.SQLTransactionRollbackException; +import java.util.Set; + +import org.postgresql.util.PSQLException; + +import com.google.gson.JsonObject; + +import io.supertokens.pluginInterface.LOG_LEVEL; +import io.supertokens.pluginInterface.exceptions.InvalidConfigException; +import io.supertokens.pluginInterface.exceptions.StorageQueryException; +import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException; +import io.supertokens.pluginInterface.multitenancy.TenantIdentifier; +import io.supertokens.pluginInterface.sqlStorage.TransactionConnection; +import io.supertokens.storage.postgresql.config.Config; + + +/** +* BulkImportProxyStorage is a class extending Start, serving as a Storage instance in the bulk import user cronjob. +* This cronjob extensively utilizes existing queries to import users, all of which internally operate within transactions. +* +* For the purpose of bulkimport cronjob, we aim to employ a single connection for all queries and rollback any operations in case of query failures. +* To achieve this, we override the startTransaction method to utilize the same connection and prevent automatic query commits even upon transaction success. +* Subsequently, the cronjob is responsible for committing the transaction after ensuring the successful execution of all queries. +*/ + +public class BulkImportProxyStorage extends Start { + private Connection transactionConnection; + + public Connection getTransactionConnection() throws SQLException { + if (this.transactionConnection == null || this.transactionConnection.isClosed()) { + this.transactionConnection = ConnectionPool.getConnectionForProxyStorage(this); + } + return this.transactionConnection; + } + + @Override + public T startTransaction(TransactionLogic logic) + throws StorageTransactionLogicException, StorageQueryException { + return startTransaction(logic, TransactionIsolationLevel.SERIALIZABLE); + } + + @Override + public T startTransaction(TransactionLogic logic, TransactionIsolationLevel isolationLevel) + throws StorageTransactionLogicException, StorageQueryException { + final int NUM_TRIES = 50; + int tries = 0; + while (true) { + tries++; + try { + return startTransactionHelper(logic, isolationLevel); + } catch (SQLException | StorageQueryException | StorageTransactionLogicException e) { + Throwable actualException = e; + if (e instanceof StorageQueryException) { + actualException = e.getCause(); + } else if (e instanceof StorageTransactionLogicException) { + actualException = ((StorageTransactionLogicException) e).actualException; + } + String exceptionMessage = actualException.getMessage(); + if (exceptionMessage == null) { + exceptionMessage = ""; + } + + // see: https://github.com/supertokens/supertokens-postgresql-plugin/pull/3 + + // We set this variable to the current (or cause) exception casted to + // PSQLException if we can safely cast it + PSQLException psqlException = actualException instanceof PSQLException ? (PSQLException) actualException + : null; + + // PSQL error class 40 is transaction rollback. See: + // https://www.postgresql.org/docs/12/errcodes-appendix.html + boolean isPSQLRollbackException = psqlException != null + && psqlException.getServerErrorMessage().getSQLState().startsWith("40"); + + // We keep the old exception detection logic to ensure backwards compatibility. + // We could get here if the new logic hits a false negative, + // e.g., in case someone renamed constraints/tables + boolean isDeadlockException = actualException instanceof SQLTransactionRollbackException + || exceptionMessage.toLowerCase().contains("concurrent update") + || exceptionMessage.toLowerCase().contains("concurrent delete") + || exceptionMessage.toLowerCase().contains("the transaction might succeed if retried") || + + // we have deadlock as well due to the DeadlockTest.java + exceptionMessage.toLowerCase().contains("deadlock"); + + if ((isPSQLRollbackException || isDeadlockException) && tries < NUM_TRIES) { + try { + Thread.sleep((long) (10 + (250 + Math.min(Math.pow(2, tries), 3000)) * Math.random())); + } catch (InterruptedException ignored) { + } + ProcessState.getInstance(this).addState(ProcessState.PROCESS_STATE.DEADLOCK_FOUND, e); + // this because deadlocks are not necessarily a result of faulty logic. They can + // happen + continue; + } + + if ((isPSQLRollbackException || isDeadlockException) && tries == NUM_TRIES) { + ProcessState.getInstance(this).addState(ProcessState.PROCESS_STATE.DEADLOCK_NOT_RESOLVED, e); + } + if (e instanceof StorageQueryException) { + throw (StorageQueryException) e; + } else if (e instanceof StorageTransactionLogicException) { + throw (StorageTransactionLogicException) e; + } + throw new StorageQueryException(e); + } + } + } + + private T startTransactionHelper(TransactionLogic logic, TransactionIsolationLevel isolationLevel) + throws StorageQueryException, StorageTransactionLogicException, SQLException { + Connection con = null; + try { + con = (Connection) getTransactionConnection(); + int libIsolationLevel = Connection.TRANSACTION_SERIALIZABLE; + switch (isolationLevel) { + case SERIALIZABLE: + libIsolationLevel = Connection.TRANSACTION_SERIALIZABLE; + break; + case REPEATABLE_READ: + libIsolationLevel = Connection.TRANSACTION_REPEATABLE_READ; + break; + case READ_COMMITTED: + libIsolationLevel = Connection.TRANSACTION_READ_COMMITTED; + break; + case READ_UNCOMMITTED: + libIsolationLevel = Connection.TRANSACTION_READ_UNCOMMITTED; + break; + case NONE: + libIsolationLevel = Connection.TRANSACTION_NONE; + break; + } + + if (libIsolationLevel != Connection.TRANSACTION_SERIALIZABLE) { + con.setTransactionIsolation(libIsolationLevel); + } + con.setAutoCommit(false); + return logic.mainLogicAndCommit(new TransactionConnection(con)); + } catch (Exception e) { + if (con != null) { + con.rollback(); + } + throw e; + } + } + + @Override + public void commitTransaction(TransactionConnection con) throws StorageQueryException { + // We do not want to commit the queries when using the BulkImportProxyStorage to be able to rollback everything + // if any query fails while importing the user + } + + @Override + public void loadConfig(JsonObject configJson, Set logLevels, TenantIdentifier tenantIdentifier) + throws InvalidConfigException { + // We are overriding the loadConfig method to set the connection pool size + // to 1 to avoid creating many connections for the bulk import cronjob + configJson.addProperty("postgresql_connection_pool_size", 1); + Config.loadConfig(this, configJson, logLevels, tenantIdentifier); + } +} diff --git a/src/main/java/io/supertokens/storage/postgresql/ConnectionPool.java b/src/main/java/io/supertokens/storage/postgresql/ConnectionPool.java index d0b8c6f0..0008b4c3 100644 --- a/src/main/java/io/supertokens/storage/postgresql/ConnectionPool.java +++ b/src/main/java/io/supertokens/storage/postgresql/ConnectionPool.java @@ -189,7 +189,7 @@ static void initPool(Start start, boolean shouldWait) throws DbInitException { } } - public static Connection getConnection(Start start) throws SQLException { + private static Connection getNewConnection(Start start) throws SQLException { if (getInstance(start) == null) { throw new IllegalStateException("Please call initPool before getConnection"); } @@ -202,6 +202,17 @@ public static Connection getConnection(Start start) throws SQLException { return getInstance(start).hikariDataSource.getConnection(); } + public static Connection getConnectionForProxyStorage(Start start) throws SQLException { + return getNewConnection(start); + } + + public static Connection getConnection(Start start) throws SQLException { + if (start instanceof BulkImportProxyStorage) { + return ((BulkImportProxyStorage) start).getTransactionConnection(); + } + return getNewConnection(start); + } + static void close(Start start) { if (getInstance(start) == null) { return; @@ -216,4 +227,12 @@ static void close(Start start) { } } } + + public static void closeConnection(Start start, Connection con) throws SQLException { + if (start instanceof BulkImportProxyStorage) { + // Keep the connection open for future queries + } else { + con.close(); + } + } } diff --git a/src/main/java/io/supertokens/storage/postgresql/QueryExecutorTemplate.java b/src/main/java/io/supertokens/storage/postgresql/QueryExecutorTemplate.java index db0c9785..b20163a9 100644 --- a/src/main/java/io/supertokens/storage/postgresql/QueryExecutorTemplate.java +++ b/src/main/java/io/supertokens/storage/postgresql/QueryExecutorTemplate.java @@ -27,8 +27,12 @@ public interface QueryExecutorTemplate { static T execute(Start start, String QUERY, PreparedStatementValueSetter setter, ResultSetValueExtractor mapper) throws SQLException, StorageQueryException { - try (Connection con = ConnectionPool.getConnection(start)) { + + Connection con = ConnectionPool.getConnection(start); + try { return execute(con, QUERY, setter, mapper); + } finally { + ConnectionPool.closeConnection(start, con); } } @@ -44,15 +48,31 @@ static T execute(Connection con, String QUERY, PreparedStatementValueSetter } } - static int update(Start start, String QUERY, PreparedStatementValueSetter setter) - throws SQLException, StorageQueryException { - try (Connection con = ConnectionPool.getConnection(start)) { + static int update(Start start, String QUERY, PreparedStatementValueSetter setter) throws SQLException { + Connection con = ConnectionPool.getConnection(start); + try { return update(con, QUERY, setter); + } finally { + ConnectionPool.closeConnection(start, con); } } - static int update(Connection con, String QUERY, PreparedStatementValueSetter setter) + static T update(Start start, String QUERY, PreparedStatementValueSetter setter, ResultSetValueExtractor mapper) throws SQLException, StorageQueryException { + Connection con = ConnectionPool.getConnection(start); + try { + try (PreparedStatement pst = con.prepareStatement(QUERY)) { + setter.setValues(pst); + try (ResultSet result = pst.executeQuery()) { + return mapper.extract(result); + } + } + } finally { + ConnectionPool.closeConnection(start, con); + } + } + + static int update(Connection con, String QUERY, PreparedStatementValueSetter setter) throws SQLException { try (PreparedStatement pst = con.prepareStatement(QUERY)) { setter.setValues(pst); return pst.executeUpdate(); diff --git a/src/main/java/io/supertokens/storage/postgresql/Start.java b/src/main/java/io/supertokens/storage/postgresql/Start.java index 4e2ee20e..024e29c6 100644 --- a/src/main/java/io/supertokens/storage/postgresql/Start.java +++ b/src/main/java/io/supertokens/storage/postgresql/Start.java @@ -25,6 +25,9 @@ import io.supertokens.pluginInterface.authRecipe.AuthRecipeUserInfo; import io.supertokens.pluginInterface.authRecipe.LoginMethod; import io.supertokens.pluginInterface.authRecipe.sqlStorage.AuthRecipeSQLStorage; +import io.supertokens.pluginInterface.bulkimport.BulkImportStorage.BULK_IMPORT_USER_STATUS; +import io.supertokens.pluginInterface.bulkimport.sqlStorage.BulkImportSQLStorage; +import io.supertokens.pluginInterface.bulkimport.BulkImportUser; import io.supertokens.pluginInterface.dashboard.DashboardSearchTags; import io.supertokens.pluginInterface.dashboard.DashboardSessionInfo; import io.supertokens.pluginInterface.dashboard.DashboardUser; @@ -109,7 +112,7 @@ public class Start implements SessionSQLStorage, EmailPasswordSQLStorage, EmailVerificationSQLStorage, ThirdPartySQLStorage, JWTRecipeSQLStorage, PasswordlessSQLStorage, UserMetadataSQLStorage, UserRolesSQLStorage, UserIdMappingStorage, UserIdMappingSQLStorage, MultitenancyStorage, MultitenancySQLStorage, DashboardSQLStorage, TOTPSQLStorage, - ActiveUsersStorage, ActiveUsersSQLStorage, AuthRecipeSQLStorage { + ActiveUsersStorage, ActiveUsersSQLStorage, AuthRecipeSQLStorage, BulkImportSQLStorage { // these configs are protected from being modified / viewed by the dev using the SuperTokens // SaaS. If the core is not running in SuperTokens SaaS, this array has no effect. @@ -152,6 +155,11 @@ public STORAGE_TYPE getType() { return STORAGE_TYPE.SQL; } + @Override + public Storage createBulkImportProxyStorageInstance() { + return new BulkImportProxyStorage(); + } + @Override public void loadConfig(JsonObject configJson, Set logLevels, TenantIdentifier tenantIdentifier) throws InvalidConfigException { @@ -2359,7 +2367,7 @@ public boolean addUserIdToTenant_Transaction(TenantIdentifier tenantIdentifier, throw new IllegalStateException("Should never come here!"); } - sqlCon.commit(); + this.commitTransaction(con); return added; } catch (SQLException throwables) { PostgreSQLConfig config = Config.getConfig(this); @@ -3046,4 +3054,74 @@ public int getDbActivityCount(String dbname) throws SQLException, StorageQueryEx return -1; }); } + + @Override + public void addBulkImportUsers(AppIdentifier appIdentifier, List users) + throws StorageQueryException, + TenantOrAppNotFoundException, + io.supertokens.pluginInterface.bulkimport.exceptions.DuplicateUserIdException { + try { + BulkImportQueries.insertBulkImportUsers(this, appIdentifier, users); + } catch (SQLException e) { + if (e instanceof PSQLException) { + ServerErrorMessage serverErrorMessage = ((PSQLException) e).getServerErrorMessage(); + if (isPrimaryKeyError(serverErrorMessage, Config.getConfig(this).getBulkImportUsersTable())) { + throw new io.supertokens.pluginInterface.bulkimport.exceptions.DuplicateUserIdException(); + } + if (isForeignKeyConstraintError(serverErrorMessage, Config.getConfig(this).getBulkImportUsersTable(), "app_id")) { + throw new TenantOrAppNotFoundException(appIdentifier); + } + } + throw new StorageQueryException(e); + } + } + + @Override + public List getBulkImportUsers(AppIdentifier appIdentifier, @Nonnull Integer limit, @Nullable BULK_IMPORT_USER_STATUS status, + @Nullable String bulkImportUserId, @Nullable Long createdAt) throws StorageQueryException { + try { + return BulkImportQueries.getBulkImportUsers(this, appIdentifier, limit, status, bulkImportUserId, createdAt); + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } + + @Override + public void updateBulkImportUserStatus_Transaction(AppIdentifier appIdentifier, TransactionConnection con, @Nonnull String[] bulkImportUserIds, @Nonnull BULK_IMPORT_USER_STATUS status, @Nullable String errorMessage) + throws StorageQueryException { + Connection sqlCon = (Connection) con.getConnection(); + try { + BulkImportQueries.updateBulkImportUserStatus_Transaction(this, sqlCon, appIdentifier, bulkImportUserIds, status, errorMessage); + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } + + @Override + public List deleteBulkImportUsers(AppIdentifier appIdentifier, @Nonnull String[] bulkImportUserIds) throws StorageQueryException { + try { + return BulkImportQueries.deleteBulkImportUsers(this, appIdentifier, bulkImportUserIds); + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } + + @Override + public List getBulkImportUsersForProcessing(AppIdentifier appIdentifier, @Nonnull Integer limit) throws StorageQueryException { + try { + return BulkImportQueries.getBulkImportUsersForProcessing(this, appIdentifier, limit); + } catch (StorageTransactionLogicException e) { + throw new StorageQueryException(e.actualException); + } + } + + @Override + public void deleteBulkImportUser_Transaction(AppIdentifier appIdentifier, TransactionConnection con, @Nonnull String bulkImportUserId) throws StorageQueryException { + Connection sqlCon = (Connection) con.getConnection(); + try { + BulkImportQueries.deleteBulkImportUser_Transaction(this, sqlCon, appIdentifier, bulkImportUserId); + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } } diff --git a/src/main/java/io/supertokens/storage/postgresql/config/PostgreSQLConfig.java b/src/main/java/io/supertokens/storage/postgresql/config/PostgreSQLConfig.java index e0a0c682..bd206366 100644 --- a/src/main/java/io/supertokens/storage/postgresql/config/PostgreSQLConfig.java +++ b/src/main/java/io/supertokens/storage/postgresql/config/PostgreSQLConfig.java @@ -326,6 +326,10 @@ public String getTotpUsedCodesTable() { return addSchemaAndPrefixToTableName("totp_used_codes"); } + public String getBulkImportUsersTable() { + return addSchemaAndPrefixToTableName("bulk_import_users"); + } + private String addSchemaAndPrefixToTableName(String tableName) { return addSchemaToTableName(postgresql_table_names_prefix + tableName); } diff --git a/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java b/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java new file mode 100644 index 00000000..8dc41e35 --- /dev/null +++ b/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java @@ -0,0 +1,271 @@ +/* + * Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved. + * + * This software is licensed under the Apache License, Version 2.0 (the + * "License") as published by the Apache Software Foundation. + * + * You may not use this file except in compliance with the License. You may + * obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.supertokens.storage.postgresql.queries; + +import static io.supertokens.storage.postgresql.QueryExecutorTemplate.update; +import static io.supertokens.storage.postgresql.QueryExecutorTemplate.execute; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import io.supertokens.pluginInterface.RowMapper; +import io.supertokens.pluginInterface.bulkimport.BulkImportStorage.BULK_IMPORT_USER_STATUS; +import io.supertokens.pluginInterface.bulkimport.BulkImportUser; +import io.supertokens.pluginInterface.exceptions.StorageQueryException; +import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException; +import io.supertokens.pluginInterface.multitenancy.AppIdentifier; +import io.supertokens.storage.postgresql.Start; +import io.supertokens.storage.postgresql.config.Config; +import io.supertokens.storage.postgresql.utils.Utils; + +public class BulkImportQueries { + static String getQueryToCreateBulkImportUsersTable(Start start) { + String schema = Config.getConfig(start).getTableSchema(); + String tableName = Config.getConfig(start).getBulkImportUsersTable(); + return "CREATE TABLE IF NOT EXISTS " + tableName + " (" + + "id CHAR(36)," + + "app_id VARCHAR(64) NOT NULL DEFAULT 'public'," + + "raw_data TEXT NOT NULL," + + "status VARCHAR(128) DEFAULT 'NEW'," + + "error_msg TEXT," + + "created_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000," + + "updated_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000," + + "CONSTRAINT " + Utils.getConstraintName(schema, tableName, null, "pkey") + + " PRIMARY KEY(app_id, id)," + + "CONSTRAINT " + Utils.getConstraintName(schema, tableName, "app_id", "fkey") + " " + + "FOREIGN KEY(app_id) " + + "REFERENCES " + Config.getConfig(start).getAppsTable() + " (app_id) ON DELETE CASCADE" + + " );"; + } + + public static String getQueryToCreateStatusUpdatedAtIndex(Start start) { + return "CREATE INDEX IF NOT EXISTS bulk_import_users_status_updated_at_index ON " + + Config.getConfig(start).getBulkImportUsersTable() + " (app_id, status, updated_at)"; + } + + public static String getQueryToCreateCreatedAtIndex(Start start) { + return "CREATE INDEX IF NOT EXISTS bulk_import_users_created_at_index ON " + + Config.getConfig(start).getBulkImportUsersTable() + " (app_id, created_at)"; + } + + public static void insertBulkImportUsers(Start start, AppIdentifier appIdentifier, List users) + throws SQLException, StorageQueryException { + StringBuilder queryBuilder = new StringBuilder( + "INSERT INTO " + Config.getConfig(start).getBulkImportUsersTable() + " (id, app_id, raw_data) VALUES "); + + int userCount = users.size(); + + for (int i = 0; i < userCount; i++) { + queryBuilder.append(" (?, ?, ?)"); + + if (i < userCount - 1) { + queryBuilder.append(","); + } + } + + update(start, queryBuilder.toString(), pst -> { + int parameterIndex = 1; + for (BulkImportUser user : users) { + pst.setString(parameterIndex++, user.id); + pst.setString(parameterIndex++, appIdentifier.getAppId()); + pst.setString(parameterIndex++, user.toRawDataForDbStorage()); + } + }); + } + + public static void updateBulkImportUserStatus_Transaction(Start start, Connection con, AppIdentifier appIdentifier, @Nonnull String[] bulkImportUserIds, @Nonnull BULK_IMPORT_USER_STATUS status, @Nullable String errorMessage) + throws SQLException, StorageQueryException { + if (bulkImportUserIds.length == 0) { + return; + } + + String baseQuery = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() + " SET status = ?, error_msg = ?, updated_at = ? WHERE app_id = ?"; + StringBuilder queryBuilder = new StringBuilder(baseQuery); + + List parameters = new ArrayList<>(); + + parameters.add(status.toString()); + parameters.add(errorMessage); + parameters.add(System.currentTimeMillis()); + parameters.add(appIdentifier.getAppId()); + + queryBuilder.append(" AND id IN ("); + for (int i = 0; i < bulkImportUserIds.length; i++) { + if (i != 0) { + queryBuilder.append(", "); + } + queryBuilder.append("?"); + parameters.add(bulkImportUserIds[i]); + } + queryBuilder.append(")"); + + String query = queryBuilder.toString(); + + update(con, query, pst -> { + for (int i = 0; i < parameters.size(); i++) { + pst.setObject(i + 1, parameters.get(i)); + } + }); + } + + public static List getBulkImportUsersForProcessing(Start start, AppIdentifier appIdentifier, @Nonnull Integer limit) + throws StorageQueryException, StorageTransactionLogicException { + + return start.startTransaction(con -> { + Connection sqlCon = (Connection) con.getConnection(); + try { + String selectQuery = "SELECT * FROM " + Config.getConfig(start).getBulkImportUsersTable() + + " WHERE status = 'NEW' AND app_id = ? " + + " OR (status = 'PROCESSING' AND updated_at < EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000 - 60 * 1000) " + + " LIMIT ? FOR UPDATE SKIP LOCKED"; + + List bulkImportUsers = new ArrayList<>(); + + execute(sqlCon, selectQuery, pst -> { + pst.setString(1, appIdentifier.getAppId()); + pst.setInt(2, limit); + }, result -> { + while (result.next()) { + bulkImportUsers.add(BulkImportUserRowMapper.getInstance().mapOrThrow(result)); + } + return null; + }); + + String[] bulkImportUserIds = bulkImportUsers.stream().map(user -> user.id).toArray(String[]::new); + + updateBulkImportUserStatus_Transaction(start, sqlCon, appIdentifier, bulkImportUserIds, BULK_IMPORT_USER_STATUS.PROCESSING, null); + return bulkImportUsers; + } catch (SQLException throwables) { + throw new StorageTransactionLogicException(throwables); + } + }); + } + + public static List getBulkImportUsers(Start start, AppIdentifier appIdentifier, @Nonnull Integer limit, @Nullable BULK_IMPORT_USER_STATUS status, + @Nullable String bulkImportUserId, @Nullable Long createdAt) + throws SQLException, StorageQueryException { + + String baseQuery = "SELECT * FROM " + Config.getConfig(start).getBulkImportUsersTable(); + + StringBuilder queryBuilder = new StringBuilder(baseQuery); + List parameters = new ArrayList<>(); + + queryBuilder.append(" WHERE app_id = ?"); + parameters.add(appIdentifier.getAppId()); + + if (status != null) { + queryBuilder.append(" AND status = ?"); + parameters.add(status.toString()); + } + + if (bulkImportUserId != null && createdAt != null) { + queryBuilder + .append(" AND created_at < ? OR (created_at = ? AND id <= ?)"); + parameters.add(createdAt); + parameters.add(createdAt); + parameters.add(bulkImportUserId); + } + + queryBuilder.append(" ORDER BY created_at DESC, id DESC LIMIT ?"); + parameters.add(limit); + + String query = queryBuilder.toString(); + + return execute(start, query, pst -> { + for (int i = 0; i < parameters.size(); i++) { + pst.setObject(i + 1, parameters.get(i)); + } + }, result -> { + List bulkImportUsers = new ArrayList<>(); + while (result.next()) { + bulkImportUsers.add(BulkImportUserRowMapper.getInstance().mapOrThrow(result)); + } + return bulkImportUsers; + }); + } + + public static List deleteBulkImportUsers(Start start, AppIdentifier appIdentifier, @Nonnull String[] bulkImportUserIds) throws SQLException, StorageQueryException { + if (bulkImportUserIds.length == 0) { + return new ArrayList<>(); + } + + String baseQuery = "DELETE FROM " + Config.getConfig(start).getBulkImportUsersTable(); + StringBuilder queryBuilder = new StringBuilder(baseQuery); + + List parameters = new ArrayList<>(); + + queryBuilder.append(" WHERE app_id = ?"); + parameters.add(appIdentifier.getAppId()); + + queryBuilder.append(" AND id IN ("); + for (int i = 0; i < bulkImportUserIds.length; i++) { + if (i != 0) { + queryBuilder.append(", "); + } + queryBuilder.append("?"); + parameters.add(bulkImportUserIds[i]); + } + queryBuilder.append(") RETURNING id"); + + String query = queryBuilder.toString(); + + return update(start, query, pst -> { + for (int i = 0; i < parameters.size(); i++) { + pst.setObject(i + 1, parameters.get(i)); + } + }, result -> { + List deletedIds = new ArrayList<>(); + while (result.next()) { + deletedIds.add(result.getString("id")); + } + return deletedIds; + }); + } + + public static void deleteBulkImportUser_Transaction(Start start, Connection con, AppIdentifier appIdentifier, @Nonnull String bulkImportUserId) throws SQLException, StorageQueryException { + String query = "DELETE FROM " + Config.getConfig(start).getBulkImportUsersTable() + " WHERE app_id = ? AND id = ?"; + + update(con, query, pst -> { + pst.setString(1, appIdentifier.getAppId()); + pst.setString(2, bulkImportUserId); + }); + } + + private static class BulkImportUserRowMapper implements RowMapper { + private static final BulkImportUserRowMapper INSTANCE = new BulkImportUserRowMapper(); + + private BulkImportUserRowMapper() { + } + + private static BulkImportUserRowMapper getInstance() { + return INSTANCE; + } + + @Override + public BulkImportUser map(ResultSet result) throws Exception { + return BulkImportUser.fromRawDataFromDbStorage(result.getString("id"), result.getString("raw_data"), + BULK_IMPORT_USER_STATUS.valueOf(result.getString("status")), + result.getString("error_msg"), result.getLong("created_at"), result.getLong("updated_at")); + } + } +} diff --git a/src/main/java/io/supertokens/storage/postgresql/queries/EmailPasswordQueries.java b/src/main/java/io/supertokens/storage/postgresql/queries/EmailPasswordQueries.java index 55bb51c4..cedbe01f 100644 --- a/src/main/java/io/supertokens/storage/postgresql/queries/EmailPasswordQueries.java +++ b/src/main/java/io/supertokens/storage/postgresql/queries/EmailPasswordQueries.java @@ -326,7 +326,7 @@ public static AuthRecipeUserInfo signUp(Start start, TenantIdentifier tenantIden UserInfoPartial userInfo = new UserInfoPartial(userId, email, passwordHash, timeJoined); fillUserInfoWithTenantIds_transaction(start, sqlCon, tenantIdentifier.toAppIdentifier(), userInfo); fillUserInfoWithVerified_transaction(start, sqlCon, tenantIdentifier.toAppIdentifier(), userInfo); - sqlCon.commit(); + start.commitTransaction(con); return AuthRecipeUserInfo.create(userId, false, userInfo.toLoginMethod()); } catch (SQLException throwables) { throw new StorageTransactionLogicException(throwables); diff --git a/src/main/java/io/supertokens/storage/postgresql/queries/GeneralQueries.java b/src/main/java/io/supertokens/storage/postgresql/queries/GeneralQueries.java index 8bc2d561..5e8574a2 100644 --- a/src/main/java/io/supertokens/storage/postgresql/queries/GeneralQueries.java +++ b/src/main/java/io/supertokens/storage/postgresql/queries/GeneralQueries.java @@ -541,6 +541,14 @@ public static void createTablesIfNotExists(Start start) throws SQLException, Sto update(start, TOTPQueries.getQueryToCreateTenantIdIndexForUsedCodesTable(start), NO_OP_SETTER); } + if (!doesTableExists(start, Config.getConfig(start).getBulkImportUsersTable())) { + getInstance(start).addState(CREATING_NEW_TABLE, null); + update(start, BulkImportQueries.getQueryToCreateBulkImportUsersTable(start), NO_OP_SETTER); + // index: + update(start, BulkImportQueries.getQueryToCreateStatusUpdatedAtIndex(start), NO_OP_SETTER); + update(start, BulkImportQueries.getQueryToCreateCreatedAtIndex(start), NO_OP_SETTER); + } + } catch (Exception e) { if (e.getMessage().contains("schema") && e.getMessage().contains("does not exist") && numberOfRetries < 1) { @@ -576,7 +584,14 @@ public static void deleteAllTables(Start start) throws SQLException, StorageQuer String DROP_QUERY = "DROP INDEX IF EXISTS all_auth_recipe_users_pagination_index"; update(start, DROP_QUERY, NO_OP_SETTER); } - + { + String DROP_QUERY = "DROP INDEX IF EXISTS bulk_import_users_status_updated_at_index"; + update(start, DROP_QUERY, NO_OP_SETTER); + } + { + String DROP_QUERY = "DROP INDEX IF EXISTS bulk_import_users_created_at_index"; + update(start, DROP_QUERY, NO_OP_SETTER); + } { String DROP_QUERY = "DROP TABLE IF EXISTS " + getConfig(start).getAppsTable() + "," @@ -613,7 +628,8 @@ public static void deleteAllTables(Start start) throws SQLException, StorageQuer + getConfig(start).getDashboardSessionsTable() + "," + getConfig(start).getTotpUsedCodesTable() + "," + getConfig(start).getTotpUserDevicesTable() + "," - + getConfig(start).getTotpUsersTable(); + + getConfig(start).getTotpUsersTable() + "," + + getConfig(start).getBulkImportUsersTable(); update(start, DROP_QUERY, NO_OP_SETTER); } } diff --git a/src/main/java/io/supertokens/storage/postgresql/queries/PasswordlessQueries.java b/src/main/java/io/supertokens/storage/postgresql/queries/PasswordlessQueries.java index 8f8df3d6..a8a697bc 100644 --- a/src/main/java/io/supertokens/storage/postgresql/queries/PasswordlessQueries.java +++ b/src/main/java/io/supertokens/storage/postgresql/queries/PasswordlessQueries.java @@ -447,7 +447,7 @@ public static AuthRecipeUserInfo createUser(Start start, TenantIdentifier tenant UserInfoPartial userInfo = new UserInfoPartial(id, email, phoneNumber, timeJoined); fillUserInfoWithTenantIds_transaction(start, sqlCon, tenantIdentifier.toAppIdentifier(), userInfo); fillUserInfoWithVerified_transaction(start, sqlCon, tenantIdentifier.toAppIdentifier(), userInfo); - sqlCon.commit(); + start.commitTransaction(con); return AuthRecipeUserInfo.create(id, false, userInfo.toLoginMethod()); } catch (SQLException throwables) { diff --git a/src/main/java/io/supertokens/storage/postgresql/queries/ThirdPartyQueries.java b/src/main/java/io/supertokens/storage/postgresql/queries/ThirdPartyQueries.java index 2a37c9dc..91046e5a 100644 --- a/src/main/java/io/supertokens/storage/postgresql/queries/ThirdPartyQueries.java +++ b/src/main/java/io/supertokens/storage/postgresql/queries/ThirdPartyQueries.java @@ -160,7 +160,7 @@ public static AuthRecipeUserInfo signUp(Start start, TenantIdentifier tenantIden UserInfoPartial userInfo = new UserInfoPartial(id, email, thirdParty, timeJoined); fillUserInfoWithTenantIds_transaction(start, sqlCon, tenantIdentifier.toAppIdentifier(), userInfo); fillUserInfoWithVerified_transaction(start, sqlCon, tenantIdentifier.toAppIdentifier(), userInfo); - sqlCon.commit(); + start.commitTransaction(con); return AuthRecipeUserInfo.create(id, false, userInfo.toLoginMethod()); } catch (SQLException throwables) { @@ -264,9 +264,14 @@ public static List getUsersInfoUsingIdList(Start start, Set return finalResult; }); - try (Connection con = ConnectionPool.getConnection(start)) { + // This query is a part of the BulkImport. To ensure we don't close the connection for + // bulkimport, we are using ConnectionPool.closeConnection instead of auto closing the connection + Connection con = ConnectionPool.getConnection(start); + try { fillUserInfoWithTenantIds_transaction(start, con, appIdentifier, userInfos); fillUserInfoWithVerified_transaction(start, con, appIdentifier, userInfos); + } finally { + ConnectionPool.closeConnection(start, con); } return userInfos.stream().map(UserInfoPartial::toLoginMethod).collect(Collectors.toList()); } From cc2ffe1c50738a15a576ffb2e11414fac75905ab Mon Sep 17 00:00:00 2001 From: Ankit Tiwari Date: Thu, 21 Mar 2024 16:17:31 +0530 Subject: [PATCH 02/11] fix: PR changes --- .../postgresql/BulkImportProxyConnection.java | 325 ++++++++++++++++++ .../postgresql/BulkImportProxyStorage.java | 157 +++------ .../storage/postgresql/ConnectionPool.java | 8 - .../postgresql/QueryExecutorTemplate.java | 34 +- .../supertokens/storage/postgresql/Start.java | 30 +- .../postgresql/queries/BulkImportQueries.java | 77 +++-- .../queries/EmailPasswordQueries.java | 2 +- .../queries/PasswordlessQueries.java | 2 +- .../postgresql/queries/ThirdPartyQueries.java | 9 +- 9 files changed, 450 insertions(+), 194 deletions(-) create mode 100644 src/main/java/io/supertokens/storage/postgresql/BulkImportProxyConnection.java diff --git a/src/main/java/io/supertokens/storage/postgresql/BulkImportProxyConnection.java b/src/main/java/io/supertokens/storage/postgresql/BulkImportProxyConnection.java new file mode 100644 index 00000000..1972fe0e --- /dev/null +++ b/src/main/java/io/supertokens/storage/postgresql/BulkImportProxyConnection.java @@ -0,0 +1,325 @@ +package io.supertokens.storage.postgresql; + +import java.sql.Array; +import java.sql.Blob; +import java.sql.CallableStatement; +import java.sql.Clob; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.NClob; +import java.sql.PreparedStatement; +import java.sql.SQLClientInfoException; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Savepoint; +import java.sql.Statement; +import java.sql.Struct; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; + +/** +* BulkImportProxyConnection is a class implementing the Connection interface, serving as a Connection instance in the bulk import user cronjob. +* This cron extensively utilizes existing queries to import users, all of which internally operate within transactions and those query sometimes +* call the commit/rollback method on the connection. +* +* For the purpose of bulkimport cronjob, we aim to employ a single connection for all queries and rollback any operations in case of query failures. +* To achieve this, we use our own proxy Connection instance and override the commit/rollback/close methods to do nothing. +*/ + +public class BulkImportProxyConnection implements Connection { + private Connection con = null; + + public BulkImportProxyConnection(Connection con) { + this.con = con; + } + + @Override + public void close() throws SQLException { + // We simply ignore when close is called BulkImportProxyConnection + } + + @Override + public void commit() throws SQLException { + // We simply ignore when commit is called BulkImportProxyConnection + } + + @Override + public void rollback() throws SQLException { + // We simply ignore when rollback is called BulkImportProxyConnection + } + + public void closeForBulkImportProxyStorage() throws SQLException { + this.con.close(); + } + + public void commitForBulkImportProxyStorage() throws SQLException { + this.con.commit(); + } + + public void rollbackForBulkImportProxyStorage() throws SQLException { + this.con.rollback(); + } + + /* Following methods are unchaged */ + + @Override + public Statement createStatement() throws SQLException { + return this.con.createStatement(); + } + + @Override + public PreparedStatement prepareStatement(String sql) throws SQLException { + return this.con.prepareStatement(sql); + } + + @Override + public CallableStatement prepareCall(String sql) throws SQLException { + return this.con.prepareCall(sql); + } + + @Override + public String nativeSQL(String sql) throws SQLException { + return this.con.nativeSQL(sql); + } + + @Override + public void setAutoCommit(boolean autoCommit) throws SQLException { + this.con.setAutoCommit(autoCommit); + } + + @Override + public boolean getAutoCommit() throws SQLException { + return this.con.getAutoCommit(); + } + + @Override + public boolean isClosed() throws SQLException { + return this.con.isClosed(); + } + + @Override + public DatabaseMetaData getMetaData() throws SQLException { + return this.con.getMetaData(); + } + + @Override + public void setReadOnly(boolean readOnly) throws SQLException { + this.con.setReadOnly(readOnly); + } + + @Override + public boolean isReadOnly() throws SQLException { + return this.con.isReadOnly(); + } + + @Override + public void setCatalog(String catalog) throws SQLException { + this.con.setCatalog(catalog); + } + + @Override + public String getCatalog() throws SQLException { + return this.con.getCatalog(); + } + + @Override + public void setTransactionIsolation(int level) throws SQLException { + this.con.setTransactionIsolation(level); + } + + @Override + public int getTransactionIsolation() throws SQLException { + return this.con.getTransactionIsolation(); + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return this.con.getWarnings(); + } + + @Override + public void clearWarnings() throws SQLException { + this.con.clearWarnings(); + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { + return this.con.createStatement(resultSetType, resultSetConcurrency); + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) + throws SQLException { + return this.con.prepareStatement(sql, resultSetType, resultSetConcurrency); + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + return this.con.prepareCall(sql, resultSetType, resultSetConcurrency); + } + + @Override + public Map> getTypeMap() throws SQLException { + return this.con.getTypeMap(); + } + + @Override + public void setTypeMap(Map> map) throws SQLException { + this.con.setTypeMap(map); + } + + @Override + public void setHoldability(int holdability) throws SQLException { + this.con.setHoldability(holdability); + } + + @Override + public int getHoldability() throws SQLException { + return this.con.getHoldability(); + } + + @Override + public Savepoint setSavepoint() throws SQLException { + return this.con.setSavepoint(); + } + + @Override + public Savepoint setSavepoint(String name) throws SQLException { + return this.con.setSavepoint(name); + } + + @Override + public void rollback(Savepoint savepoint) throws SQLException { + this.con.rollback(savepoint); + } + + @Override + public void releaseSavepoint(Savepoint savepoint) throws SQLException { + this.con.releaseSavepoint(savepoint); + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) + throws SQLException { + return this.con.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, + int resultSetHoldability) throws SQLException { + return this.con.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, + int resultSetHoldability) throws SQLException { + return this.con.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @Override + public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { + return this.con.prepareStatement(sql, autoGeneratedKeys); + } + + @Override + public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { + return this.con.prepareStatement(sql, columnIndexes); + } + + @Override + public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { + return this.con.prepareStatement(sql, columnNames); + } + + @Override + public Clob createClob() throws SQLException { + return this.con.createClob(); + } + + @Override + public Blob createBlob() throws SQLException { + return this.con.createBlob(); + } + + @Override + public NClob createNClob() throws SQLException { + return this.con.createNClob(); + } + + @Override + public SQLXML createSQLXML() throws SQLException { + return this.con.createSQLXML(); + } + + @Override + public boolean isValid(int timeout) throws SQLException { + return this.con.isValid(timeout); + } + + @Override + public void setClientInfo(String name, String value) throws SQLClientInfoException { + this.con.setClientInfo(name, value); + } + + @Override + public void setClientInfo(Properties properties) throws SQLClientInfoException { + this.con.setClientInfo(properties); + } + + @Override + public String getClientInfo(String name) throws SQLException { + return this.con.getClientInfo(name); + } + + @Override + public Properties getClientInfo() throws SQLException { + return this.con.getClientInfo(); + } + + @Override + public Array createArrayOf(String typeName, Object[] elements) throws SQLException { + return this.con.createArrayOf(typeName, elements); + } + + @Override + public Struct createStruct(String typeName, Object[] attributes) throws SQLException { + return this.con.createStruct(typeName, attributes); + } + + @Override + public void setSchema(String schema) throws SQLException { + this.con.setSchema(schema); + } + + @Override + public String getSchema() throws SQLException { + return this.con.getSchema(); + } + + @Override + public void abort(Executor executor) throws SQLException { + this.con.abort(executor); + } + + @Override + public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { + this.con.setNetworkTimeout(executor, milliseconds); + } + + @Override + public int getNetworkTimeout() throws SQLException { + return this.con.getNetworkTimeout(); + } + + @Override + public T unwrap(Class iface) throws SQLException { + return this.con.unwrap(iface); + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return this.con.isWrapperFor(iface); + } +} \ No newline at end of file diff --git a/src/main/java/io/supertokens/storage/postgresql/BulkImportProxyStorage.java b/src/main/java/io/supertokens/storage/postgresql/BulkImportProxyStorage.java index e12f341c..9642c6a6 100644 --- a/src/main/java/io/supertokens/storage/postgresql/BulkImportProxyStorage.java +++ b/src/main/java/io/supertokens/storage/postgresql/BulkImportProxyStorage.java @@ -18,11 +18,8 @@ import java.sql.Connection; import java.sql.SQLException; -import java.sql.SQLTransactionRollbackException; import java.util.Set; -import org.postgresql.util.PSQLException; - import com.google.gson.JsonObject; import io.supertokens.pluginInterface.LOG_LEVEL; @@ -39,129 +36,27 @@ * This cronjob extensively utilizes existing queries to import users, all of which internally operate within transactions. * * For the purpose of bulkimport cronjob, we aim to employ a single connection for all queries and rollback any operations in case of query failures. -* To achieve this, we override the startTransaction method to utilize the same connection and prevent automatic query commits even upon transaction success. +* To achieve this, we override the startTransactionHelper method to utilize the same connection and prevent automatic query commits even upon transaction success. * Subsequently, the cronjob is responsible for committing the transaction after ensuring the successful execution of all queries. */ public class BulkImportProxyStorage extends Start { - private Connection transactionConnection; - - public Connection getTransactionConnection() throws SQLException { - if (this.transactionConnection == null || this.transactionConnection.isClosed()) { - this.transactionConnection = ConnectionPool.getConnectionForProxyStorage(this); + private BulkImportProxyConnection connection; + + public synchronized Connection getTransactionConnection() throws SQLException { + if (this.connection == null) { + Connection con = ConnectionPool.getConnectionForProxyStorage(this); + this.connection = new BulkImportProxyConnection(con); + connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE); + connection.setAutoCommit(false); } - return this.transactionConnection; - } - - @Override - public T startTransaction(TransactionLogic logic) - throws StorageTransactionLogicException, StorageQueryException { - return startTransaction(logic, TransactionIsolationLevel.SERIALIZABLE); + return this.connection; } @Override - public T startTransaction(TransactionLogic logic, TransactionIsolationLevel isolationLevel) - throws StorageTransactionLogicException, StorageQueryException { - final int NUM_TRIES = 50; - int tries = 0; - while (true) { - tries++; - try { - return startTransactionHelper(logic, isolationLevel); - } catch (SQLException | StorageQueryException | StorageTransactionLogicException e) { - Throwable actualException = e; - if (e instanceof StorageQueryException) { - actualException = e.getCause(); - } else if (e instanceof StorageTransactionLogicException) { - actualException = ((StorageTransactionLogicException) e).actualException; - } - String exceptionMessage = actualException.getMessage(); - if (exceptionMessage == null) { - exceptionMessage = ""; - } - - // see: https://github.com/supertokens/supertokens-postgresql-plugin/pull/3 - - // We set this variable to the current (or cause) exception casted to - // PSQLException if we can safely cast it - PSQLException psqlException = actualException instanceof PSQLException ? (PSQLException) actualException - : null; - - // PSQL error class 40 is transaction rollback. See: - // https://www.postgresql.org/docs/12/errcodes-appendix.html - boolean isPSQLRollbackException = psqlException != null - && psqlException.getServerErrorMessage().getSQLState().startsWith("40"); - - // We keep the old exception detection logic to ensure backwards compatibility. - // We could get here if the new logic hits a false negative, - // e.g., in case someone renamed constraints/tables - boolean isDeadlockException = actualException instanceof SQLTransactionRollbackException - || exceptionMessage.toLowerCase().contains("concurrent update") - || exceptionMessage.toLowerCase().contains("concurrent delete") - || exceptionMessage.toLowerCase().contains("the transaction might succeed if retried") || - - // we have deadlock as well due to the DeadlockTest.java - exceptionMessage.toLowerCase().contains("deadlock"); - - if ((isPSQLRollbackException || isDeadlockException) && tries < NUM_TRIES) { - try { - Thread.sleep((long) (10 + (250 + Math.min(Math.pow(2, tries), 3000)) * Math.random())); - } catch (InterruptedException ignored) { - } - ProcessState.getInstance(this).addState(ProcessState.PROCESS_STATE.DEADLOCK_FOUND, e); - // this because deadlocks are not necessarily a result of faulty logic. They can - // happen - continue; - } - - if ((isPSQLRollbackException || isDeadlockException) && tries == NUM_TRIES) { - ProcessState.getInstance(this).addState(ProcessState.PROCESS_STATE.DEADLOCK_NOT_RESOLVED, e); - } - if (e instanceof StorageQueryException) { - throw (StorageQueryException) e; - } else if (e instanceof StorageTransactionLogicException) { - throw (StorageTransactionLogicException) e; - } - throw new StorageQueryException(e); - } - } - } - - private T startTransactionHelper(TransactionLogic logic, TransactionIsolationLevel isolationLevel) + protected T startTransactionHelper(TransactionLogic logic, TransactionIsolationLevel isolationLevel) throws StorageQueryException, StorageTransactionLogicException, SQLException { - Connection con = null; - try { - con = (Connection) getTransactionConnection(); - int libIsolationLevel = Connection.TRANSACTION_SERIALIZABLE; - switch (isolationLevel) { - case SERIALIZABLE: - libIsolationLevel = Connection.TRANSACTION_SERIALIZABLE; - break; - case REPEATABLE_READ: - libIsolationLevel = Connection.TRANSACTION_REPEATABLE_READ; - break; - case READ_COMMITTED: - libIsolationLevel = Connection.TRANSACTION_READ_COMMITTED; - break; - case READ_UNCOMMITTED: - libIsolationLevel = Connection.TRANSACTION_READ_UNCOMMITTED; - break; - case NONE: - libIsolationLevel = Connection.TRANSACTION_NONE; - break; - } - - if (libIsolationLevel != Connection.TRANSACTION_SERIALIZABLE) { - con.setTransactionIsolation(libIsolationLevel); - } - con.setAutoCommit(false); - return logic.mainLogicAndCommit(new TransactionConnection(con)); - } catch (Exception e) { - if (con != null) { - con.rollback(); - } - throw e; - } + return logic.mainLogicAndCommit(new TransactionConnection(getTransactionConnection())); } @Override @@ -178,4 +73,32 @@ public void loadConfig(JsonObject configJson, Set logLevels, TenantId configJson.addProperty("postgresql_connection_pool_size", 1); Config.loadConfig(this, configJson, logLevels, tenantIdentifier); } + + @Override + public void closeConnectionForBulkImportProxyStorage() throws StorageQueryException { + try { + this.connection.close(); + this.connection = null; + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } + + @Override + public void commitTransactionForBulkImportProxyStorage() throws StorageQueryException { + try { + this.connection.commitForBulkImportProxyStorage(); + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } + + @Override + public void rollbackTransactionForBulkImportProxyStorage() throws StorageQueryException { + try { + this.connection.rollbackForBulkImportProxyStorage(); + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } } diff --git a/src/main/java/io/supertokens/storage/postgresql/ConnectionPool.java b/src/main/java/io/supertokens/storage/postgresql/ConnectionPool.java index 0008b4c3..a8764c5c 100644 --- a/src/main/java/io/supertokens/storage/postgresql/ConnectionPool.java +++ b/src/main/java/io/supertokens/storage/postgresql/ConnectionPool.java @@ -227,12 +227,4 @@ static void close(Start start) { } } } - - public static void closeConnection(Start start, Connection con) throws SQLException { - if (start instanceof BulkImportProxyStorage) { - // Keep the connection open for future queries - } else { - con.close(); - } - } } diff --git a/src/main/java/io/supertokens/storage/postgresql/QueryExecutorTemplate.java b/src/main/java/io/supertokens/storage/postgresql/QueryExecutorTemplate.java index b20163a9..65b539a5 100644 --- a/src/main/java/io/supertokens/storage/postgresql/QueryExecutorTemplate.java +++ b/src/main/java/io/supertokens/storage/postgresql/QueryExecutorTemplate.java @@ -27,12 +27,8 @@ public interface QueryExecutorTemplate { static T execute(Start start, String QUERY, PreparedStatementValueSetter setter, ResultSetValueExtractor mapper) throws SQLException, StorageQueryException { - - Connection con = ConnectionPool.getConnection(start); - try { + try (Connection con = ConnectionPool.getConnection(start)) { return execute(con, QUERY, setter, mapper); - } finally { - ConnectionPool.closeConnection(start, con); } } @@ -48,35 +44,29 @@ static T execute(Connection con, String QUERY, PreparedStatementValueSetter } } - static int update(Start start, String QUERY, PreparedStatementValueSetter setter) throws SQLException { - Connection con = ConnectionPool.getConnection(start); - try { + static int update(Start start, String QUERY, PreparedStatementValueSetter setter) + throws SQLException { + try (Connection con = ConnectionPool.getConnection(start)) { return update(con, QUERY, setter); - } finally { - ConnectionPool.closeConnection(start, con); + } + } + + static int update(Connection con, String QUERY, PreparedStatementValueSetter setter) throws SQLException { + try (PreparedStatement pst = con.prepareStatement(QUERY)) { + setter.setValues(pst); + return pst.executeUpdate(); } } static T update(Start start, String QUERY, PreparedStatementValueSetter setter, ResultSetValueExtractor mapper) throws SQLException, StorageQueryException { - Connection con = ConnectionPool.getConnection(start); - try { + try (Connection con = ConnectionPool.getConnection(start)) { try (PreparedStatement pst = con.prepareStatement(QUERY)) { setter.setValues(pst); try (ResultSet result = pst.executeQuery()) { return mapper.extract(result); } } - } finally { - ConnectionPool.closeConnection(start, con); } } - - static int update(Connection con, String QUERY, PreparedStatementValueSetter setter) throws SQLException { - try (PreparedStatement pst = con.prepareStatement(QUERY)) { - setter.setValues(pst); - return pst.executeUpdate(); - } - } - } diff --git a/src/main/java/io/supertokens/storage/postgresql/Start.java b/src/main/java/io/supertokens/storage/postgresql/Start.java index 024e29c6..09c552eb 100644 --- a/src/main/java/io/supertokens/storage/postgresql/Start.java +++ b/src/main/java/io/supertokens/storage/postgresql/Start.java @@ -160,6 +160,24 @@ public Storage createBulkImportProxyStorageInstance() { return new BulkImportProxyStorage(); } + @Override + public void closeConnectionForBulkImportProxyStorage() throws StorageQueryException { + throw new UnsupportedOperationException( + "closeConnectionForBulkImportProxyStorage should only be called from BulkImportProxyStorage"); + } + + @Override + public void commitTransactionForBulkImportProxyStorage() throws StorageQueryException { + throw new UnsupportedOperationException( + "commitTransactionForBulkImportProxyStorage should only be called from BulkImportProxyStorage"); + } + + @Override + public void rollbackTransactionForBulkImportProxyStorage() throws StorageQueryException { + throw new UnsupportedOperationException( + "rollbackTransactionForBulkImportProxyStorage should only be called from BulkImportProxyStorage"); + } + @Override public void loadConfig(JsonObject configJson, Set logLevels, TenantIdentifier tenantIdentifier) throws InvalidConfigException { @@ -317,7 +335,7 @@ public T startTransaction(TransactionLogic logic, TransactionIsolationLev } } - private T startTransactionHelper(TransactionLogic logic, TransactionIsolationLevel isolationLevel) + protected T startTransactionHelper(TransactionLogic logic, TransactionIsolationLevel isolationLevel) throws StorageQueryException, StorageTransactionLogicException, SQLException { Connection con = null; Integer defaultTransactionIsolation = null; @@ -2367,7 +2385,7 @@ public boolean addUserIdToTenant_Transaction(TenantIdentifier tenantIdentifier, throw new IllegalStateException("Should never come here!"); } - this.commitTransaction(con); + sqlCon.commit(); return added; } catch (SQLException throwables) { PostgreSQLConfig config = Config.getConfig(this); @@ -3087,11 +3105,11 @@ public List getBulkImportUsers(AppIdentifier appIdentifier, @Non } @Override - public void updateBulkImportUserStatus_Transaction(AppIdentifier appIdentifier, TransactionConnection con, @Nonnull String[] bulkImportUserIds, @Nonnull BULK_IMPORT_USER_STATUS status, @Nullable String errorMessage) + public void updateBulkImportUserStatus_Transaction(AppIdentifier appIdentifier, TransactionConnection con, @Nonnull String bulkImportUserId, @Nonnull BULK_IMPORT_USER_STATUS status, @Nullable String errorMessage) throws StorageQueryException { Connection sqlCon = (Connection) con.getConnection(); try { - BulkImportQueries.updateBulkImportUserStatus_Transaction(this, sqlCon, appIdentifier, bulkImportUserIds, status, errorMessage); + BulkImportQueries.updateBulkImportUserStatus_Transaction(this, sqlCon, appIdentifier, bulkImportUserId, status, errorMessage); } catch (SQLException e) { throw new StorageQueryException(e); } @@ -3107,9 +3125,9 @@ public List deleteBulkImportUsers(AppIdentifier appIdentifier, @Nonnull } @Override - public List getBulkImportUsersForProcessing(AppIdentifier appIdentifier, @Nonnull Integer limit) throws StorageQueryException { + public List getBulkImportUsersAndChangeStatusToProcessing(AppIdentifier appIdentifier, @Nonnull Integer limit) throws StorageQueryException { try { - return BulkImportQueries.getBulkImportUsersForProcessing(this, appIdentifier, limit); + return BulkImportQueries.getBulkImportUsersAndChangeStatusToProcessing(this, appIdentifier, limit); } catch (StorageTransactionLogicException e) { throw new StorageQueryException(e.actualException); } diff --git a/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java b/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java index 8dc41e35..6ba072aa 100644 --- a/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java +++ b/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java @@ -69,7 +69,7 @@ public static String getQueryToCreateCreatedAtIndex(Start start) { } public static void insertBulkImportUsers(Start start, AppIdentifier appIdentifier, List users) - throws SQLException, StorageQueryException { + throws SQLException { StringBuilder queryBuilder = new StringBuilder( "INSERT INTO " + Config.getConfig(start).getBulkImportUsersTable() + " (id, app_id, raw_data) VALUES "); @@ -93,14 +93,10 @@ public static void insertBulkImportUsers(Start start, AppIdentifier appIdentifie }); } - public static void updateBulkImportUserStatus_Transaction(Start start, Connection con, AppIdentifier appIdentifier, @Nonnull String[] bulkImportUserIds, @Nonnull BULK_IMPORT_USER_STATUS status, @Nullable String errorMessage) - throws SQLException, StorageQueryException { - if (bulkImportUserIds.length == 0) { - return; - } - - String baseQuery = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() + " SET status = ?, error_msg = ?, updated_at = ? WHERE app_id = ?"; - StringBuilder queryBuilder = new StringBuilder(baseQuery); + public static void updateBulkImportUserStatus_Transaction(Start start, Connection con, AppIdentifier appIdentifier, + @Nonnull String bulkImportUserId, @Nonnull BULK_IMPORT_USER_STATUS status, @Nullable String errorMessage) + throws SQLException { + String query = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() + " SET status = ?, error_msg = ?, updated_at = ? WHERE app_id = ? and id = ?"; List parameters = new ArrayList<>(); @@ -108,18 +104,7 @@ public static void updateBulkImportUserStatus_Transaction(Start start, Connectio parameters.add(errorMessage); parameters.add(System.currentTimeMillis()); parameters.add(appIdentifier.getAppId()); - - queryBuilder.append(" AND id IN ("); - for (int i = 0; i < bulkImportUserIds.length; i++) { - if (i != 0) { - queryBuilder.append(", "); - } - queryBuilder.append("?"); - parameters.add(bulkImportUserIds[i]); - } - queryBuilder.append(")"); - - String query = queryBuilder.toString(); + parameters.add(bulkImportUserId); update(con, query, pst -> { for (int i = 0; i < parameters.size(); i++) { @@ -128,7 +113,8 @@ public static void updateBulkImportUserStatus_Transaction(Start start, Connectio }); } - public static List getBulkImportUsersForProcessing(Start start, AppIdentifier appIdentifier, @Nonnull Integer limit) + public static List getBulkImportUsersAndChangeStatusToProcessing(Start start, AppIdentifier appIdentifier, + @Nonnull Integer limit) throws StorageQueryException, StorageTransactionLogicException { return start.startTransaction(con -> { @@ -136,7 +122,7 @@ public static List getBulkImportUsersForProcessing(Start start, try { String selectQuery = "SELECT * FROM " + Config.getConfig(start).getBulkImportUsersTable() + " WHERE status = 'NEW' AND app_id = ? " - + " OR (status = 'PROCESSING' AND updated_at < EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000 - 60 * 1000) " + + " OR (status = 'PROCESSING' AND updated_at < EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000 - 60 * 1000) " + " LIMIT ? FOR UPDATE SKIP LOCKED"; List bulkImportUsers = new ArrayList<>(); @@ -151,9 +137,32 @@ public static List getBulkImportUsersForProcessing(Start start, return null; }); - String[] bulkImportUserIds = bulkImportUsers.stream().map(user -> user.id).toArray(String[]::new); - - updateBulkImportUserStatus_Transaction(start, sqlCon, appIdentifier, bulkImportUserIds, BULK_IMPORT_USER_STATUS.PROCESSING, null); + String baseQuery = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() + " SET status = ?, updated_at = ? WHERE app_id = ?"; + StringBuilder queryBuilder = new StringBuilder(baseQuery); + + List parameters = new ArrayList<>(); + + parameters.add(BULK_IMPORT_USER_STATUS.PROCESSING.toString()); + parameters.add(System.currentTimeMillis()); + parameters.add(appIdentifier.getAppId()); + + queryBuilder.append(" AND id IN ("); + for (int i = 0; i < bulkImportUsers.size(); i++) { + if (i != 0) { + queryBuilder.append(", "); + } + queryBuilder.append("?"); + parameters.add(bulkImportUsers.get(i).id); + } + queryBuilder.append(")"); + + String updateQuery = queryBuilder.toString(); + + update(sqlCon, updateQuery, pst -> { + for (int i = 0; i < parameters.size(); i++) { + pst.setObject(i + 1, parameters.get(i)); + } + }); return bulkImportUsers; } catch (SQLException throwables) { throw new StorageTransactionLogicException(throwables); @@ -161,7 +170,8 @@ public static List getBulkImportUsersForProcessing(Start start, }); } - public static List getBulkImportUsers(Start start, AppIdentifier appIdentifier, @Nonnull Integer limit, @Nullable BULK_IMPORT_USER_STATUS status, + public static List getBulkImportUsers(Start start, AppIdentifier appIdentifier, + @Nonnull Integer limit, @Nullable BULK_IMPORT_USER_STATUS status, @Nullable String bulkImportUserId, @Nullable Long createdAt) throws SQLException, StorageQueryException { @@ -180,7 +190,7 @@ public static List getBulkImportUsers(Start start, AppIdentifier if (bulkImportUserId != null && createdAt != null) { queryBuilder - .append(" AND created_at < ? OR (created_at = ? AND id <= ?)"); + .append(" AND created_at < ? OR (created_at = ? AND id <= ?)"); parameters.add(createdAt); parameters.add(createdAt); parameters.add(bulkImportUserId); @@ -204,12 +214,13 @@ public static List getBulkImportUsers(Start start, AppIdentifier }); } - public static List deleteBulkImportUsers(Start start, AppIdentifier appIdentifier, @Nonnull String[] bulkImportUserIds) throws SQLException, StorageQueryException { + public static List deleteBulkImportUsers(Start start, AppIdentifier appIdentifier, + @Nonnull String[] bulkImportUserIds) throws SQLException, StorageQueryException { if (bulkImportUserIds.length == 0) { return new ArrayList<>(); } - String baseQuery = "DELETE FROM " + Config.getConfig(start).getBulkImportUsersTable(); + String baseQuery = "DELETE FROM " + Config.getConfig(start).getBulkImportUsersTable(); StringBuilder queryBuilder = new StringBuilder(baseQuery); List parameters = new ArrayList<>(); @@ -242,8 +253,10 @@ public static List deleteBulkImportUsers(Start start, AppIdentifier appI }); } - public static void deleteBulkImportUser_Transaction(Start start, Connection con, AppIdentifier appIdentifier, @Nonnull String bulkImportUserId) throws SQLException, StorageQueryException { - String query = "DELETE FROM " + Config.getConfig(start).getBulkImportUsersTable() + " WHERE app_id = ? AND id = ?"; + public static void deleteBulkImportUser_Transaction(Start start, Connection con, AppIdentifier appIdentifier, + @Nonnull String bulkImportUserId) throws SQLException { + String query = "DELETE FROM " + Config.getConfig(start).getBulkImportUsersTable() + + " WHERE app_id = ? AND id = ?"; update(con, query, pst -> { pst.setString(1, appIdentifier.getAppId()); diff --git a/src/main/java/io/supertokens/storage/postgresql/queries/EmailPasswordQueries.java b/src/main/java/io/supertokens/storage/postgresql/queries/EmailPasswordQueries.java index cedbe01f..55bb51c4 100644 --- a/src/main/java/io/supertokens/storage/postgresql/queries/EmailPasswordQueries.java +++ b/src/main/java/io/supertokens/storage/postgresql/queries/EmailPasswordQueries.java @@ -326,7 +326,7 @@ public static AuthRecipeUserInfo signUp(Start start, TenantIdentifier tenantIden UserInfoPartial userInfo = new UserInfoPartial(userId, email, passwordHash, timeJoined); fillUserInfoWithTenantIds_transaction(start, sqlCon, tenantIdentifier.toAppIdentifier(), userInfo); fillUserInfoWithVerified_transaction(start, sqlCon, tenantIdentifier.toAppIdentifier(), userInfo); - start.commitTransaction(con); + sqlCon.commit(); return AuthRecipeUserInfo.create(userId, false, userInfo.toLoginMethod()); } catch (SQLException throwables) { throw new StorageTransactionLogicException(throwables); diff --git a/src/main/java/io/supertokens/storage/postgresql/queries/PasswordlessQueries.java b/src/main/java/io/supertokens/storage/postgresql/queries/PasswordlessQueries.java index a8a697bc..8f8df3d6 100644 --- a/src/main/java/io/supertokens/storage/postgresql/queries/PasswordlessQueries.java +++ b/src/main/java/io/supertokens/storage/postgresql/queries/PasswordlessQueries.java @@ -447,7 +447,7 @@ public static AuthRecipeUserInfo createUser(Start start, TenantIdentifier tenant UserInfoPartial userInfo = new UserInfoPartial(id, email, phoneNumber, timeJoined); fillUserInfoWithTenantIds_transaction(start, sqlCon, tenantIdentifier.toAppIdentifier(), userInfo); fillUserInfoWithVerified_transaction(start, sqlCon, tenantIdentifier.toAppIdentifier(), userInfo); - start.commitTransaction(con); + sqlCon.commit(); return AuthRecipeUserInfo.create(id, false, userInfo.toLoginMethod()); } catch (SQLException throwables) { diff --git a/src/main/java/io/supertokens/storage/postgresql/queries/ThirdPartyQueries.java b/src/main/java/io/supertokens/storage/postgresql/queries/ThirdPartyQueries.java index 91046e5a..2a37c9dc 100644 --- a/src/main/java/io/supertokens/storage/postgresql/queries/ThirdPartyQueries.java +++ b/src/main/java/io/supertokens/storage/postgresql/queries/ThirdPartyQueries.java @@ -160,7 +160,7 @@ public static AuthRecipeUserInfo signUp(Start start, TenantIdentifier tenantIden UserInfoPartial userInfo = new UserInfoPartial(id, email, thirdParty, timeJoined); fillUserInfoWithTenantIds_transaction(start, sqlCon, tenantIdentifier.toAppIdentifier(), userInfo); fillUserInfoWithVerified_transaction(start, sqlCon, tenantIdentifier.toAppIdentifier(), userInfo); - start.commitTransaction(con); + sqlCon.commit(); return AuthRecipeUserInfo.create(id, false, userInfo.toLoginMethod()); } catch (SQLException throwables) { @@ -264,14 +264,9 @@ public static List getUsersInfoUsingIdList(Start start, Set return finalResult; }); - // This query is a part of the BulkImport. To ensure we don't close the connection for - // bulkimport, we are using ConnectionPool.closeConnection instead of auto closing the connection - Connection con = ConnectionPool.getConnection(start); - try { + try (Connection con = ConnectionPool.getConnection(start)) { fillUserInfoWithTenantIds_transaction(start, con, appIdentifier, userInfos); fillUserInfoWithVerified_transaction(start, con, appIdentifier, userInfos); - } finally { - ConnectionPool.closeConnection(start, con); } return userInfos.stream().map(UserInfoPartial::toLoginMethod).collect(Collectors.toList()); } From 38882d7a791af96d4b76ed1a1a8553ed946ecc93 Mon Sep 17 00:00:00 2001 From: Ankit Tiwari Date: Fri, 29 Mar 2024 11:49:28 +0530 Subject: [PATCH 03/11] fix: PR changes --- .../supertokens/storage/postgresql/Start.java | 9 +++++ .../postgresql/queries/BulkImportQueries.java | 37 ++++++++++++++----- 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/src/main/java/io/supertokens/storage/postgresql/Start.java b/src/main/java/io/supertokens/storage/postgresql/Start.java index 09c552eb..a14811c8 100644 --- a/src/main/java/io/supertokens/storage/postgresql/Start.java +++ b/src/main/java/io/supertokens/storage/postgresql/Start.java @@ -3142,4 +3142,13 @@ public void deleteBulkImportUser_Transaction(AppIdentifier appIdentifier, Transa throw new StorageQueryException(e); } } + + @Override + public void updateBulkImportUserPrimaryUserId(AppIdentifier appIdentifier, @Nonnull String bulkImportUserId, @Nonnull String primaryUserId) throws StorageQueryException { + try { + BulkImportQueries.updateBulkImportUserPrimaryUserId(this, appIdentifier, bulkImportUserId, primaryUserId); + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } } diff --git a/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java b/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java index 6ba072aa..539e5fed 100644 --- a/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java +++ b/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java @@ -45,6 +45,7 @@ static String getQueryToCreateBulkImportUsersTable(Start start) { return "CREATE TABLE IF NOT EXISTS " + tableName + " (" + "id CHAR(36)," + "app_id VARCHAR(64) NOT NULL DEFAULT 'public'," + + "primary_user_id VARCHAR(64)," + "raw_data TEXT NOT NULL," + "status VARCHAR(128) DEFAULT 'NEW'," + "error_msg TEXT," @@ -96,7 +97,8 @@ public static void insertBulkImportUsers(Start start, AppIdentifier appIdentifie public static void updateBulkImportUserStatus_Transaction(Start start, Connection con, AppIdentifier appIdentifier, @Nonnull String bulkImportUserId, @Nonnull BULK_IMPORT_USER_STATUS status, @Nullable String errorMessage) throws SQLException { - String query = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() + " SET status = ?, error_msg = ?, updated_at = ? WHERE app_id = ? and id = ?"; + String query = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() + + " SET status = ?, error_msg = ?, updated_at = ? WHERE app_id = ? and id = ?"; List parameters = new ArrayList<>(); @@ -113,7 +115,8 @@ public static void updateBulkImportUserStatus_Transaction(Start start, Connectio }); } - public static List getBulkImportUsersAndChangeStatusToProcessing(Start start, AppIdentifier appIdentifier, + public static List getBulkImportUsersAndChangeStatusToProcessing(Start start, + AppIdentifier appIdentifier, @Nonnull Integer limit) throws StorageQueryException, StorageTransactionLogicException { @@ -137,15 +140,16 @@ public static List getBulkImportUsersAndChangeStatusToProcessing return null; }); - String baseQuery = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() + " SET status = ?, updated_at = ? WHERE app_id = ?"; + String baseQuery = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() + + " SET status = ?, updated_at = ? WHERE app_id = ?"; StringBuilder queryBuilder = new StringBuilder(baseQuery); - + List parameters = new ArrayList<>(); - + parameters.add(BULK_IMPORT_USER_STATUS.PROCESSING.toString()); parameters.add(System.currentTimeMillis()); parameters.add(appIdentifier.getAppId()); - + queryBuilder.append(" AND id IN ("); for (int i = 0; i < bulkImportUsers.size(); i++) { if (i != 0) { @@ -155,9 +159,9 @@ public static List getBulkImportUsersAndChangeStatusToProcessing parameters.add(bulkImportUsers.get(i).id); } queryBuilder.append(")"); - + String updateQuery = queryBuilder.toString(); - + update(sqlCon, updateQuery, pst -> { for (int i = 0; i < parameters.size(); i++) { pst.setObject(i + 1, parameters.get(i)); @@ -264,6 +268,20 @@ public static void deleteBulkImportUser_Transaction(Start start, Connection con, }); } + public static void updateBulkImportUserPrimaryUserId(Start start, AppIdentifier appIdentifier, + @Nonnull String bulkImportUserId, + @Nonnull String primaryUserId) throws SQLException { + String query = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() + + " SET primary_user_id = ?, updated_at = ? WHERE app_id = ? and id = ?"; + + update(start, query, pst -> { + pst.setString(1, primaryUserId); + pst.setLong(2, System.currentTimeMillis()); + pst.setString(3, appIdentifier.getAppId()); + pst.setString(4, bulkImportUserId); + }); + } + private static class BulkImportUserRowMapper implements RowMapper { private static final BulkImportUserRowMapper INSTANCE = new BulkImportUserRowMapper(); @@ -278,7 +296,8 @@ private static BulkImportUserRowMapper getInstance() { public BulkImportUser map(ResultSet result) throws Exception { return BulkImportUser.fromRawDataFromDbStorage(result.getString("id"), result.getString("raw_data"), BULK_IMPORT_USER_STATUS.valueOf(result.getString("status")), - result.getString("error_msg"), result.getLong("created_at"), result.getLong("updated_at")); + result.getString("primary_user_id"), result.getString("error_msg"), result.getLong("created_at"), + result.getLong("updated_at")); } } } From 6d4da2e4a822f88609f661d7a410df219be27389 Mon Sep 17 00:00:00 2001 From: Ankit Tiwari Date: Fri, 29 Mar 2024 19:23:58 +0530 Subject: [PATCH 04/11] fix: PR changes --- .../postgresql/queries/BulkImportQueries.java | 32 ++++++------------- 1 file changed, 9 insertions(+), 23 deletions(-) diff --git a/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java b/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java index 539e5fed..5fde6b87 100644 --- a/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java +++ b/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java @@ -140,31 +140,17 @@ public static List getBulkImportUsersAndChangeStatusToProcessing return null; }); - String baseQuery = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() - + " SET status = ?, updated_at = ? WHERE app_id = ?"; - StringBuilder queryBuilder = new StringBuilder(baseQuery); - - List parameters = new ArrayList<>(); - - parameters.add(BULK_IMPORT_USER_STATUS.PROCESSING.toString()); - parameters.add(System.currentTimeMillis()); - parameters.add(appIdentifier.getAppId()); - - queryBuilder.append(" AND id IN ("); - for (int i = 0; i < bulkImportUsers.size(); i++) { - if (i != 0) { - queryBuilder.append(", "); - } - queryBuilder.append("?"); - parameters.add(bulkImportUsers.get(i).id); - } - queryBuilder.append(")"); - - String updateQuery = queryBuilder.toString(); + String updateQuery = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() + + " SET status = ?, updated_at = ? WHERE app_id = ? AND id IN (" + Utils + .generateCommaSeperatedQuestionMarks(bulkImportUsers.size()) + ")"; update(sqlCon, updateQuery, pst -> { - for (int i = 0; i < parameters.size(); i++) { - pst.setObject(i + 1, parameters.get(i)); + int index = 1; + pst.setString(index++, BULK_IMPORT_USER_STATUS.PROCESSING.toString()); + pst.setLong(index++, System.currentTimeMillis()); + pst.setString(index++, appIdentifier.getAppId()); + for (BulkImportUser user : bulkImportUsers) { + pst.setObject(index++, user.id); } }); return bulkImportUsers; From c39e8e75e4ba231732a6466f609a62b48dca7e13 Mon Sep 17 00:00:00 2001 From: Ankit Tiwari Date: Sat, 30 Mar 2024 00:26:37 +0530 Subject: [PATCH 05/11] fix: PR changes --- .../postgresql/queries/BulkImportQueries.java | 17 +++++++++++------ .../postgresql/queries/GeneralQueries.java | 3 ++- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java b/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java index 5fde6b87..75b4f053 100644 --- a/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java +++ b/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java @@ -64,9 +64,14 @@ public static String getQueryToCreateStatusUpdatedAtIndex(Start start) { + Config.getConfig(start).getBulkImportUsersTable() + " (app_id, status, updated_at)"; } - public static String getQueryToCreateCreatedAtIndex(Start start) { - return "CREATE INDEX IF NOT EXISTS bulk_import_users_created_at_index ON " - + Config.getConfig(start).getBulkImportUsersTable() + " (app_id, created_at)"; + public static String getQueryToCreatePaginationIndex1(Start start) { + return "CREATE INDEX IF NOT EXISTS bulk_import_users_pagination_index1 ON " + + Config.getConfig(start).getBulkImportUsersTable() + " (app_id, status, created_at DESC, id DESC)"; + } + + public static String getQueryToCreatePaginationIndex2(Start start) { + return "CREATE INDEX IF NOT EXISTS bulk_import_users_pagination_index2 ON " + + Config.getConfig(start).getBulkImportUsersTable() + " (app_id, created_at DESC, id DESC)"; } public static void insertBulkImportUsers(Start start, AppIdentifier appIdentifier, List users) @@ -124,8 +129,8 @@ public static List getBulkImportUsersAndChangeStatusToProcessing Connection sqlCon = (Connection) con.getConnection(); try { String selectQuery = "SELECT * FROM " + Config.getConfig(start).getBulkImportUsersTable() - + " WHERE status = 'NEW' AND app_id = ? " - + " OR (status = 'PROCESSING' AND updated_at < EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000 - 60 * 1000) " + + " WHERE app_id = ?" + + " AND (status = 'NEW' OR (status = 'PROCESSING' AND updated_at < EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000 - 60 * 1000))" + " LIMIT ? FOR UPDATE SKIP LOCKED"; List bulkImportUsers = new ArrayList<>(); @@ -180,7 +185,7 @@ public static List getBulkImportUsers(Start start, AppIdentifier if (bulkImportUserId != null && createdAt != null) { queryBuilder - .append(" AND created_at < ? OR (created_at = ? AND id <= ?)"); + .append(" AND (created_at < ? OR (created_at = ? AND id <= ?))"); parameters.add(createdAt); parameters.add(createdAt); parameters.add(bulkImportUserId); diff --git a/src/main/java/io/supertokens/storage/postgresql/queries/GeneralQueries.java b/src/main/java/io/supertokens/storage/postgresql/queries/GeneralQueries.java index 5e8574a2..2102d0e0 100644 --- a/src/main/java/io/supertokens/storage/postgresql/queries/GeneralQueries.java +++ b/src/main/java/io/supertokens/storage/postgresql/queries/GeneralQueries.java @@ -546,7 +546,8 @@ public static void createTablesIfNotExists(Start start) throws SQLException, Sto update(start, BulkImportQueries.getQueryToCreateBulkImportUsersTable(start), NO_OP_SETTER); // index: update(start, BulkImportQueries.getQueryToCreateStatusUpdatedAtIndex(start), NO_OP_SETTER); - update(start, BulkImportQueries.getQueryToCreateCreatedAtIndex(start), NO_OP_SETTER); + update(start, BulkImportQueries.getQueryToCreatePaginationIndex1(start), NO_OP_SETTER); + update(start, BulkImportQueries.getQueryToCreatePaginationIndex2(start), NO_OP_SETTER); } } catch (Exception e) { From 5634523607be8b89524a3583a04dd51aab852ea5 Mon Sep 17 00:00:00 2001 From: Ankit Tiwari Date: Tue, 2 Apr 2024 17:23:35 +0530 Subject: [PATCH 06/11] fix: PR changes --- .../storage/postgresql/queries/BulkImportQueries.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java b/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java index 75b4f053..25df8c5c 100644 --- a/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java +++ b/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java @@ -128,9 +128,10 @@ public static List getBulkImportUsersAndChangeStatusToProcessing return start.startTransaction(con -> { Connection sqlCon = (Connection) con.getConnection(); try { + // NOTE: On average, we take about 66 seconds to process 1000 users. If, for any reason, the bulk import users were marked as processing but couldn't be processed within 10 minutes, we'll attempt to process them again. String selectQuery = "SELECT * FROM " + Config.getConfig(start).getBulkImportUsersTable() + " WHERE app_id = ?" - + " AND (status = 'NEW' OR (status = 'PROCESSING' AND updated_at < EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000 - 60 * 1000))" + + " AND (status = 'NEW' OR (status = 'PROCESSING' AND updated_at < (EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000) - 10 * 60 * 1000))" /* 10 mins */ + " LIMIT ? FOR UPDATE SKIP LOCKED"; List bulkImportUsers = new ArrayList<>(); From 3b0df86fba7322046f199bce022875ffdab090e5 Mon Sep 17 00:00:00 2001 From: Ankit Tiwari Date: Thu, 4 Apr 2024 19:24:50 +0530 Subject: [PATCH 07/11] fix: PR changes --- .../storage/postgresql/queries/BulkImportQueries.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java b/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java index 25df8c5c..c8546d8b 100644 --- a/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java +++ b/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java @@ -146,6 +146,10 @@ public static List getBulkImportUsersAndChangeStatusToProcessing return null; }); + if (bulkImportUsers.isEmpty()) { + return new ArrayList<>(); + } + String updateQuery = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() + " SET status = ?, updated_at = ? WHERE app_id = ? AND id IN (" + Utils .generateCommaSeperatedQuestionMarks(bulkImportUsers.size()) + ")"; From 737db91b265d8dbb0f66654feac5687d0fab3f12 Mon Sep 17 00:00:00 2001 From: Ankit Tiwari Date: Tue, 9 Apr 2024 18:00:57 +0530 Subject: [PATCH 08/11] fix: Update version and changelog --- CHANGELOG.md | 4 ++++ build.gradle | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cdc2ad60..3903d758 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +## [7.1.0] - 2024-04-10 + +- Adds queries for Bulk Import + ## [7.0.0] - 2024-03-13 - Replace `TotpNotEnabledError` with `UnknownUserIdTotpError`. diff --git a/build.gradle b/build.gradle index c1281047..fdcd09f6 100644 --- a/build.gradle +++ b/build.gradle @@ -2,7 +2,7 @@ plugins { id 'java-library' } -version = "7.0.0" +version = "7.1.0" repositories { mavenCentral() From 1375803ff2a6efabc350a04ce3f35dff40b91cee Mon Sep 17 00:00:00 2001 From: Ankit Tiwari Date: Wed, 10 Apr 2024 19:50:16 +0530 Subject: [PATCH 09/11] fix: PR changes --- .../postgresql/BulkImportProxyConnection.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/main/java/io/supertokens/storage/postgresql/BulkImportProxyConnection.java b/src/main/java/io/supertokens/storage/postgresql/BulkImportProxyConnection.java index 1972fe0e..b25df932 100644 --- a/src/main/java/io/supertokens/storage/postgresql/BulkImportProxyConnection.java +++ b/src/main/java/io/supertokens/storage/postgresql/BulkImportProxyConnection.java @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved. + * + * This software is licensed under the Apache License, Version 2.0 (the + * "License") as published by the Apache Software Foundation. + * + * You may not use this file except in compliance with the License. You may + * obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + package io.supertokens.storage.postgresql; import java.sql.Array; From 3ecd8ea7de1e17b70b437566964ab68fbf9a2012 Mon Sep 17 00:00:00 2001 From: Ankit Tiwari Date: Mon, 29 Apr 2024 15:54:58 +0530 Subject: [PATCH 10/11] fix: PR changes --- .../storage/postgresql/BulkImportProxyStorage.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/supertokens/storage/postgresql/BulkImportProxyStorage.java b/src/main/java/io/supertokens/storage/postgresql/BulkImportProxyStorage.java index f077badd..57879344 100644 --- a/src/main/java/io/supertokens/storage/postgresql/BulkImportProxyStorage.java +++ b/src/main/java/io/supertokens/storage/postgresql/BulkImportProxyStorage.java @@ -87,7 +87,9 @@ public void closeConnectionForBulkImportProxyStorage() throws StorageQueryExcept @Override public void commitTransactionForBulkImportProxyStorage() throws StorageQueryException { try { - this.connection.commitForBulkImportProxyStorage(); + if (this.connection != null) { + this.connection.commitForBulkImportProxyStorage(); + } } catch (SQLException e) { throw new StorageQueryException(e); } From ecb078b70f1a03a7a46a8038fe4d89eef5e33c51 Mon Sep 17 00:00:00 2001 From: Ankit Tiwari Date: Wed, 29 May 2024 14:32:39 +0530 Subject: [PATCH 11/11] fix: PR changes --- .../postgresql/BulkImportProxyStorage.java | 28 ++++++++-- .../supertokens/storage/postgresql/Start.java | 9 ++-- .../postgresql/queries/BulkImportQueries.java | 53 +++++++++++++------ .../postgresql/queries/GeneralQueries.java | 6 ++- 4 files changed, 70 insertions(+), 26 deletions(-) diff --git a/src/main/java/io/supertokens/storage/postgresql/BulkImportProxyStorage.java b/src/main/java/io/supertokens/storage/postgresql/BulkImportProxyStorage.java index 57879344..f4e8dd8e 100644 --- a/src/main/java/io/supertokens/storage/postgresql/BulkImportProxyStorage.java +++ b/src/main/java/io/supertokens/storage/postgresql/BulkImportProxyStorage.java @@ -18,17 +18,18 @@ import java.sql.Connection; import java.sql.SQLException; +import java.util.List; import java.util.Set; import com.google.gson.JsonObject; import io.supertokens.pluginInterface.LOG_LEVEL; +import io.supertokens.pluginInterface.exceptions.DbInitException; import io.supertokens.pluginInterface.exceptions.InvalidConfigException; import io.supertokens.pluginInterface.exceptions.StorageQueryException; import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException; import io.supertokens.pluginInterface.multitenancy.TenantIdentifier; import io.supertokens.pluginInterface.sqlStorage.TransactionConnection; -import io.supertokens.storage.postgresql.config.Config; /** @@ -71,14 +72,33 @@ public void loadConfig(JsonObject configJson, Set logLevels, TenantId // We are overriding the loadConfig method to set the connection pool size // to 1 to avoid creating many connections for the bulk import cronjob configJson.addProperty("postgresql_connection_pool_size", 1); - Config.loadConfig(this, configJson, logLevels, tenantIdentifier); + super.loadConfig(configJson, logLevels, tenantIdentifier); + } + + @Override + public void initStorage(boolean shouldWait, List tenantIdentifiers) throws DbInitException { + super.initStorage(shouldWait, tenantIdentifiers); + + // `BulkImportProxyStorage` uses `BulkImportProxyConnection`, which overrides the `.commit()` method on the Connection object. + // The `initStorage()` method runs `select * from table_name limit 1` queries to check if the tables exist but these queries + // don't get committed due to the overridden `.commit()`, so we need to manually commit the transaction to remove any locks on the tables. + + // Without this commit, a call to `select * from bulk_import_users limit 1` in `doesTableExist()` locks the `bulk_import_users` table, + try { + this.commitTransactionForBulkImportProxyStorage(); + } catch (StorageQueryException e) { + throw new DbInitException(e); + } } @Override public void closeConnectionForBulkImportProxyStorage() throws StorageQueryException { try { - this.connection.close(); - this.connection = null; + if (this.connection != null) { + this.connection.close(); + this.connection = null; + } + ConnectionPool.close(this); } catch (SQLException e) { throw new StorageQueryException(e); } diff --git a/src/main/java/io/supertokens/storage/postgresql/Start.java b/src/main/java/io/supertokens/storage/postgresql/Start.java index 9c10a2ee..411e52b3 100644 --- a/src/main/java/io/supertokens/storage/postgresql/Start.java +++ b/src/main/java/io/supertokens/storage/postgresql/Start.java @@ -3159,19 +3159,18 @@ public List getBulkImportUsersAndChangeStatusToProcessing(AppIde } @Override - public void deleteBulkImportUser_Transaction(AppIdentifier appIdentifier, TransactionConnection con, @Nonnull String bulkImportUserId) throws StorageQueryException { - Connection sqlCon = (Connection) con.getConnection(); + public void updateBulkImportUserPrimaryUserId(AppIdentifier appIdentifier, @Nonnull String bulkImportUserId, @Nonnull String primaryUserId) throws StorageQueryException { try { - BulkImportQueries.deleteBulkImportUser_Transaction(this, sqlCon, appIdentifier, bulkImportUserId); + BulkImportQueries.updateBulkImportUserPrimaryUserId(this, appIdentifier, bulkImportUserId, primaryUserId); } catch (SQLException e) { throw new StorageQueryException(e); } } @Override - public void updateBulkImportUserPrimaryUserId(AppIdentifier appIdentifier, @Nonnull String bulkImportUserId, @Nonnull String primaryUserId) throws StorageQueryException { + public long getBulkImportUsersCount(AppIdentifier appIdentifier, @Nullable BULK_IMPORT_USER_STATUS status) throws StorageQueryException { try { - BulkImportQueries.updateBulkImportUserPrimaryUserId(this, appIdentifier, bulkImportUserId, primaryUserId); + return BulkImportQueries.getBulkImportUsersCount(this, appIdentifier, status); } catch (SQLException e) { throw new StorageQueryException(e); } diff --git a/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java b/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java index 165c606b..0a01fd4f 100644 --- a/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java +++ b/src/main/java/io/supertokens/storage/postgresql/queries/BulkImportQueries.java @@ -45,12 +45,12 @@ static String getQueryToCreateBulkImportUsersTable(Start start) { return "CREATE TABLE IF NOT EXISTS " + tableName + " (" + "id CHAR(36)," + "app_id VARCHAR(64) NOT NULL DEFAULT 'public'," - + "primary_user_id VARCHAR(64)," + + "primary_user_id VARCHAR(36)," + "raw_data TEXT NOT NULL," + "status VARCHAR(128) DEFAULT 'NEW'," + "error_msg TEXT," - + "created_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000," - + "updated_at BIGINT DEFAULT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000," + + "created_at BIGINT NOT NULL, " + + "updated_at BIGINT NOT NULL, " + "CONSTRAINT " + Utils.getConstraintName(schema, tableName, null, "pkey") + " PRIMARY KEY(app_id, id)," + "CONSTRAINT " + Utils.getConstraintName(schema, tableName, "app_id", "fkey") + " " @@ -77,12 +77,12 @@ public static String getQueryToCreatePaginationIndex2(Start start) { public static void insertBulkImportUsers(Start start, AppIdentifier appIdentifier, List users) throws SQLException, StorageQueryException { StringBuilder queryBuilder = new StringBuilder( - "INSERT INTO " + Config.getConfig(start).getBulkImportUsersTable() + " (id, app_id, raw_data) VALUES "); + "INSERT INTO " + Config.getConfig(start).getBulkImportUsersTable() + " (id, app_id, raw_data, created_at, updated_at) VALUES "); int userCount = users.size(); for (int i = 0; i < userCount; i++) { - queryBuilder.append(" (?, ?, ?)"); + queryBuilder.append(" (?, ?, ?, ?, ?)"); if (i < userCount - 1) { queryBuilder.append(","); @@ -95,6 +95,8 @@ public static void insertBulkImportUsers(Start start, AppIdentifier appIdentifie pst.setString(parameterIndex++, user.id); pst.setString(parameterIndex++, appIdentifier.getAppId()); pst.setString(parameterIndex++, user.toRawDataForDbStorage()); + pst.setLong(parameterIndex++, System.currentTimeMillis()); + pst.setLong(parameterIndex++, System.currentTimeMillis()); } }); } @@ -129,6 +131,10 @@ public static List getBulkImportUsersAndChangeStatusToProcessing Connection sqlCon = (Connection) con.getConnection(); try { // NOTE: On average, we take about 66 seconds to process 1000 users. If, for any reason, the bulk import users were marked as processing but couldn't be processed within 10 minutes, we'll attempt to process them again. + + // "FOR UPDATE" ensures that multiple cron jobs don't read the same rows simultaneously. + // If one process locks the first 1000 rows, others will wait for the lock to be released. + // "SKIP LOCKED" allows other processes to skip locked rows and select the next 1000 available rows. String selectQuery = "SELECT * FROM " + Config.getConfig(start).getBulkImportUsersTable() + " WHERE app_id = ?" + " AND (status = 'NEW' OR (status = 'PROCESSING' AND updated_at < (EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000) - 10 * 60 * 1000))" /* 10 mins */ @@ -253,17 +259,6 @@ public static List deleteBulkImportUsers(Start start, AppIdentifier appI }); } - public static void deleteBulkImportUser_Transaction(Start start, Connection con, AppIdentifier appIdentifier, - @Nonnull String bulkImportUserId) throws SQLException { - String query = "DELETE FROM " + Config.getConfig(start).getBulkImportUsersTable() - + " WHERE app_id = ? AND id = ?"; - - update(con, query, pst -> { - pst.setString(1, appIdentifier.getAppId()); - pst.setString(2, bulkImportUserId); - }); - } - public static void updateBulkImportUserPrimaryUserId(Start start, AppIdentifier appIdentifier, @Nonnull String bulkImportUserId, @Nonnull String primaryUserId) throws SQLException, StorageQueryException { @@ -278,6 +273,32 @@ public static void updateBulkImportUserPrimaryUserId(Start start, AppIdentifier }); } + public static long getBulkImportUsersCount(Start start, AppIdentifier appIdentifier, @Nullable BULK_IMPORT_USER_STATUS status) throws SQLException, StorageQueryException { + String baseQuery = "SELECT COUNT(*) FROM " + Config.getConfig(start).getBulkImportUsersTable(); + StringBuilder queryBuilder = new StringBuilder(baseQuery); + + List parameters = new ArrayList<>(); + + queryBuilder.append(" WHERE app_id = ?"); + parameters.add(appIdentifier.getAppId()); + + if (status != null) { + queryBuilder.append(" AND status = ?"); + parameters.add(status.toString()); + } + + String query = queryBuilder.toString(); + + return execute(start, query, pst -> { + for (int i = 0; i < parameters.size(); i++) { + pst.setObject(i + 1, parameters.get(i)); + } + }, result -> { + result.next(); + return result.getLong(1); + }); + } + private static class BulkImportUserRowMapper implements RowMapper { private static final BulkImportUserRowMapper INSTANCE = new BulkImportUserRowMapper(); diff --git a/src/main/java/io/supertokens/storage/postgresql/queries/GeneralQueries.java b/src/main/java/io/supertokens/storage/postgresql/queries/GeneralQueries.java index 1ff9ef7f..d0d9f7de 100644 --- a/src/main/java/io/supertokens/storage/postgresql/queries/GeneralQueries.java +++ b/src/main/java/io/supertokens/storage/postgresql/queries/GeneralQueries.java @@ -593,7 +593,11 @@ public static void deleteAllTables(Start start) throws SQLException, StorageQuer update(start, DROP_QUERY, NO_OP_SETTER); } { - String DROP_QUERY = "DROP INDEX IF EXISTS bulk_import_users_created_at_index"; + String DROP_QUERY = "DROP INDEX IF EXISTS bulk_import_users_pagination_index1"; + update(start, DROP_QUERY, NO_OP_SETTER); + } + { + String DROP_QUERY = "DROP INDEX IF EXISTS bulk_import_users_pagination_index2"; update(start, DROP_QUERY, NO_OP_SETTER); } {