Skip to content

Commit

Permalink
fix: PR changes
Browse files Browse the repository at this point in the history
  • Loading branch information
anku255 committed May 29, 2024
1 parent a7d48ab commit 6673d87
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,26 @@

import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Set;

import com.google.gson.JsonObject;

import io.supertokens.pluginInterface.LOG_LEVEL;
import io.supertokens.pluginInterface.exceptions.DbInitException;
import io.supertokens.pluginInterface.exceptions.InvalidConfigException;
import io.supertokens.pluginInterface.exceptions.StorageQueryException;
import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException;
import io.supertokens.pluginInterface.multitenancy.TenantIdentifier;
import io.supertokens.pluginInterface.sqlStorage.TransactionConnection;
import io.supertokens.storage.mysql.config.Config;


/**
* BulkImportProxyStorage is a class extending Start, serving as a Storage instance in the bulk import user cronjob.
* This cronjob extensively utilizes existing queries to import users, all of which internally operate within transactions.
*
* For the purpose of bulkimport cronjob, we aim to employ a single connection for all queries and rollback any operations in case of query failures.
* To achieve this, we override the startTransactionHelper method to utilize the same connection and prevent automatic query commits even upon transaction success.
* To achieve this, we override the startTransactionHelper method to utilize the same connection and prevent automatic query commits even upon transaction
* success.
* Subsequently, the cronjob is responsible for committing the transaction after ensuring the successful execution of all queries.
*/

