Skip to content

Commit

Permalink
Updated to use latest nuklaivm whereby we are now retrieving transact…
Browse files Browse the repository at this point in the history
…ions with both sender and receiver
  • Loading branch information
kpachhai committed Dec 13, 2024
1 parent 716ea5e commit 9e53f5f
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 85 deletions.
14 changes: 13 additions & 1 deletion api/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,18 @@ func GetActionsByUser(db *sql.DB) gin.HandlerFunc {
SELECT COUNT(*)
FROM actions
INNER JOIN transactions ON actions.tx_hash = transactions.tx_hash
WHERE transactions.sponsor ILIKE $1
WHERE
transactions.sponsor ILIKE $1
OR EXISTS (
SELECT 1
FROM unnest(transactions.actors) AS actor
WHERE actor ILIKE $1
)
OR EXISTS (
SELECT 1
FROM unnest(transactions.receivers) AS receiver
WHERE receiver ILIKE $1
)
`, "%"+user+"%").Scan(&totalCount)
if err != nil {
log.Printf("Error fetching actions: %v", err)
Expand All @@ -165,6 +176,7 @@ func GetActionsByUser(db *sql.DB) gin.HandlerFunc {
// Fetch paginated actions for the user
actions, err := models.FetchActionsByUser(db, user, limit, offset)
if err != nil {
log.Printf("Error fetching actions: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "Unable to retrieve actions for user"})
return
}
Expand Down
22 changes: 17 additions & 5 deletions api/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,31 @@ func GetTransactionsByBlock(db *sql.DB) gin.HandlerFunc {
}
}

// GetTransactionsByUser retrieves transactions for a specific user (sponsor)
// GetTransactionsByUser retrieves transactions for a specific user (sponsor or actor or receiver)
func GetTransactionsByUser(db *sql.DB) gin.HandlerFunc {
return func(c *gin.Context) {
user := c.Param("user")
limit := c.DefaultQuery("limit", "10")
offset := c.DefaultQuery("offset", "0")

// Normalize the user identifier by removing "0x" prefix if present
user = strings.TrimPrefix(user, "0x")

// Get total count of user's transactions
var totalCount int
err := db.QueryRow(`SELECT COUNT(*) FROM transactions WHERE sponsor ILIKE $1`, "%"+user+"%").Scan(&totalCount)
err := db.QueryRow(`
SELECT COUNT(*)
FROM transactions
WHERE
sponsor ILIKE $1
OR EXISTS (
SELECT 1
FROM unnest(actors) AS actor
WHERE actor ILIKE $1
)
OR EXISTS (
SELECT 1
FROM unnest(receivers) AS receiver
WHERE receiver ILIKE $1
)
`, "%"+user+"%").Scan(&totalCount)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Unable to count transactions for user"})
return
Expand Down
7 changes: 7 additions & 0 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ func InitDB(connStr string) (*sql.DB, error) {
// CreateSchema creates the database schema if it doesn't already exist
func CreateSchema(db *sql.DB) error {
schema := `
-- Ensure the pg_trgm extension is enabled for GIN indexes
CREATE EXTENSION IF NOT EXISTS pg_trgm;
CREATE TABLE IF NOT EXISTS blocks (
block_height BIGINT PRIMARY KEY,
block_hash TEXT NOT NULL,
Expand All @@ -66,6 +69,8 @@ func CreateSchema(db *sql.DB) error {
tx_hash TEXT UNIQUE NOT NULL,
block_hash TEXT,
sponsor TEXT,
actors TEXT[],
receivers TEXT[],
max_fee NUMERIC,
success BOOLEAN,
fee NUMERIC,
Expand Down Expand Up @@ -116,6 +121,8 @@ func CreateSchema(db *sql.DB) error {
CREATE INDEX IF NOT EXISTS idx_transactions_block_hash ON transactions(block_hash);
CREATE INDEX IF NOT EXISTS idx_sponsor ON transactions(sponsor);
CREATE INDEX IF NOT EXISTS idx_transactions_timestamp ON transactions (timestamp);
CREATE INDEX IF NOT EXISTS idx_actors ON transactions USING GIN (actors);
CREATE INDEX IF NOT EXISTS idx_receivers ON transactions USING GIN (receivers);
CREATE INDEX IF NOT EXISTS idx_action_type ON actions(action_type);
CREATE INDEX IF NOT EXISTS idx_action_name_lower ON actions (LOWER(action_name));
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/gin-contrib/cors v1.7.2
github.com/gin-gonic/gin v1.9.1
github.com/lib/pq v1.10.9
github.com/nuklai/nuklaivm v0.1.3-0.20241210141437-9fd87b8d476e
github.com/nuklai/nuklaivm v0.1.3-0.20241213173252-dc06e8f28de2
google.golang.org/grpc v1.62.0
google.golang.org/protobuf v1.35.1
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ github.com/near/borsh-go v0.3.1 h1:ukNbhJlPKxfua0/nIuMZhggSU8zvtRP/VyC25LLqPUA=
github.com/near/borsh-go v0.3.1/go.mod h1:NeMochZp7jN/pYFuxLkrZtmLqbADmnp/y1+/dL+AsyQ=
github.com/neilotoole/errgroup v0.1.6 h1:PODGqPXdT5BC/zCYIMoTrwV+ujKcW+gBXM6Ye9Ve3R8=
github.com/neilotoole/errgroup v0.1.6/go.mod h1:Q2nLGf+594h0CLBs/Mbg6qOr7GtqDK7C2S41udRnToE=
github.com/nuklai/nuklaivm v0.1.3-0.20241210141437-9fd87b8d476e h1:mqm0AU4KJM0bx/vu4pKHgjlNO12JO3qC0ZDBxjijoiE=
github.com/nuklai/nuklaivm v0.1.3-0.20241210141437-9fd87b8d476e/go.mod h1:IN0kXhI+yce6CD9+rSI27cOJ1QUZYJK1TsBB/AvVIhQ=
github.com/nuklai/nuklaivm v0.1.3-0.20241213173252-dc06e8f28de2 h1:0WLM6da3HOPJPORMdh/EbfpRczr0XK2CwlYVRe58F9w=
github.com/nuklai/nuklaivm v0.1.3-0.20241213173252-dc06e8f28de2/go.mod h1:IN0kXhI+yce6CD9+rSI27cOJ1QUZYJK1TsBB/AvVIhQ=
github.com/oasisprotocol/curve25519-voi v0.0.0-20230110094441-db37f07504ce h1:/pEpMk55wH0X+E5zedGEMOdLuWmV8P4+4W3+LZaM6kg=
github.com/oasisprotocol/curve25519-voi v0.0.0-20230110094441-db37f07504ce/go.mod h1:hVoHR2EVESiICEMbg137etN/Lx+lSrHPTD39Z/uE+2s=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down
106 changes: 45 additions & 61 deletions grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/ava-labs/hypersdk/chain"
"github.com/ava-labs/hypersdk/codec"
pb "github.com/ava-labs/hypersdk/proto/pb/externalsubscriber"
"github.com/lib/pq"
"github.com/nuklai/nuklaivm-external-subscriber/consts"
subscriberDB "github.com/nuklai/nuklaivm-external-subscriber/db"
"github.com/nuklai/nuklaivm/vm"
Expand Down Expand Up @@ -209,6 +210,8 @@ func (s *Server) AcceptBlock(ctx context.Context, req *pb.BlockRequest) (*emptyp
success := false

actions := []map[string]interface{}{}
actors := make(map[string]struct{})
receivers := make(map[string]struct{})

if i < len(executedBlock.Results) {
result := executedBlock.Results[i]
Expand All @@ -226,6 +229,16 @@ func (s *Server) AcceptBlock(ctx context.Context, req *pb.BlockRequest) (*emptyp
var outputMap map[string]interface{}
json.Unmarshal(outputJSON, &outputMap)
outputs = append(outputs, outputMap)

// Add actor and receiver to uniqueParticipants and individual maps
if actor, ok := outputMap["actor"].(string); ok && actor != "" {
uniqueParticipants[actor] = struct{}{}
actors[actor] = struct{}{}
}
if receiver, ok := outputMap["receiver"].(string); ok && receiver != "" {
uniqueParticipants[receiver] = struct{}{}
receivers[receiver] = struct{}{}
}
}
}
}
Expand Down Expand Up @@ -274,60 +287,16 @@ func (s *Server) AcceptBlock(ctx context.Context, req *pb.BlockRequest) (*emptyp

// Save the action in the actions table
_, err := s.db.Exec(`
INSERT INTO actions (tx_hash, action_type, action_name, action_index, input, output, timestamp)
VALUES ($1, $2, $3, $4, $5::json, $6::json, $7)
ON CONFLICT (tx_hash, action_type, action_index) DO UPDATE
SET input = EXCLUDED.input,
output = EXCLUDED.output,
timestamp = EXCLUDED.timestamp`,
INSERT INTO actions (tx_hash, action_type, action_name, action_index, input, output, timestamp)
VALUES ($1, $2, $3, $4, $5::json, $6::json, $7)
ON CONFLICT (tx_hash, action_type, action_index) DO UPDATE
SET input = EXCLUDED.input,
output = EXCLUDED.output,
timestamp = EXCLUDED.timestamp`,
txID, actionType, actionName, j, actionInputJSON, actionOutputsJSON, timestamp)
if err != nil {
log.Printf("Error saving action to database: %v\n", err)
}

if actionType == 4 { // create_asset action
// Parse actionInputJSON into map[string]interface{}
var actionInput map[string]interface{}
err := json.Unmarshal([]byte(actionInputJSON), &actionInput)
if err != nil {
log.Printf("Error unmarshaling action input: %v\n", err)
continue
}
actionOutput := outputs[j]
assetID := actionOutput["asset_address"].(string)
assetTypeID := actionInput["asset_type"].(float64)
assetType := map[float64]string{0: "fungible", 1: "non-fungible", 2: "fractional"}[assetTypeID]

// Insert asset into the assets table
_, err = s.db.Exec(`
INSERT INTO assets (
asset_address, asset_type_id, asset_type, asset_creator, tx_hash, name, symbol, decimals, metadata, max_supply, mint_admin, pause_unpause_admin, freeze_unfreeze_admin, enable_disable_kyc_account_admin, timestamp
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
ON CONFLICT (asset_address) DO UPDATE
SET asset_type_id = EXCLUDED.asset_type_id,
asset_type = EXCLUDED.asset_type,
asset_creator = EXCLUDED.asset_creator,
tx_hash = EXCLUDED.tx_hash,
name = EXCLUDED.name,
symbol = EXCLUDED.symbol,
decimals = EXCLUDED.decimals,
metadata = EXCLUDED.metadata,
max_supply = EXCLUDED.max_supply,
mint_admin = EXCLUDED.mint_admin,
pause_unpause_admin = EXCLUDED.pause_unpause_admin,
freeze_unfreeze_admin = EXCLUDED.freeze_unfreeze_admin,
enable_disable_kyc_account_admin = EXCLUDED.enable_disable_kyc_account_admin,
timestamp = EXCLUDED.timestamp
`, assetID, assetTypeID, assetType, sponsor, txID,
actionInput["name"], actionInput["symbol"], actionInput["decimals"],
actionInput["metadata"], actionInput["max_supply"], actionInput["mint_admin"],
actionInput["pause_unpause_admin"], actionInput["freeze_unfreeze_admin"],
actionInput["enable_disable_kyc_account_admin"], timestamp)
if err != nil {
log.Printf("Error saving asset to database: %v\n", err)
}
}
}

// Convert actions to JSON for storing in the transactions table
Expand All @@ -337,19 +306,26 @@ func (s *Server) AcceptBlock(ctx context.Context, req *pb.BlockRequest) (*emptyp
continue
}

// Convert actors and receivers to slices of strings
actorsSlice := getKeysFromMap(actors)
receiversSlice := getKeysFromMap(receivers)

// Save the transaction with aggregated actions
_, err = s.db.Exec(`
INSERT INTO transactions (tx_hash, block_hash, sponsor, max_fee, success, fee, actions, timestamp)
VALUES ($1, $2, $3, $4, $5, $6, $7::json, $8)
ON CONFLICT (tx_hash) DO UPDATE
SET block_hash = EXCLUDED.block_hash,
sponsor = EXCLUDED.sponsor,
max_fee = EXCLUDED.max_fee,
success = EXCLUDED.success,
fee = EXCLUDED.fee,
actions = EXCLUDED.actions,
timestamp = EXCLUDED.timestamp`,
txID, blockHash, sponsor, tx.MaxFee(), success, fee, actionsJSON, timestamp)
INSERT INTO transactions (tx_hash, block_hash, sponsor, actors, receivers, max_fee, success, fee, actions, timestamp)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::json, $10)
ON CONFLICT (tx_hash) DO UPDATE
SET block_hash = EXCLUDED.block_hash,
sponsor = EXCLUDED.sponsor,
actors = EXCLUDED.actors,
receivers = EXCLUDED.receivers,
max_fee = EXCLUDED.max_fee,
success = EXCLUDED.success,
fee = EXCLUDED.fee,
actions = EXCLUDED.actions,
timestamp = EXCLUDED.timestamp`,
txID, blockHash, sponsor, pq.Array(actorsSlice), pq.Array(receiversSlice),
tx.MaxFee(), success, fee, actionsJSON, timestamp)
if err != nil {
log.Printf("Error saving transaction to database: %v\n", err)
}
Expand Down Expand Up @@ -377,3 +353,11 @@ func (s *Server) AcceptBlock(ctx context.Context, req *pb.BlockRequest) (*emptyp

return &emptypb.Empty{}, nil
}

func getKeysFromMap(m map[string]struct{}) []string {
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
return keys
}
18 changes: 16 additions & 2 deletions models/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"log"
"strconv"
"strings"
)

type Action struct {
Expand Down Expand Up @@ -107,14 +108,27 @@ func FetchActionsByName(db *sql.DB, actionName, limit, offset string) ([]Action,

// FetchActionsByUser retrieves actions by user with pagination
func FetchActionsByUser(db *sql.DB, user, limit, offset string) ([]Action, error) {
normalizedUser := "%" + strings.TrimPrefix(user, "0x") + "%"

rows, err := db.Query(`
SELECT actions.*
FROM actions
INNER JOIN transactions ON actions.tx_hash = transactions.tx_hash
WHERE transactions.sponsor ILIKE $1
WHERE
transactions.sponsor ILIKE $1
OR EXISTS (
SELECT 1
FROM unnest(transactions.actors) AS actor
WHERE actor ILIKE $1
)
OR EXISTS (
SELECT 1
FROM unnest(transactions.receivers) AS receiver
WHERE receiver ILIKE $1
)
ORDER BY actions.timestamp DESC
LIMIT $2 OFFSET $3
`, "%"+user+"%", limit, offset)
`, normalizedUser, limit, offset)
if err != nil {
log.Printf("Database query error: %v", err)
return nil, err
Expand Down
Loading

0 comments on commit 9e53f5f

Please sign in to comment.