forked from planetdecred/dcrlibwallet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
txandblocknotifications.go
159 lines (132 loc) · 5.04 KB
/
txandblocknotifications.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package dcrlibwallet
import (
"encoding/json"
"decred.org/dcrwallet/v2/errors"
)
func (mw *MultiWallet) listenForTransactions(walletID int) {
go func() {
wallet := mw.wallets[walletID]
n := wallet.Internal().NtfnServer.TransactionNotifications()
for {
select {
case v := <-n.C:
if v == nil {
return
}
for _, transaction := range v.UnminedTransactions {
tempTransaction, err := wallet.decodeTransactionWithTxSummary(&transaction, nil)
if err != nil {
log.Errorf("[%d] Error ntfn parse tx: %v", wallet.ID, err)
return
}
overwritten, err := wallet.walletDataDB.SaveOrUpdate(&Transaction{}, tempTransaction)
if err != nil {
log.Errorf("[%d] New Tx save err: %v", wallet.ID, err)
return
}
if !overwritten {
log.Infof("[%d] New Transaction %s", wallet.ID, tempTransaction.Hash)
result, err := json.Marshal(tempTransaction)
if err != nil {
log.Error(err)
} else {
mw.mempoolTransactionNotification(string(result))
}
}
}
for _, block := range v.AttachedBlocks {
blockHash := block.Header.BlockHash()
for _, transaction := range block.Transactions {
tempTransaction, err := wallet.decodeTransactionWithTxSummary(&transaction, &blockHash)
if err != nil {
log.Errorf("[%d] Error ntfn parse tx: %v", wallet.ID, err)
return
}
_, err = wallet.walletDataDB.SaveOrUpdate(&Transaction{}, tempTransaction)
if err != nil {
log.Errorf("[%d] Incoming block replace tx error :%v", wallet.ID, err)
return
}
mw.publishTransactionConfirmed(wallet.ID, transaction.Hash.String(), int32(block.Header.Height))
}
mw.publishBlockAttached(wallet.ID, int32(block.Header.Height))
}
if len(v.AttachedBlocks) > 0 {
mw.checkWalletMixers()
}
case <-mw.syncData.syncCanceled:
n.Done()
}
}
}()
}
// AddTxAndBlockNotificationListener registers a set of functions to be invoked
// when a transaction or block update is processed by the wallet. If async is
// true, the provided callback methods will be called from separate goroutines,
// allowing notification senders to continue their operation without waiting
// for the listener to complete processing the notification. This asyncrhonous
// handling is especially important for cases where the wallet process that
// sends the notification temporarily prevents access to other wallet features
// until all notification handlers finish processing the notification. If a
// notification handler were to try to access such features, it would result
// in a deadlock.
func (mw *MultiWallet) AddTxAndBlockNotificationListener(txAndBlockNotificationListener TxAndBlockNotificationListener, async bool, uniqueIdentifier string) error {
mw.notificationListenersMu.Lock()
defer mw.notificationListenersMu.Unlock()
_, ok := mw.txAndBlockNotificationListeners[uniqueIdentifier]
if ok {
return errors.New(ErrListenerAlreadyExist)
}
if async {
mw.txAndBlockNotificationListeners[uniqueIdentifier] = &asyncTxAndBlockNotificationListener{
l: txAndBlockNotificationListener,
}
} else {
mw.txAndBlockNotificationListeners[uniqueIdentifier] = txAndBlockNotificationListener
}
return nil
}
func (mw *MultiWallet) RemoveTxAndBlockNotificationListener(uniqueIdentifier string) {
mw.notificationListenersMu.Lock()
defer mw.notificationListenersMu.Unlock()
delete(mw.txAndBlockNotificationListeners, uniqueIdentifier)
}
func (mw *MultiWallet) checkWalletMixers() {
for _, wallet := range mw.wallets {
if wallet.IsAccountMixerActive() {
unmixedAccount := wallet.ReadInt32ConfigValueForKey(AccountMixerUnmixedAccount, -1)
hasMixableOutput, err := wallet.accountHasMixableOutput(unmixedAccount)
if err != nil {
log.Errorf("Error checking for mixable outputs: %v", err)
}
if !hasMixableOutput {
log.Infof("[%d] unmixed account does not have a mixable output, stopping account mixer", wallet.ID)
err = mw.StopAccountMixer(wallet.ID)
if err != nil {
log.Errorf("Error stopping account mixer: %v", err)
}
}
}
}
}
func (mw *MultiWallet) mempoolTransactionNotification(transaction string) {
mw.notificationListenersMu.RLock()
defer mw.notificationListenersMu.RUnlock()
for _, txAndBlockNotifcationListener := range mw.txAndBlockNotificationListeners {
txAndBlockNotifcationListener.OnTransaction(transaction)
}
}
func (mw *MultiWallet) publishTransactionConfirmed(walletID int, transactionHash string, blockHeight int32) {
mw.notificationListenersMu.RLock()
defer mw.notificationListenersMu.RUnlock()
for _, txAndBlockNotifcationListener := range mw.txAndBlockNotificationListeners {
txAndBlockNotifcationListener.OnTransactionConfirmed(walletID, transactionHash, blockHeight)
}
}
func (mw *MultiWallet) publishBlockAttached(walletID int, blockHeight int32) {
mw.notificationListenersMu.RLock()
defer mw.notificationListenersMu.RUnlock()
for _, txAndBlockNotifcationListener := range mw.txAndBlockNotificationListeners {
txAndBlockNotifcationListener.OnBlockAttached(walletID, blockHeight)
}
}