Skip to content

Commit

Permalink
Refactored how stream, dfuseClient are initialized, added --plaintext…
Browse files Browse the repository at this point in the history
… support
  • Loading branch information
maoueh committed Mar 1, 2022
1 parent c8abb25 commit 4900f5c
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 143 deletions.
35 changes: 0 additions & 35 deletions cmd/sf/cmd/eth.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cmd

import (
"crypto/tls"
"fmt"
"os"
"strings"
Expand All @@ -10,15 +9,10 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/viper"
dfuse "github.com/streamingfast/client-go"
"github.com/streamingfast/dgrpc"
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v1"
sf "github.com/streamingfast/streamingfast-client"
pbcodec "github.com/streamingfast/streamingfast-client/pb/sf/ethereum/codec/v1"
pbtransforms "github.com/streamingfast/streamingfast-client/pb/sf/ethereum/transforms/v1"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)
Expand Down Expand Up @@ -66,7 +60,6 @@ func ethSfRunE(cmd *cobra.Command, args []string) error {
fantomNetwork := viper.GetBool("eth-cmd-fantom")
xdaiNetwork := viper.GetBool("eth-cmd-xdai")
outputFlag := viper.GetString("global-output")
skipAuth := viper.GetBool("global-skip-auth")

inputs, err := checkArgs(startCursor, args)
if err != nil {
Expand Down Expand Up @@ -94,31 +87,6 @@ func ethSfRunE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("unable to resolve endpoint")
}

var clientOptions []dfuse.ClientOption
apiKey := os.Getenv("STREAMINGFAST_API_KEY")
if apiKey == "" {
apiKey = os.Getenv("SF_API_KEY")
if apiKey == "" {
clientOptions = []dfuse.ClientOption{dfuse.WithoutAuthentication()}
skipAuth = true
}
}

dfuse, err := dfuse.NewClient(endpoint, apiKey, clientOptions...)
if err != nil {
return fmt.Errorf("unable to create streamingfast client")
}

var dialOptions []grpc.DialOption
if viper.GetBool("global-insecure") {
dialOptions = []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}))}
}

conn, err := dgrpc.NewExternalClient(endpoint, dialOptions...)
if err != nil {
return fmt.Errorf("unable to create external gRPC client")
}

writer, closer, err := blockWriter(inputs.Range, outputFlag)
if err != nil {
return fmt.Errorf("unable to setup writer: %w", err)
Expand Down Expand Up @@ -168,15 +136,12 @@ func ethSfRunE(cmd *cobra.Command, args []string) error {
}

return launchStream(ctx, streamConfig{
client: pbfirehose.NewStreamClient(conn),
dfuseCli: dfuse,
writer: writer,
stats: newStats(),
brange: inputs.Range,
cursor: startCursor,
endpoint: endpoint,
handleForks: viper.GetBool("global-handle-forks"),
skipAuth: skipAuth,
transforms: transforms,
},
func() proto.Message {
Expand Down
9 changes: 1 addition & 8 deletions cmd/sf/cmd/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,6 @@ package cmd

import (
"github.com/streamingfast/logging"
"go.uber.org/zap"
)

var traceEnabled = logging.IsTraceEnabled("sf", "github.com/streamingfast/streamingfast-client")

var zlog = zap.NewNop()

func init() {
logging.Register("github.com/streamingfast/streamingfast-client/cmd", &zlog)
}
var zlog, tracer = logging.PackageLogger("sf", "github.com/streamingfast/streamingfast-client/cmd/sf/cmd")
40 changes: 2 additions & 38 deletions cmd/sf/cmd/near.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,19 @@
package cmd

import (
"crypto/tls"
"fmt"
"os"

"github.com/spf13/cobra"
"github.com/spf13/viper"
dfuse "github.com/streamingfast/client-go"
"github.com/streamingfast/dgrpc"
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v1"
sf "github.com/streamingfast/streamingfast-client"
pbcodec "github.com/streamingfast/streamingfast-client/pb/sf/near/codec/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/protobuf/proto"
)

var nearSfCmd = &cobra.Command{
Use: "near [flags] [<start_block>] [<end_block>]",
Short: `StreamingFast Near client`,
Use: "near [<start_block>] [<end_block>]",
Short: `StreamingFast NEAR Client`,
Long: usage,
RunE: nearSfCmdE,
}
Expand All @@ -41,8 +35,6 @@ func nearSfCmdE(cmd *cobra.Command, args []string) error {
startCursor := viper.GetString("global-start-cursor")
endpoint := viper.GetString("near-cmd-endpoint")
outputFlag := viper.GetString("global-output")
skipAuth := viper.GetBool("global-skip-auth")

testnet := viper.GetBool("near-cmd-testnet")

inputs, err := checkArgs(startCursor, args)
Expand All @@ -57,47 +49,19 @@ func nearSfCmdE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("unable to resolve endpoint")
}

var clientOptions []dfuse.ClientOption
apiKey := os.Getenv("STREAMINGFAST_API_KEY")
if apiKey == "" {
apiKey = os.Getenv("SF_API_KEY")
if apiKey == "" {
clientOptions = []dfuse.ClientOption{dfuse.WithoutAuthentication()}
skipAuth = true
}
}

dfuse, err := dfuse.NewClient(endpoint, apiKey, clientOptions...)
if err != nil {
return fmt.Errorf("unable to create streamingfast client")
}

var dialOptions []grpc.DialOption
if viper.GetBool("global-insecure") {
dialOptions = []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}))}
}

