Skip to content

Commit

Permalink
fix CloseAllConns (#104)
Browse files Browse the repository at this point in the history
* Fix: fix CloseAllConns

* Fix: probably fix recycling being off and a slew of other bugs (make proxymap a ptr)

* Chore: update kayos/common
  • Loading branch information
yunginnanet authored Oct 28, 2023
1 parent a892a21 commit 2aa4580
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 22 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
name: tests
on:
push:
branches: ["main", "development"]
branches: [ "main", "development" ]
pull_request:
branches: ["main"]
branches: [ "main" ]
jobs:
build:
runs-on: ubuntu-latest
Expand Down
31 changes: 24 additions & 7 deletions conductor.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package prox5

import (
"context"
"errors"
"sync/atomic"
"time"
)

// engineState represents the current state of our ProxyEngine.
Expand Down Expand Up @@ -66,18 +66,35 @@ func (p5 *ProxyEngine) Resume() error {
return nil
}

// CloseAllConns will close all connections in progress by the dialers (including the SOCKS server if in use).
// CloseAllConns will (maybe) close all connections in progress by the dialers (including the SOCKS server if in use).
// Note this does not effect the proxy pool, it will continue to operate as normal.
// this is hacky FIXME
func (p5 *ProxyEngine) CloseAllConns() {
p5.killConns()
p5.mu.Lock()
p5.ctx, p5.killConns = context.WithCancel(context.Background())
p5.mu.Unlock()
timeout := time.NewTicker(500 * time.Millisecond)
p5.conKiller <- struct{}{}
defer func() {
timeout.Stop()
select {
case <-p5.conKiller:
default:
}
}()
for {
select {
case <-p5.ctx.Done():
return
case <-timeout.C:
return
case p5.conKiller <- struct{}{}:
timeout.Reset(500 * time.Millisecond)
p5.DebugLogger.Printf("killed a connection")
}
}
}

func (p5 *ProxyEngine) Close() error {
p5.mu.Lock()
defer p5.mu.Unlock()
p5.killConns()
p5.quit()
return p5.Pause()
}
5 changes: 1 addition & 4 deletions daemons.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,7 @@ func (p5 *ProxyEngine) jobSpawner() {
// case <-p5.ctx.Done():
// default:
// }
p5.Pending.RLock()
pendingLen := p5.Pending.Len()
p5.Pending.RUnlock()
if pendingLen < 1 {
if p5.Pending.Len() < 1 {
count := p5.recycling()
switch {
case count > 0:
Expand Down
11 changes: 6 additions & 5 deletions defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,8 @@ type ProxyEngine struct {

dispenseMiddleware func(*Proxy) (*Proxy, bool)

conCtx context.Context
killConns context.CancelFunc
ctx context.Context
quit context.CancelFunc
ctx context.Context
quit context.CancelFunc

httpOptsDirty *atomic.Bool
httpClients *sync.Pool
Expand All @@ -90,6 +88,8 @@ type ProxyEngine struct {

// reaper sync.Pool

conKiller chan struct{}

recycleMu *sync.Mutex
mu *sync.RWMutex
pool *ants.Pool
Expand Down Expand Up @@ -215,6 +215,7 @@ func NewProxyEngine() *ProxyEngine {
mu: &sync.RWMutex{},
recycleMu: &sync.Mutex{},
httpOptsDirty: &atomic.Bool{},
conKiller: make(chan struct{}, 1),
Status: uint32(stateNew),
}

Expand All @@ -239,7 +240,7 @@ func NewProxyEngine() *ProxyEngine {
return p, true
}
p5.ctx, p5.quit = context.WithCancel(context.Background())
p5.conCtx, p5.killConns = context.WithCancel(context.Background())
// p5.conCtx, p5.killConns = context.WithCancel(context.Background())
p5.proxyMap = newProxyMap(p5)

atomic.StoreUint32(&p5.Status, uint32(stateNew))
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
git.tcp.direct/kayos/common v0.8.6 h1:lt8nv+PrgAcbiOnbKUt7diza5hifR5fV3un6uIp/YVc=
git.tcp.direct/kayos/common v0.8.6/go.mod h1:QGGn7d2l4xBG7Cs/g84JzItPpHWjtfiyy+PSMnf6TzE=
git.tcp.direct/kayos/common v0.9.2 h1:T7XzvmtPTmhXMTIb8v5W7a4DAV8ekusRkCe1WJ/qGBY=
git.tcp.direct/kayos/common v0.9.2/go.mod h1:rMExTem3JjB5XrwL+i+/QBv3agRFB8mdsoQ97wzBa+8=
git.tcp.direct/kayos/go-socks5 v0.3.0 h1:nCsYM0ttPZHGAVVG8zFEy2ZTxoSyPp5ld1YSy3zyWDQ=
Expand Down
15 changes: 11 additions & 4 deletions mystery_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package prox5
import (
"context"
"fmt"
"io"
"net"
"sync/atomic"
"time"
Expand Down Expand Up @@ -121,10 +122,16 @@ func (p5 *ProxyEngine) mysteryDialer(ctx context.Context, network, addr string)
return nil, fmt.Errorf("giving up after %d tries", maxBail)
case ctx.Err() != nil:
return nil, fmt.Errorf("context error: %w", ctx.Err())
case p5.conCtx.Err() != nil:
return nil, fmt.Errorf("context closed")
default:
//
select {
case <-ctx.Done():
return nil, fmt.Errorf("context done: %w", ctx.Err())
case <-p5.ctx.Done():
return nil, fmt.Errorf("prox5 closed: %w", p5.ctx.Err())
case <-p5.conKiller:
return nil, fmt.Errorf("prox5 closed: %w", io.ErrClosedPipe)
default:
}
}
var sock *Proxy
for {
Expand Down Expand Up @@ -162,7 +169,7 @@ func (p5 *ProxyEngine) mysteryDialer(ctx context.Context, network, addr string)
select {
case <-ctx.Done():
_ = conn.Close()
case <-p5.conCtx.Done():
case <-p5.conKiller:
_ = conn.Close()
case <-p5.ctx.Done():
_ = conn.Close()
Expand Down
1 change: 1 addition & 0 deletions prox5_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ testLoop:
t.Fatal("no successful requests")
}
t.Logf("total successful requests: %d", successCountFinal)
p5.CloseAllConns()
break testLoop
case <-ticker.C:
// pre-warm
Expand Down

0 comments on commit 2aa4580

Please sign in to comment.