Skip to content

Commit

Permalink
Merge branch 'feat/bulk-import-base' into feat/bulk-import-1
Browse files Browse the repository at this point in the history
  • Loading branch information
anku255 committed Apr 29, 2024
2 parents 03e6e9d + a3b9428 commit a7d48ab
Show file tree
Hide file tree
Showing 10 changed files with 788 additions and 141 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

- Adds queries for Bulk Import

## [7.0.1] - 2024-04-17

- Fixes issues with partial failures during tenant creation

## [7.0.0] - 2024-03-13

- Replace `TotpNotEnabledError` with `UnknownUserIdTotpError`.
Expand Down
Binary file not shown.
2 changes: 1 addition & 1 deletion pluginInterfaceSupported.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"_comment": "contains a list of plugin interfaces branch names that this core supports",
"versions": [
"6.0"
"6.1"
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
public class BulkImportProxyStorage extends Start {
private BulkImportProxyConnection connection;

public synchronized Connection getTransactionConnection() throws SQLException {
public synchronized Connection getTransactionConnection() throws SQLException, StorageQueryException {
if (this.connection == null) {
Connection con = ConnectionPool.getConnectionForProxyStorage(this);
this.connection = new BulkImportProxyConnection(con);
Expand Down
36 changes: 29 additions & 7 deletions src/main/java/io/supertokens/storage/mysql/ConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import io.supertokens.pluginInterface.exceptions.DbInitException;
import io.supertokens.pluginInterface.exceptions.StorageQueryException;
import io.supertokens.storage.mysql.config.Config;
import io.supertokens.storage.mysql.config.MySQLConfig;
import io.supertokens.storage.mysql.output.Logging;
Expand All @@ -36,12 +37,14 @@ public class ConnectionPool extends ResourceDistributor.SingletonResource {
private HikariDataSource hikariDataSource = null;

private final Start start;
private PostConnectCallback postConnectCallback;

private ConnectionPool(Start start) {
private ConnectionPool(Start start, PostConnectCallback postConnectCallback) {
this.start = start;
this.postConnectCallback = postConnectCallback;
}

private synchronized void initialiseHikariDataSource() throws SQLException {
private synchronized void initialiseHikariDataSource() throws SQLException, StorageQueryException {
if (this.hikariDataSource != null) {
return;
}
Expand Down Expand Up @@ -98,6 +101,19 @@ private synchronized void initialiseHikariDataSource() throws SQLException {
} catch (Exception e) {
throw new SQLException(e);
}

try {
try (Connection con = hikariDataSource.getConnection()) {
this.postConnectCallback.apply(con);
}
} catch (StorageQueryException e) {
// if an exception happens here, we want to set the hikariDataSource to null once again so that
// whenever the getConnection is called again, we want to re-attempt creation of tables and tenant
// entries for this storage
hikariDataSource.close();
hikariDataSource = null;
throw e;
}
}

private static int getTimeToWaitToInit(Start start) {
Expand Down Expand Up @@ -132,7 +148,8 @@ static boolean isAlreadyInitialised(Start start) {
return getInstance(start) != null && getInstance(start).hikariDataSource != null;
}

static void initPool(Start start, boolean shouldWait) throws DbInitException, SQLException {
static void initPool(Start start, boolean shouldWait, PostConnectCallback postConnectCallback)
throws DbInitException, SQLException, StorageQueryException {
if (isAlreadyInitialised(start)) {
return;
}
Expand All @@ -143,7 +160,7 @@ static void initPool(Start start, boolean shouldWait) throws DbInitException, SQ
+ "you have"
+ " specified the correct values for ('mysql_host' and 'mysql_port') or for 'mysql_connection_uri'";
try {
ConnectionPool con = new ConnectionPool(start);
ConnectionPool con = new ConnectionPool(start, postConnectCallback);
start.getResourceDistributor().setResource(RESOURCE_KEY, con);
while (true) {
try {
Expand Down Expand Up @@ -185,7 +202,7 @@ static void initPool(Start start, boolean shouldWait) throws DbInitException, SQ
}
}

private static Connection getNewConnection(Start start) throws SQLException {
private static Connection getNewConnection(Start start) throws SQLException, StorageQueryException {
if (getInstance(start) == null) {
throw new IllegalStateException("Please call initPool before getConnection");
}
Expand All @@ -198,11 +215,11 @@ private static Connection getNewConnection(Start start) throws SQLException {
return getInstance(start).hikariDataSource.getConnection();
}

public static Connection getConnectionForProxyStorage(Start start) throws SQLException {
public static Connection getConnectionForProxyStorage(Start start) throws SQLException, StorageQueryException {
return getNewConnection(start);
}

public static Connection getConnection(Start start) throws SQLException {
public static Connection getConnection(Start start) throws SQLException, StorageQueryException {
if (start instanceof BulkImportProxyStorage) {
return ((BulkImportProxyStorage) start).getTransactionConnection();
}
Expand All @@ -223,4 +240,9 @@ static void close(Start start) {
}
}
}

@FunctionalInterface
public static interface PostConnectCallback {
void apply(Connection connection) throws StorageQueryException;
}
}
36 changes: 32 additions & 4 deletions src/main/java/io/supertokens/storage/mysql/Start.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public void stopLogging() {
}

@Override
public void initStorage(boolean shouldWait) throws DbInitException {
public void initStorage(boolean shouldWait, List<TenantIdentifier> tenantIdentifiers) throws DbInitException {
if (ConnectionPool.isAlreadyInitialised(this)) {
return;
}
Expand All @@ -253,8 +253,20 @@ public void initStorage(boolean shouldWait) throws DbInitException {
mainThread = Thread.currentThread();
}
try {
ConnectionPool.initPool(this, shouldWait);
GeneralQueries.createTablesIfNotExists(this);
ConnectionPool.initPool(this, shouldWait, (con) -> {
try {
GeneralQueries.createTablesIfNotExists(this, con);
} catch (SQLException e) {
throw new StorageQueryException(e);
}
for (TenantIdentifier tenantIdentifier : tenantIdentifiers) {
try {
this.addTenantIdInTargetStorage_Transaction(con, tenantIdentifier);
} catch (DuplicateTenantException e) {
// ignore
}
}
});
} catch (Exception e) {
throw new DbInitException(e);
}
Expand Down Expand Up @@ -459,7 +471,7 @@ public void deleteAllInformation() throws StorageQueryException {
}
ProcessState.getInstance(this).clear();
try {
initStorage(false);
initStorage(false, new ArrayList<>());
enabled = true; // Allow get connection to work, to delete the data
GeneralQueries.deleteAllTables(this);

Expand Down Expand Up @@ -2252,6 +2264,22 @@ public void addTenantIdInTargetStorage(TenantIdentifier tenantIdentifier)
}
}

public void addTenantIdInTargetStorage_Transaction(Connection con, TenantIdentifier tenantIdentifier)
throws DuplicateTenantException, StorageQueryException {
try {
MultitenancyQueries.addTenantIdInTargetStorage_Transaction(this, con, tenantIdentifier);
} catch (SQLException e) {
if (e instanceof SQLIntegrityConstraintViolationException) {
String errorMessage = e.getMessage();
if (isPrimaryKeyError(errorMessage, Config.getConfig(this).getTenantsTable())) {
throw new DuplicateTenantException();
}
}
throw new StorageQueryException(e);
}
}


@Override
public void overwriteTenantConfig(TenantConfig tenantConfig)
throws TenantOrAppNotFoundException, StorageQueryException, DuplicateThirdPartyIdException,
Expand Down
Loading

0 comments on commit a7d48ab

Please sign in to comment.