Expand All @@ -61,7 +62,7 @@ protected <T> T startTransactionHelper(TransactionLogic<T> logic, TransactionIso

@Override
public void commitTransaction(TransactionConnection con) throws StorageQueryException {
// We do not want to commit the queries when using the BulkImportProxyStorage to be able to rollback everything
// We do not want to commit the queries when using the BulkImportProxyStorage to be able to rollback everything
// if any query fails while importing the user
}

Expand All @@ -71,14 +72,33 @@ public void loadConfig(JsonObject configJson, Set<LOG_LEVEL> logLevels, TenantId
// We are overriding the loadConfig method to set the connection pool size
// to 1 to avoid creating many connections for the bulk import cronjob
configJson.addProperty("postgresql_connection_pool_size", 1);
Config.loadConfig(this, configJson, logLevels, tenantIdentifier);
super.loadConfig(configJson, logLevels, tenantIdentifier);
}

@Override
public void initStorage(boolean shouldWait, List<TenantIdentifier> tenantIdentifiers) throws DbInitException {
super.initStorage(shouldWait, tenantIdentifiers);

// `BulkImportProxyStorage` uses `BulkImportProxyConnection`, which overrides the `.commit()` method on the Connection object.
// The `initStorage()` method runs `select * from table_name limit 1` queries to check if the tables exist but these queries
// don't get committed due to the overridden `.commit()`, so we need to manually commit the transaction to remove any locks on the tables.

// Without this commit, a call to `select * from bulk_import_users limit 1` in `doesTableExist()` locks the `bulk_import_users` table,
try {
this.commitTransactionForBulkImportProxyStorage();
} catch (StorageQueryException e) {
throw new DbInitException(e);
}
}

@Override
public void closeConnectionForBulkImportProxyStorage() throws StorageQueryException {
try {
this.connection.close();
this.connection = null;
if (this.connection != null) {
this.connection.close();
this.connection = null;
}
ConnectionPool.close(this);
} catch (SQLException e) {
throw new StorageQueryException(e);
}
Expand Down
9 changes: 4 additions & 5 deletions src/main/java/io/supertokens/storage/mysql/Start.java
Original file line number Diff line number Diff line change
Expand Up @@ -3107,19 +3107,18 @@ public List<BulkImportUser> getBulkImportUsersAndChangeStatusToProcessing(AppIde
}

@Override
public void deleteBulkImportUser_Transaction(AppIdentifier appIdentifier, TransactionConnection con, @Nonnull String bulkImportUserId) throws StorageQueryException {
Connection sqlCon = (Connection) con.getConnection();
public void updateBulkImportUserPrimaryUserId(AppIdentifier appIdentifier, @Nonnull String bulkImportUserId, @Nonnull String primaryUserId) throws StorageQueryException {
try {
BulkImportQueries.deleteBulkImportUser_Transaction(this, sqlCon, appIdentifier, bulkImportUserId);
BulkImportQueries.updateBulkImportUserPrimaryUserId(this, appIdentifier, bulkImportUserId, primaryUserId);
} catch (SQLException e) {
throw new StorageQueryException(e);
}
}

@Override
public void updateBulkImportUserPrimaryUserId(AppIdentifier appIdentifier, @Nonnull String bulkImportUserId, @Nonnull String primaryUserId) throws StorageQueryException {
public long getBulkImportUsersCount(AppIdentifier appIdentifier, @Nullable BULK_IMPORT_USER_STATUS status) throws StorageQueryException {
try {
BulkImportQueries.updateBulkImportUserPrimaryUserId(this, appIdentifier, bulkImportUserId, primaryUserId);
return BulkImportQueries.getBulkImportUsersCount(this, appIdentifier, status);
} catch (SQLException e) {
throw new StorageQueryException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ static String getQueryToCreateBulkImportUsersTable(Start start) {
return "CREATE TABLE IF NOT EXISTS " + tableName + " ("
+ "id CHAR(36),"
+ "app_id VARCHAR(64) NOT NULL DEFAULT 'public',"
+ "primary_user_id VARCHAR(64),"
+ "primary_user_id VARCHAR(36),"
+ "raw_data TEXT NOT NULL,"
+ "status VARCHAR(128) DEFAULT 'NEW',"
+ "error_msg TEXT,"
Expand Down Expand Up @@ -128,6 +128,10 @@ public static List<BulkImportUser> getBulkImportUsersAndChangeStatusToProcessing
Connection sqlCon = (Connection) con.getConnection();
try {
// NOTE: On average, we take about 66 seconds to process 1000 users. If, for any reason, the bulk import users were marked as processing but couldn't be processed within 10 minutes, we'll attempt to process them again.

// "FOR UPDATE" ensures that multiple cron jobs don't read the same rows simultaneously.
// If one process locks the first 1000 rows, others will wait for the lock to be released.
// "SKIP LOCKED" allows other processes to skip locked rows and select the next 1000 available rows.
String selectQuery = "SELECT * FROM " + Config.getConfig(start).getBulkImportUsersTable()
+ " WHERE app_id = ?"
+ " AND (status = 'NEW' OR (status = 'PROCESSING' AND updated_at < (UNIX_TIMESTAMP() * 1000) - 10 * 60 * 1000))" /* 10 mins */
Expand Down Expand Up @@ -220,6 +224,8 @@ public static List<String> deleteBulkImportUsers(Start start, AppIdentifier appI
return new ArrayList<>();
}

// This function needs to return the IDs of the deleted users. Since the DELETE query doesn't return the IDs of the deleted entries,
// we first perform a SELECT query to find all IDs that actually exist in the database. After deletion, we return these IDs.
String selectQuery = "SELECT id FROM " + Config.getConfig(start).getBulkImportUsersTable()
+ " WHERE app_id = ? AND id IN (" + Utils
.generateCommaSeperatedQuestionMarks(bulkImportUserIds.length) + ")";
Expand Down Expand Up @@ -282,6 +288,32 @@ public static void updateBulkImportUserPrimaryUserId(Start start, AppIdentifier
});
}

public static long getBulkImportUsersCount(Start start, AppIdentifier appIdentifier, @Nullable BULK_IMPORT_USER_STATUS status) throws SQLException, StorageQueryException {
String baseQuery = "SELECT COUNT(*) FROM " + Config.getConfig(start).getBulkImportUsersTable();
StringBuilder queryBuilder = new StringBuilder(baseQuery);

List<Object> parameters = new ArrayList<>();

queryBuilder.append(" WHERE app_id = ?");
parameters.add(appIdentifier.getAppId());

if (status != null) {
queryBuilder.append(" AND status = ?");
parameters.add(status.toString());
}

String query = queryBuilder.toString();

return execute(start, query, pst -> {
for (int i = 0; i < parameters.size(); i++) {
pst.setObject(i + 1, parameters.get(i));
}
}, result -> {
result.next();
return result.getLong(1);
});
}

private static class BulkImportUserRowMapper implements RowMapper<BulkImportUser, ResultSet> {
private static final BulkImportUserRowMapper INSTANCE = new BulkImportUserRowMapper();

Expand Down

0 comments on commit 6673d87

Please sign in to comment.