Skip to content
This repository has been archived by the owner on Nov 25, 2020. It is now read-only.

Support for submitting bundle transactions #220

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ reset-test-db:
ci:
export MIX_ENV=test PORT=4002 && $(MIX) local.hex --force && $(MIX) local.rebar --force && $(MIX) deps.get && $(MIX) test --trace

testrelease:
export NO_ESCRIPT=1 MIX_ENV=test && $(MIX) distillery.release --env=test

# Build prod docker image
docker-prod:
docker build \
Expand Down
5 changes: 3 additions & 2 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ config :blockchain_api, BlockchainAPI.Repo,
hostname: "localhost",
pool: Ecto.Adapters.SQL.Sandbox,
pool_size: 10,
timeout: 60000,
timeout: 120000,
ownership_timeout: 120000,
log: false

config :blockchain_api,
Expand All @@ -34,4 +35,4 @@ config :blockchain_api,
config :blockchain,
seed_nodes: [],
seed_node_dns: '',
base_dir: String.to_charlist("/tmp/blockchain-api/test/")
base_dir: String.to_charlist("/var/data/blockchain-api/test/")
29 changes: 26 additions & 3 deletions lib/blockchain_api/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,30 @@ defmodule BlockchainAPI.Application do
alias BlockchainAPI.Watcher
alias BlockchainAPI.{PeriodicCleaner, PeriodicUpdater}
alias BlockchainAPI.{Notifier, RewardsNotifier}
alias BlockchainAPI.Job.{SubmitPayment, SubmitGateway, SubmitLocation, SubmitCoinbase, SubmitOUI, SubmitSecExchange}
alias BlockchainAPI.Schema.{PendingPayment, PendingGateway, PendingLocation, PendingCoinbase, PendingOUI, PendingSecExchange}
alias BlockchainAPI.Job.{
SubmitPayment,
SubmitGateway,
SubmitLocation,
SubmitCoinbase,
SubmitOUI,
SubmitSecExchange,
SubmitBundle}
alias BlockchainAPI.Schema.{
PendingPayment,
PendingGateway,
PendingLocation,
PendingCoinbase,
PendingOUI,
PendingSecExchange,
PendingBundle}

import PendingPayment, only: [submit_payment_queue: 0]
import PendingGateway, only: [submit_gateway_queue: 0]
import PendingLocation, only: [submit_location_queue: 0]
import PendingCoinbase, only: [submit_coinbase_queue: 0]
import PendingOUI, only: [submit_oui_queue: 0]
import PendingSecExchange, only: [submit_sec_exchange_queue: 0]
import PendingBundle, only: [submit_bundle_queue: 0]

def start(_type, _args) do
# Blockchain Supervisor Options
Expand Down Expand Up @@ -126,6 +141,14 @@ defmodule BlockchainAPI.Application do

:ok = Honeydew.start_workers(submit_sec_exchange_queue(), SubmitSecExchange)

:ok =
Honeydew.start_queue(submit_bundle_queue(),
queue: {EctoPollQueue, queue_args(env, PendingBundle)},
failure_mode: Honeydew.FailureMode.Abandon
)

:ok = Honeydew.start_workers(submit_bundle_queue(), SubmitBundle)

{:ok, sup}
end

Expand Down Expand Up @@ -154,7 +177,7 @@ defmodule BlockchainAPI.Application do
end

defp queue_args(_, schema) do
# Check for test and dev env pending txns every 30 minutes
# Check for test and dev env pending txns every 30 seconds
# No need for prod level checking here
poll_interval = Application.get_env(:ecto_poll_queue, :interval, 30)
[schema: schema, repo: Repo, poll_interval: poll_interval]
Expand Down
143 changes: 75 additions & 68 deletions lib/blockchain_api/committer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -188,65 +188,70 @@ defmodule BlockchainAPI.Committer do
:ok

