Skip to content

Commit

Permalink
Handle IOException and SocketException (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
lausannel authored Aug 28, 2024
1 parent 8d9cc80 commit 5211c9c
Showing 1 changed file with 65 additions and 25 deletions.
90 changes: 65 additions & 25 deletions src/Apache.IoTDB/SessionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,27 @@ public async Task<TResult> ExecuteClientOperationAsync<TResult>(AsyncOperation<T
{
if (retryOnFailure)
{
client = await Reconnect(client);
try
try{
client = await Reconnect(client);
return await operation(client);
} catch (TException retryEx)
{
var resp = await operation(client);
return resp;
throw new TException(errMsg, retryEx);
}
catch (TException retryEx)
}
else
{
throw new TException(errMsg, ex);
}
}
catch (Exception ex)
{
if (retryOnFailure)
{
try{
client = await Reconnect(client);
return await operation(client);
} catch (TException retryEx)
{
throw new TException(errMsg, retryEx);
}
Expand Down Expand Up @@ -179,7 +193,17 @@ public async Task Open(CancellationToken cancellationToken = default)
{
for (var index = 0; index < _poolSize; index++)
{
_clients.Add(await CreateAndOpen(_host, _port, _enableRpcCompression, _timeout, cancellationToken));
try
{
_clients.Add(await CreateAndOpen(_host, _port, _enableRpcCompression, _timeout, cancellationToken));
}
catch (Exception e)
{
if (_debugMode)
{
_logger.LogWarning(e, "Currently connecting to {0}:{1} failed", _host, _port);
}
}
}
}
else
Expand Down Expand Up @@ -217,47 +241,63 @@ public async Task Open(CancellationToken cancellationToken = default)

if (_clients.ClientQueue.Count != _poolSize)
{
throw new TException(string.Format("Error occurs when opening session pool. Client pool size is not equal to the expected size. Client pool size: {0}, expected size: {1}", _clients.ClientQueue.Count, _poolSize), null);
throw new TException(string.Format("Error occurs when opening session pool. Client pool size is not equal to the expected size. Client pool size: {0}, expected size: {1}, Please check the server status", _clients.ClientQueue.Count, _poolSize), null);
}
_isClose = false;
}


public async Task<Client> Reconnect(Client originalClient = null, CancellationToken cancellationToken = default)
{
if (_nodeUrls.Count == 0)
{
await Open(_enableRpcCompression);
return _clients.Take();
}

originalClient.Transport.Close();
originalClient?.Transport.Close();

int startIndex = _endPoints.FindIndex(x => x.Ip == originalClient.EndPoint.Ip && x.Port == originalClient.EndPoint.Port);
if (startIndex == -1)
{
throw new ArgumentException($"The original client is not in the list of endpoints. Original client: {originalClient.EndPoint.Ip}:{originalClient.EndPoint.Port}");
}

for (int attempt = 1; attempt <= RetryNum; attempt++)
if (_nodeUrls.Count == 0)
{
for (int i = 0; i < _endPoints.Count; i++)
for (int attempt = 1; attempt <= RetryNum; attempt++)
{
int j = (startIndex + i) % _endPoints.Count;
try
{
var client = await CreateAndOpen(_endPoints[j].Ip, _endPoints[j].Port, _enableRpcCompression, _timeout, cancellationToken);
var client = await CreateAndOpen(_host, _port, _enableRpcCompression, _timeout, cancellationToken);
return client;
}
catch (Exception e)
{
if (_debugMode)
{
_logger.LogWarning(e, "Attempt connecting to {0}:{1} failed", _endPoints[j].Ip, _endPoints[j].Port);
_logger.LogWarning(e, "Attempt reconnecting to {0}:{1} failed", _host, _port);
}
}
}
}
else
{
int startIndex = _endPoints.FindIndex(x => x.Ip == originalClient.EndPoint.Ip && x.Port == originalClient.EndPoint.Port);
if (startIndex == -1)
{
throw new ArgumentException($"The original client is not in the list of endpoints. Original client: {originalClient.EndPoint.Ip}:{originalClient.EndPoint.Port}");
}

for (int attempt = 1; attempt <= RetryNum; attempt++)
{
for (int i = 0; i < _endPoints.Count; i++)
{
int j = (startIndex + i) % _endPoints.Count;
try
{
var client = await CreateAndOpen(_endPoints[j].Ip, _endPoints[j].Port, _enableRpcCompression, _timeout, cancellationToken);
return client;
}
catch (Exception e)
{
if (_debugMode)
{
_logger.LogWarning(e, "Attempt connecting to {0}:{1} failed", _endPoints[j].Ip, _endPoints[j].Port);
}
}
}
}
}

throw new TException("Error occurs when reconnecting session pool. Could not connect to any server", null);
}

Expand Down

0 comments on commit 5211c9c

Please sign in to comment.