conn, err := dgrpc.NewExternalClient(endpoint, dialOptions...)
if err != nil {
return fmt.Errorf("unable to create external gRPC client")
}

writer, closer, err := blockWriter(inputs.Range, outputFlag)
if err != nil {
return fmt.Errorf("unable to setup writer: %w", err)
}
defer closer()

return launchStream(ctx, streamConfig{
client: pbfirehose.NewStreamClient(conn),
dfuseCli: dfuse,
writer: writer,
stats: newStats(),
brange: inputs.Range,
cursor: startCursor,
endpoint: endpoint,
handleForks: viper.GetBool("global-handle-forks"),
skipAuth: skipAuth,
},
func() proto.Message {
return &pbcodec.Block{}
Expand Down
5 changes: 3 additions & 2 deletions cmd/sf/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Examples:
# Look at ALL blocks in a given range on ETH mainnet
$ sf eth 100000 100010
# Stream blocks in a given range on BSC with logs that match a given address and event signature (topic0)
# Stream blocks in a given range on BSC with logs that match a given address and event signature (topic0)
# (transactions that do not match are filtered out of the "transactionTraces" array in the response)
# sf eth --bsc --log-filter-addresses='0xcA143Ce32Fe78f1f7019d7d551a6402fC5350c73' --log-filter-event-sigs='0x0d3648bd0f6ba80134a33ba9275ac585d9d315f0ad8355cddefde31afa28d0e9' 6500000 6810800
Expand All @@ -56,7 +56,8 @@ func init() {
cobra.OnInitialize(initConfig)

RootCmd.PersistentFlags().Bool("handle-forks", false, "Request notifications type STEP_UNDO when a block was forked out, and STEP_IRREVERSIBLE after a block has seen enough confirmations (200)")
RootCmd.PersistentFlags().BoolP("insecure", "s", false, "Enables Insecure connection, When set, skips certification verification")
RootCmd.PersistentFlags().BoolP("insecure", "s", false, "Use TLS for the connection with the remote server but skips SSL certificate verification, this is an insecure setup")
RootCmd.PersistentFlags().BoolP("plaintext", "p", false, "Use plain text for the connection with the remote server, this is an insecure setup and any middleman is able to see the traffic")
RootCmd.PersistentFlags().BoolP("skip-auth", "a", false, "Skips the authentication")
RootCmd.PersistentFlags().StringP("output", "o", "-", "When set, write each block as one JSON line in the specified file, value '-' writes to standard output otherwise to a file, {range} is replaced by block range in this case")
RootCmd.PersistentFlags().String("start-cursor", "", "Last cursor used to continue where you left off")
Expand Down
35 changes: 0 additions & 35 deletions cmd/sf/cmd/sol.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
package cmd

import (
"crypto/tls"
"fmt"
"os"

"github.com/spf13/cobra"
"github.com/spf13/viper"
dfuse "github.com/streamingfast/client-go"
"github.com/streamingfast/dgrpc"
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v1"
sf "github.com/streamingfast/streamingfast-client"
pbcodec "github.com/streamingfast/streamingfast-client/pb/sf/solana/codec/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -46,7 +40,6 @@ func solSfCmdE(cmd *cobra.Command, args []string) error {
endpoint := viper.GetString("sol-cmd-endpoint")
testnet := viper.GetBool("sol-cmd-testnet")
outputFlag := viper.GetString("global-output")
skipAuth := viper.GetBool("global-skip-auth")

inputs, err := checkArgs(startCursor, args)
if err != nil {
Expand All @@ -60,47 +53,19 @@ func solSfCmdE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("unable to resolve endpoint")
}

var clientOptions []dfuse.ClientOption
apiKey := os.Getenv("STREAMINGFAST_API_KEY")
if apiKey == "" {
apiKey = os.Getenv("SF_API_KEY")
if apiKey == "" {
clientOptions = []dfuse.ClientOption{dfuse.WithoutAuthentication()}
skipAuth = true
}
}

dfuse, err := dfuse.NewClient(endpoint, apiKey, clientOptions...)
if err != nil {
return fmt.Errorf("unable to create streamingfast client")
}

var dialOptions []grpc.DialOption
if viper.GetBool("global-insecure") {
dialOptions = []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{InsecureSkipVerify: true}))}
}

conn, err := dgrpc.NewExternalClient(endpoint, dialOptions...)
if err != nil {
return fmt.Errorf("unable to create external gRPC client")
}

writer, closer, err := blockWriter(inputs.Range, outputFlag)
if err != nil {
return fmt.Errorf("unable to setup writer: %w", err)
}
defer closer()

return launchStream(ctx, streamConfig{
client: pbfirehose.NewStreamClient(conn),
dfuseCli: dfuse,
writer: writer,
stats: newStats(),
brange: inputs.Range,
cursor: startCursor,
endpoint: endpoint,
handleForks: viper.GetBool("global-handle-forks"),
skipAuth: skipAuth,
},
func() proto.Message {
return &pbcodec.Block{}
Expand Down
Loading

0 comments on commit 4900f5c

Please sign in to comment.