Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconnect Cassandra client and reinit session when NoHostAvailableException #2476

Merged
merged 1 commit into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.google.common.collect.Lists;
import org.commonjava.indy.action.IndyLifecycleException;
import org.commonjava.indy.action.StartupAction;
Expand Down Expand Up @@ -302,7 +304,7 @@ private void update( String ga, Set<String> set )
BoundStatement bound = preparedStoresIncrement.bind();
bound.setSet( 0, set );
bound.setString( 1, ga );
session.execute( bound );
executeSession( bound );
inMemoryCache.remove( ga ); // clear to force reloading
}

Expand All @@ -313,11 +315,11 @@ public void reduce( String ga, Set<String> set, boolean isAsync )
bound.setString( 1, ga );
if ( isAsync )
{
session.executeAsync( bound );
executeSession ( bound, true, ResultSetFuture.class );
}
else
{
session.execute( bound );
executeSession( bound );
}
inMemoryCache.remove( ga ); // clear to force reloading
}
Expand Down Expand Up @@ -393,7 +395,7 @@ public Set<String> getStoresContaining( String gaPath )
}
// query db
BoundStatement bound = preparedQueryByGA.bind( gaPath );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
Row row = result.one();
if ( row != null )
{
Expand Down Expand Up @@ -426,4 +428,41 @@ public boolean matchAny( List<ArtifactStore> concreteStores )
}
return false;
}

private ResultSet executeSession ( BoundStatement bind )
{
return executeSession ( bind, false, ResultSet.class );
}

