Skip to content

Commit

Permalink
use channels for loop waits instead
Browse files Browse the repository at this point in the history
  • Loading branch information
Son Roy Almerol committed Nov 11, 2024
1 parent c708a3a commit 45abc06
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 139 deletions.
47 changes: 47 additions & 0 deletions cmd/windows_agent/local_drives.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//go:build windows

package main

import (
"fmt"
"os"

"github.com/sonroyaalmerol/pbs-plus/internal/agent/sftp"
"github.com/sonroyaalmerol/pbs-plus/internal/utils"
)

type Drive struct {
Letter string
ErrorChan chan string
}

func getLocalDrives() (r []Drive) {
for _, drive := range "ABCDEFGHIJKLMNOPQRSTUVWXYZ" {
f, err := os.Open(string(drive) + ":\\")
if err == nil {
r = append(r, Drive{Letter: string(drive)})
f.Close()
}
}
return
}

func (drive *Drive) serveSFTP(p *agentService) error {
rune := []rune(drive.Letter)[0]
sftpConfig, err := sftp.InitializeSFTPConfig(p.svc, drive.Letter)
if err != nil {
return fmt.Errorf("Unable to initialize SFTP config: %s", err)
}
if err := sftpConfig.PopulateKeys(); err != nil {
return fmt.Errorf("Unable to populate SFTP keys: %s", err)
}

port, err := utils.DriveLetterPort(rune)
if err != nil {
return fmt.Errorf("Unable to map letter to port: %s", err)
}

go sftp.Serve(p.ctx, drive.ErrorChan, sftpConfig, "0.0.0.0", port, drive.Letter)

return nil
}
141 changes: 60 additions & 81 deletions cmd/windows_agent/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@ import (
_ "embed"
"fmt"
"net/http"
"sync"
"time"

"github.com/kardianos/service"
"github.com/sonroyaalmerol/pbs-plus/internal/agent"
"github.com/sonroyaalmerol/pbs-plus/internal/agent/sftp"
"github.com/sonroyaalmerol/pbs-plus/internal/agent/snapshots"
"github.com/sonroyaalmerol/pbs-plus/internal/syslog"
"github.com/sonroyaalmerol/pbs-plus/internal/utils"
Expand All @@ -30,68 +28,42 @@ type PingResp struct {

type agentService struct {
svc service.Service
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}

func (p *agentService) Start(s service.Service) error {
p.ctx, p.cancel = context.WithCancel(context.Background())

go p.runLoop()
go p.startPing()
go p.run()

return nil
}

func (p *agentService) startPing() {
firstPing := true
lastCheck := time.Now()
for {
select {
case <-p.ctx.Done():
utils.SetEnvironment("PBS_AGENT_STATUS", "Agent service is not running")
return
default:
if time.Since(lastCheck) > time.Second*5 || firstPing {
firstPing = false

var pingResp PingResp
pingErr := agent.ProxmoxHTTPRequest(http.MethodGet, "/api2/json/ping", nil, &pingResp)
if pingErr != nil {
utils.SetEnvironment("PBS_AGENT_STATUS", fmt.Sprintf("Error - (%s)", pingErr.Error()))
} else if !pingResp.Data.Pong {
utils.SetEnvironment("PBS_AGENT_STATUS", "Error - server did not return expected data")
} else {
utils.SetEnvironment("PBS_AGENT_STATUS", "Connected")
}
lastCheck = time.Now()
}
ping := func() {
var pingResp PingResp
pingErr := agent.ProxmoxHTTPRequest(http.MethodGet, "/api2/json/ping", nil, &pingResp)
if pingErr != nil {
utils.SetEnvironment("PBS_AGENT_STATUS", fmt.Sprintf("Error - (%s)", pingErr.Error()))
} else if !pingResp.Data.Pong {
utils.SetEnvironment("PBS_AGENT_STATUS", "Error - server did not return expected data")
} else {
utils.SetEnvironment("PBS_AGENT_STATUS", "Connected")
}
}
}

func (p *agentService) runLoop() {
logger, err := syslog.InitializeLogger(p.svc)
if err != nil {
utils.SetEnvironment("PBS_AGENT_STATUS", fmt.Sprintf("Failed to initialize logger -> %s", err.Error()))
return
}

go p.startPing()
ping()

for {
p.run()
wgDone := utils.WaitChan(&p.wg)

retryWait := utils.WaitChan(time.Second * 5)
select {
case <-p.ctx.Done():
snapshots.CloseAllSnapshots()
utils.SetEnvironment("PBS_AGENT_STATUS", "Agent service is not running")
return
case <-wgDone:
utils.SetEnvironment("PBS_AGENT_STATUS", "Unexpected shutdown - restarting SSH endpoints")
logger.Error("SSH endpoints stopped unexpectedly. Restarting...")
p.wg = sync.WaitGroup{}
time.Sleep(5 * time.Second)
case <-retryWait:
ping()
}
}
}
Expand All @@ -104,54 +76,61 @@ func (p *agentService) run() {
return
}

firstUrlCheck := true
lastCheck := time.Now()
waitUrl:
for {
select {
case <-p.ctx.Done():
return
default:
if time.Since(lastCheck) > time.Second*5 || firstUrlCheck {
firstUrlCheck = false
key, err := registry.OpenKey(registry.LOCAL_MACHINE, `Software\PBSPlus\Config`, registry.QUERY_VALUE)
if err == nil {
defer key.Close()

if serverUrl, _, err := key.GetStringValue("ServerURL"); err == nil && serverUrl != "" {
break waitUrl
}
}
lastCheck = time.Now()
urlExists := func() bool {
key, err := registry.OpenKey(registry.LOCAL_MACHINE, `Software\PBSPlus\Config`, registry.QUERY_VALUE)
if err == nil {
defer key.Close()

if serverUrl, _, err := key.GetStringValue("ServerURL"); err == nil && serverUrl != "" {
return true
}
}

return false
}

drives := utils.GetLocalDrives()
for _, driveLetter := range drives {
rune := []rune(driveLetter)[0]
sftpConfig, err := sftp.InitializeSFTPConfig(p.svc, driveLetter)
if err != nil {
logger.Error(fmt.Sprintf("Unable to initialize SFTP config: %s", err))
continue
}
if err := sftpConfig.PopulateKeys(); err != nil {
logger.Error(fmt.Sprintf("Unable to populate SFTP keys: %s", err))
continue
if !urlExists() {
for !urlExists() {
retryWait := utils.WaitChan(time.Second * 5)
select {
case <-p.ctx.Done():
return
case <-retryWait:
}
}
}

port, err := utils.DriveLetterPort(rune)
if err != nil {
logger.Error(fmt.Sprintf("Unable to map letter to port: %s", err))
continue
drives := getLocalDrives()
for _, drive := range drives {
drive.ErrorChan = make(chan string)
err = drive.serveSFTP(p)
for err != nil {
logger.Errorf("Drive SFTP error: %v", err)
retryWait := utils.WaitChan(time.Second * 5)
select {
case <-p.ctx.Done():
return
case <-retryWait:
err = drive.serveSFTP(p)
}
}

p.wg.Add(1)
go func() {
sftp.Serve(p.ctx, sftpConfig, "0.0.0.0", port, driveLetter)
p.wg.Done()
defer close(drive.ErrorChan)

for {
select {
case <-p.ctx.Done():
return
case err := <-drive.ErrorChan:
logger.Errorf("SFTP %s drive error: %s", drive.Letter, err)
}
}
}()
}

<-p.ctx.Done()
snapshots.CloseAllSnapshots()
}

func (p *agentService) Stop(s service.Service) error {
Expand Down
10 changes: 2 additions & 8 deletions internal/agent/sftp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"time"

"github.com/kardianos/service"
"github.com/sonroyaalmerol/pbs-plus/internal/syslog"
"github.com/sonroyaalmerol/pbs-plus/internal/utils"
"golang.org/x/crypto/ssh"
"golang.org/x/sys/windows/registry"
Expand All @@ -36,15 +35,10 @@ func (s *SFTPConfig) GetRegistryKey() string {
return fmt.Sprintf("Software\\PBSPlus\\Config\\SFTP-%s", s.BasePath)
}

var logger *syslog.Logger

func InitializeSFTPConfig(svc service.Service, driveLetter string) (*SFTPConfig, error) {
var err error
if logger == nil {
logger, err = syslog.InitializeLogger(svc)
if err != nil {
return nil, fmt.Errorf("InitializeLogger: failed to initialize logger -> %w", err)
}
if err != nil {
return nil, fmt.Errorf("InitializeLogger: failed to initialize logger -> %w", err)
}

baseKey, _, err := registry.CreateKey(registry.LOCAL_MACHINE, "Software\\PBSPlus\\Config", registry.QUERY_VALUE)
Expand Down
Loading

0 comments on commit 45abc06

Please sign in to comment.