Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: bulk import changelog #137

Merged
merged 3 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,35 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

## [7.3.0]

- Adds queries for Bulk Import
- Adds support for multithreaded bulk import

### Migration

```sql
CREATE TABLE IF NOT EXISTS bulk_import_users (
id CHAR(36),
app_id VARCHAR(64) NOT NULL DEFAULT 'public',
primary_user_id VARCHAR(36),
raw_data TEXT NOT NULL,
status VARCHAR(128) DEFAULT 'NEW',
error_msg TEXT,
created_at BIGINT UNSIGNED NOT NULL,
updated_at BIGINT UNSIGNED NOT NULL,
PRIMARY KEY (app_id, id),
FOREIGN KEY(app_id) REFERENCES apps(app_id) ON DELETE CASCADE
);

CREATE INDEX bulk_import_users_status_updated_at_index ON bulk_import_users (app_id, status, updated_at);

CREATE INDEX bulk_import_users_pagination_index1 ON bulk_import_users (app_id, status, created_at DESC,
id DESC);

CREATE INDEX bulk_import_users_pagination_index2 ON bulk_import_users (app_id, created_at DESC, id DESC);
```

## [7.2.0] - 2024-10-03

- Compatible with plugin interface version 6.3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

public interface QueryExecutorTemplate {

Expand All @@ -44,6 +45,25 @@ static <T> T execute(Connection con, String QUERY, PreparedStatementValueSetter
}
}

static void executeBatch(Connection connection, String QUERY, List<PreparedStatementValueSetter> setters)
throws SQLException, StorageQueryException {
assert setters != null;
assert !setters.isEmpty();
try (PreparedStatement pst = connection.prepareStatement(QUERY)) {
int counter = 0;
for(PreparedStatementValueSetter setter: setters) {
setter.setValues(pst);
pst.addBatch();
counter++;

if(counter % 100 == 0) {
pst.executeBatch();
}
}
pst.executeBatch(); //for the possible remaining ones
}
}