txns ->
Enum.map(txns, fn txn ->
case :blockchain_txn.type(txn) do
:blockchain_txn_coinbase_v1 ->
insert_transaction(:blockchain_txn_coinbase_v1, txn, height)
Enum.map(txns,
fn txn ->
add_transaction(block, ledger, txn, height)
end)
end
end

:blockchain_txn_payment_v1 ->
insert_transaction(:blockchain_txn_payment_v1, txn, height)
defp add_transaction(block, ledger, txn, height) do
case :blockchain_txn.type(txn) do
:blockchain_txn_coinbase_v1 ->
insert_transaction(:blockchain_txn_coinbase_v1, txn, height)

:blockchain_txn_add_gateway_v1 ->
insert_transaction(:blockchain_txn_add_gateway_v1, txn, height)
upsert_hotspot(:blockchain_txn_add_gateway_v1, txn, ledger)
:blockchain_txn_payment_v1 ->
insert_transaction(:blockchain_txn_payment_v1, txn, height)

:blockchain_txn_gen_gateway_v1 ->
insert_transaction(:blockchain_txn_gen_gateway_v1, txn, height)
upsert_hotspot(:blockchain_txn_gen_gateway_v1, txn, ledger)
:blockchain_txn_add_gateway_v1 ->
insert_transaction(:blockchain_txn_add_gateway_v1, txn, height)
upsert_hotspot(:blockchain_txn_add_gateway_v1, txn, ledger)

:blockchain_txn_poc_request_v1 ->
insert_transaction(:blockchain_txn_poc_request_v1, txn, block, ledger, height)
:blockchain_txn_gen_gateway_v1 ->
insert_transaction(:blockchain_txn_gen_gateway_v1, txn, height)
upsert_hotspot(:blockchain_txn_gen_gateway_v1, txn, ledger)

:blockchain_txn_poc_receipts_v1 ->
insert_transaction(:blockchain_txn_poc_receipts_v1, txn, block, ledger, height)
:blockchain_txn_poc_request_v1 ->
insert_transaction(:blockchain_txn_poc_request_v1, txn, block, ledger, height)

:blockchain_txn_assert_location_v1 ->
insert_transaction(:blockchain_txn_assert_location_v1, txn, height)
# also upsert hotspot
upsert_hotspot(:blockchain_txn_assert_location_v1, txn, ledger)
:blockchain_txn_poc_receipts_v1 ->
insert_transaction(:blockchain_txn_poc_receipts_v1, txn, block, ledger, height)

:blockchain_txn_security_coinbase_v1 ->
insert_transaction(:blockchain_txn_security_coinbase_v1, txn, height)
:blockchain_txn_assert_location_v1 ->
insert_transaction(:blockchain_txn_assert_location_v1, txn, height)
# also upsert hotspot
upsert_hotspot(:blockchain_txn_assert_location_v1, txn, ledger)

:blockchain_txn_security_exchange_v1 ->
insert_transaction(:blockchain_txn_security_exchange_v1, txn, height)
:blockchain_txn_security_coinbase_v1 ->
insert_transaction(:blockchain_txn_security_coinbase_v1, txn, height)

:blockchain_txn_dc_coinbase_v1 ->
insert_transaction(:blockchain_txn_dc_coinbase_v1, txn, height)
:blockchain_txn_security_exchange_v1 ->
insert_transaction(:blockchain_txn_security_exchange_v1, txn, height)

:blockchain_txn_consensus_group_v1 ->
insert_transaction(
:blockchain_txn_consensus_group_v1,
txn,
height,
:blockchain_block.time(block)
)
:blockchain_txn_dc_coinbase_v1 ->
insert_transaction(:blockchain_txn_dc_coinbase_v1, txn, height)

:blockchain_txn_rewards_v1 ->
insert_transaction(
:blockchain_txn_rewards_v1,
txn,
height,
:blockchain_block.time(block)
)
:blockchain_txn_consensus_group_v1 ->
insert_transaction(
:blockchain_txn_consensus_group_v1,
txn,
height,
:blockchain_block.time(block)
)

:blockchain_txn_rewards_v1 ->
insert_transaction(
:blockchain_txn_rewards_v1,
txn,
height,
:blockchain_block.time(block)
)

:blockchain_txn_oui_v1 ->
insert_transaction(:blockchain_txn_oui_v1, txn, height)
:blockchain_txn_oui_v1 ->
insert_transaction(:blockchain_txn_oui_v1, txn, height)

_ ->
:ok
end
end)
_ ->
:ok
end
end

Expand All @@ -259,36 +264,38 @@ defmodule BlockchainAPI.Committer do
:ok

txns ->
Enum.map(txns, fn txn ->
case :blockchain_txn.type(txn) do
:blockchain_txn_coinbase_v1 ->
insert_account_transaction(:blockchain_txn_coinbase_v1, txn)
Enum.map(txns, fn txn -> add_account_transaction(txn) end)
end
end

:blockchain_txn_payment_v1 ->
insert_account_transaction(:blockchain_txn_payment_v1, txn)
defp add_account_transaction(txn) do
case :blockchain_txn.type(txn) do
:blockchain_txn_coinbase_v1 ->
insert_account_transaction(:blockchain_txn_coinbase_v1, txn)

:blockchain_txn_add_gateway_v1 ->
insert_account_transaction(:blockchain_txn_add_gateway_v1, txn)
:blockchain_txn_payment_v1 ->
insert_account_transaction(:blockchain_txn_payment_v1, txn)

:blockchain_txn_assert_location_v1 ->
insert_account_transaction(:blockchain_txn_assert_location_v1, txn)
:blockchain_txn_add_gateway_v1 ->
insert_account_transaction(:blockchain_txn_add_gateway_v1, txn)

:blockchain_txn_gen_gateway_v1 ->
insert_account_transaction(:blockchain_txn_gen_gateway_v1, txn)
:blockchain_txn_assert_location_v1 ->
insert_account_transaction(:blockchain_txn_assert_location_v1, txn)

:blockchain_txn_security_coinbase_v1 ->
insert_account_transaction(:blockchain_txn_security_coinbase_v1, txn)
:blockchain_txn_gen_gateway_v1 ->
insert_account_transaction(:blockchain_txn_gen_gateway_v1, txn)

:blockchain_txn_dc_coinbase_v1 ->
insert_account_transaction(:blockchain_txn_dc_coinbase_v1, txn)
:blockchain_txn_security_coinbase_v1 ->
insert_account_transaction(:blockchain_txn_security_coinbase_v1, txn)

:blockchain_txn_rewards_v1 ->
insert_account_transaction(:blockchain_txn_rewards_v1, txn)
:blockchain_txn_dc_coinbase_v1 ->
insert_account_transaction(:blockchain_txn_dc_coinbase_v1, txn)

_ ->
:ok
end
end)
:blockchain_txn_rewards_v1 ->
insert_account_transaction(:blockchain_txn_rewards_v1, txn)

