From eda932f2b780830098cfb00f428c4d620df85e01 Mon Sep 17 00:00:00 2001 From: Tamas Soltesz Date: Thu, 19 Dec 2024 09:44:06 +0100 Subject: [PATCH 1/3] feat: multithreaded bulk import (#128) * feat: Add bulk import queries * fix: PR changes * fix: PR changes * fix: removing restriction of connection pool size for bulk import * feat: enable test run trigger from ci by hand * fix: enable test run trigger from ci by hand * Updated config.yml * fix: reverting CI changes * "fix: reverting CI config changes" This reverts commit 4586b90927ccbf072c13362c28482571321959b3. * fix: fixing transaction rolled back issues with multithreaded bulk import * fix: changelog * fix: put back accidentally deleted comment * feat: mysql implementation for bulk import * fix: using right transaction isolation level for mysql in bulkimport * fix: review fix, onemillion user test move * fix: fixing imports --------- Co-authored-by: Ankit Tiwari --- CHANGELOG.md | 3 + .../mysql/BulkImportProxyConnection.java | 328 +++++++++++ .../storage/mysql/BulkImportProxyStorage.java | 113 ++++ .../storage/mysql/ConnectionPool.java | 13 +- .../storage/mysql/QueryExecutorTemplate.java | 12 + .../io/supertokens/storage/mysql/Start.java | 543 +++++++++++++++++- .../storage/mysql/config/MySQLConfig.java | 4 + .../mysql/queries/ActiveUsersQueries.java | 31 +- .../mysql/queries/BulkImportQueries.java | 362 ++++++++++++ .../mysql/queries/EmailPasswordQueries.java | 79 +++ .../queries/EmailVerificationQueries.java | 128 +++++ .../storage/mysql/queries/GeneralQueries.java | 141 ++++- .../mysql/queries/PasswordlessQueries.java | 71 +++ .../storage/mysql/queries/SessionQueries.java | 28 + .../storage/mysql/queries/TOTPQueries.java | 74 +++ .../mysql/queries/ThirdPartyQueries.java | 77 +++ .../mysql/queries/UserIdMappingQueries.java | 72 +++ .../mysql/queries/UserMetadataQueries.java | 60 ++ .../mysql/queries/UserRolesQueries.java | 74 +++ .../mysql/test/OneMillionUsersTest.java | 246 +++++++- 20 files changed, 2439 insertions(+), 20 deletions(-) create mode 100644 src/main/java/io/supertokens/storage/mysql/BulkImportProxyConnection.java create mode 100644 src/main/java/io/supertokens/storage/mysql/BulkImportProxyStorage.java create mode 100644 src/main/java/io/supertokens/storage/mysql/queries/BulkImportQueries.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a2163cd..936c0cab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +- Adds queries for Bulk Import +- Adds support for multithreaded bulk import + ## [7.2.0] - 2024-10-03 - Compatible with plugin interface version 6.3 diff --git a/src/main/java/io/supertokens/storage/mysql/BulkImportProxyConnection.java b/src/main/java/io/supertokens/storage/mysql/BulkImportProxyConnection.java new file mode 100644 index 00000000..e04dff02 --- /dev/null +++ b/src/main/java/io/supertokens/storage/mysql/BulkImportProxyConnection.java @@ -0,0 +1,328 @@ +/* + * Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved. + * + * This software is licensed under the Apache License, Version 2.0 (the + * "License") as published by the Apache Software Foundation. + * + * You may not use this file except in compliance with the License. You may + * obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + + +package io.supertokens.storage.mysql; + +import java.sql.*; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; + +/** +* BulkImportProxyConnection is a class implementing the Connection interface, serving as a Connection instance in the bulk import user cronjob. +* This cron extensively utilizes existing queries to import users, all of which internally operate within transactions and those query sometimes +* call the commit/rollback method on the connection. +* +* For the purpose of bulkimport cronjob, we aim to employ a single connection for all queries and rollback any operations in case of query failures. +* To achieve this, we use our own proxy Connection instance and override the commit/rollback/close methods to do nothing. +*/ + +public class BulkImportProxyConnection implements Connection { + private Connection con = null; + + public BulkImportProxyConnection(Connection con) { + this.con = con; + } + + @Override + public void close() throws SQLException { + // We simply ignore when close is called BulkImportProxyConnection + } + + @Override + public void commit() throws SQLException { + // We simply ignore when commit is called BulkImportProxyConnection + } + + @Override + public void rollback() throws SQLException { + // We simply ignore when rollback is called BulkImportProxyConnection + } + + public void closeForBulkImportProxyStorage() throws SQLException { + this.con.close(); + } + + public void commitForBulkImportProxyStorage() throws SQLException { + this.con.commit(); + } + + public void rollbackForBulkImportProxyStorage() throws SQLException { + this.con.rollback(); + } + + /* Following methods are unchaged */ + + @Override + public Statement createStatement() throws SQLException { + return this.con.createStatement(); + } + + @Override + public PreparedStatement prepareStatement(String sql) throws SQLException { + return this.con.prepareStatement(sql); + } + + @Override + public CallableStatement prepareCall(String sql) throws SQLException { + return this.con.prepareCall(sql); + } + + @Override + public String nativeSQL(String sql) throws SQLException { + return this.con.nativeSQL(sql); + } + + @Override + public void setAutoCommit(boolean autoCommit) throws SQLException { + this.con.setAutoCommit(autoCommit); + } + + @Override + public boolean getAutoCommit() throws SQLException { + return this.con.getAutoCommit(); + } + + @Override + public boolean isClosed() throws SQLException { + return this.con.isClosed(); + } + + @Override + public DatabaseMetaData getMetaData() throws SQLException { + return this.con.getMetaData(); + } + + @Override + public void setReadOnly(boolean readOnly) throws SQLException { + this.con.setReadOnly(readOnly); + } + + @Override + public boolean isReadOnly() throws SQLException { + return this.con.isReadOnly(); + } + + @Override + public void setCatalog(String catalog) throws SQLException { + this.con.setCatalog(catalog); + } + + @Override + public String getCatalog() throws SQLException { + return this.con.getCatalog(); + } + + @Override + public void setTransactionIsolation(int level) throws SQLException { + this.con.setTransactionIsolation(level); + } + + @Override + public int getTransactionIsolation() throws SQLException { + return this.con.getTransactionIsolation(); + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return this.con.getWarnings(); + } + + @Override + public void clearWarnings() throws SQLException { + this.con.clearWarnings(); + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { + return this.con.createStatement(resultSetType, resultSetConcurrency); + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) + throws SQLException { + return this.con.prepareStatement(sql, resultSetType, resultSetConcurrency); + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + return this.con.prepareCall(sql, resultSetType, resultSetConcurrency); + } + + @Override + public Map> getTypeMap() throws SQLException { + return this.con.getTypeMap(); + } + + @Override + public void setTypeMap(Map> map) throws SQLException { + this.con.setTypeMap(map); + } + + @Override + public void setHoldability(int holdability) throws SQLException { + this.con.setHoldability(holdability); + } + + @Override + public int getHoldability() throws SQLException { + return this.con.getHoldability(); + } + + @Override + public Savepoint setSavepoint() throws SQLException { + return this.con.setSavepoint(); + } + + @Override + public Savepoint setSavepoint(String name) throws SQLException { + return this.con.setSavepoint(name); + } + + @Override + public void rollback(Savepoint savepoint) throws SQLException { + this.con.rollback(savepoint); + } + + @Override + public void releaseSavepoint(Savepoint savepoint) throws SQLException { + this.con.releaseSavepoint(savepoint); + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) + throws SQLException { + return this.con.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, + int resultSetHoldability) throws SQLException { + return this.con.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, + int resultSetHoldability) throws SQLException { + return this.con.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @Override + public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { + return this.con.prepareStatement(sql, autoGeneratedKeys); + } + + @Override + public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { + return this.con.prepareStatement(sql, columnIndexes); + } + + @Override + public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { + return this.con.prepareStatement(sql, columnNames); + } + + @Override + public Clob createClob() throws SQLException { + return this.con.createClob(); + } + + @Override + public Blob createBlob() throws SQLException { + return this.con.createBlob(); + } + + @Override + public NClob createNClob() throws SQLException { + return this.con.createNClob(); + } + + @Override + public SQLXML createSQLXML() throws SQLException { + return this.con.createSQLXML(); + } + + @Override + public boolean isValid(int timeout) throws SQLException { + return this.con.isValid(timeout); + } + + @Override + public void setClientInfo(String name, String value) throws SQLClientInfoException { + this.con.setClientInfo(name, value); + } + + @Override + public void setClientInfo(Properties properties) throws SQLClientInfoException { + this.con.setClientInfo(properties); + } + + @Override + public String getClientInfo(String name) throws SQLException { + return this.con.getClientInfo(name); + } + + @Override + public Properties getClientInfo() throws SQLException { + return this.con.getClientInfo(); + } + + @Override + public Array createArrayOf(String typeName, Object[] elements) throws SQLException { + return this.con.createArrayOf(typeName, elements); + } + + @Override + public Struct createStruct(String typeName, Object[] attributes) throws SQLException { + return this.con.createStruct(typeName, attributes); + } + + @Override + public void setSchema(String schema) throws SQLException { + this.con.setSchema(schema); + } + + @Override + public String getSchema() throws SQLException { + return this.con.getSchema(); + } + + @Override + public void abort(Executor executor) throws SQLException { + this.con.abort(executor); + } + + @Override + public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { + this.con.setNetworkTimeout(executor, milliseconds); + } + + @Override + public int getNetworkTimeout() throws SQLException { + return this.con.getNetworkTimeout(); + } + + @Override + public T unwrap(Class iface) throws SQLException { + return this.con.unwrap(iface); + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return this.con.isWrapperFor(iface); + } +} diff --git a/src/main/java/io/supertokens/storage/mysql/BulkImportProxyStorage.java b/src/main/java/io/supertokens/storage/mysql/BulkImportProxyStorage.java new file mode 100644 index 00000000..6b001af9 --- /dev/null +++ b/src/main/java/io/supertokens/storage/mysql/BulkImportProxyStorage.java @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved. + * + * This software is licensed under the Apache License, Version 2.0 (the + * "License") as published by the Apache Software Foundation. + * + * You may not use this file except in compliance with the License. You may + * obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.supertokens.storage.mysql; + +import io.supertokens.pluginInterface.exceptions.DbInitException; +import io.supertokens.pluginInterface.exceptions.StorageQueryException; +import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException; +import io.supertokens.pluginInterface.multitenancy.TenantIdentifier; +import io.supertokens.pluginInterface.multitenancy.exceptions.TenantOrAppNotFoundException; +import io.supertokens.pluginInterface.sqlStorage.TransactionConnection; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.List; + +/** + * BulkImportProxyStorage is a class extending Start, serving as a Storage instance in the bulk import user cronjob. + * This cronjob extensively utilizes existing queries to import users, all of which internally operate within transactions. + * + * For the purpose of bulkimport cronjob, we aim to employ a single connection for all queries and rollback any operations in case of query failures. + * To achieve this, we override the startTransactionHelper method to utilize the same connection and prevent automatic query commits even upon transaction + * success. + * Subsequently, the cronjob is responsible for committing the transaction after ensuring the successful execution of all queries. + */ + +public class BulkImportProxyStorage extends Start { + private BulkImportProxyConnection connection; + + public synchronized Connection getTransactionConnection() throws SQLException, StorageQueryException { + if (this.connection == null) { + Connection con = ConnectionPool.getConnectionForProxyStorage(this); + this.connection = new BulkImportProxyConnection(con); + connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED); + connection.setAutoCommit(false); + } + return this.connection; + } + + @Override + protected T startTransactionHelper(TransactionLogic logic, TransactionIsolationLevel isolationLevel) + throws StorageQueryException, StorageTransactionLogicException, SQLException, TenantOrAppNotFoundException { + return logic.mainLogicAndCommit(new TransactionConnection(getTransactionConnection())); + } + + @Override + public void commitTransaction(TransactionConnection con) throws StorageQueryException { + // We do not want to commit the queries when using the BulkImportProxyStorage to be able to rollback everything + // if any query fails while importing the user + } + + @Override + public void initStorage(boolean shouldWait, List tenantIdentifiers) throws DbInitException { + super.initStorage(shouldWait, tenantIdentifiers); + + // `BulkImportProxyStorage` uses `BulkImportProxyConnection`, which overrides the `.commit()` method on the Connection object. + // The `initStorage()` method runs `select * from table_name limit 1` queries to check if the tables exist but these queries + // don't get committed due to the overridden `.commit()`, so we need to manually commit the transaction to remove any locks on the tables. + + // Without this commit, a call to `select * from bulk_import_users limit 1` in `doesTableExist()` locks the `bulk_import_users` table, + try { + this.commitTransactionForBulkImportProxyStorage(); + } catch (StorageQueryException e) { + throw new DbInitException(e); + } + } + + @Override + public void closeConnectionForBulkImportProxyStorage() throws StorageQueryException { + try { + if (this.connection != null) { + this.connection.closeForBulkImportProxyStorage(); + this.connection = null; + } + ConnectionPool.close(this); + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } + + @Override + public void commitTransactionForBulkImportProxyStorage() throws StorageQueryException { + try { + if (this.connection != null) { + this.connection.commitForBulkImportProxyStorage(); + } + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } + + @Override + public void rollbackTransactionForBulkImportProxyStorage() throws StorageQueryException { + try { + this.connection.rollbackForBulkImportProxyStorage(); + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } +} diff --git a/src/main/java/io/supertokens/storage/mysql/ConnectionPool.java b/src/main/java/io/supertokens/storage/mysql/ConnectionPool.java index 8f5bd100..8f96d0c7 100644 --- a/src/main/java/io/supertokens/storage/mysql/ConnectionPool.java +++ b/src/main/java/io/supertokens/storage/mysql/ConnectionPool.java @@ -202,7 +202,7 @@ static void initPool(Start start, boolean shouldWait, PostConnectCallback postCo } } - public static Connection getConnection(Start start) throws SQLException, StorageQueryException { + private static Connection getNewConnection(Start start) throws SQLException, StorageQueryException { if (getInstance(start) == null) { throw new IllegalStateException("Please call initPool before getConnection"); } @@ -215,6 +215,17 @@ public static Connection getConnection(Start start) throws SQLException, Storage return getInstance(start).hikariDataSource.getConnection(); } + public static Connection getConnectionForProxyStorage(Start start) throws SQLException, StorageQueryException { + return getNewConnection(start); + } + + public static Connection getConnection(Start start) throws SQLException, StorageQueryException { + if (start instanceof BulkImportProxyStorage) { + return ((BulkImportProxyStorage) start).getTransactionConnection(); + } + return getNewConnection(start); + } + static void close(Start start) { if (getInstance(start) == null) { return; diff --git a/src/main/java/io/supertokens/storage/mysql/QueryExecutorTemplate.java b/src/main/java/io/supertokens/storage/mysql/QueryExecutorTemplate.java index 50e7325e..17affad3 100644 --- a/src/main/java/io/supertokens/storage/mysql/QueryExecutorTemplate.java +++ b/src/main/java/io/supertokens/storage/mysql/QueryExecutorTemplate.java @@ -59,4 +59,16 @@ static int update(Connection con, String QUERY, PreparedStatementValueSetter set } } + static T update(Start start, String QUERY, PreparedStatementValueSetter setter, ResultSetValueExtractor mapper) + throws SQLException, StorageQueryException { + try (Connection con = ConnectionPool.getConnection(start)) { + try (PreparedStatement pst = con.prepareStatement(QUERY)) { + setter.setValues(pst); + try (ResultSet result = pst.executeQuery()) { + return mapper.extract(result); + } + } + } + } + } diff --git a/src/main/java/io/supertokens/storage/mysql/Start.java b/src/main/java/io/supertokens/storage/mysql/Start.java index 388c6c77..460973c2 100644 --- a/src/main/java/io/supertokens/storage/mysql/Start.java +++ b/src/main/java/io/supertokens/storage/mysql/Start.java @@ -25,11 +25,17 @@ import io.supertokens.pluginInterface.authRecipe.AuthRecipeUserInfo; import io.supertokens.pluginInterface.authRecipe.LoginMethod; import io.supertokens.pluginInterface.authRecipe.sqlStorage.AuthRecipeSQLStorage; +import io.supertokens.pluginInterface.bulkimport.BulkImportUser; +import io.supertokens.pluginInterface.bulkimport.ImportUserBase; +import io.supertokens.pluginInterface.bulkimport.exceptions.BulkImportBatchInsertException; +import io.supertokens.pluginInterface.bulkimport.exceptions.BulkImportTransactionRolledBackException; +import io.supertokens.pluginInterface.bulkimport.sqlStorage.BulkImportSQLStorage; import io.supertokens.pluginInterface.dashboard.DashboardSearchTags; import io.supertokens.pluginInterface.dashboard.DashboardSessionInfo; import io.supertokens.pluginInterface.dashboard.DashboardUser; import io.supertokens.pluginInterface.dashboard.exceptions.UserIdNotFoundException; import io.supertokens.pluginInterface.dashboard.sqlStorage.DashboardSQLStorage; +import io.supertokens.pluginInterface.emailpassword.EmailPasswordImportUser; import io.supertokens.pluginInterface.emailpassword.PasswordResetTokenInfo; import io.supertokens.pluginInterface.emailpassword.exceptions.DuplicateEmailException; import io.supertokens.pluginInterface.emailpassword.exceptions.DuplicatePasswordResetTokenException; @@ -64,12 +70,14 @@ import io.supertokens.pluginInterface.oauth.exception.OAuthClientNotFoundException; import io.supertokens.pluginInterface.passwordless.PasswordlessCode; import io.supertokens.pluginInterface.passwordless.PasswordlessDevice; +import io.supertokens.pluginInterface.passwordless.PasswordlessImportUser; import io.supertokens.pluginInterface.passwordless.exception.*; import io.supertokens.pluginInterface.passwordless.sqlStorage.PasswordlessSQLStorage; import io.supertokens.pluginInterface.session.SessionInfo; import io.supertokens.pluginInterface.session.SessionStorage; import io.supertokens.pluginInterface.session.sqlStorage.SessionSQLStorage; import io.supertokens.pluginInterface.sqlStorage.TransactionConnection; +import io.supertokens.pluginInterface.thirdparty.ThirdPartyImportUser; import io.supertokens.pluginInterface.thirdparty.exception.DuplicateThirdPartyUserException; import io.supertokens.pluginInterface.thirdparty.sqlStorage.ThirdPartySQLStorage; import io.supertokens.pluginInterface.totp.TOTPDevice; @@ -101,14 +109,8 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.SQLIntegrityConstraintViolationException; -import java.sql.SQLTransactionRollbackException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Set; +import java.sql.*; +import java.util.*; import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute; @@ -117,7 +119,7 @@ public class Start implements SessionSQLStorage, EmailPasswordSQLStorage, EmailVerificationSQLStorage, ThirdPartySQLStorage, JWTRecipeSQLStorage, PasswordlessSQLStorage, UserMetadataSQLStorage, UserRolesSQLStorage, UserIdMappingStorage, UserIdMappingSQLStorage, MultitenancyStorage, MultitenancySQLStorage, DashboardSQLStorage, TOTPSQLStorage, - ActiveUsersStorage, ActiveUsersSQLStorage, AuthRecipeSQLStorage, OAuthStorage { + ActiveUsersStorage, ActiveUsersSQLStorage, AuthRecipeSQLStorage, OAuthStorage, BulkImportSQLStorage { // these configs are protected from being modified / viewed by the dev using the SuperTokens // SaaS. If the core is not running in SuperTokens SaaS, this array has no effect. @@ -161,6 +163,29 @@ public STORAGE_TYPE getType() { return STORAGE_TYPE.SQL; } + @Override + public Storage createBulkImportProxyStorageInstance() { + return new BulkImportProxyStorage(); + } + + @Override + public void closeConnectionForBulkImportProxyStorage() throws StorageQueryException { + throw new UnsupportedOperationException( + "closeConnectionForBulkImportProxyStorage should only be called from BulkImportProxyStorage"); + } + + @Override + public void commitTransactionForBulkImportProxyStorage() throws StorageQueryException { + throw new UnsupportedOperationException( + "commitTransactionForBulkImportProxyStorage should only be called from BulkImportProxyStorage"); + } + + @Override + public void rollbackTransactionForBulkImportProxyStorage() throws StorageQueryException { + throw new UnsupportedOperationException( + "rollbackTransactionForBulkImportProxyStorage should only be called from BulkImportProxyStorage"); + } + @Override public void loadConfig(JsonObject configJson, Set logLevels, TenantIdentifier tenantIdentifier) throws InvalidConfigException { @@ -270,13 +295,22 @@ public T startTransaction(TransactionLogic logic, TransactionIsolationLev tries++; try { return startTransactionHelper(logic, isolationLevel); - } catch (SQLException | StorageQueryException | StorageTransactionLogicException e) { + } catch (SQLException | StorageQueryException | StorageTransactionLogicException | + TenantOrAppNotFoundException e) { // check according to: // https://github.com/supertokens/supertokens-mysql-plugin/pull/2 if ((e instanceof SQLTransactionRollbackException || (e.getMessage() != null && e.getMessage().toLowerCase().contains("deadlock"))) && tries < NUM_TRIES) { + try { + if(this instanceof BulkImportProxyStorage){ + throw new StorageTransactionLogicException(new BulkImportTransactionRolledBackException(e)); + // if the current instance is of BulkImportProxyStorage, that means we are doing a bulk import + // which uses nested transactions. With MySQL this retry logic doesn't going to work, we have + // to retry the whole "big" transaction, not just the innermost, current one. + // @see BulkImportTransactionRolledBackException for more explanation. + } Thread.sleep((long) (10 + (250 + Math.min(Math.pow(2, tries), 3000)) * Math.random())); } catch (InterruptedException ignored) { } @@ -293,14 +327,16 @@ public T startTransaction(TransactionLogic logic, TransactionIsolationLev throw (StorageQueryException) e; } else if (e instanceof StorageTransactionLogicException) { throw (StorageTransactionLogicException) e; + } else if (e instanceof TenantOrAppNotFoundException) { + throw new StorageTransactionLogicException(e); } throw new StorageQueryException(e); } } } - private T startTransactionHelper(TransactionLogic logic, TransactionIsolationLevel isolationLevel) - throws StorageQueryException, StorageTransactionLogicException, SQLException { + protected T startTransactionHelper(TransactionLogic logic, TransactionIsolationLevel isolationLevel) + throws StorageQueryException, StorageTransactionLogicException, SQLException, TenantOrAppNotFoundException { Connection con = null; Integer defaultTransactionIsolation = null; try { @@ -756,6 +792,73 @@ public boolean isUserIdBeingUsedInNonAuthRecipe(AppIdentifier appIdentifier, Str } } + @Override + public Map> findNonAuthRecipesWhereForUserIdsUsed(AppIdentifier appIdentifier, + List userIds) + throws StorageQueryException { + try { + Map> sessionHandlesByUserId = SessionQueries.getAllNonExpiredSessionHandlesForUsers(this, appIdentifier, userIds); + Map> rolesByUserIds = UserRolesQueries.getRolesForUsers(this, appIdentifier, userIds); + Map userMetadatasByIds = UserMetadataQueries.getMultipleUserMetadatas(this, appIdentifier, userIds); + Set userIdsUsedInEmailVerification = EmailVerificationQueries.findUserIdsBeingUsedForEmailVerification(this, appIdentifier, userIds); + Map> devicesByUserIds = TOTPQueries.getDevicesForMultipleUsers(this, appIdentifier, userIds); + Map lastActivesByUserIds = ActiveUsersQueries.getLastActiveByMultipleUserIds(this, appIdentifier, userIds); + + Map> nonAuthRecipeClassnamesByUserIds = new HashMap<>(); + //session recipe + for(String userId: sessionHandlesByUserId.keySet()){ + if(!nonAuthRecipeClassnamesByUserIds.containsKey(userId)){ + nonAuthRecipeClassnamesByUserIds.put(userId, new ArrayList<>()); + } + nonAuthRecipeClassnamesByUserIds.get(userId).add(SessionStorage.class.getName()); + } + + //role recipe + for(String userId: rolesByUserIds.keySet()){ + if(!nonAuthRecipeClassnamesByUserIds.containsKey(userId)){ + nonAuthRecipeClassnamesByUserIds.put(userId, new ArrayList<>()); + } + nonAuthRecipeClassnamesByUserIds.get(userId).add(UserRolesStorage.class.getName()); + } + + //usermetadata recipe + for(String userId: userMetadatasByIds.keySet()){ + if(!nonAuthRecipeClassnamesByUserIds.containsKey(userId)){ + nonAuthRecipeClassnamesByUserIds.put(userId, new ArrayList<>()); + } + nonAuthRecipeClassnamesByUserIds.get(userId).add(UserMetadataStorage.class.getName()); + } + + //emailverification recipe + for(String userId: userIdsUsedInEmailVerification){ + if(!nonAuthRecipeClassnamesByUserIds.containsKey(userId)){ + nonAuthRecipeClassnamesByUserIds.put(userId, new ArrayList<>()); + } + nonAuthRecipeClassnamesByUserIds.get(userId).add(EmailVerificationStorage.class.getName()); + } + + //totp recipe + for(String userId: devicesByUserIds.keySet()){ + if(!nonAuthRecipeClassnamesByUserIds.containsKey(userId)){ + nonAuthRecipeClassnamesByUserIds.put(userId, new ArrayList<>()); + } + nonAuthRecipeClassnamesByUserIds.get(userId).add(TOTPStorage.class.getName()); + } + + //active users + for(String userId: lastActivesByUserIds.keySet()){ + if(!nonAuthRecipeClassnamesByUserIds.containsKey(userId)){ + nonAuthRecipeClassnamesByUserIds.put(userId, new ArrayList<>()); + } + nonAuthRecipeClassnamesByUserIds.get(userId).add(ActiveUsersStorage.class.getName()); + } + + return nonAuthRecipeClassnamesByUserIds; + } catch (SQLException | StorageTransactionLogicException exc) { + throw new StorageQueryException(exc); + } + } + @TestOnly @Override public void addInfoToNonAuthRecipesBasedOnUserId(TenantIdentifier tenantIdentifier, String className, String userId) @@ -910,6 +1013,67 @@ public void deleteEmailPasswordUser_Transaction(TransactionConnection con, AppId } } + @Override + public void signUpMultipleViaBulkImport_Transaction(TransactionConnection connection, + List users) + throws StorageQueryException, StorageTransactionLogicException { + try { + Connection sqlConnection = (Connection) connection.getConnection(); + EmailPasswordQueries.signUpMultipleForBulkImport_Transaction(this, sqlConnection, users); + } catch (StorageQueryException | SQLException | StorageTransactionLogicException e) { + Throwable actual = e.getCause(); + if (actual instanceof BatchUpdateException) { + BatchUpdateException batchUpdateException = (BatchUpdateException) actual; + Map errorByPosition = new HashMap<>(); + + if (actual.getMessage() != null && actual.getMessage().toLowerCase().contains("deadlock")) { + throw new BulkImportTransactionRolledBackException(batchUpdateException); + } + + if (batchUpdateException.getMessage().toLowerCase().contains("duplicate")) { + MySQLConfig config = Config.getConfig(this); + String serverMessage = batchUpdateException.getMessage(); + + Integer position = getErroneousEntryPosition(batchUpdateException, users); + if (position != null) { + if (isUniqueConstraintError(serverMessage, config.getEmailPasswordUserToTenantTable(), + "email")) { + + errorByPosition.put(users.get(position).userId, new DuplicateEmailException()); + + } else if (isPrimaryKeyError(serverMessage, config.getThirdPartyUsersTable()) + || isPrimaryKeyError(serverMessage, config.getUsersTable()) + || isPrimaryKeyError(serverMessage, config.getThirdPartyUserToTenantTable()) + || isPrimaryKeyError(serverMessage, config.getAppIdToUserIdTable())) { + errorByPosition.put(users.get(position).userId, + new io.supertokens.pluginInterface.thirdparty.exception.DuplicateUserIdException()); + } + } + } + throw new StorageTransactionLogicException( + new BulkImportBatchInsertException("emailpassword errors", errorByPosition)); + } + throw new StorageQueryException(e); + } + + } + + //java.sql.BatchUpdateException: (conn=589699) Duplicate entry 'public-public-johndoe+597@gmail.com' for key 'email' + private static Integer getErroneousEntryPosition(BatchUpdateException batchUpdateException, List users) { + String errorMessage = batchUpdateException.getMessage(); + String searchFor = "Duplicate entry '"; + int searchForIndex = errorMessage.indexOf(searchFor); + String appTenantEntry = errorMessage.substring(searchForIndex + searchFor.length(), errorMessage.indexOf("'", searchForIndex + searchFor.length())); + Integer position = null; // Integer for signaling if no entry found with null + for(ImportUserBase user : users) { + if((user.tenantIdentifier.getAppId() + "-" + user.tenantIdentifier.getTenantId() + "-" + user.email).equals(appTenantEntry)){ + position = users.indexOf(user); + break; + } + } + return position; + } + @Override public void addPasswordResetToken(AppIdentifier appIdentifier, PasswordResetTokenInfo passwordResetTokenInfo) throws StorageQueryException, UnknownUserIdException, DuplicatePasswordResetTokenException { @@ -1074,6 +1238,35 @@ public void updateIsEmailVerified_Transaction(AppIdentifier appIdentifier, Trans } } + @Override + public void updateMultipleIsEmailVerified_Transaction(AppIdentifier appIdentifier, TransactionConnection con, + Map emailToUserId, boolean isEmailVerified) + throws StorageQueryException, TenantOrAppNotFoundException { + Connection sqlCon = (Connection) con.getConnection(); + try { + EmailVerificationQueries.updateMultipleUsersIsEmailVerified_Transaction(this, sqlCon, appIdentifier, + emailToUserId, isEmailVerified); + } catch (SQLException e) { + if (e instanceof SQLIntegrityConstraintViolationException) { + MySQLConfig config = Config.getConfig(this); + String serverMessage = e.getMessage(); + + if (isForeignKeyConstraintError(serverMessage, config.getEmailVerificationTable(), "app_id")) { + throw new TenantOrAppNotFoundException(appIdentifier); + } + } + + boolean isPSQLPrimKeyError = e instanceof SQLIntegrityConstraintViolationException && isPrimaryKeyError( + e.getMessage(), + Config.getConfig(this).getEmailVerificationTable()); + + if (!isEmailVerified || !isPSQLPrimKeyError) { + throw new StorageQueryException(e); + } + // we do not throw an error since the email is already verified + } + } + @Override public void deleteEmailVerificationUserInfo_Transaction(TransactionConnection con, AppIdentifier appIdentifier, String userId) @@ -1181,6 +1374,13 @@ public void updateIsEmailVerifiedToExternalUserId(AppIdentifier appIdentifier, S externalUserId); } + @Override + public void updateMultipleIsEmailVerifiedToExternalUserIds(AppIdentifier appIdentifier, + Map supertokensUserIdToExternalUserId) + throws StorageQueryException { + EmailVerificationQueries.updateMultipleIsEmailVerifiedToExternalUserIds(this, appIdentifier, supertokensUserIdToExternalUserId); + } + @Override public void deleteExpiredPasswordResetTokens() throws StorageQueryException { try { @@ -1255,6 +1455,18 @@ public void deleteThirdPartyUser_Transaction(TransactionConnection con, AppIdent } } + @Override + public void importThirdPartyUsers_Transaction(TransactionConnection con, List usersToImport) + throws StorageQueryException, StorageTransactionLogicException, TenantOrAppNotFoundException { + try { + Connection sqlCon = (Connection) con.getConnection(); + ThirdPartyQueries.importUser_Transaction(this, sqlCon, usersToImport); + } catch (SQLException e) { + //TODO!! + throw new StorageTransactionLogicException(e); + } + } + @Override public long getUsersCount(TenantIdentifier tenantIdentifier, RECIPE_ID[] includeRecipeIds) throws StorageQueryException { @@ -1346,6 +1558,16 @@ public boolean doesUserIdExist(TenantIdentifier tenantIdentifier, String userId) } } + @Override + public List findExistingUserIds(AppIdentifier appIdentifier, List userIds) + throws StorageQueryException { + try { + return GeneralQueries.findUserIdsThatExist(this, appIdentifier, userIds); + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } + @Override public AuthRecipeUserInfo getPrimaryUserById(AppIdentifier appIdentifier, String userId) throws StorageQueryException { @@ -1756,6 +1978,62 @@ public void deletePasswordlessUser_Transaction(TransactionConnection con, AppIde } } + @Override + public void importPasswordlessUsers_Transaction(TransactionConnection con, List users) + throws StorageQueryException, TenantOrAppNotFoundException, BulkImportTransactionRolledBackException { + try { + Connection sqlCon = (Connection) con.getConnection(); + PasswordlessQueries.importUsers_Transaction(sqlCon, this, users); + } catch (SQLException e) { + if (e instanceof BatchUpdateException) { + Throwable actual = e.getCause(); + if (actual instanceof BatchUpdateException) { + BatchUpdateException batchUpdateException = (BatchUpdateException) actual; + Map errorByPosition = new HashMap<>(); + if (batchUpdateException != null && + batchUpdateException.getMessage().toLowerCase().contains("deadlock")) { + throw new BulkImportTransactionRolledBackException(batchUpdateException); + } + MySQLConfig config = Config.getConfig(this); + String serverMessage = batchUpdateException.getMessage(); + + Integer position = getErroneousEntryPosition(batchUpdateException, users); + if(position != null) { + if (isPrimaryKeyError(serverMessage, config.getPasswordlessUsersTable()) + || isPrimaryKeyError(serverMessage, config.getUsersTable()) + || isPrimaryKeyError(serverMessage, config.getPasswordlessUserToTenantTable()) + || isPrimaryKeyError(serverMessage, config.getAppIdToUserIdTable())) { + errorByPosition.put(users.get(position).userId, new DuplicateUserIdException()); + } + if (isUniqueConstraintError(serverMessage, config.getPasswordlessUserToTenantTable(), + "email")) { + errorByPosition.put(users.get(position).userId, new DuplicateEmailException()); + + } else if (isUniqueConstraintError(serverMessage, config.getPasswordlessUserToTenantTable(), + "phone_number")) { + errorByPosition.put(users.get(position).userId, new DuplicatePhoneNumberException()); + + } else if (isForeignKeyConstraintError(serverMessage, config.getAppIdToUserIdTable(), + "app_id")) { + throw new TenantOrAppNotFoundException( + users.get(position).tenantIdentifier.toAppIdentifier()); + + } else if (isForeignKeyConstraintError(serverMessage, config.getUsersTable(), + "tenant_id")) { + throw new TenantOrAppNotFoundException( + users.get(position).tenantIdentifier.toAppIdentifier()); + } + } + + + throw new StorageQueryException( + new BulkImportBatchInsertException("passwordless errors", errorByPosition)); + } + throw new StorageQueryException(e); + } + } + } + @Override public PasswordlessDevice getDevice(TenantIdentifier tenantIdentifier, String deviceIdHash) throws StorageQueryException { @@ -1848,6 +2126,19 @@ public JsonObject getUserMetadata_Transaction(AppIdentifier appIdentifier, Trans } } + @Override + public Map getMultipleUsersMetadatas_Transaction(AppIdentifier appIdentifier, + TransactionConnection con, + List userIds) + throws StorageQueryException { + Connection sqlCon = (Connection) con.getConnection(); + try { + return UserMetadataQueries.getMultipleUsersMetadatas_Transaction(this, sqlCon, appIdentifier, userIds); + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } + @Override public int setUserMetadata_Transaction(AppIdentifier appIdentifier, TransactionConnection con, String userId, JsonObject metadata) @@ -1869,6 +2160,26 @@ public int setUserMetadata_Transaction(AppIdentifier appIdentifier, TransactionC } } + @Override + public void setMultipleUsersMetadatas_Transaction(AppIdentifier appIdentifier, TransactionConnection con, + Map metadataByUserId) + throws StorageQueryException, TenantOrAppNotFoundException { + Connection sqlCon = (Connection) con.getConnection(); + try { + UserMetadataQueries.setMultipleUsersMetadatas_Transaction(this, sqlCon, appIdentifier, metadataByUserId); + } catch (SQLException e) { + if (e instanceof SQLIntegrityConstraintViolationException) { + MySQLConfig config = Config.getConfig(this); + String serverMessage = e.getMessage(); + + if (isForeignKeyConstraintError(serverMessage, config.getUserMetadataTable(), "app_id")) { + throw new TenantOrAppNotFoundException(appIdentifier); + } + } + throw new StorageQueryException(e); + } + } + @Override public int deleteUserMetadata_Transaction(TransactionConnection con, AppIdentifier appIdentifier, String userId) throws StorageQueryException { @@ -2020,6 +2331,17 @@ public void deleteAllRolesForUser_Transaction(TransactionConnection con, AppIden } } + @Override + public void addRolesToUsers_Transaction(TransactionConnection connection, + Map>> rolesToUserByTenants) + throws StorageQueryException { + try { + UserRolesQueries.addRolesToUsers_Transaction(this, (Connection) connection.getConnection(), rolesToUserByTenants); + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } + @Override public boolean deleteRoleForUser_Transaction(TenantIdentifier tenantIdentifier, TransactionConnection con, String userId, String role) @@ -2114,6 +2436,17 @@ public boolean doesRoleExist_Transaction(AppIdentifier appIdentifier, Transactio } } + @Override + public List doesMultipleRoleExist_Transaction(AppIdentifier appIdentifier, TransactionConnection con, + List roles) throws StorageQueryException { + Connection sqlCon = (Connection) con.getConnection(); + try { + return UserRolesQueries.doesMultipleRoleExist_transaction(this, sqlCon, appIdentifier, roles); + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } + @Override public void createUserIdMapping(AppIdentifier appIdentifier, String superTokensUserId, String externalUserId, @org.jetbrains.annotations.Nullable String externalUserIdInfo) @@ -2150,6 +2483,18 @@ public void createUserIdMapping(AppIdentifier appIdentifier, String superTokensU } + @Override + public void createBulkUserIdMapping(AppIdentifier appIdentifier, + Map superTokensUserIdToExternalUserId) + throws StorageQueryException { + try { + + UserIdMappingQueries.createBulkUserIdMapping(this, appIdentifier, superTokensUserIdToExternalUserId); + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } + @Override public boolean deleteUserIdMapping(AppIdentifier appIdentifier, String userId, boolean isSuperTokensUserId) throws StorageQueryException { @@ -2648,6 +2993,24 @@ public TOTPDevice createDevice_Transaction(TransactionConnection con, AppIdentif } } + @Override + public void createDevices_Transaction(TransactionConnection con, AppIdentifier appIdentifier, + List devices) + throws StorageQueryException, TenantOrAppNotFoundException { + Connection sqlCon = (Connection) con.getConnection(); + try { + TOTPQueries.createDevices_Transaction(this, sqlCon, appIdentifier, devices); + } catch (SQLException actualException) { + if (actualException instanceof SQLIntegrityConstraintViolationException) { + String errMsg = actualException.getMessage(); + if (isForeignKeyConstraintError(errMsg, Config.getConfig(this).getTotpUsersTable(), "app_id")) { + throw new TenantOrAppNotFoundException(appIdentifier); + } + } + throw new StorageQueryException(actualException); + } + } + @Override public TOTPDevice getDeviceByName_Transaction(TransactionConnection con, AppIdentifier appIdentifier, String userId, String deviceName) throws StorageQueryException { @@ -2848,6 +3211,18 @@ public AuthRecipeUserInfo getPrimaryUserById_Transaction(AppIdentifier appIdenti } } + @Override + public List getPrimaryUsersByIds_Transaction(AppIdentifier appIdentifier, + TransactionConnection con, List userIds) + throws StorageQueryException { + try { + Connection sqlCon = (Connection) con.getConnection(); + return GeneralQueries.getPrimaryUserInfosForUserIds_Transaction(this, sqlCon, appIdentifier, userIds); + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } + @Override public AuthRecipeUserInfo[] listPrimaryUsersByEmail_Transaction(AppIdentifier appIdentifier, TransactionConnection con, String email) @@ -2860,6 +3235,13 @@ public AuthRecipeUserInfo[] listPrimaryUsersByEmail_Transaction(AppIdentifier ap } } + @Override + public AuthRecipeUserInfo[] listPrimaryUsersByMultipleEmailsOrPhoneNumbersOrThirdparty_Transaction( + AppIdentifier appIdentifier, TransactionConnection con, List emails, List phones, + Map thirdpartyIdToThirdpartyUserId) throws StorageQueryException { + return new AuthRecipeUserInfo[0]; + } + @Override public AuthRecipeUserInfo[] listPrimaryUsersByPhoneNumber_Transaction(AppIdentifier appIdentifier, TransactionConnection con, @@ -2915,6 +3297,17 @@ public void makePrimaryUser_Transaction(AppIdentifier appIdentifier, Transaction } } + @Override + public void makePrimaryUsers_Transaction(AppIdentifier appIdentifier, TransactionConnection con, + List userIds) throws StorageQueryException { + try { + Connection sqlCon = (Connection) con.getConnection(); + GeneralQueries.makePrimaryUsers_Transaction(this, sqlCon, appIdentifier, userIds); + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } + @Override public void linkAccounts_Transaction(AppIdentifier appIdentifier, TransactionConnection con, String recipeUserId, String primaryUserId) throws StorageQueryException { @@ -2928,6 +3321,18 @@ public void linkAccounts_Transaction(AppIdentifier appIdentifier, TransactionCon } } + @Override + public void linkMultipleAccounts_Transaction(AppIdentifier appIdentifier, TransactionConnection con, + Map recipeUserIdByPrimaryUserId) + throws StorageQueryException { + try { + Connection sqlCon = (Connection) con.getConnection(); + GeneralQueries.linkMultipleAccounts_Transaction(this, sqlCon, appIdentifier, recipeUserIdByPrimaryUserId); + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } + @Override public void unlinkAccounts_Transaction(AppIdentifier appIdentifier, TransactionConnection con, String primaryUserId, String recipeUserId) @@ -3019,6 +3424,27 @@ public UserIdMapping[] getUserIdMapping_Transaction(TransactionConnection con, A } } + @Override + public List getMultipleUserIdMapping_Transaction(TransactionConnection connection, + AppIdentifier appIdentifier, List userIds, + boolean isSupertokensIds) + throws StorageQueryException { + try { + Connection sqlCon = (Connection) connection.getConnection(); + List result; + if(isSupertokensIds){ + result = UserIdMappingQueries.getMultipleUserIdMappingWithSupertokensUserId_Transaction(this, + sqlCon, appIdentifier, userIds); + } else { + result = UserIdMappingQueries.getMultipleUserIdMappingWithExternalUserId_Transaction(this, + sqlCon, appIdentifier, userIds); + } + return result; + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } + @Override public int getUsersCountWithMoreThanOneLoginMethodOrTOTPEnabled(AppIdentifier appIdentifier) throws StorageQueryException { @@ -3327,4 +3753,97 @@ public int getDbActivityCount(String dbname) throws SQLException, StorageQueryEx return -1; }); } + + @Override + public void addBulkImportUsers(AppIdentifier appIdentifier, List users) + throws StorageQueryException, + TenantOrAppNotFoundException, + io.supertokens.pluginInterface.bulkimport.exceptions.DuplicateUserIdException { + try { + BulkImportQueries.insertBulkImportUsers(this, appIdentifier, users); + } catch (SQLException e) { + if (e instanceof SQLIntegrityConstraintViolationException) { + MySQLConfig config = Config.getConfig(this); + String errorMessage = e.getMessage(); + if (isPrimaryKeyError(errorMessage, config.getBulkImportUsersTable())) { + throw new io.supertokens.pluginInterface.bulkimport.exceptions.DuplicateUserIdException(); + } + if (isForeignKeyConstraintError(errorMessage, config.getBulkImportUsersTable(), "app_id")) { + throw new TenantOrAppNotFoundException(appIdentifier); + } + } + throw new StorageQueryException(e); + } + } + + @Override + public List getBulkImportUsers(AppIdentifier appIdentifier, @Nonnull Integer limit, @Nullable BULK_IMPORT_USER_STATUS status, + @Nullable String bulkImportUserId, @Nullable Long createdAt) throws StorageQueryException { + try { + return BulkImportQueries.getBulkImportUsers(this, appIdentifier, limit, status, bulkImportUserId, createdAt); + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } + + @Override + public void updateBulkImportUserStatus_Transaction(AppIdentifier appIdentifier, TransactionConnection con, @Nonnull String bulkImportUserId, @Nonnull BULK_IMPORT_USER_STATUS status, @Nullable String errorMessage) + throws StorageQueryException { + Connection sqlCon = (Connection) con.getConnection(); + try { + BulkImportQueries.updateBulkImportUserStatus_Transaction(this, sqlCon, appIdentifier, bulkImportUserId, status, errorMessage); + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } + + @Override + public void updateMultipleBulkImportUsersStatusToError_Transaction(AppIdentifier appIdentifier, + TransactionConnection con, + @NotNull Map bulkImportUserIdToErrorMessage) + throws StorageQueryException { + Connection sqlCon = (Connection) con.getConnection(); + try { + BulkImportQueries.updateMultipleBulkImportUsersStatusToError_Transaction(this, sqlCon, appIdentifier, + bulkImportUserIdToErrorMessage); + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } + + @Override + public List deleteBulkImportUsers(AppIdentifier appIdentifier, @Nonnull String[] bulkImportUserIds) throws StorageQueryException { + try { + return BulkImportQueries.deleteBulkImportUsers(this, appIdentifier, bulkImportUserIds); + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } + + @Override + public List getBulkImportUsersAndChangeStatusToProcessing(AppIdentifier appIdentifier, @Nonnull Integer limit) throws StorageQueryException { + try { + return BulkImportQueries.getBulkImportUsersAndChangeStatusToProcessing(this, appIdentifier, limit); + } catch (StorageTransactionLogicException e) { + throw new StorageQueryException(e.actualException); + } + } + + @Override + public void updateBulkImportUserPrimaryUserId(AppIdentifier appIdentifier, @Nonnull String bulkImportUserId, @Nonnull String primaryUserId) throws StorageQueryException { + try { + BulkImportQueries.updateBulkImportUserPrimaryUserId(this, appIdentifier, bulkImportUserId, primaryUserId); + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } + + @Override + public long getBulkImportUsersCount(AppIdentifier appIdentifier, @Nullable BULK_IMPORT_USER_STATUS status) throws StorageQueryException { + try { + return BulkImportQueries.getBulkImportUsersCount(this, appIdentifier, status); + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } } diff --git a/src/main/java/io/supertokens/storage/mysql/config/MySQLConfig.java b/src/main/java/io/supertokens/storage/mysql/config/MySQLConfig.java index 85d2e74c..ef1b7f19 100644 --- a/src/main/java/io/supertokens/storage/mysql/config/MySQLConfig.java +++ b/src/main/java/io/supertokens/storage/mysql/config/MySQLConfig.java @@ -373,6 +373,10 @@ public String getTotpUsedCodesTable() { return addPrefixToTableName("totp_used_codes"); } + public String getBulkImportUsersTable() { + return addPrefixToTableName("bulk_import_users"); + } + public String getOAuthClientsTable() { return addPrefixToTableName("oauth_clients"); } diff --git a/src/main/java/io/supertokens/storage/mysql/queries/ActiveUsersQueries.java b/src/main/java/io/supertokens/storage/mysql/queries/ActiveUsersQueries.java index 6fb8e35a..e0b7df70 100644 --- a/src/main/java/io/supertokens/storage/mysql/queries/ActiveUsersQueries.java +++ b/src/main/java/io/supertokens/storage/mysql/queries/ActiveUsersQueries.java @@ -4,11 +4,14 @@ import io.supertokens.pluginInterface.multitenancy.AppIdentifier; import io.supertokens.storage.mysql.Start; import io.supertokens.storage.mysql.config.Config; +import io.supertokens.storage.mysql.utils.Utils; +import org.jetbrains.annotations.TestOnly; import java.sql.Connection; import java.sql.SQLException; - -import org.jetbrains.annotations.TestOnly; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute; import static io.supertokens.storage.mysql.QueryExecutorTemplate.update; @@ -201,4 +204,28 @@ public static int countUsersThatHaveMoreThanOneLoginMethodOrTOTPEnabledAndActive return result.next() ? result.getInt("c") : 0; }); } + + public static Map getLastActiveByMultipleUserIds(Start start, AppIdentifier appIdentifier, List userIds) + throws StorageQueryException { + String QUERY = "SELECT user_id, last_active_time FROM " + Config.getConfig(start).getUserLastActiveTable() + + " WHERE app_id = ? AND user_id IN ( " + Utils.generateCommaSeperatedQuestionMarks(userIds.size())+ " )"; + + try { + return execute(start, QUERY, pst -> { + pst.setString(1, appIdentifier.getAppId()); + for (int i = 0; i < userIds.size(); i++) { + pst.setString(2+i, userIds.get(i)); + } + }, res -> { + Map lastActiveByUserIds = new HashMap<>(); + if (res.next()) { + String userId = res.getString("user_id"); + lastActiveByUserIds.put(userId, res.getLong("last_active_time")); + } + return lastActiveByUserIds; + }); + } catch (SQLException e) { + throw new StorageQueryException(e); + } + } } diff --git a/src/main/java/io/supertokens/storage/mysql/queries/BulkImportQueries.java b/src/main/java/io/supertokens/storage/mysql/queries/BulkImportQueries.java new file mode 100644 index 00000000..8414886d --- /dev/null +++ b/src/main/java/io/supertokens/storage/mysql/queries/BulkImportQueries.java @@ -0,0 +1,362 @@ +/* + * Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved. + * + * This software is licensed under the Apache License, Version 2.0 (the + * "License") as published by the Apache Software Foundation. + * + * You may not use this file except in compliance with the License. You may + * obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.supertokens.storage.mysql.queries; + +import io.supertokens.pluginInterface.RowMapper; +import io.supertokens.pluginInterface.bulkimport.BulkImportStorage.BULK_IMPORT_USER_STATUS; +import io.supertokens.pluginInterface.bulkimport.BulkImportUser; +import io.supertokens.pluginInterface.exceptions.StorageQueryException; +import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException; +import io.supertokens.pluginInterface.multitenancy.AppIdentifier; +import io.supertokens.storage.mysql.Start; +import io.supertokens.storage.mysql.config.Config; +import io.supertokens.storage.mysql.utils.Utils; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute; +import static io.supertokens.storage.mysql.QueryExecutorTemplate.update; + +public class BulkImportQueries { + static String getQueryToCreateBulkImportUsersTable(Start start) { + String tableName = Config.getConfig(start).getBulkImportUsersTable(); + return "CREATE TABLE IF NOT EXISTS " + tableName + " (" + + "id CHAR(36)," + + "app_id VARCHAR(64) NOT NULL DEFAULT 'public'," + + "primary_user_id VARCHAR(36)," + + "raw_data TEXT NOT NULL," + + "status VARCHAR(128) DEFAULT 'NEW'," + + "error_msg TEXT," + + "created_at BIGINT NOT NULL, " + + "updated_at BIGINT NOT NULL," + + "PRIMARY KEY (app_id, id)," + + "FOREIGN KEY (app_id)" + + " REFERENCES " + Config.getConfig(start).getAppsTable() + " (app_id) ON DELETE CASCADE" + + " );"; + } + + public static String getQueryToCreateStatusUpdatedAtIndex(Start start) { + return "CREATE INDEX bulk_import_users_status_updated_at_index ON " + + Config.getConfig(start).getBulkImportUsersTable() + " (app_id, status, updated_at)"; + } + + public static String getQueryToCreatePaginationIndex1(Start start) { + return "CREATE INDEX bulk_import_users_pagination_index1 ON " + + Config.getConfig(start).getBulkImportUsersTable() + " (app_id, status, created_at DESC, id DESC)"; + } + + public static String getQueryToCreatePaginationIndex2(Start start) { + return "CREATE INDEX bulk_import_users_pagination_index2 ON " + + Config.getConfig(start).getBulkImportUsersTable() + " (app_id, created_at DESC, id DESC)"; + } + + public static void insertBulkImportUsers(Start start, AppIdentifier appIdentifier, List users) + throws SQLException, StorageQueryException { + StringBuilder queryBuilder = new StringBuilder( + "INSERT INTO " + Config.getConfig(start).getBulkImportUsersTable() + " (id, app_id, raw_data, created_at, updated_at) VALUES "); + + int userCount = users.size(); + + for (int i = 0; i < userCount; i++) { + queryBuilder.append(" (?, ?, ?, ?, ?)"); + + if (i < userCount - 1) { + queryBuilder.append(","); + } + } + + update(start, queryBuilder.toString(), pst -> { + int parameterIndex = 1; + for (BulkImportUser user : users) { + pst.setString(parameterIndex++, user.id); + pst.setString(parameterIndex++, appIdentifier.getAppId()); + pst.setString(parameterIndex++, user.toRawDataForDbStorage()); + pst.setLong(parameterIndex++, System.currentTimeMillis()); + pst.setLong(parameterIndex++, System.currentTimeMillis()); + } + }); + } + + public static void updateBulkImportUserStatus_Transaction(Start start, Connection con, AppIdentifier appIdentifier, + @Nonnull String bulkImportUserId, @Nonnull BULK_IMPORT_USER_STATUS status, @Nullable String errorMessage) + throws SQLException, StorageQueryException { + String query = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() + + " SET status = ?, error_msg = ?, updated_at = ? WHERE app_id = ? and id = ?"; + + List parameters = new ArrayList<>(); + + parameters.add(status.toString()); + parameters.add(errorMessage); + parameters.add(System.currentTimeMillis()); + parameters.add(appIdentifier.getAppId()); + parameters.add(bulkImportUserId); + + update(con, query, pst -> { + for (int i = 0; i < parameters.size(); i++) { + pst.setObject(i + 1, parameters.get(i)); + } + }); + } + + public static List getBulkImportUsersAndChangeStatusToProcessing(Start start, + AppIdentifier appIdentifier, + @Nonnull Integer limit) + throws StorageQueryException, StorageTransactionLogicException { + + return start.startTransaction(con -> { + Connection sqlCon = (Connection) con.getConnection(); + try { + // NOTE: On average, we take about 66 seconds to process 1000 users. If, for any reason, the bulk import users were marked as processing but couldn't be processed within 10 minutes, we'll attempt to process them again. + + // "FOR UPDATE" ensures that multiple cron jobs don't read the same rows simultaneously. + // If one process locks the first 1000 rows, others will wait for the lock to be released. + // "SKIP LOCKED" allows other processes to skip locked rows and select the next 1000 available rows. + String selectQuery = "SELECT * FROM " + Config.getConfig(start).getBulkImportUsersTable() + + " WHERE app_id = ?" + + " AND (status = 'NEW' OR status = 'PROCESSING')" /* 10 mins */ + + " LIMIT ? FOR UPDATE SKIP LOCKED"; + + + List bulkImportUsers = new ArrayList<>(); + + execute(sqlCon, selectQuery, pst -> { + pst.setString(1, appIdentifier.getAppId()); + pst.setInt(2, limit); + }, result -> { + while (result.next()) { + bulkImportUsers.add(BulkImportUserRowMapper.getInstance().mapOrThrow(result)); + } + return null; + }); + + if (bulkImportUsers.isEmpty()) { + return new ArrayList<>(); + } + + String updateQuery = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() + + " SET status = ?, updated_at = ? WHERE app_id = ? AND id IN (" + Utils + .generateCommaSeperatedQuestionMarks(bulkImportUsers.size()) + ")"; + + update(sqlCon, updateQuery, pst -> { + int index = 1; + pst.setString(index++, BULK_IMPORT_USER_STATUS.PROCESSING.toString()); + pst.setLong(index++, System.currentTimeMillis()); + pst.setString(index++, appIdentifier.getAppId()); + for (BulkImportUser user : bulkImportUsers) { + pst.setObject(index++, user.id); + } + }); + return bulkImportUsers; + } catch (SQLException throwables) { + throw new StorageTransactionLogicException(throwables); + } + }); + } + + public static List getBulkImportUsers(Start start, AppIdentifier appIdentifier, + @Nonnull Integer limit, @Nullable BULK_IMPORT_USER_STATUS status, + @Nullable String bulkImportUserId, @Nullable Long createdAt) + throws SQLException, StorageQueryException { + + String baseQuery = "SELECT * FROM " + Config.getConfig(start).getBulkImportUsersTable(); + + StringBuilder queryBuilder = new StringBuilder(baseQuery); + List parameters = new ArrayList<>(); + + queryBuilder.append(" WHERE app_id = ?"); + parameters.add(appIdentifier.getAppId()); + + if (status != null) { + queryBuilder.append(" AND status = ?"); + parameters.add(status.toString()); + } + + if (bulkImportUserId != null && createdAt != null) { + queryBuilder + .append(" AND (created_at < ? OR (created_at = ? AND id <= ?))"); + parameters.add(createdAt); + parameters.add(createdAt); + parameters.add(bulkImportUserId); + } + + queryBuilder.append(" ORDER BY created_at DESC, id DESC LIMIT ?"); + parameters.add(limit); + + String query = queryBuilder.toString(); + + return execute(start, query, pst -> { + for (int i = 0; i < parameters.size(); i++) { + pst.setObject(i + 1, parameters.get(i)); + } + }, result -> { + List bulkImportUsers = new ArrayList<>(); + while (result.next()) { + bulkImportUsers.add(BulkImportUserRowMapper.getInstance().mapOrThrow(result)); + } + return bulkImportUsers; + }); + } + + public static List deleteBulkImportUsers(Start start, AppIdentifier appIdentifier, + @Nonnull String[] bulkImportUserIds) throws SQLException, StorageQueryException { + if (bulkImportUserIds.length == 0) { + return new ArrayList<>(); + } + + // This function needs to return the IDs of the deleted users. Since the DELETE query doesn't return the IDs of the deleted entries, + // we first perform a SELECT query to find all IDs that actually exist in the database. After deletion, we return these IDs. + String selectQuery = "SELECT id FROM " + Config.getConfig(start).getBulkImportUsersTable() + + " WHERE app_id = ? AND id IN (" + Utils + .generateCommaSeperatedQuestionMarks(bulkImportUserIds.length) + ")"; + + List deletedIds = new ArrayList<>(); + + execute(start, selectQuery, pst -> { + int index = 1; + pst.setString(index++, appIdentifier.getAppId()); + for (String id : bulkImportUserIds) { + pst.setObject(index++, id); + } + }, result -> { + while (result.next()) { + deletedIds.add(result.getString("id")); + } + return null; + }); + + if (deletedIds.isEmpty()) { + return new ArrayList<>(); + } + + String deleteQuery = "DELETE FROM " + Config.getConfig(start).getBulkImportUsersTable() + + " WHERE app_id = ? AND id IN (" + Utils.generateCommaSeperatedQuestionMarks(deletedIds.size()) + ")"; + + update(start, deleteQuery, pst -> { + int index = 1; + pst.setString(index++, appIdentifier.getAppId()); + for (String id : deletedIds) { + pst.setObject(index++, id); + } + }); + + return deletedIds; + } + + public static void deleteBulkImportUser_Transaction(Start start, Connection con, AppIdentifier appIdentifier, + @Nonnull String bulkImportUserId) throws SQLException, StorageQueryException { + String query = "DELETE FROM " + Config.getConfig(start).getBulkImportUsersTable() + + " WHERE app_id = ? AND id = ?"; + + update(con, query, pst -> { + pst.setString(1, appIdentifier.getAppId()); + pst.setString(2, bulkImportUserId); + }); + } + + public static void updateBulkImportUserPrimaryUserId(Start start, AppIdentifier appIdentifier, + @Nonnull String bulkImportUserId, + @Nonnull String primaryUserId) throws SQLException, StorageQueryException { + String query = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() + + " SET primary_user_id = ?, updated_at = ? WHERE app_id = ? and id = ?"; + + update(start, query, pst -> { + pst.setString(1, primaryUserId); + pst.setLong(2, System.currentTimeMillis()); + pst.setString(3, appIdentifier.getAppId()); + pst.setString(4, bulkImportUserId); + }); + } + + public static long getBulkImportUsersCount(Start start, AppIdentifier appIdentifier, @Nullable BULK_IMPORT_USER_STATUS status) throws SQLException, StorageQueryException { + String baseQuery = "SELECT COUNT(*) FROM " + Config.getConfig(start).getBulkImportUsersTable(); + StringBuilder queryBuilder = new StringBuilder(baseQuery); + + List parameters = new ArrayList<>(); + + queryBuilder.append(" WHERE app_id = ?"); + parameters.add(appIdentifier.getAppId()); + + if (status != null) { + queryBuilder.append(" AND status = ?"); + parameters.add(status.toString()); + } + + String query = queryBuilder.toString(); + + return execute(start, query, pst -> { + for (int i = 0; i < parameters.size(); i++) { + pst.setObject(i + 1, parameters.get(i)); + } + }, result -> { + result.next(); + return result.getLong(1); + }); + } + + public static void updateMultipleBulkImportUsersStatusToError_Transaction(Start start, Connection con, AppIdentifier appIdentifier, + @Nonnull Map bulkImportUserIdToErrorMessage) + throws SQLException { + BULK_IMPORT_USER_STATUS errorStatus = BULK_IMPORT_USER_STATUS.FAILED; + String query = "UPDATE " + Config.getConfig(start).getBulkImportUsersTable() + + " SET status = ?, error_msg = ?, updated_at = ? WHERE app_id = ? and id = ?"; + + PreparedStatement setErrorStatement = con.prepareStatement(query); + + int counter = 0; + for(String bulkImportUserId : bulkImportUserIdToErrorMessage.keySet()){ + setErrorStatement.setString(1, errorStatus.toString()); + setErrorStatement.setString(2, bulkImportUserIdToErrorMessage.get(bulkImportUserId)); + setErrorStatement.setLong(3, System.currentTimeMillis()); + setErrorStatement.setString(4, appIdentifier.getAppId()); + setErrorStatement.setString(5, bulkImportUserId); + setErrorStatement.addBatch(); + + if(counter % 100 == 0) { + setErrorStatement.executeBatch(); + } + } + + setErrorStatement.executeBatch(); + } + + private static class BulkImportUserRowMapper implements RowMapper { + private static final BulkImportUserRowMapper INSTANCE = new BulkImportUserRowMapper(); + + private BulkImportUserRowMapper() { + } + + private static BulkImportUserRowMapper getInstance() { + return INSTANCE; + } + + @Override + public BulkImportUser map(ResultSet result) throws Exception { + return BulkImportUser.fromRawDataFromDbStorage(result.getString("id"), result.getString("raw_data"), + BULK_IMPORT_USER_STATUS.valueOf(result.getString("status")), + result.getString("primary_user_id"), result.getString("error_msg"), result.getLong("created_at"), + result.getLong("updated_at")); + } + } +} \ No newline at end of file diff --git a/src/main/java/io/supertokens/storage/mysql/queries/EmailPasswordQueries.java b/src/main/java/io/supertokens/storage/mysql/queries/EmailPasswordQueries.java index 1380efdc..359d6412 100644 --- a/src/main/java/io/supertokens/storage/mysql/queries/EmailPasswordQueries.java +++ b/src/main/java/io/supertokens/storage/mysql/queries/EmailPasswordQueries.java @@ -19,6 +19,7 @@ import io.supertokens.pluginInterface.RowMapper; import io.supertokens.pluginInterface.authRecipe.AuthRecipeUserInfo; import io.supertokens.pluginInterface.authRecipe.LoginMethod; +import io.supertokens.pluginInterface.emailpassword.EmailPasswordImportUser; import io.supertokens.pluginInterface.emailpassword.PasswordResetTokenInfo; import io.supertokens.pluginInterface.emailpassword.exceptions.UnknownUserIdException; import io.supertokens.pluginInterface.exceptions.StorageQueryException; @@ -30,6 +31,7 @@ import io.supertokens.storage.mysql.utils.Utils; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.*; @@ -310,6 +312,83 @@ public static AuthRecipeUserInfo signUp(Start start, TenantIdentifier tenantIden }); } + public static void signUpMultipleForBulkImport_Transaction(Start start, Connection sqlCon, List usersToSignUp) + throws StorageQueryException, StorageTransactionLogicException, SQLException { + try { + String app_id_to_user_id_QUERY = "INSERT INTO " + getConfig(start).getAppIdToUserIdTable() + + "(app_id, user_id, primary_or_recipe_user_id, recipe_id)" + " VALUES(?, ?, ?, ?)"; + + String all_auth_recipe_users_QUERY = "INSERT INTO " + getConfig(start).getUsersTable() + + "(app_id, tenant_id, user_id, primary_or_recipe_user_id, recipe_id, time_joined, " + + "primary_or_recipe_user_time_joined)" + + " VALUES(?, ?, ?, ?, ?, ?, ?)"; + + String emailpassword_users_QUERY = "INSERT INTO " + getConfig(start).getEmailPasswordUsersTable() + + "(app_id, user_id, email, password_hash, time_joined)" + " VALUES(?, ?, ?, ?, ?)"; + + String emailpassword_users_to_tenant_QUERY = + "INSERT INTO " + getConfig(start).getEmailPasswordUserToTenantTable() + + "(app_id, tenant_id, user_id, email)" + " VALUES(?, ?, ?, ?)"; + + PreparedStatement appIdToUserId = sqlCon.prepareStatement(app_id_to_user_id_QUERY); + PreparedStatement allAuthRecipeUsers = sqlCon.prepareStatement(all_auth_recipe_users_QUERY); + PreparedStatement emailPasswordUsers = sqlCon.prepareStatement(emailpassword_users_QUERY); + PreparedStatement emailPasswordUsersToTenant = sqlCon.prepareStatement(emailpassword_users_to_tenant_QUERY); + + int counter = 0; + for (EmailPasswordImportUser user : usersToSignUp) { + String userId = user.userId; + TenantIdentifier tenantIdentifier = user.tenantIdentifier; + + appIdToUserId.setString(1, tenantIdentifier.getAppId()); + appIdToUserId.setString(2, userId); + appIdToUserId.setString(3, userId); + appIdToUserId.setString(4, EMAIL_PASSWORD.toString()); + appIdToUserId.addBatch(); + + + allAuthRecipeUsers.setString(1, tenantIdentifier.getAppId()); + allAuthRecipeUsers.setString(2, tenantIdentifier.getTenantId()); + allAuthRecipeUsers.setString(3, userId); + allAuthRecipeUsers.setString(4, userId); + allAuthRecipeUsers.setString(5, EMAIL_PASSWORD.toString()); + allAuthRecipeUsers.setLong(6, user.timeJoinedMSSinceEpoch); + allAuthRecipeUsers.setLong(7, user.timeJoinedMSSinceEpoch); + allAuthRecipeUsers.addBatch(); + + emailPasswordUsers.setString(1, tenantIdentifier.getAppId()); + emailPasswordUsers.setString(2, userId); + emailPasswordUsers.setString(3, user.email); + emailPasswordUsers.setString(4, user.passwordHash); + emailPasswordUsers.setLong(5, user.timeJoinedMSSinceEpoch); + emailPasswordUsers.addBatch(); + + emailPasswordUsersToTenant.setString(1, tenantIdentifier.getAppId()); + emailPasswordUsersToTenant.setString(2, tenantIdentifier.getTenantId()); + emailPasswordUsersToTenant.setString(3, userId); + emailPasswordUsersToTenant.setString(4, user.email); + emailPasswordUsersToTenant.addBatch(); + counter++; + if (counter % 100 == 0) { + appIdToUserId.executeBatch(); + allAuthRecipeUsers.executeBatch(); + emailPasswordUsers.executeBatch(); + emailPasswordUsersToTenant.executeBatch(); + } + } + + //execute the remaining ones + appIdToUserId.executeBatch(); + allAuthRecipeUsers.executeBatch(); + emailPasswordUsers.executeBatch(); + emailPasswordUsersToTenant.executeBatch(); + + //sqlCon.commit(); + } catch (SQLException throwables) { + throw new StorageTransactionLogicException(throwables); + } + } + public static void deleteUser_Transaction(Connection sqlCon, Start start, AppIdentifier appIdentifier, String userId, boolean deleteUserIdMappingToo) throws StorageQueryException, SQLException { diff --git a/src/main/java/io/supertokens/storage/mysql/queries/EmailVerificationQueries.java b/src/main/java/io/supertokens/storage/mysql/queries/EmailVerificationQueries.java index b612b003..1e96fa5f 100644 --- a/src/main/java/io/supertokens/storage/mysql/queries/EmailVerificationQueries.java +++ b/src/main/java/io/supertokens/storage/mysql/queries/EmailVerificationQueries.java @@ -28,6 +28,7 @@ import io.supertokens.storage.mysql.utils.Utils; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.*; @@ -494,6 +495,133 @@ public static boolean isUserIdBeingUsedForEmailVerification(Start start, AppIden } } + public static void updateMultipleUsersIsEmailVerified_Transaction(Start start, Connection con, AppIdentifier appIdentifier, + Map emailToUserIds, + boolean isEmailVerified) + throws SQLException, StorageQueryException { + + if (isEmailVerified) { + String QUERY = "INSERT INTO " + getConfig(start).getEmailVerificationTable() + + "(app_id, user_id, email) VALUES(?, ?, ?)"; + PreparedStatement insertQuery = con.prepareStatement(QUERY); + int counter = 0; + for(Map.Entry emailToUser : emailToUserIds.entrySet()){ + insertQuery.setString(1, appIdentifier.getAppId()); + insertQuery.setString(2, emailToUser.getKey()); + insertQuery.setString(3, emailToUser.getValue()); + insertQuery.addBatch(); + + counter++; + if (counter % 100 == 0) { + insertQuery.executeBatch(); + } + } + insertQuery.executeBatch(); + } else { + String QUERY = "DELETE FROM " + getConfig(start).getEmailVerificationTable() + + " WHERE app_id = ? AND user_id = ? AND email = ?"; + PreparedStatement deleteQuery = con.prepareStatement(QUERY); + int counter = 0; + for (Map.Entry emailToUser : emailToUserIds.entrySet()) { + deleteQuery.setString(1, appIdentifier.getAppId()); + deleteQuery.setString(2, emailToUser.getValue()); + deleteQuery.setString(3, emailToUser.getKey()); + deleteQuery.addBatch(); + + counter++; + if (counter % 100 == 0) { + deleteQuery.executeBatch(); + } + } + deleteQuery.executeBatch(); + } + } + + public static Set findUserIdsBeingUsedForEmailVerification(Start start, AppIdentifier appIdentifier, List userIds) + throws SQLException, StorageQueryException { + + Set foundUserIds = new HashSet<>(); + + String email_verificiation_tokens_QUERY = "SELECT * FROM " + getConfig(start).getEmailVerificationTokensTable() + + " WHERE app_id = ? AND user_id IN (" + Utils.generateCommaSeperatedQuestionMarks(userIds.size()) +")"; + + foundUserIds.addAll(execute(start, email_verificiation_tokens_QUERY, pst -> { + pst.setString(1, appIdentifier.getAppId()); + for (int i = 0; i < userIds.size(); i++) { + pst.setString(2 + i, userIds.get(i)); + } + }, result -> { + Set userIdsFound = new HashSet<>(); + while (result.next()) { + userIdsFound.add(result.getString("user_id")); + } + return userIdsFound; + })); + + String email_verification_table_QUERY = "SELECT * FROM " + getConfig(start).getEmailVerificationTable() + + " WHERE app_id = ? AND user_id IN (" + Utils.generateCommaSeperatedQuestionMarks(userIds.size()) +")"; + + foundUserIds.addAll(execute(start, email_verification_table_QUERY, pst -> { + pst.setString(1, appIdentifier.getAppId()); + for (int i = 0; i < userIds.size(); i++) { + pst.setString(2 + i, userIds.get(i)); + } + }, result -> { + Set userIdsFound = new HashSet<>(); + while (result.next()) { + userIdsFound.add(result.getString("user_id")); + } + return userIdsFound; + })); + return foundUserIds; + } + + public static void updateMultipleIsEmailVerifiedToExternalUserIds(Start start, AppIdentifier appIdentifier, + Map supertokensUserIdToExternalUserId) + throws StorageQueryException { + try { + start.startTransaction((TransactionConnection con) -> { + Connection sqlCon = (Connection) con.getConnection(); + try { + String update_email_verification_table_query = "UPDATE " + getConfig(start).getEmailVerificationTable() + + " SET user_id = ? WHERE app_id = ? AND user_id = ?"; + String update_email_verification_tokens_table_query = "UPDATE " + getConfig(start).getEmailVerificationTokensTable() + + " SET user_id = ? WHERE app_id = ? AND user_id = ?"; + PreparedStatement updateEmailVerificationQuery = sqlCon.prepareStatement(update_email_verification_table_query); + PreparedStatement updateEmailVerificationTokensQuery = sqlCon.prepareStatement(update_email_verification_tokens_table_query); + + int counter = 0; + for (String supertokensUserId : supertokensUserIdToExternalUserId.keySet()){ + updateEmailVerificationQuery.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId)); + updateEmailVerificationQuery.setString(2, appIdentifier.getAppId()); + updateEmailVerificationQuery.setString(3, supertokensUserId); + updateEmailVerificationQuery.addBatch(); + + updateEmailVerificationTokensQuery.setString(1, supertokensUserIdToExternalUserId.get(supertokensUserId)); + updateEmailVerificationTokensQuery.setString(2, appIdentifier.getAppId()); + updateEmailVerificationTokensQuery.setString(3, supertokensUserId); + updateEmailVerificationTokensQuery.addBatch(); + + counter++; + if(counter % 100 == 0) { + updateEmailVerificationQuery.executeBatch(); + updateEmailVerificationTokensQuery.executeBatch(); + } + } + updateEmailVerificationQuery.executeBatch(); + updateEmailVerificationTokensQuery.executeBatch(); + + } catch (SQLException e) { + throw new StorageTransactionLogicException(e); + } + + return null; + }); + } catch (StorageTransactionLogicException e) { + throw new StorageQueryException(e.actualException); + } + } + private static class EmailVerificationTokenInfoRowMapper implements RowMapper { private static final EmailVerificationTokenInfoRowMapper INSTANCE = new EmailVerificationTokenInfoRowMapper(); diff --git a/src/main/java/io/supertokens/storage/mysql/queries/GeneralQueries.java b/src/main/java/io/supertokens/storage/mysql/queries/GeneralQueries.java index 956faa02..e3c7e30c 100644 --- a/src/main/java/io/supertokens/storage/mysql/queries/GeneralQueries.java +++ b/src/main/java/io/supertokens/storage/mysql/queries/GeneralQueries.java @@ -33,10 +33,7 @@ import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.ResultSet; -import java.sql.SQLException; +import java.sql.*; import java.util.*; import java.util.stream.Collectors; @@ -417,6 +414,15 @@ public static void createTablesIfNotExists(Start start, Connection con) throws S update(con, TOTPQueries.getQueryToCreateUsedCodesExpiryTimeIndex(start), NO_OP_SETTER); } + if (!doesTableExists(start, con, Config.getConfig(start).getBulkImportUsersTable())) { + getInstance(start).addState(CREATING_NEW_TABLE, null); + update(start, BulkImportQueries.getQueryToCreateBulkImportUsersTable(start), NO_OP_SETTER); + // index: + update(start, BulkImportQueries.getQueryToCreateStatusUpdatedAtIndex(start), NO_OP_SETTER); + update(start, BulkImportQueries.getQueryToCreatePaginationIndex1(start), NO_OP_SETTER); + update(start, BulkImportQueries.getQueryToCreatePaginationIndex2(start), NO_OP_SETTER); + } + if (!doesTableExists(start, con, Config.getConfig(start).getOAuthClientsTable())) { getInstance(start).addState(CREATING_NEW_TABLE, null); update(con, OAuthQueries.getQueryToCreateOAuthClientTable(start), NO_OP_SETTER); @@ -987,6 +993,37 @@ public static void makePrimaryUser_Transaction(Start start, Connection sqlCon, A } } + public static void makePrimaryUsers_Transaction(Start start, Connection sqlCon, AppIdentifier appIdentifier, + List userIds) + throws SQLException, StorageQueryException { + + String users_update_QUERY = "UPDATE " + Config.getConfig(start).getUsersTable() + + " SET is_linked_or_is_a_primary_user = true WHERE app_id = ? AND user_id = ?"; + String appid_to_userid_update_QUERY = "UPDATE " + Config.getConfig(start).getAppIdToUserIdTable() + + " SET is_linked_or_is_a_primary_user = true WHERE app_id = ? AND user_id = ?"; + + PreparedStatement usersUpdateStatement = sqlCon.prepareStatement(users_update_QUERY); + PreparedStatement appIdToUserIdUpdateStatement = sqlCon.prepareStatement(appid_to_userid_update_QUERY); + int counter = 0; + for(String userId: userIds){ + usersUpdateStatement.setString(1, appIdentifier.getAppId()); + usersUpdateStatement.setString(2, userId); + usersUpdateStatement.addBatch(); + + appIdToUserIdUpdateStatement.setString(1, appIdentifier.getAppId()); + appIdToUserIdUpdateStatement.setString(2, userId); + appIdToUserIdUpdateStatement.addBatch(); + + counter++; + if(counter % 100 == 0) { + usersUpdateStatement.executeBatch(); + appIdToUserIdUpdateStatement.executeBatch(); + } + } + usersUpdateStatement.executeBatch(); + appIdToUserIdUpdateStatement.executeBatch(); + } + public static void linkAccounts_Transaction(Start start, Connection sqlCon, AppIdentifier appIdentifier, String recipeUserId, String primaryUserId) throws SQLException, StorageQueryException { @@ -1017,6 +1054,73 @@ public static void linkAccounts_Transaction(Start start, Connection sqlCon, AppI } } + public static void linkMultipleAccounts_Transaction(Start start, Connection sqlCon, AppIdentifier appIdentifier, + Map recipeUserIdToPrimaryUserId) + throws SQLException, StorageQueryException { + + if(recipeUserIdToPrimaryUserId == null || recipeUserIdToPrimaryUserId.isEmpty()){ + return; + } + + String update_users_QUERY = "UPDATE " + Config.getConfig(start).getUsersTable() + + " SET is_linked_or_is_a_primary_user = true, primary_or_recipe_user_id = ? WHERE app_id = ? AND " + + "user_id = ?"; + + String update_appid_to_userid_QUERY = "UPDATE " + Config.getConfig(start).getAppIdToUserIdTable() + + " SET is_linked_or_is_a_primary_user = true, primary_or_recipe_user_id = ? WHERE app_id = ? AND " + + "user_id = ?"; + + PreparedStatement updateUsers = sqlCon.prepareStatement(update_users_QUERY); + PreparedStatement updateAppIdToUserId = sqlCon.prepareStatement(update_appid_to_userid_QUERY); + + int counter = 0; + for(Map.Entry linkEntry : recipeUserIdToPrimaryUserId.entrySet()) { + String primaryUserId = linkEntry.getValue(); + String recipeUserId = linkEntry.getKey(); + + updateUsers.setString(1, primaryUserId); + updateUsers.setString(2, appIdentifier.getAppId()); + updateUsers.setString(3, recipeUserId); + updateUsers.addBatch(); + + updateAppIdToUserId.setString(1, primaryUserId); + updateAppIdToUserId.setString(2, appIdentifier.getAppId()); + updateAppIdToUserId.setString(3, recipeUserId); + updateAppIdToUserId.addBatch(); + + counter++; + if (counter % 100 == 0) { + updateUsers.executeBatch(); + updateAppIdToUserId.executeBatch(); + } + } + + updateUsers.executeBatch(); + updateAppIdToUserId.executeBatch(); + + updateTimeJoinedForPrimaryUsers_Transaction(start, sqlCon, appIdentifier, + new ArrayList<>(recipeUserIdToPrimaryUserId.values())); + } + + public static void updateTimeJoinedForPrimaryUsers_Transaction(Start start, Connection sqlCon, + AppIdentifier appIdentifier, List primaryUserIds) + throws SQLException, StorageQueryException { + String QUERY = "UPDATE " + Config.getConfig(start).getUsersTable() + + " SET primary_or_recipe_user_time_joined = (SELECT MIN(time_joined) FROM " + + Config.getConfig(start).getUsersTable() + " WHERE app_id = ? AND primary_or_recipe_user_id = ?) WHERE " + + " app_id = ? AND primary_or_recipe_user_id = ?"; + PreparedStatement updateStatement = sqlCon.prepareStatement(QUERY); + for(String primaryUserId : primaryUserIds) { + updateStatement.setString(1, appIdentifier.getAppId()); + updateStatement.setString(2, primaryUserId); + updateStatement.setString(3, appIdentifier.getAppId()); + updateStatement.setString(4, primaryUserId); + updateStatement.addBatch(); + } + + updateStatement.executeBatch(); + } + public static void unlinkAccounts_Transaction(Start start, Connection sqlCon, AppIdentifier appIdentifier, String primaryUserId, String recipeUserId) throws SQLException, StorageQueryException { @@ -1344,6 +1448,17 @@ private static List getPrimaryUserInfoForUserIds(Start start .collect(Collectors.toList()); } + public static List getPrimaryUserInfosForUserIds_Transaction(Start start, Connection con, + AppIdentifier appIdentifier, List ids) + throws SQLException, StorageQueryException { + + List result = getPrimaryUserInfoForUserIds_Transaction(start, con, appIdentifier, ids); + if (result.isEmpty()) { + return null; + } + return result; + } + private static List getPrimaryUserInfoForUserIds_Transaction(Start start, Connection sqlCon, AppIdentifier appIdentifier, List userIds) @@ -1678,6 +1793,24 @@ public static boolean checkIfUsesAccountLinking(Start start, AppIdentifier appId }); } + public static List findUserIdsThatExist(Start start, AppIdentifier appIdentifier, List userIds) + throws SQLException, StorageQueryException { + String QUERY = "SELECT user_id FROM " + Config.getConfig(start).getAppIdToUserIdTable() + + " WHERE app_id = ? AND user_id IN ("+ Utils.generateCommaSeperatedQuestionMarks(userIds.size()) +")"; + return execute(start, QUERY, pst -> { + pst.setString(1, appIdentifier.getAppId()); + for(int i = 0; i { + List foundUserIds = new ArrayList<>(); + while(result.next()){ + foundUserIds.add(result.getString(1)); + } + return foundUserIds; + }); + } + public static AccountLinkingInfo getAccountLinkingInfo_Transaction(Start start, Connection sqlCon, AppIdentifier appIdentifier, String userId) throws SQLException, StorageQueryException { diff --git a/src/main/java/io/supertokens/storage/mysql/queries/PasswordlessQueries.java b/src/main/java/io/supertokens/storage/mysql/queries/PasswordlessQueries.java index ae6b4be2..537fd618 100644 --- a/src/main/java/io/supertokens/storage/mysql/queries/PasswordlessQueries.java +++ b/src/main/java/io/supertokens/storage/mysql/queries/PasswordlessQueries.java @@ -26,6 +26,7 @@ import io.supertokens.pluginInterface.multitenancy.TenantIdentifier; import io.supertokens.pluginInterface.passwordless.PasswordlessCode; import io.supertokens.pluginInterface.passwordless.PasswordlessDevice; +import io.supertokens.pluginInterface.passwordless.PasswordlessImportUser; import io.supertokens.pluginInterface.sqlStorage.SQLStorage.TransactionIsolationLevel; import io.supertokens.storage.mysql.ConnectionPool; import io.supertokens.storage.mysql.Start; @@ -35,6 +36,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.*; @@ -154,6 +156,75 @@ public static void createDeviceWithCode(Start start, TenantIdentifier tenantIden }, TransactionIsolationLevel.REPEATABLE_READ); } + public static void importUsers_Transaction(Connection sqlCon, Start start, + Collection users) throws SQLException { + + String app_id_to_user_id_QUERY = "INSERT INTO " + getConfig(start).getAppIdToUserIdTable() + + "(app_id, user_id, primary_or_recipe_user_id, recipe_id)" + " VALUES(?, ?, ?, ?)"; + PreparedStatement appIdToUserIdStatement = sqlCon.prepareStatement(app_id_to_user_id_QUERY); + + String all_auth_recipe_users_QUERY = "INSERT INTO " + getConfig(start).getUsersTable() + + "(app_id, tenant_id, user_id, primary_or_recipe_user_id, recipe_id, time_joined, " + + "primary_or_recipe_user_time_joined)" + + " VALUES(?, ?, ?, ?, ?, ?, ?)"; + PreparedStatement allAuthRecipeUsersStatement = sqlCon.prepareStatement(all_auth_recipe_users_QUERY); + + String passwordless_users_QUERY = "INSERT INTO " + getConfig(start).getPasswordlessUsersTable() + + "(app_id, user_id, email, phone_number, time_joined)" + " VALUES(?, ?, ?, ?, ?)"; + PreparedStatement passwordlessUsersStatement = sqlCon.prepareStatement(passwordless_users_QUERY); + + String passwordless_user_to_tenant_QUERY = "INSERT INTO " + getConfig(start).getPasswordlessUserToTenantTable() + + "(app_id, tenant_id, user_id, email, phone_number)" + " VALUES(?, ?, ?, ?, ?)"; + PreparedStatement passwordlessUserToTenantStatement = sqlCon.prepareStatement(passwordless_user_to_tenant_QUERY); + + int counter = 0; + for (PasswordlessImportUser user: users){ + TenantIdentifier tenantIdentifier = user.tenantIdentifier; + appIdToUserIdStatement.setString(1, tenantIdentifier.getAppId()); + appIdToUserIdStatement.setString(2, user.userId); + appIdToUserIdStatement.setString(3, user.userId); + appIdToUserIdStatement.setString(4, PASSWORDLESS.toString()); + appIdToUserIdStatement.addBatch(); + + allAuthRecipeUsersStatement.setString(1, tenantIdentifier.getAppId()); + allAuthRecipeUsersStatement.setString(2, tenantIdentifier.getTenantId()); + allAuthRecipeUsersStatement.setString(3, user.userId); + allAuthRecipeUsersStatement.setString(4, user.userId); + allAuthRecipeUsersStatement.setString(5, PASSWORDLESS.toString()); + allAuthRecipeUsersStatement.setLong(6, user.timeJoinedMSSinceEpoch); + allAuthRecipeUsersStatement.setLong(7, user.timeJoinedMSSinceEpoch); + allAuthRecipeUsersStatement.addBatch(); + + passwordlessUsersStatement.setString(1, tenantIdentifier.getAppId()); + passwordlessUsersStatement.setString(2, user.userId); + passwordlessUsersStatement.setString(3, user.email); + passwordlessUsersStatement.setString(4, user.phoneNumber); + passwordlessUsersStatement.setLong(5, user.timeJoinedMSSinceEpoch); + passwordlessUsersStatement.addBatch(); + + passwordlessUserToTenantStatement.setString(1, tenantIdentifier.getAppId()); + passwordlessUserToTenantStatement.setString(2, tenantIdentifier.getTenantId()); + passwordlessUserToTenantStatement.setString(3, user.userId); + passwordlessUserToTenantStatement.setString(4, user.email); + passwordlessUserToTenantStatement.setString(5, user.phoneNumber); + passwordlessUserToTenantStatement.addBatch(); + + counter++; + + if(counter % 100 == 0) { + appIdToUserIdStatement.executeBatch(); + allAuthRecipeUsersStatement.executeBatch(); + passwordlessUsersStatement.executeBatch(); + passwordlessUserToTenantStatement.executeBatch(); + } + } + + appIdToUserIdStatement.executeBatch(); + allAuthRecipeUsersStatement.executeBatch(); + passwordlessUsersStatement.executeBatch(); + passwordlessUserToTenantStatement.executeBatch(); + + } public static PasswordlessDevice getDevice_Transaction(Start start, Connection con, TenantIdentifier tenantIdentifier, String deviceIdHash) throws StorageQueryException, SQLException { diff --git a/src/main/java/io/supertokens/storage/mysql/queries/SessionQueries.java b/src/main/java/io/supertokens/storage/mysql/queries/SessionQueries.java index 93fef07f..365929e5 100644 --- a/src/main/java/io/supertokens/storage/mysql/queries/SessionQueries.java +++ b/src/main/java/io/supertokens/storage/mysql/queries/SessionQueries.java @@ -26,13 +26,16 @@ import io.supertokens.pluginInterface.session.SessionInfo; import io.supertokens.storage.mysql.Start; import io.supertokens.storage.mysql.config.Config; +import io.supertokens.storage.mysql.utils.Utils; import javax.annotation.Nullable; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute; import static io.supertokens.storage.mysql.QueryExecutorTemplate.update; @@ -393,6 +396,31 @@ public static void removeAccessTokenSigningKeysBefore(Start start, AppIdentifier }); } + public static Map> getAllNonExpiredSessionHandlesForUsers(Start start, AppIdentifier appIdentifier, + List userIds) + throws SQLException, StorageQueryException { + String QUERY = "SELECT user_id, session_handle FROM " + Config.getConfig(start).getSessionInfoTable() + + " WHERE app_id = ? AND expires_at >= ? AND user_id IN ( " + Utils.generateCommaSeperatedQuestionMarks(userIds.size()) + " )"; + + return execute(start, QUERY, pst -> { + pst.setString(1, appIdentifier.getAppId()); + pst.setLong(2, currentTimeMillis()); + for(int i = 0; i < userIds.size() ; i++){ + pst.setString(3 + i, userIds.get(i)); + } + }, result -> { + Map> temp = new HashMap<>(); + while (result.next()) { + String userId = result.getString("user_id"); + if(!temp.containsKey(userId)){ + temp.put(userId, new ArrayList<>()); + } + temp.get(userId).add(result.getString("session_handle")); + } + return temp; + }); + } + static class SessionInfoRowMapper { public static final SessionInfoRowMapper INSTANCE = new SessionInfoRowMapper(); diff --git a/src/main/java/io/supertokens/storage/mysql/queries/TOTPQueries.java b/src/main/java/io/supertokens/storage/mysql/queries/TOTPQueries.java index 2ccc763c..5d8cdd44 100644 --- a/src/main/java/io/supertokens/storage/mysql/queries/TOTPQueries.java +++ b/src/main/java/io/supertokens/storage/mysql/queries/TOTPQueries.java @@ -8,12 +8,16 @@ import io.supertokens.pluginInterface.totp.TOTPUsedCode; import io.supertokens.storage.mysql.Start; import io.supertokens.storage.mysql.config.Config; +import io.supertokens.storage.mysql.utils.Utils; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute; import static io.supertokens.storage.mysql.QueryExecutorTemplate.update; @@ -287,6 +291,76 @@ public static int removeExpiredCodes(Start start, TenantIdentifier tenantIdentif }); } + public static void createDevices_Transaction(Start start, Connection sqlCon, AppIdentifier appIdentifier, + List devices) + throws SQLException, StorageQueryException { + + String insert_user_QUERY = "INSERT INTO " + Config.getConfig(start).getTotpUsersTable() + + " (app_id, user_id) VALUES (?, ?) ON DUPLICATE KEY UPDATE user_id=user_id"; + + String insert_device_QUERY = "INSERT INTO " + Config.getConfig(start).getTotpUserDevicesTable() + + + " (app_id, user_id, device_name, secret_key, period, skew, verified, created_at) VALUES (?, ?, ?, ?, " + + "?, ?, ?, ?) ON DUPLICATE KEY UPDATE secret_key = ?, period = ?, skew = ?, created_at = ?, verified = ?"; + + PreparedStatement insertUserStatement = sqlCon.prepareStatement(insert_user_QUERY); + PreparedStatement insertDeviceStatement = sqlCon.prepareStatement(insert_device_QUERY); + + int counter = 0; + for(TOTPDevice device : devices){ + insertUserStatement.setString(1, appIdentifier.getAppId()); + insertUserStatement.setString(2, device.userId); + insertUserStatement.addBatch(); + + insertDeviceStatement.setString(1, appIdentifier.getAppId()); + insertDeviceStatement.setString(2, device.userId); + insertDeviceStatement.setString(3, device.deviceName); + insertDeviceStatement.setString(4, device.secretKey); + insertDeviceStatement.setInt(5, device.period); + insertDeviceStatement.setInt(6, device.skew); + insertDeviceStatement.setBoolean(7, device.verified); + insertDeviceStatement.setLong(8, device.createdAt); + insertDeviceStatement.setString(9, device.secretKey); + insertDeviceStatement.setInt(10, device.period); + insertDeviceStatement.setInt(11, device.skew); + insertDeviceStatement.setLong(12, device.createdAt); + insertDeviceStatement.setBoolean(13, device.verified); + insertDeviceStatement.addBatch(); + counter++; + if(counter % 100 == 0) { + insertUserStatement.executeBatch(); + insertDeviceStatement.executeBatch(); + } + } + + insertUserStatement.executeBatch(); + insertDeviceStatement.executeBatch(); + } + + public static Map> getDevicesForMultipleUsers(Start start, AppIdentifier appIdentifier, List userIds) + throws StorageQueryException, SQLException { + String QUERY = "SELECT * FROM " + Config.getConfig(start).getTotpUserDevicesTable() + + " WHERE app_id = ? AND user_id IN (" + Utils.generateCommaSeperatedQuestionMarks(userIds.size()) + ");"; + + return execute(start, QUERY, pst -> { + pst.setString(1, appIdentifier.getAppId()); + for(int i = 0; i < userIds.size(); i++) { + pst.setString(2+i, userIds.get(i)); + } + }, result -> { + Map> devicesByUserIds = new HashMap<>(); + while (result.next()) { + String userId = result.getString("user_id"); + if (!devicesByUserIds.containsKey(userId)){ + devicesByUserIds.put(userId, new ArrayList<>()); + } + devicesByUserIds.get(userId).add(TOTPDeviceRowMapper.getInstance().map(result)); + } + + return devicesByUserIds; + }); + } + private static class TOTPDeviceRowMapper implements RowMapper { private static final TOTPDeviceRowMapper INSTANCE = new TOTPDeviceRowMapper(); diff --git a/src/main/java/io/supertokens/storage/mysql/queries/ThirdPartyQueries.java b/src/main/java/io/supertokens/storage/mysql/queries/ThirdPartyQueries.java index ea0bc546..dcb0a1f5 100644 --- a/src/main/java/io/supertokens/storage/mysql/queries/ThirdPartyQueries.java +++ b/src/main/java/io/supertokens/storage/mysql/queries/ThirdPartyQueries.java @@ -24,12 +24,14 @@ import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException; import io.supertokens.pluginInterface.multitenancy.AppIdentifier; import io.supertokens.pluginInterface.multitenancy.TenantIdentifier; +import io.supertokens.pluginInterface.thirdparty.ThirdPartyImportUser; import io.supertokens.storage.mysql.ConnectionPool; import io.supertokens.storage.mysql.Start; import io.supertokens.storage.mysql.config.Config; import io.supertokens.storage.mysql.utils.Utils; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.*; @@ -581,6 +583,81 @@ private static List fillUserInfoWithTenantIds_transaction(Start return userInfos; } + + public static void importUser_Transaction(Start start, Connection sqlConnection, Collection users) + throws SQLException { + + String app_id_userid_QUERY = "INSERT INTO " + Config.getConfig(start).getAppIdToUserIdTable() + + "(app_id, user_id, primary_or_recipe_user_id, recipe_id)" + " VALUES(?, ?, ?, ?)"; + + String all_auth_recipe_users_QUERY = "INSERT INTO " + Config.getConfig(start).getUsersTable() + + + "(app_id, tenant_id, user_id, primary_or_recipe_user_id, recipe_id, time_joined, " + + "primary_or_recipe_user_time_joined)" + + " VALUES(?, ?, ?, ?, ?, ?, ?)"; + + String thirdparty_users_QUERY = "INSERT INTO " + Config.getConfig(start).getThirdPartyUsersTable() + + "(app_id, third_party_id, third_party_user_id, user_id, email, time_joined)" + + " VALUES(?, ?, ?, ?, ?, ?)"; + + String thirdparty_user_to_tenant_QUERY = "INSERT INTO " + Config.getConfig(start).getThirdPartyUserToTenantTable() + + "(app_id, tenant_id, user_id, third_party_id, third_party_user_id)" + + " VALUES(?, ?, ?, ?, ?)"; + + PreparedStatement appIdToUserIdStatement = sqlConnection.prepareStatement(app_id_userid_QUERY); + PreparedStatement allAuthRecipeUsersStatement = sqlConnection.prepareStatement(all_auth_recipe_users_QUERY); + PreparedStatement thirdPartyUsersStatement = sqlConnection.prepareStatement(thirdparty_users_QUERY); + PreparedStatement thirdPartyUsersToTenantStatement = sqlConnection.prepareStatement( + thirdparty_user_to_tenant_QUERY); + + int counter = 0; + for (ThirdPartyImportUser user : users) { + TenantIdentifier tenantIdentifier = user.tenantIdentifier; + appIdToUserIdStatement.setString(1, tenantIdentifier.getAppId()); + appIdToUserIdStatement.setString(2, user.userId); + appIdToUserIdStatement.setString(3, user.userId); + appIdToUserIdStatement.setString(4, THIRD_PARTY.toString()); + appIdToUserIdStatement.addBatch(); + + allAuthRecipeUsersStatement.setString(1, tenantIdentifier.getAppId()); + allAuthRecipeUsersStatement.setString(2, tenantIdentifier.getTenantId()); + allAuthRecipeUsersStatement.setString(3, user.userId); + allAuthRecipeUsersStatement.setString(4, user.userId); + allAuthRecipeUsersStatement.setString(5, THIRD_PARTY.toString()); + allAuthRecipeUsersStatement.setLong(6, user.timeJoinedMSSinceEpoch); + allAuthRecipeUsersStatement.setLong(7, user.timeJoinedMSSinceEpoch); + allAuthRecipeUsersStatement.addBatch(); + + thirdPartyUsersStatement.setString(1, tenantIdentifier.getAppId()); + thirdPartyUsersStatement.setString(2, user.thirdpartyId); + thirdPartyUsersStatement.setString(3, user.thirdpartyUserId); + thirdPartyUsersStatement.setString(4, user.userId); + thirdPartyUsersStatement.setString(5, user.email); + thirdPartyUsersStatement.setLong(6, user.timeJoinedMSSinceEpoch); + thirdPartyUsersStatement.addBatch(); + + thirdPartyUsersToTenantStatement.setString(1, tenantIdentifier.getAppId()); + thirdPartyUsersToTenantStatement.setString(2, tenantIdentifier.getTenantId()); + thirdPartyUsersToTenantStatement.setString(3, user.userId); + thirdPartyUsersToTenantStatement.setString(4, user.thirdpartyId); + thirdPartyUsersToTenantStatement.setString(5, user.thirdpartyUserId); + thirdPartyUsersToTenantStatement.addBatch(); + + counter++; + if(counter % 100 == 0) { + appIdToUserIdStatement.executeBatch(); + allAuthRecipeUsersStatement.executeBatch(); + thirdPartyUsersStatement.executeBatch(); + thirdPartyUsersToTenantStatement.executeBatch(); + } + } + + appIdToUserIdStatement.executeBatch(); + allAuthRecipeUsersStatement.executeBatch(); + thirdPartyUsersStatement.executeBatch(); + thirdPartyUsersToTenantStatement.executeBatch(); + } + private static class UserInfoPartial { public final String id; public final String email; diff --git a/src/main/java/io/supertokens/storage/mysql/queries/UserIdMappingQueries.java b/src/main/java/io/supertokens/storage/mysql/queries/UserIdMappingQueries.java index 7effa7ad..f0335b3f 100644 --- a/src/main/java/io/supertokens/storage/mysql/queries/UserIdMappingQueries.java +++ b/src/main/java/io/supertokens/storage/mysql/queries/UserIdMappingQueries.java @@ -20,16 +20,20 @@ import io.supertokens.pluginInterface.exceptions.StorageQueryException; import io.supertokens.pluginInterface.multitenancy.AppIdentifier; import io.supertokens.pluginInterface.useridmapping.UserIdMapping; +import io.supertokens.storage.mysql.ConnectionPool; import io.supertokens.storage.mysql.Start; import io.supertokens.storage.mysql.config.Config; +import io.supertokens.storage.mysql.utils.Utils; import javax.annotation.Nullable; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute; import static io.supertokens.storage.mysql.QueryExecutorTemplate.update; @@ -84,6 +88,30 @@ public static UserIdMapping getuseraIdMappingWithSuperTokensUserId(Start start, }); } + public static void createBulkUserIdMapping(Start start, AppIdentifier appIdentifier, + Map superTokensUserIdToExternalUserId) + throws SQLException, StorageQueryException { + String QUERY = "INSERT INTO " + Config.getConfig(start).getUserIdMappingTable() + + " (app_id, supertokens_user_id, external_user_id)" + " VALUES(?, ?, ?)"; + + Connection sqlConnection = ConnectionPool.getConnection(start); + PreparedStatement insertStatement = sqlConnection.prepareStatement(QUERY); + + int counter = 0; + for(String superTokensUserId : superTokensUserIdToExternalUserId.keySet()) { + insertStatement.setString(1, appIdentifier.getAppId()); + insertStatement.setString(2, superTokensUserId); + insertStatement.setString(3, superTokensUserIdToExternalUserId.get(superTokensUserId)); + insertStatement.addBatch(); + + counter++; + if(counter % 100 == 0) { + insertStatement.executeBatch(); + } + } + insertStatement.executeBatch(); + } + public static UserIdMapping getUserIdMappingWithExternalUserId(Start start, AppIdentifier appIdentifier, String userId) throws SQLException, StorageQueryException { @@ -101,6 +129,50 @@ public static UserIdMapping getUserIdMappingWithExternalUserId(Start start, AppI }); } + public static List getMultipleUserIdMappingWithExternalUserId_Transaction(Start start, Connection sqlCon, + AppIdentifier appIdentifier, + List userId) + throws SQLException, StorageQueryException { + String QUERY = "SELECT * FROM " + Config.getConfig(start).getUserIdMappingTable() + + " WHERE app_id = ? AND external_user_id IN ( "+ Utils.generateCommaSeperatedQuestionMarks( + userId.size()) + " )"; + + return execute(sqlCon, QUERY, pst -> { + pst.setString(1, appIdentifier.getAppId()); + for(int i = 0; i < userId.size(); i++) { + pst.setString(2 + i, userId.get(i)); + } + }, result -> { + List results = new ArrayList<>(); + while (result.next()) { + results.add(UserIdMappingRowMapper.getInstance().mapOrThrow(result)); + } + return results; + }); + } + + public static List getMultipleUserIdMappingWithSupertokensUserId_Transaction(Start start, Connection sqlCon, + AppIdentifier appIdentifier, + List userId) + throws SQLException, StorageQueryException { + String QUERY = "SELECT * FROM " + Config.getConfig(start).getUserIdMappingTable() + + " WHERE app_id = ? AND supertokens_user_id IN ( "+ Utils.generateCommaSeperatedQuestionMarks( + userId.size()) + " )"; + + return execute(sqlCon, QUERY, pst -> { + pst.setString(1, appIdentifier.getAppId()); + for(int i = 0; i < userId.size(); i++) { + pst.setString(2 + i, userId.get(i)); + } + }, result -> { + List results = new ArrayList<>(); + while (result.next()) { + results.add(UserIdMappingRowMapper.getInstance().mapOrThrow(result)); + } + return results; + }); + } + public static UserIdMapping[] getUserIdMappingWithEitherSuperTokensUserIdOrExternalUserId(Start start, AppIdentifier appIdentifier, String userId) diff --git a/src/main/java/io/supertokens/storage/mysql/queries/UserMetadataQueries.java b/src/main/java/io/supertokens/storage/mysql/queries/UserMetadataQueries.java index 87e96a76..5e9a0d54 100644 --- a/src/main/java/io/supertokens/storage/mysql/queries/UserMetadataQueries.java +++ b/src/main/java/io/supertokens/storage/mysql/queries/UserMetadataQueries.java @@ -19,12 +19,18 @@ import com.google.gson.JsonObject; import com.google.gson.JsonParser; import io.supertokens.pluginInterface.exceptions.StorageQueryException; +import io.supertokens.pluginInterface.exceptions.StorageTransactionLogicException; import io.supertokens.pluginInterface.multitenancy.AppIdentifier; import io.supertokens.storage.mysql.Start; import io.supertokens.storage.mysql.config.Config; +import io.supertokens.storage.mysql.utils.Utils; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute; import static io.supertokens.storage.mysql.QueryExecutorTemplate.update; @@ -118,4 +124,58 @@ public static JsonObject getUserMetadata(Start start, AppIdentifier appIdentifie return null; }); } + + public static void setMultipleUsersMetadatas_Transaction(Start start, Connection con, AppIdentifier appIdentifier, + Map metadatasByUserId) + throws SQLException, StorageQueryException { + + String QUERY = "INSERT INTO " + getConfig(start).getUserMetadataTable() + + " (app_id, user_id, user_metadata) VALUES(?, ?, ?)" + + " ON DUPLICATE KEY UPDATE user_metadata = ?"; + PreparedStatement insertStatement = con.prepareStatement(QUERY); + + int counter = 0; + for(Map.Entry metadataByUserId : metadatasByUserId.entrySet()){ + insertStatement.setString(1, appIdentifier.getAppId()); + insertStatement.setString(2, metadataByUserId.getKey()); + insertStatement.setString(3, metadataByUserId.getValue().toString()); + insertStatement.setString(4, metadataByUserId.getValue().toString()); + insertStatement.addBatch(); + + counter++; + if(counter % 100 == 0) { + insertStatement.executeBatch(); + } + } + + insertStatement.executeBatch(); + } + + public static Map getMultipleUsersMetadatas_Transaction(Start start, Connection con, AppIdentifier appIdentifier, + List userIds) + throws SQLException, StorageQueryException { + String QUERY = "SELECT user_id, user_metadata FROM " + getConfig(start).getUserMetadataTable() + + " WHERE app_id = ? AND user_id IN (" + Utils.generateCommaSeperatedQuestionMarks(userIds.size()) + + ") FOR UPDATE"; + return execute(con, QUERY, pst -> { + pst.setString(1, appIdentifier.getAppId()); + for (int i = 0; i< userIds.size(); i++){ + pst.setString(2+i, userIds.get(i)); + } + }, result -> { + Map userMetadataByUserId = new HashMap<>(); + JsonParser jp = new JsonParser(); + if (result.next()) { + userMetadataByUserId.put(result.getString("user_id"), + jp.parse(result.getString("user_metadata")).getAsJsonObject()); + } + return userMetadataByUserId; + }); + } + public static Map getMultipleUserMetadatas(Start start, AppIdentifier appIdentifier, List userIds) + throws StorageQueryException, StorageTransactionLogicException { + return start.startTransaction(con -> { + return getMultipleUsersMetadatas_Transaction(start, (Connection) con.getConnection(), appIdentifier, userIds); + }); + } } diff --git a/src/main/java/io/supertokens/storage/mysql/queries/UserRolesQueries.java b/src/main/java/io/supertokens/storage/mysql/queries/UserRolesQueries.java index 670c1e77..45154b61 100644 --- a/src/main/java/io/supertokens/storage/mysql/queries/UserRolesQueries.java +++ b/src/main/java/io/supertokens/storage/mysql/queries/UserRolesQueries.java @@ -21,11 +21,16 @@ import io.supertokens.pluginInterface.multitenancy.TenantIdentifier; import io.supertokens.storage.mysql.Start; import io.supertokens.storage.mysql.config.Config; +import io.supertokens.storage.mysql.utils.Utils; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import static io.supertokens.storage.mysql.QueryExecutorTemplate.execute; import static io.supertokens.storage.mysql.QueryExecutorTemplate.update; @@ -342,4 +347,73 @@ public static boolean deleteAllUserRoleAssociationsForRole(Start start, AppIdent pst.setString(2, role); }) >= 1; } + + public static Map> getRolesForUsers(Start start, AppIdentifier appIdentifier, List userIds) + throws SQLException, StorageQueryException { + String QUERY = "SELECT user_id, role FROM " + Config.getConfig(start).getUserRolesTable() + + " WHERE app_id = ? AND user_id IN ("+ Utils.generateCommaSeperatedQuestionMarks(userIds.size())+") ;"; + + return execute(start, QUERY, pst -> { + pst.setString(1, appIdentifier.getAppId()); + for(int i = 0; i < userIds.size(); i++) { + pst.setString(2+i, userIds.get(i)); + } + }, result -> { + Map> rolesByUserId = new HashMap<>(); + while (result.next()) { + String userId = result.getString("user_id"); + if(!rolesByUserId.containsKey(userId)) { + rolesByUserId.put(userId, new ArrayList<>()); + } + rolesByUserId.get(userId).add(result.getString("role")); + } + return rolesByUserId; + }); + } + + public static void addRolesToUsers_Transaction(Start start, Connection connection, Map>> rolesToUserByTenants) //tenant -> user -> role + throws SQLException, StorageQueryException { + String QUERY = "INSERT INTO " + Config.getConfig(start).getUserRolesTable() + + "(app_id, tenant_id, user_id, role) VALUES(?, ?, ?, ?);"; + PreparedStatement insertStatement = connection.prepareStatement(QUERY); + + int counter = 0; + for(Map.Entry>> tenantsEntry : rolesToUserByTenants.entrySet()) { + for(Map.Entry> rolesToUser : tenantsEntry.getValue().entrySet()) { + for(String roleForUser : rolesToUser.getValue()){ + insertStatement.setString(1, tenantsEntry.getKey().getAppId()); + insertStatement.setString(2, tenantsEntry.getKey().getTenantId()); + insertStatement.setString(3, rolesToUser.getKey()); + insertStatement.setString(4, roleForUser); + insertStatement.addBatch(); + counter++; + + if(counter % 100 == 0) { + insertStatement.executeBatch(); + } + } + } + } + + insertStatement.executeBatch(); + } + + public static List doesMultipleRoleExist_transaction(Start start, Connection con, AppIdentifier appIdentifier, + List roles) + throws SQLException, StorageQueryException { + String QUERY = "SELECT role FROM " + Config.getConfig(start).getRolesTable() + + " WHERE app_id = ? AND role IN (" +Utils.generateCommaSeperatedQuestionMarks(roles.size())+ ") FOR UPDATE"; + return execute(con, QUERY, pst -> { + pst.setString(1, appIdentifier.getAppId()); + for (int i = 0; i < roles.size(); i++) { + pst.setString(2+i, roles.get(i)); + } + }, result -> { + List rolesFound = new ArrayList<>(); + while(result.next()){ + rolesFound.add(result.getString("role")); + } + return rolesFound; + }); + } } diff --git a/src/test/java/io/supertokens/storage/mysql/test/OneMillionUsersTest.java b/src/test/java/io/supertokens/storage/mysql/test/OneMillionUsersTest.java index b8d97ab6..1a23e3bd 100644 --- a/src/test/java/io/supertokens/storage/mysql/test/OneMillionUsersTest.java +++ b/src/test/java/io/supertokens/storage/mysql/test/OneMillionUsersTest.java @@ -19,31 +19,39 @@ import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; +import com.google.gson.JsonParser; import io.supertokens.ActiveUsers; import io.supertokens.Main; import io.supertokens.ProcessState; import io.supertokens.authRecipe.AuthRecipe; import io.supertokens.authRecipe.UserPaginationContainer; +import io.supertokens.cronjobs.CronTaskTest; +import io.supertokens.cronjobs.Cronjobs; +import io.supertokens.cronjobs.bulkimport.ProcessBulkImportUsers; import io.supertokens.emailpassword.EmailPassword; import io.supertokens.emailpassword.ParsedFirebaseSCryptResponse; import io.supertokens.featureflag.EE_FEATURES; import io.supertokens.featureflag.FeatureFlag; import io.supertokens.featureflag.FeatureFlagTestContent; import io.supertokens.passwordless.Passwordless; +import io.supertokens.pluginInterface.RECIPE_ID; +import io.supertokens.pluginInterface.STORAGE_TYPE; import io.supertokens.pluginInterface.Storage; import io.supertokens.pluginInterface.authRecipe.AuthRecipeUserInfo; import io.supertokens.pluginInterface.authRecipe.LoginMethod; import io.supertokens.pluginInterface.authRecipe.sqlStorage.AuthRecipeSQLStorage; +import io.supertokens.pluginInterface.bulkimport.BulkImportStorage; import io.supertokens.pluginInterface.emailpassword.sqlStorage.EmailPasswordSQLStorage; import io.supertokens.pluginInterface.multitenancy.AppIdentifier; import io.supertokens.pluginInterface.multitenancy.TenantIdentifier; +import io.supertokens.pluginInterface.multitenancy.exceptions.TenantOrAppNotFoundException; import io.supertokens.pluginInterface.passwordless.sqlStorage.PasswordlessSQLStorage; import io.supertokens.pluginInterface.thirdparty.sqlStorage.ThirdPartySQLStorage; -import io.supertokens.pluginInterface.RECIPE_ID; import io.supertokens.session.Session; import io.supertokens.session.info.SessionInformationHolder; import io.supertokens.storage.mysql.Start; import io.supertokens.storage.mysql.test.httpRequest.HttpRequestForTesting; +import io.supertokens.storage.mysql.test.httpRequest.HttpResponseException; import io.supertokens.storageLayer.StorageLayer; import io.supertokens.thirdparty.ThirdParty; import io.supertokens.useridmapping.UserIdMapping; @@ -56,6 +64,8 @@ import org.junit.Test; import org.junit.rules.TestRule; +import java.io.IOException; +import java.net.SocketTimeoutException; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -974,4 +984,238 @@ private static long measureTime(Supplier function) { // Calculate elapsed time in milliseconds return (endTime - startTime) / 1000000; // Convert to milliseconds } + + @Test + public void testWithOneMillionUsers() throws Exception { + Main main = startCronProcess(String.valueOf(NUM_THREADS)); + + int NUMBER_OF_USERS_TO_UPLOAD = 1000000; // million + + if (StorageLayer.getBaseStorage(main).getType() != STORAGE_TYPE.SQL || StorageLayer.isInMemDb(main)) { + return; + } + + // Create user roles before inserting bulk users + { + UserRoles.createNewRoleOrModifyItsPermissions(main, "role1", null); + UserRoles.createNewRoleOrModifyItsPermissions(main, "role2", null); + } + + // upload a bunch of users through the API + { + for (int i = 0; i < (NUMBER_OF_USERS_TO_UPLOAD / 10000); i++) { + JsonObject request = generateUsersJson(10000, i * 10000); // API allows 10k users upload at once + JsonObject response = uploadBulkImportUsersJson(main, request); + assertEquals("OK", response.get("status").getAsString()); + } + + } + + long processingStarted = System.currentTimeMillis(); + + // wait for the cron job to process them + // periodically check the remaining unprocessed users + // Note1: the cronjob starts the processing automatically + // Note2: the successfully processed users get deleted from the bulk_import_users table + { + long count = NUMBER_OF_USERS_TO_UPLOAD; + while(true) { + try { + JsonObject response = loadBulkImportUsersCountWithStatus(main, null); + assertEquals("OK", response.get("status").getAsString()); + count = response.get("count").getAsLong(); + int newUsersNumber = loadBulkImportUsersCountWithStatus(main, + BulkImportStorage.BULK_IMPORT_USER_STATUS.NEW).get("count").getAsInt(); + int processingUsersNumber = loadBulkImportUsersCountWithStatus(main, + BulkImportStorage.BULK_IMPORT_USER_STATUS.PROCESSING).get("count").getAsInt(); + int failedUsersNumber = loadBulkImportUsersCountWithStatus(main, + BulkImportStorage.BULK_IMPORT_USER_STATUS.FAILED).get("count").getAsInt(); + count = newUsersNumber + processingUsersNumber; + + if (count == 0) { + break; + } + } catch (Exception e) { + if(e instanceof SocketTimeoutException) { + //ignore + } else { + throw e; + } + } + Thread.sleep(5000); + } + } + + long processingFinished = System.currentTimeMillis(); + System.out.println("Processed " + NUMBER_OF_USERS_TO_UPLOAD + " users in " + (processingFinished - processingStarted) / 1000 + + " seconds ( or " + (processingFinished - processingStarted) / 60000 + " minutes)"); + + // after processing finished, make sure every user got processed correctly + { + int failedImportedUsersNumber = loadBulkImportUsersCountWithStatus(main, BulkImportStorage.BULK_IMPORT_USER_STATUS.FAILED).get("count").getAsInt(); + int usersInCore = loadUsersCount(main).get("count").getAsInt(); + assertEquals(NUMBER_OF_USERS_TO_UPLOAD, usersInCore + failedImportedUsersNumber); + assertEquals(NUMBER_OF_USERS_TO_UPLOAD, usersInCore); + } + } + + private static JsonObject loadBulkImportUsersCountWithStatus(Main main, BulkImportStorage.BULK_IMPORT_USER_STATUS status) + throws HttpResponseException, IOException { + Map params = new HashMap<>(); + if(status!= null) { + params.put("status", status.name()); + } + return HttpRequestForTesting.sendGETRequest(main, "", + "http://localhost:3567/bulk-import/users/count", + params, 10000, 10000, null, SemVer.v5_2.get(), null); + } + + private static JsonObject loadBulkImportUsersWithStatus(Main main, BulkImportStorage.BULK_IMPORT_USER_STATUS status) + throws HttpResponseException, IOException { + Map params = new HashMap<>(); + if(status!= null) { + params.put("status", status.name()); + } + return HttpRequestForTesting.sendGETRequest(main, "", + "http://localhost:3567/bulk-import/users", + params, 10000, 10000, null, SemVer.v5_2.get(), null); + } + + private static JsonObject loadUsersCount(Main main) throws HttpResponseException, IOException { + Map params = new HashMap<>(); + + return HttpRequestForTesting.sendGETRequest(main, "", + "http://localhost:3567/users/count", + params, 10000, 10000, null, SemVer.v5_2.get(), null); + } + + private static JsonObject generateUsersJson(int numberOfUsers, int startIndex) { + JsonObject userJsonObject = new JsonObject(); + JsonParser parser = new JsonParser(); + + JsonArray usersArray = new JsonArray(); + for (int i = 0; i < numberOfUsers; i++) { + JsonObject user = new JsonObject(); + + user.addProperty("externalUserId", UUID.randomUUID().toString()); + user.add("userMetadata", parser.parse("{\"key1\":"+ UUID.randomUUID().toString() + ",\"key2\":{\"key3\":\"value3\"}}")); + user.add("userRoles", parser.parse( + "[{\"role\":\"role1\", \"tenantIds\": [\"public\"]},{\"role\":\"role2\", \"tenantIds\": [\"public\"]}]")); + user.add("totpDevices", parser.parse("[{\"secretKey\":\"secretKey\",\"deviceName\":\"deviceName\"}]")); + + //JsonArray tenanatIds = parser.parse("[\"public\", \"t1\"]").getAsJsonArray(); + JsonArray tenanatIds = parser.parse("[\"public\"]").getAsJsonArray(); + String email = " johndoe+" + (i + startIndex) + "@gmail.com "; + + Random random = new Random(); + + JsonArray loginMethodsArray = new JsonArray(); + //if(random.nextInt(2) == 0){ + loginMethodsArray.add(createEmailLoginMethod(email, tenanatIds)); + //} + if(random.nextInt(2) == 0){ + loginMethodsArray.add(createThirdPartyLoginMethod(email, tenanatIds)); + } + if(random.nextInt(2) == 0){ + loginMethodsArray.add(createPasswordlessLoginMethod(email, tenanatIds, "+910000" + (startIndex + i))); + } + if(loginMethodsArray.size() == 0) { + int methodNumber = random.nextInt(3); + switch (methodNumber) { + case 0: + loginMethodsArray.add(createEmailLoginMethod(email, tenanatIds)); + break; + case 1: + loginMethodsArray.add(createThirdPartyLoginMethod(email, tenanatIds)); + break; + case 2: + loginMethodsArray.add(createPasswordlessLoginMethod(email, tenanatIds, "+911000" + (startIndex + i))); + break; + } + } + user.add("loginMethods", loginMethodsArray); + + usersArray.add(user); + } + + userJsonObject.add("users", usersArray); + return userJsonObject; + } + + private static JsonObject createEmailLoginMethod(String email, JsonArray tenantIds) { + JsonObject loginMethod = new JsonObject(); + loginMethod.add("tenantIds", tenantIds); + loginMethod.addProperty("email", email); + loginMethod.addProperty("recipeId", "emailpassword"); + loginMethod.addProperty("passwordHash", + "$argon2d$v=19$m=12,t=3,p=1$aGI4enNvMmd0Zm0wMDAwMA$r6p7qbr6HD+8CD7sBi4HVw"); + loginMethod.addProperty("hashingAlgorithm", "argon2"); + loginMethod.addProperty("isVerified", true); + loginMethod.addProperty("isPrimary", true); + loginMethod.addProperty("timeJoinedInMSSinceEpoch", 0); + return loginMethod; + } + + private static JsonObject createThirdPartyLoginMethod(String email, JsonArray tenantIds) { + JsonObject loginMethod = new JsonObject(); + loginMethod.add("tenantIds", tenantIds); + loginMethod.addProperty("recipeId", "thirdparty"); + loginMethod.addProperty("email", email); + loginMethod.addProperty("thirdPartyId", "google"); + loginMethod.addProperty("thirdPartyUserId", String.valueOf(email.hashCode())); + loginMethod.addProperty("isVerified", true); + loginMethod.addProperty("isPrimary", false); + loginMethod.addProperty("timeJoinedInMSSinceEpoch", 0); + return loginMethod; + } + + private static JsonObject createPasswordlessLoginMethod(String email, JsonArray tenantIds, String phoneNumber) { + JsonObject loginMethod = new JsonObject(); + loginMethod.add("tenantIds", tenantIds); + loginMethod.addProperty("email", email); + loginMethod.addProperty("recipeId", "passwordless"); + loginMethod.addProperty("phoneNumber", phoneNumber); + loginMethod.addProperty("isVerified", true); + loginMethod.addProperty("isPrimary", false); + loginMethod.addProperty("timeJoinedInMSSinceEpoch", 0); + return loginMethod; + } + + private void setFeatureFlags(Main main, EE_FEATURES[] features) { + FeatureFlagTestContent.getInstance(main).setKeyValue(FeatureFlagTestContent.ENABLED_FEATURES, features); + } + + + private static JsonObject uploadBulkImportUsersJson(Main main, JsonObject request) throws IOException, + HttpResponseException { + return HttpRequestForTesting.sendJsonPOSTRequest(main, "", + "http://localhost:3567/bulk-import/users", + request, 1000, 10000, null, SemVer.v5_2.get(), null); + } + + private Main startCronProcess(String parallelism) throws IOException, InterruptedException, + TenantOrAppNotFoundException { + return startCronProcess(parallelism, 5*60); + } + + private Main startCronProcess(String parallelism, int intervalInSeconds) throws IOException, InterruptedException, TenantOrAppNotFoundException { + String[] args = { "../" }; + + // set processing thread number + Utils.setValueInConfig("bulk_migration_parallelism", parallelism); + + TestingProcessManager.TestingProcess process = TestingProcessManager.start(args, false); + Main main = process.getProcess(); + setFeatureFlags(main, new EE_FEATURES[] { + EE_FEATURES.ACCOUNT_LINKING, EE_FEATURES.MULTI_TENANCY, EE_FEATURES.MFA }); + // We are setting a non-zero initial wait for tests to avoid race condition with the beforeTest process that deletes data in the storage layer + CronTaskTest.getInstance(main).setInitialWaitTimeInSeconds(ProcessBulkImportUsers.RESOURCE_KEY, 5); + CronTaskTest.getInstance(main).setIntervalInSeconds(ProcessBulkImportUsers.RESOURCE_KEY, intervalInSeconds); + + process.startProcess(); + assertNotNull(process.checkOrWaitForEvent(ProcessState.PROCESS_STATE.STARTED)); + + Cronjobs.addCronjob(main, (ProcessBulkImportUsers) main.getResourceDistributor().getResource(new TenantIdentifier(null, null, null), ProcessBulkImportUsers.RESOURCE_KEY)); + return main; + } } From e1784d82e7c0ab50fe3401d33a059fb3cfc393b6 Mon Sep 17 00:00:00 2001 From: Sattvik Chakravarthy Date: Thu, 19 Dec 2024 14:54:40 +0530 Subject: [PATCH 2/3] Update build.gradle --- build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index 92488139..9b16f6be 100644 --- a/build.gradle +++ b/build.gradle @@ -2,7 +2,7 @@ plugins { id 'java-library' } -version = "7.2.0" +version = "7.3.0" repositories { mavenCentral() @@ -129,4 +129,4 @@ tasks.withType(Test) { } } } -} \ No newline at end of file +} From bfed7196e84ecfa3a97e5e8819cba75d8bc80ee9 Mon Sep 17 00:00:00 2001 From: Sattvik Chakravarthy Date: Thu, 19 Dec 2024 14:55:01 +0530 Subject: [PATCH 3/3] Update pluginInterfaceSupported.json --- pluginInterfaceSupported.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pluginInterfaceSupported.json b/pluginInterfaceSupported.json index 25f82381..1f32a8a8 100644 --- a/pluginInterfaceSupported.json +++ b/pluginInterfaceSupported.json @@ -1,6 +1,6 @@ { "_comment": "contains a list of plugin interfaces branch names that this core supports", "versions": [ - "6.3" + "6.4" ] -} \ No newline at end of file +}