static int update(Start start, String QUERY, PreparedStatementValueSetter setter)
throws SQLException, StorageQueryException {
try (Connection con = ConnectionPool.getConnection(start)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,21 @@
import io.supertokens.pluginInterface.exceptions.StorageQueryException;
import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException;
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.storage.mysql.PreparedStatementValueSetter;
import io.supertokens.storage.mysql.Start;
import io.supertokens.storage.mysql.config.Config;
import io.supertokens.storage.mysql.utils.Utils;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute;
import static io.supertokens.storage.mysql.QueryExecutorTemplate.update;
import static io.supertokens.storage.mysql.QueryExecutorTemplate.*;

public class BulkImportQueries {
static String getQueryToCreateBulkImportUsersTable(Start start) {
Expand Down Expand Up @@ -317,28 +316,24 @@ public static long getBulkImportUsersCount(Start start, AppIdentifier appIdentif

public static void updateMultipleBulkImportUsersStatusToError_Transaction(Start start, Connection con, AppIdentifier appIdentifier,
@Nonnull Map<String,String> bulkImportUserIdToErrorMessage)
throws SQLException {
throws SQLException, StorageQueryException {
BULK_IMPORT_USER_STATUS errorStatus = BULK_IMPORT_USER_STATUS.FAILED;
String query = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable()
+ " SET status = ?, error_msg = ?, updated_at = ? WHERE app_id = ? and id = ?";

PreparedStatement setErrorStatement = con.prepareStatement(query);
List<PreparedStatementValueSetter> errorSetters = new ArrayList<>();

int counter = 0;
for(String bulkImportUserId : bulkImportUserIdToErrorMessage.keySet()){
setErrorStatement.setString(1, errorStatus.toString());
setErrorStatement.setString(2, bulkImportUserIdToErrorMessage.get(bulkImportUserId));
setErrorStatement.setLong(3, System.currentTimeMillis());
setErrorStatement.setString(4, appIdentifier.getAppId());
setErrorStatement.setString(5, bulkImportUserId);
setErrorStatement.addBatch();

if(counter % 100 == 0) {
setErrorStatement.executeBatch();
}
errorSetters.add(pst -> {
pst.setString(1, errorStatus.toString());
pst.setString(2, bulkImportUserIdToErrorMessage.get(bulkImportUserId));
pst.setLong(3, System.currentTimeMillis());
pst.setString(4, appIdentifier.getAppId());
pst.setString(5, bulkImportUserId);
});
}

setErrorStatement.executeBatch();
executeBatch(con, query, errorSetters);
}

private static class BulkImportUserRowMapper implements RowMapper<BulkImportUser, ResultSet> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,19 @@
import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException;
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.pluginInterface.multitenancy.TenantIdentifier;
import io.supertokens.storage.mysql.PreparedStatementValueSetter;
import io.supertokens.storage.mysql.Start;
import io.supertokens.storage.mysql.config.Config;
import io.supertokens.storage.mysql.utils.Utils;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.stream.Collectors;

import static io.supertokens.pluginInterface.RECIPE_ID.EMAIL_PASSWORD;
import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute;
import static io.supertokens.storage.mysql.QueryExecutorTemplate.update;
import static io.supertokens.storage.mysql.QueryExecutorTemplate.*;
import static io.supertokens.storage.mysql.config.Config.getConfig;
import static java.lang.System.currentTimeMillis;

Expand Down Expand Up @@ -330,60 +329,53 @@ public static void signUpMultipleForBulkImport_Transaction(Start start, Connecti
"INSERT INTO " + getConfig(start).getEmailPasswordUserToTenantTable()
+ "(app_id, tenant_id, user_id, email)" + " VALUES(?, ?, ?, ?)";

PreparedStatement appIdToUserId = sqlCon.prepareStatement(app_id_to_user_id_QUERY);
PreparedStatement allAuthRecipeUsers = sqlCon.prepareStatement(all_auth_recipe_users_QUERY);
PreparedStatement emailPasswordUsers = sqlCon.prepareStatement(emailpassword_users_QUERY);
PreparedStatement emailPasswordUsersToTenant = sqlCon.prepareStatement(emailpassword_users_to_tenant_QUERY);
List<PreparedStatementValueSetter> appIdToUserIdSetters = new ArrayList<>();
List<PreparedStatementValueSetter> allAuthRecipeUsersSetters = new ArrayList<>();
List<PreparedStatementValueSetter> emailPasswordUsersSetters = new ArrayList<>();
List<PreparedStatementValueSetter> emailPasswordUsersToTenantSetters = new ArrayList<>();;

int counter = 0;
for (EmailPasswordImportUser user : usersToSignUp) {
String userId = user.userId;
TenantIdentifier tenantIdentifier = user.tenantIdentifier;

appIdToUserId.setString(1, tenantIdentifier.getAppId());
appIdToUserId.setString(2, userId);
appIdToUserId.setString(3, userId);
appIdToUserId.setString(4, EMAIL_PASSWORD.toString());
appIdToUserId.addBatch();


allAuthRecipeUsers.setString(1, tenantIdentifier.getAppId());
allAuthRecipeUsers.setString(2, tenantIdentifier.getTenantId());
allAuthRecipeUsers.setString(3, userId);
allAuthRecipeUsers.setString(4, userId);
allAuthRecipeUsers.setString(5, EMAIL_PASSWORD.toString());
allAuthRecipeUsers.setLong(6, user.timeJoinedMSSinceEpoch);
allAuthRecipeUsers.setLong(7, user.timeJoinedMSSinceEpoch);
allAuthRecipeUsers.addBatch();

emailPasswordUsers.setString(1, tenantIdentifier.getAppId());
emailPasswordUsers.setString(2, userId);
emailPasswordUsers.setString(3, user.email);
emailPasswordUsers.setString(4, user.passwordHash);
emailPasswordUsers.setLong(5, user.timeJoinedMSSinceEpoch);
emailPasswordUsers.addBatch();

emailPasswordUsersToTenant.setString(1, tenantIdentifier.getAppId());
emailPasswordUsersToTenant.setString(2, tenantIdentifier.getTenantId());
emailPasswordUsersToTenant.setString(3, userId);
emailPasswordUsersToTenant.setString(4, user.email);
emailPasswordUsersToTenant.addBatch();
counter++;
if (counter % 100 == 0) {
appIdToUserId.executeBatch();
allAuthRecipeUsers.executeBatch();
emailPasswordUsers.executeBatch();
emailPasswordUsersToTenant.executeBatch();
}
appIdToUserIdSetters.add(pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, userId);
pst.setString(3, userId);
pst.setString(4, EMAIL_PASSWORD.toString());
});

allAuthRecipeUsersSetters.add(pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, tenantIdentifier.getTenantId());
pst.setString(3, userId);
pst.setString(4, userId);
pst.setString(5, EMAIL_PASSWORD.toString());
pst.setLong(6, user.timeJoinedMSSinceEpoch);
pst.setLong(7, user.timeJoinedMSSinceEpoch);
});

emailPasswordUsersSetters.add(pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, userId);
pst.setString(3, user.email);
pst.setString(4, user.passwordHash);
pst.setLong(5, user.timeJoinedMSSinceEpoch);
});

emailPasswordUsersToTenantSetters.add(pst -> {
pst.setString(1, tenantIdentifier.getAppId());
pst.setString(2, tenantIdentifier.getTenantId());
pst.setString(3, userId);
pst.setString(4, user.email);
});
}

//execute the remaining ones
appIdToUserId.executeBatch();
allAuthRecipeUsers.executeBatch();
emailPasswordUsers.executeBatch();
emailPasswordUsersToTenant.executeBatch();
executeBatch(sqlCon, app_id_to_user_id_QUERY, appIdToUserIdSetters);
executeBatch(sqlCon, all_auth_recipe_users_QUERY, allAuthRecipeUsersSetters);
executeBatch(sqlCon, emailpassword_users_QUERY, emailPasswordUsersSetters);
executeBatch(sqlCon, emailpassword_users_to_tenant_QUERY, emailPasswordUsersToTenantSetters);

//sqlCon.commit();
} catch (SQLException throwables) {
throw new StorageTransactionLogicException(throwables);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,17 @@
import io.supertokens.pluginInterface.multitenancy.AppIdentifier;
import io.supertokens.pluginInterface.multitenancy.TenantIdentifier;
import io.supertokens.pluginInterface.sqlStorage.TransactionConnection;
import io.supertokens.storage.mysql.PreparedStatementValueSetter;
import io.supertokens.storage.mysql.Start;
import io.supertokens.storage.mysql.config.Config;
import io.supertokens.storage.mysql.utils.Utils;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;

import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute;
import static io.supertokens.storage.mysql.QueryExecutorTemplate.update;
import static io.supertokens.storage.mysql.QueryExecutorTemplate.*;
import static io.supertokens.storage.mysql.config.Config.getConfig;
import static java.lang.System.currentTimeMillis;

Expand Down Expand Up @@ -500,41 +499,25 @@ public static void updateMultipleUsersIsEmailVerified_Transaction(Start start, C
boolean isEmailVerified)
throws SQLException, StorageQueryException {

String QUERY = "";
if (isEmailVerified) {
String QUERY = "INSERT INTO " + getConfig(start).getEmailVerificationTable()
QUERY = "INSERT INTO " + getConfig(start).getEmailVerificationTable()
+ "(app_id, user_id, email) VALUES(?, ?, ?)";
PreparedStatement insertQuery = con.prepareStatement(QUERY);
int counter = 0;
for(Map.Entry<String, String> emailToUser : emailToUserIds.entrySet()){
insertQuery.setString(1, appIdentifier.getAppId());
insertQuery.setString(2, emailToUser.getKey());
insertQuery.setString(3, emailToUser.getValue());
insertQuery.addBatch();

counter++;
if (counter % 100 == 0) {
insertQuery.executeBatch();
}
}
insertQuery.executeBatch();
} else {
String QUERY = "DELETE FROM " + getConfig(start).getEmailVerificationTable()
QUERY = "DELETE FROM " + getConfig(start).getEmailVerificationTable()
+ " WHERE app_id = ? AND user_id = ? AND email = ?";
PreparedStatement deleteQuery = con.prepareStatement(QUERY);
int counter = 0;
for (Map.Entry<String, String> emailToUser : emailToUserIds.entrySet()) {
deleteQuery.setString(1, appIdentifier.getAppId());
deleteQuery.setString(2, emailToUser.getValue());
deleteQuery.setString(3, emailToUser.getKey());
deleteQuery.addBatch();

counter++;
if (counter % 100 == 0) {
deleteQuery.executeBatch();
}
}
deleteQuery.executeBatch();
}
List<PreparedStatementValueSetter> setters = new ArrayList<>();

for(Map.Entry<String, String> emailToUser : emailToUserIds.entrySet()){
setters.add(pst -> {
pst.setString(1, appIdentifier.getAppId());
pst.setString(2, emailToUser.getKey());
pst.setString(3, emailToUser.getValue());
});
}

executeBatch(con, QUERY, setters);
}

public static Set<String> findUserIdsBeingUsedForEmailVerification(Start start, AppIdentifier appIdentifier, List<String> userIds)
Expand Down Expand Up @@ -587,29 +570,29 @@ public static void updateMultipleIsEmailVerifiedToExternalUserIds(Start start, A
+ " SET user_id = ? WHERE app_id = ? AND user_id = ?";
String update_email_verification_tokens_table_query = "UPDATE " + getConfig(start).getEmailVerificationTokensTable()
+ " SET user_id = ? WHERE app_id = ? AND user_id = ?";
PreparedStatement updateEmailVerificationQuery = sqlCon.prepareStatement(update_email_verification_table_query);
PreparedStatement updateEmailVerificationTokensQuery = sqlCon.prepareStatement(update_email_verification_tokens_table_query);

int counter = 0;
List<PreparedStatementValueSetter> emailVerificationSetters = new ArrayList<>();
List<PreparedStatementValueSetter> emailVerificationTokensSetters = new ArrayList<>();

for (String supertokensUserId : supertokensUserIdToExternalUserId.keySet()){
updateEmailVerificationQuery.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId));
updateEmailVerificationQuery.setString(2, appIdentifier.getAppId());
updateEmailVerificationQuery.setString(3, supertokensUserId);
updateEmailVerificationQuery.addBatch();

updateEmailVerificationTokensQuery.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId));
updateEmailVerificationTokensQuery.setString(2, appIdentifier.getAppId());
updateEmailVerificationTokensQuery.setString(3, supertokensUserId);
updateEmailVerificationTokensQuery.addBatch();

counter++;
if(counter % 100 == 0) {
updateEmailVerificationQuery.executeBatch();
updateEmailVerificationTokensQuery.executeBatch();
}
emailVerificationSetters.add(pst -> {
pst.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId));
pst.setString(2, appIdentifier.getAppId());
pst.setString(3, supertokensUserId);
});

emailVerificationTokensSetters.add(pst -> {
pst.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId));
pst.setString(2, appIdentifier.getAppId());
pst.setString(3, supertokensUserId);
});
}
updateEmailVerificationQuery.executeBatch();
updateEmailVerificationTokensQuery.executeBatch();
if(emailVerificationSetters.isEmpty()){
return null;
}

executeBatch(sqlCon, update_email_verification_table_query, emailVerificationSetters);
executeBatch(sqlCon, update_email_verification_tokens_table_query, emailVerificationTokensSetters);

} catch (SQLException e) {
throw new StorageTransactionLogicException(e);
Expand Down
Loading
Loading