Skip to content

Commit

Permalink
Merge pull request #97 from maxpert/pprof
Browse files Browse the repository at this point in the history
Adding pprof flag to support better tracing & rewriting file watching
  • Loading branch information
maxpert authored Aug 12, 2024
2 parents 59bcd19 + 2e0e668 commit fe252af
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 5 deletions.
1 change: 1 addition & 0 deletions cfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ var SaveSnapshotFlag = flag.Bool("save-snapshot", false, "Only take snapshot and
var ClusterAddrFlag = flag.String("cluster-addr", "", "Cluster listening address")
var ClusterPeersFlag = flag.String("cluster-peers", "", "Comma separated list of clusters")
var LeafServerFlag = flag.String("leaf-servers", "", "Comma separated list of leaf servers")
var ProfServer = flag.String("pprof", "", "PProf listening address")

var DataRootDir = os.TempDir()
var Config = &Configuration{
Expand Down
28 changes: 24 additions & 4 deletions db/change_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,34 +253,54 @@ func (conn *SqliteStreamDB) initTriggers(tableName string) error {
return nil
}

func (conn *SqliteStreamDB) filterChangesTo(changed chan fsnotify.Event, watcher *fsnotify.Watcher) {
for {
select {
case ev, ok := <-watcher.Events:
if !ok {
close(changed)
return
}

if ev.Op == fsnotify.Chmod {
continue
}

changed <- ev
}
}
}

func (conn *SqliteStreamDB) watchChanges(watcher *fsnotify.Watcher, path string) {
shmPath := path + "-shm"
walPath := path + "-wal"

errDB := watcher.Add(path)
errShm := watcher.Add(shmPath)
errWal := watcher.Add(walPath)
dbChanged := make(chan fsnotify.Event)

tickerDur := time.Duration(cfg.Config.PollingInterval) * time.Millisecond
changeLogTicker := utils.NewTimeoutPublisher(tickerDur)

// Publish change logs for any residual change logs before starting watcher
conn.publishChangeLog()
go conn.filterChangesTo(dbChanged, watcher)

for {
changeLogTicker.Reset()

err := conn.WithReadTx(func(_tx *sql.Tx) error {
select {
case ev, ok := <-watcher.Events:
case ev, ok := <-dbChanged:
if !ok {
return ErrEndOfWatch
}

if ev.Op != fsnotify.Chmod {
conn.publishChangeLog()
}
log.Debug().Int("change", int(ev.Op)).Msg("Change detected")
conn.publishChangeLog()
case <-changeLogTicker.Channel():
log.Debug().Dur("timeout", tickerDur).Msg("Change polling timeout")
conn.publishChangeLog()
}

Expand Down
20 changes: 19 additions & 1 deletion marmot.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"flag"
"io"
"net/http"
"net/http/pprof"
_ "net/http/pprof"
"os"
"time"

Expand All @@ -22,7 +25,6 @@ import (

func main() {
flag.Parse()

err := cfg.Load(*cfg.ConfigPathFlag)
if err != nil {
panic(err)
Expand All @@ -44,6 +46,22 @@ func main() {
log.Logger = gLog.Level(zerolog.InfoLevel)
}

if *cfg.ProfServer != "" {
go func() {
mux := http.NewServeMux()
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)

err := http.ListenAndServe(*cfg.ProfServer, mux)
if err != nil {
log.Error().Err(err).Msg("unable to bind profiler server")
}
}()
}

log.Debug().Msg("Initializing telemetry")
telemetry.InitializeTelemetry()

Expand Down

0 comments on commit fe252af

Please sign in to comment.