_ ->
:ok
end
end

Expand Down
95 changes: 95 additions & 0 deletions lib/blockchain_api/job/submit_bundle.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
defmodule BlockchainAPI.Job.SubmitBundle do
alias BlockchainAPI.Query.{
PendingBundle,
PendingPayment,
PendingGateway,
PendingLocation,
PendingOUI,
PendingSecExchange}
alias BlockchainAPI.Util
require Logger

def run(id) do
Logger.debug("running pending_bundle job: #{inspect(id)}")

pending_bundle = PendingBundle.get_by_id!(id)

IO.inspect(pending_bundle, label: :pending_bundle)
IO.inspect(pending_bundle.txn, label: :pending_bundle_txn)

txn = pending_bundle.txn |> :blockchain_txn.deserialize()

IO.inspect(txn, label: :before_submit)

txn
|> :blockchain_worker.submit_txn(fn res ->
case res do
:ok ->
Logger.info("Txn: #{Util.bin_to_string(:blockchain_txn.hash(txn))} accepted!")

# Do this update
update_attrs = %{status: "cleared"}

# Mark the bundle itself
pending_bundle |> PendingBundle.update!(update_attrs)

# Mark all the bundled txns
update_bundled_txns(pending_bundle, update_attrs)

{:error, reason} ->
Logger.error(
"Txn: #{Util.bin_to_string(:blockchain_txn.hash(txn))} failed!, reason: #{
inspect(reason)
}"
)

# Do this update
update_attrs = %{status: "error"}

# Mark the bundle itself
pending_bundle |> PendingBundle.update!(update_attrs)

# Mark all the bundled txns
update_bundled_txns(pending_bundle, update_attrs)
end
end)
end

defp update_bundled_txns(pending_bundle, update_attrs) do
hashes = pending_bundle.txn_hashes
IO.inspect(hashes, label: :hashes)
types = pending_bundle.txn_types
IO.inspect(types, label: :types)

Enum.zip(types, hashes)
|> IO.inspect()
|> Enum.map(fn {type, hash} ->
case type do
"blockchain_txn_payment_v1" ->
PendingPayment.get_all_by_hash(hash)
|> Enum.map(fn(pp) -> PendingPayment.update!(pp, update_attrs) end)

"blockchain_txn_add_gateway_v1" ->
PendingGateway.get_all_by_hash(hash)
|> Enum.map(fn(pg) -> PendingGateway.update!(pg, update_attrs) end)

"blockchain_txn_assert_location_v1" ->
PendingLocation.get_all_by_hash(hash)
|> Enum.map(fn(pl) -> PendingLocation.update!(pl, update_attrs) end)

"blockchain_txn_oui_v1" ->
PendingOUI.get_all_by_hash(hash)
|> Enum.map(fn(poui) -> PendingOUI.update!(poui, update_attrs) end)

"blockchain_txn_security_exchange_v1" ->
PendingSecExchange.get_all_by_hash(hash)
|> Enum.map(fn(psec) -> PendingSecExchange.update!(psec, update_attrs) end)

_ ->
:ok

end
end)
end

end
30 changes: 30 additions & 0 deletions lib/blockchain_api/query/pending_bundle.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
defmodule BlockchainAPI.Query.PendingBundle do
@moduledoc false
import Ecto.Query, warn: false

alias BlockchainAPI.{Repo, Schema.PendingBundle}

def create(attrs \\ %{}) do
%PendingBundle{}
|> PendingBundle.changeset(attrs)
|> Repo.insert()
end

def get!(hash) do
PendingBundle
|> where([pbundle], pbundle.hash == ^hash)
|> Repo.one!()
end

def get_by_id!(id) do
PendingBundle
|> where([pbundle], pbundle.id == ^id)
|> Repo.one!()
end

def update!(pbundle, attrs \\ %{}) do
pbundle
|> PendingBundle.changeset(attrs)
|> Repo.update!()
end
end
Loading