Skip to content

Commit

Permalink
Added documentation on monitor and rates modules.
Browse files Browse the repository at this point in the history
  • Loading branch information
Claude Muller committed Feb 21, 2019
1 parent 1cbdcbd commit f946eb4
Show file tree
Hide file tree
Showing 20 changed files with 297 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

/**
* Programmatically sets the configuration for cross-origin resource sharing of the core application.
* The origin URLs for which to allow CORS must be specified in the application property
* corresponding to {@link CoreCorsConfiguration#corsUrls}.
*/
@Configuration
public class CoreCorsConfiguration implements WebMvcConfigurer {

Expand Down
72 changes: 59 additions & 13 deletions services/monitor/src/main/java/io/iconator/monitor/BaseMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@

import static com.github.rholder.retry.WaitStrategies.randomWait;

/**
* Contains the transaction processing logic that should be used by all blockchain-specific
* monitor implementations (e.g. {@link EthereumMonitor}.
*/
@Component
abstract public class BaseMonitor {

Expand Down Expand Up @@ -56,22 +60,36 @@ public BaseMonitor(MonitorService monitorService,
this.retryer = retryer;
}

/**
* Starts the monitoring and transaction processing.
* Also starts transmitting {@link io.iconator.commons.amqp.model.BlockNrMessage}s, containing
* the latest block numbers, to a message queue.
*/
protected abstract void start() throws Exception;

/**
* Adds the given address to the set of monitored addresses.
* The given address creation time can be used by implementing classes to determine at which
* block the monitoring of the blockchain should start. Obviously, blocks that are older than
* the date of registration of the first investor do not have to be scanned.
* @param address payment address which will be monitored
* @param addressCreationTimestamp Creation time of the address in miliseconds since the epoch.
* Some blockchain implementations can use this to determine
* from which block to start scanning the chain.
*/
abstract protected void addPaymentAddressesForMonitoring(String address, Long addressCreationTimestamp);

/**
* @param receivingAddress the address to which a payment has been made.
* @return TRUE if this address is monitored. False otherwise.
* @param receivingAddress the address for which to check if it is monitored by this monitor.
* @return ture if the given address is monitored. False, otherwise.
*/
abstract protected boolean isAddressMonitored(String receivingAddress);

/**
* Entry point to the processing flow of transactions that are in status pending, i.e. are not
* on a block yet but were seen in the network.
* This must be called by blockchain-specific implementations when a new pending transaction to
* a monitored address is seen in the network.
* @param tx The pending transaction.
*/
protected void processPendingTransactions(TransactionAdapter tx) {
try {
if (!isAddressMonitored(tx.getReceivingAddress())) return;
Expand All @@ -92,6 +110,15 @@ protected void processPendingTransactions(TransactionAdapter tx) {
}
}

/**
* Entry point to the processing flow of transactions that are in status building, i.e. are on
* a block already.
* This must be called by blockchain-specific implementations when a transaction to a monitored
* address has been added to a block. Transactions do not have to be covered by more blocks
* before calling this method. If, in the end, they should not be on the main chain, they will
* never be marked as confirmed transactions.
* @param tx The building transaction.
*/
protected void processBuildingTransaction(TransactionAdapter tx) {
PaymentLog paymentLog = null;
try {
Expand Down Expand Up @@ -141,16 +168,17 @@ private boolean isAmountInsufficient(BigDecimal usdAmount) {
}

/**
* Updates the given payment log (in status {@link PaymentLog.TransactionStatus#BUILDING}) according to the given
* transaction. If all neccessary information can be retrieved, including the exchange rate, the payment log is
* updated and the changes are immediatly commited. If some information cannot be retrieved a refund entry is
* created (also immediatly commited) and the payment log is not updated.
* Updates the given payment log (in status {@link PaymentLog.TransactionStatus#BUILDING})
* according to the given transaction. If all neccessary information can be retrieved, including
* the exchange rate, the payment log is updated and the changes are immediatly commited.
* If some information cannot be retrieved a refund entry is created (also immediatly commited)
* and the payment log is not updated.
* @param tx The transaction corresponding to the payment log.
* @param paymentLog The payment log to update.
* @return the update payment log or null if some transaction information could not be retrieved and a refund entry
* had to be created.
* @throws RefundEntryAlreadyExistsException if a refund entry already exists for this payment log. A refund entry
* is create if some transaction information cannot be retrieved.
* @return the update payment log or null if some transaction information could not be retrieved
* and a refund entry had to be created.
* @throws RefundEntryAlreadyExistsException if a refund entry already exists for this payment
* log. A refund entry is create if some transaction information cannot be retrieved.
*/
private PaymentLog updateBuildingPaymentLog(TransactionAdapter tx, PaymentLog paymentLog) throws RefundEntryAlreadyExistsException {
if (paymentLog == null) return null;
Expand Down Expand Up @@ -190,6 +218,17 @@ private BigDecimal getUSDExchangeRate(Instant blockTimestamp, CurrencyType curre

}

/**
* Entry point to the token allocation.
* A retryer is used because exceptions might arise due to optimistic locking when trying to
* make changes to the sale tiers.
* This is only package-private so that it can be called in unit tests directly.
* @param paymentLog The payment log of the transaction for which to allocate tokens.
* @return The given payment log updated with the allocated amount of tokens if the allocation
* was successful.
* @throws Throwable If an error occured when allocating the tokens. Though, before exiting this
* method a refund entry is created.
*/
PaymentLog allocateTokensWithRetries(PaymentLog paymentLog)
throws Throwable {

Expand All @@ -216,13 +255,20 @@ PaymentLog allocateTokensWithRetries(PaymentLog paymentLog)

return updatedPaymentLog;
} catch (Throwable e) {
LOG.error("Failed to distribute payment to tiers for {} transaction {}.", paymentLog.getCurrency().name(), paymentLog.getTransactionId(), e.getCause());
LOG.error("Failed to distribute payment to tiers for {} transaction {}.",
paymentLog.getCurrency().name(), paymentLog.getTransactionId(), e.getCause());
RefundReason reason = RefundReason.TOKEN_ALLOCATION_FAILED;
monitorService.createRefundEntryForPaymentLogAndCommit(paymentLog, reason);
throw e;
}
}

/**
* Gets the payment log corresponding to the given transaction and set its status to confirmed.
* This method should be called by blockchain-specific implementations when a transaction to a
* monitored address can be confired, i.e. is burried under enough blocks.
* @param tx The confirmed transaction.
*/
protected void confirmTransaction(TransactionAdapter tx) {
try {
LOG.info("Setting status of transaction {} to confirmed.", tx.getTransactionId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@

import static org.bitcoinj.core.TransactionConfidence.ConfidenceType.DEAD;

/**
* Bitcoin-specific implementation of the {@link BaseMonitor}.
* Uses a {@link Wallet} for storing the set of minitored payment addresses.
*/
public class BitcoinMonitor extends BaseMonitor {

private final static Logger LOG = LoggerFactory.getLogger(BitcoinMonitor.class);
Expand Down Expand Up @@ -206,6 +210,9 @@ protected boolean isAddressMonitored(String receivingAddress) {
a -> a.toBase58().contentEquals(receivingAddress));
}

/**
* Regularly logs information about connected peers and chain height.
*/
@Scheduled(fixedRate = 60000)
public void reportBitcoinPeersConnected() {
int amountConnectedPeers = bitcoinPeerGroup.numConnectedPeers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@

import static java.time.temporal.ChronoUnit.MINUTES;

/**
* Ethereum-specific implementation of the {@link BaseMonitor}.
* Uses a simple {@link Set} for storing the minitored payment addresses.
* Uses another {@link Set} for tracking fully processed transactions that have not yet been
* confirmed (i.e. are not burried under enough blocks yet).
*/
public class EthereumMonitor extends BaseMonitor {

private final static Logger LOG = LoggerFactory.getLogger(EthereumMonitor.class);
Expand Down Expand Up @@ -54,6 +60,7 @@ public EthereumMonitor(FxService fxService,
this.messageService = messageService;
}

@Override
public synchronized void addPaymentAddressesForMonitoring(String addressString, Long addressCreationTimestamp) {
if (!addressString.startsWith("0x"))
addressString = "0x" + addressString;
Expand Down Expand Up @@ -89,6 +96,12 @@ public void start() throws Exception {
monitorProcessedTransactions();
}

/**
* Subscribes a listener that receives new arriving blocks and sends a
* {@link BlockNREthereumMessage} containing the new block number to a message queue.
* @param highestBlockNumber the block number below which no block numbers will be send. This
* should be the latest block number when callling this method.
*/
private void monitorBlockNumbers(BigInteger highestBlockNumber) {
Long startBlock = configHolder.getEthereumNodeStartBlock();
web3j.catchUpToLatestAndSubscribeToNewBlocksObservable(
Expand All @@ -103,6 +116,9 @@ private void monitorBlockNumbers(BigInteger highestBlockNumber) {
});
}

/**
* Subscribes a listener that reacts to new transactions that are not yet on a block.
*/
private void monitorPendingTransactions() {
web3j.pendingTransactionObservable().subscribe(web3jTx -> {
try {
Expand All @@ -113,9 +129,13 @@ private void monitorPendingTransactions() {
LOG.error("Error while processing transaction.", t);
}
}, t -> LOG.error("Error during scanning of pending transactions.", t));

}

/**
* Subscribes a listener that receives all transactions starting at the block given in
* the property {@link MonitorAppConfigHolder#ethereumNodeStartBlock}. Of course only
* transactions directed to a monitored address are processed.
*/
private void monitorBuildingTransactions() {
Long startBlock = configHolder.getEthereumNodeStartBlock();
web3j.catchUpToLatestAndSubscribeToNewTransactionsObservable(new DefaultBlockParameterNumber(startBlock))
Expand All @@ -137,6 +157,13 @@ private void monitorBuildingTransactions() {
}, t -> LOG.error("Error during scanning of transactions.", t));
}

/**
* Subscribes a listener that receives new blocks when they are added to the chain. With each
* block all processed but unconfirmed tarnsactions are checked for their block depth
* ({@link MonitorAppConfigHolder#ethereumNodeStartBlock}) and are
* confirmed if they are burried under enough blocks. When they are confirmed, they are also
* removed from the unconfirmed list.
*/
private void monitorProcessedTransactions() {
web3j.blockObservable(false).subscribe(block -> {
BigInteger currentBlockNr = block.getBlock().getNumber();
Expand All @@ -162,6 +189,9 @@ protected boolean isAddressMonitored(String receivingAddress) {
return monitoredAddresses.contains(receivingAddress);
}

/**
* Regularly logs information about the connected Ethereum full node.
*/
@Scheduled(fixedRate = 60000)
public void reportEthereumFullNode() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,18 @@ public class MonitoringInit {

private final static Logger LOG = LoggerFactory.getLogger(MonitoringInit.class);

/**
* The maximum time in miliseconds to wait for a successful connection to the Ethereum full
* node.
*/
private final static int WAIT_FOR_ETH_MILLIS = 60000;
/**
* Time in miliseconds to wait between connection retries to the Ethereum full node.
*/
private final static int WAIT_ETH_RETRY_MILLIS = 2000;
/**
* Number of connection retries to Ethereum full node before givin up.
*/
private final static int NR_RETRIES = WAIT_FOR_ETH_MILLIS / WAIT_ETH_RETRY_MILLIS;

@Autowired
Expand All @@ -35,6 +45,16 @@ public class MonitoringInit {
@Autowired
private InvestorRepository investorRepository;

/**
* Starts the monitors (adds monitored addresses, starts monitoring of blockchains and
* processing of transactionson) on initialization of the Spring Application Context.
*
* Monitors a started according to the setting of the application properties
* {@link MonitorAppConfigHolder#bitcoinNodeEnabled} and
* {@link MonitorAppConfigHolder#ethereumNodeEnabled}.
*
* If the Ethereum full node cannot be reached immediately connection retries are attempted.
*/
@EventListener({ContextRefreshedEvent.class})
void contextRefreshedEvent() {
new Thread(() -> {
Expand All @@ -58,6 +78,13 @@ void contextRefreshedEvent() {
}).start();
}

/**
* Adds all, so far, distributed payment addresses to the sets of monitored addresses of the
* {@link BitcoinMonitor} and {@link EthereumMonitor} and starts the monitoring and processing
* of transactions. Each monitor is only started if the application properties
* {@link MonitorAppConfigHolder#bitcoinNodeEnabled} and
* {@link MonitorAppConfigHolder#ethereumNodeEnabled} are set accordingly.
*/
private void initMonitors() throws Exception {

addExistinPaymentAdresses();
Expand All @@ -74,6 +101,11 @@ private void initMonitors() throws Exception {

}

/**
* Retrieves all investors from the database and adds their payment addresses to the
* {@link BitcoinMonitor} and {@link EthereumMonitor} instances respectively.
* The timestamps provided with the addresses are the creation dates of the investors.
*/
private void addExistinPaymentAdresses() {

List<Investor> listInvestors = investorRepository.findAllByOrderByCreationDateAsc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@

import static com.github.rholder.retry.WaitStrategies.randomWait;

/**
* Monitor-related bean declarations.
*/
@Configuration
@Import(value = {MonitorAppConfigHolder.class, BitcoinConfig.class})
public class MonitorBean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
import static io.iconator.commons.amqp.model.constants.RoutingKeyConstants.ADDRESS_SET_WALLET_ROUTING_KEY;
import static java.util.Optional.ofNullable;

/**
* RabbitMQ consumer that consumes messages containing information about newly registered investors.
* The payment addresses of a new investor are added to the {@link BitcoinMonitor} and
* {@link EthereumMonitor} for monitoring.
*/
@Component
public class SetWalletAddressMessageConsumer {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,21 @@
import java.util.Arrays;
import java.util.Optional;

/**
* Service for retrieving exchange rates.
*/
@Service
public class FxService {

@Autowired
private ICOnatorMessageService messageService;

/**
* Fetches the exchange rate in USD for the given {@link CurrencyType}. The request for the rate
* is sent to a message queue from which the rates application is consuming.
* @param blockTimestamp the block's timestamp for which the exchange rate shall be fetched.
* @param currencyType the crypto currency for which the exchange rate shall be fetched.
* @return an optional object containing the exchange rate in USD per crypto currency unit.
* @param currencyType the currency for which the exchange rate shall be fetched.
* @return an optional object containing the exchange rate in USD per currency unit.
* @throws FxException if the exchange rate could could not be fetched.
*/
public Optional<BigDecimal> getUSDExchangeRate(Instant blockTimestamp, CurrencyType currencyType)
Expand Down
Loading

0 comments on commit f946eb4

Please sign in to comment.