private <T> T executeSession ( BoundStatement bind, boolean isAsync, Class<T> type )
{
boolean exception = false;
T trackingRecord = null;
try
{
if ( session == null || session.isClosed() )
{
cassandraClient.close();
cassandraClient.init();
this.init();
}
trackingRecord = type.cast( isAsync ? session.executeAsync( bind ) : session.execute( bind ) );
}
catch ( NoHostAvailableException e )
{
exception = true;
logger.error( "Cannot connect to host, reconnect once more with new session.", e );
}
finally
{
if ( exception )
{
cassandraClient.close();
cassandraClient.init();
this.init();
trackingRecord = type.cast( isAsync ? session.executeAsync( bind ) : session.execute( bind ) );
}
}
return trackingRecord;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
import org.commonjava.indy.conf.IndyConfiguration;
Expand Down Expand Up @@ -224,7 +225,7 @@ private Date calculateExpirationTime( Date scheduleTime, Long timeout)
public DtxSchedule querySchedule( String storeKey, String jobName )
{
BoundStatement bound = preparedSingleScheduleQuery.bind( storeKey, jobName );
ResultSet resultSet = session.execute( bound );
ResultSet resultSet = executeSession( bound );

Row row = resultSet.one();

Expand All @@ -240,7 +241,7 @@ public Collection<DtxExpiration> queryExpirations( Date date )
Collection<DtxExpiration> expirations = new ArrayList<>( );

BoundStatement bound = preparedExpiredQuery.bind( pid );
ResultSet resultSet = session.execute( bound );
ResultSet resultSet = executeSession( bound );
resultSet.forEach( row -> {
expirations.add( toDtxExpiration( row ) );
} );
Expand All @@ -260,7 +261,7 @@ public void queryAndSetExpiredSchedule( Date date )
.equals( expiration.getScheduleUID() ) )
{
BoundStatement boundU = preparedExpiredUpdate.bind( schedule.getStoreKey(), schedule.getJobName() );
session.execute( boundU );
executeSession( boundU );

logger.debug( "Expired entry: {}", schedule );
eventDispatcher.fire( new ScheduleTriggerEvent( schedule.getJobType(), schedule.getPayload() ) );
Expand All @@ -273,7 +274,7 @@ public Collection<DtxSchedule> querySchedulesByJobType( String jobType )
{
Collection<DtxSchedule> schedules = new ArrayList<>( );
BoundStatement bound = preparedScheduleByTypeQuery.bind( jobType );
ResultSet resultSet = session.execute( bound );
ResultSet resultSet = executeSession( bound );
resultSet.forEach( row -> {
schedules.add(toDtxSchedule(row));
} );
Expand All @@ -284,7 +285,7 @@ public Collection<DtxSchedule> querySchedulesByStoreKey( String storeKey )
{
Collection<DtxSchedule> schedules = new ArrayList<>( );
BoundStatement bound = preparedScheduleByStoreKeyQuery.bind( storeKey );
ResultSet resultSet = session.execute( bound );
ResultSet resultSet = executeSession( bound );
resultSet.forEach( row -> {
schedules.add(toDtxSchedule(row));
} );
Expand All @@ -295,7 +296,7 @@ public Collection<DtxSchedule> querySchedules( String storeKey, String jobType,
{
Collection<DtxSchedule> schedules = new ArrayList<>( );
BoundStatement bound = preparedScheduleByStoreKeyAndTypeQuery.bind( storeKey, jobType );
ResultSet resultSet = session.execute( bound );
ResultSet resultSet = executeSession( bound );
resultSet.forEach( row -> {
DtxSchedule schedule = toDtxSchedule( row );
if ( !expired && !schedule.getExpired() )
Expand Down Expand Up @@ -342,4 +343,35 @@ private DtxExpiration toDtxExpiration( Row row )
return null;
}

private ResultSet executeSession ( BoundStatement bind )
{
boolean exception = false;
ResultSet trackingRecord = null;
try
{
if ( session == null || session.isClosed() )
{
client.close();
client.init();
this.init();
}
trackingRecord = session.execute( bind );
}
catch ( NoHostAvailableException e )
{
exception = true;
logger.error( "Cannot connect to host, reconnect once more with new session.", e );
}
finally
{
if ( exception )
{
client.close();
client.init();
this.init();
trackingRecord = session.execute( bind );
}
}
return trackingRecord;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import org.commonjava.indy.conf.IndyConfiguration;
import org.commonjava.o11yphant.metrics.annotation.Measure;
import org.commonjava.indy.model.core.StoreKey;
Expand Down Expand Up @@ -177,7 +178,7 @@ public void addMissing( final ConcreteResource resource )

BoundStatement bound = preparedInsert.bind( key.toString(), resource.getPath(), curDate, timeoutDate,
timeoutInSeconds );
session.execute( bound );
executeSession( bound );
inMemoryCache.put( resource, DUMB_CACHE_VALUE, timeoutInSeconds, TimeUnit.SECONDS );
}

Expand All @@ -191,7 +192,7 @@ public boolean isMissing( final ConcreteResource resource )
}
StoreKey key = getResourceKey( resource );
BoundStatement bound = preparedExistQuery.bind( key.toString(), resource.getPath() );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
Row row = result.one();
if ( row == null )
{
Expand All @@ -217,7 +218,7 @@ public void clearMissing( final Location location )
{
StoreKey key = ( (KeyedLocation) location ).getKey();
BoundStatement bound = preparedDeleteByStore.bind( key.toString() );
session.execute( bound );
executeSession( bound );
clearInMemoryCache( location );
}

Expand Down Expand Up @@ -246,7 +247,7 @@ public void clearMissing( final ConcreteResource resource )
{
StoreKey key = getResourceKey( resource );
BoundStatement bound = preparedDelete.bind( key.toString(), resource.getPath() );
session.execute( bound );
executeSession( bound );
inMemoryCache.remove( resource );
}

Expand All @@ -272,7 +273,7 @@ public Set<String> getMissing( final Location location )
logger.debug( "[NFC] getMissing for {}", location );
StoreKey key = ( (KeyedLocation) location ).getKey();
BoundStatement bound = preparedQueryByStore.bind( key.toString() );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
int count = 0;
Set<String> matches = new HashSet<>();
for ( Row row : result )
Expand Down Expand Up @@ -315,7 +316,7 @@ public Set<String> getMissing( Location location, int pageIndex, int pageSize )
public long getSize( StoreKey storeKey )
{
BoundStatement bound = preparedCountByStore.bind( storeKey.toString() );
ResultSet result = session.execute( bound );
ResultSet result = executeSession( bound );
return result.one().get( 0, Long.class );
}

Expand All @@ -331,4 +332,36 @@ private StoreKey getResourceKey( ConcreteResource resource )
KeyedLocation location = (KeyedLocation) resource.getLocation();
return location.getKey();
}

private ResultSet executeSession ( BoundStatement bind )
{
boolean exception = false;
ResultSet trackingRecord = null;
try
{
if ( session == null || session.isClosed() )
{
cassandraClient.close();
cassandraClient.init();
this.start();
}
trackingRecord = session.execute( bind );
}
catch ( NoHostAvailableException e )
{
exception = true;
logger.error( "Cannot connect to host, reconnect once more with new session.", e );
}
finally
{
if ( exception )
{
cassandraClient.close();
cassandraClient.init();
this.start();
trackingRecord = session.execute( bind );
}
}
return trackingRecord;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public CassandraClient( CassandraConfig config )
}

@PostConstruct
private void init()
public void init()
{
if ( !config.isEnabled() )
{
Expand Down Expand Up @@ -116,18 +116,15 @@ public Session getSession( String keyspace )
} );
}

private volatile boolean closed;

public void close()
{
if ( !closed && cluster != null && sessions != null )
if ( cluster != null && sessions != null )
{
logger.info( "Close cassandra client" );
sessions.entrySet().forEach( e -> e.getValue().close() );
sessions.clear();
cluster.close();
cluster = null;
closed = true;
}
}

Expand Down
Loading