Skip to content

Commit

Permalink
Streamlined management of the verification session codes.
Browse files Browse the repository at this point in the history
We eliminate the pooled locks for simplicity. The verification sessions are
protecting SQLite writes and those must be serial anyway, so there is no
overall performance gain from parallelizing the locking.

We shift the expiry of each verification code into a goroutine that is spawned
during the provisioning process. This guarantees that the expiry happens in a
more timely manner than flushing old codes at regular intervals.
  • Loading branch information
LTLA committed Jan 25, 2025
1 parent b231e27 commit f60e5a3
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 109 deletions.
6 changes: 3 additions & 3 deletions handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestValidatePath(t *testing.T) {
}

func TestCheckVerificationCode(t *testing.T) {
v := newVerificationRegistry(3)
v := newVerificationRegistry(time.Minute)

t.Run("success", func(t * testing.T) {
target, err := os.MkdirTemp("", "")
Expand Down Expand Up @@ -190,7 +190,7 @@ func TestRegisterHandlers(t *testing.T) {
}
defer dbconn.Close()

verifier := newVerificationRegistry(5)
verifier := newVerificationRegistry(time.Minute)

to_add := filepath.Join(tmp, "to_add")
err = mockDirectory(to_add)
Expand Down Expand Up @@ -452,7 +452,7 @@ func TestDeregisterHandlers(t *testing.T) {
}
defer dbconn.Close()

verifier := newVerificationRegistry(5)
verifier := newVerificationRegistry(time.Minute)

tokr, err := newUnicodeTokenizer(false)
if err != nil {
Expand Down
16 changes: 1 addition & 15 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ func main() {
os.Exit(1)
}

const num_verification_threads = 64
verifier := newVerificationRegistry(num_verification_threads)
verifier := newVerificationRegistry(time.Duration(*lifetime0) * time.Minute)

prefix := *prefix0
if prefix != "" {
Expand Down Expand Up @@ -91,19 +90,6 @@ func main() {
}()
}

// Adding a hour job that purges various old verification sessions.
{
lifetime := time.Duration(*lifetime0) * time.Minute
ticker := time.NewTicker(lifetime)
defer ticker.Stop()
go func() {
for {
<-ticker.C
verifier.Flush(lifetime)
}
}()
}

// Adding a per-day job that updates the paths.
{
ticker := time.NewTicker(time.Duration(*update0) * time.Hour)
Expand Down
76 changes: 24 additions & 52 deletions verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,16 @@ type verificationSession struct {
Created time.Time
}

// We use a multi-pool approach to improve parallelism across requests.
// The idea is that each path's length (modulo the number of pools) is
// used to determine the pool in which its verification codes are stored.
// This should distribute requests fairly evenly among multiple locks.
type verificationRegistry struct {
NumPools int
Locks []sync.Mutex
Sessions []map[string]verificationSession
Lock sync.Mutex
Sessions map[string]verificationSession
Lifespan time.Duration
}

func newVerificationRegistry(num_pools int) *verificationRegistry {
return &verificationRegistry {
NumPools: num_pools,
Locks: make([]sync.Mutex, num_pools),
Sessions: make([]map[string]verificationSession, num_pools),
func newVerificationRegistry(lifespan time.Duration) *verificationRegistry {
return &verificationRegistry {
Sessions: map[string]verificationSession{},
Lifespan: lifespan,
}
}

Expand Down Expand Up @@ -65,56 +60,33 @@ func (vr *verificationRegistry) Provision(path string) (string, error) {
return "", errors.New("exhausted attempts")
}

i := len(path) % vr.NumPools
vr.Locks[i].Lock()
defer vr.Locks[i].Unlock()

if vr.Sessions[i] == nil {
vr.Sessions[i] = map[string]verificationSession{}
vr.Lock.Lock()
defer vr.Lock.Unlock()
vr.Sessions[path] = verificationSession{
Code: candidate,
Created: time.Now(),
}
vr.Sessions[i][path] = verificationSession{ Code: candidate, Created: time.Now() }

// Automatically deleting it after some time has expired.
go func() {
time.Sleep(vr.Lifespan)
vr.Lock.Lock()
defer vr.Lock.Unlock()
delete(vr.Sessions, path)
}()

return candidate, nil
}

func (vr *verificationRegistry) Pop(path string) (string, bool) {
i := len(path) % vr.NumPools
vr.Locks[i].Lock()
defer vr.Locks[i].Unlock()

if vr.Sessions[i] == nil {
return "", false
}
vr.Lock.Lock()
defer vr.Lock.Unlock()

found, ok := vr.Sessions[i][path]
found, ok := vr.Sessions[path]
if !ok {
return "", false
}

delete(vr.Sessions[i], path)
delete(vr.Sessions, path)
return found.Code, true
}

func (vr *verificationRegistry) Flush(lifespan time.Duration) {
threshold := time.Now().Add(-lifespan)
var wg sync.WaitGroup
wg.Add(vr.NumPools)

for i := 0; i < vr.NumPools; i++ {
go func(i int) {
defer wg.Done()
vr.Locks[i].Lock()
defer vr.Locks[i].Unlock()
if vr.Sessions[i] != nil {
for k, v := range vr.Sessions[i] {
if threshold.After(v.Created) {
delete(vr.Sessions[i], k)
}
}
}
}(i)
}

wg.Wait()
return
}
99 changes: 60 additions & 39 deletions verify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,67 @@ package main
import (
"testing"
"strings"
"time"
)

func TestVerificationRegistry(t *testing.T) {
v := newVerificationRegistry(3)

path := "adasdasdasd"
candidate, err := v.Provision(path)
if err != nil {
t.Fatal(err)
}

if !strings.HasPrefix(candidate, ".sewer_") {
t.Fatalf("expected code to have a '.sewer_' prefix")
}
if len(candidate) < 32 {
t.Fatalf("expected code to be at least 32 characters long")
}

reload, ok := v.Pop(path)
if !ok || reload != candidate {
t.Fatal("failed to report the right code")
}

reload, ok = v.Pop(path)
if ok {
t.Fatal("code should have been popped on first use")
}

// Get a different code on another invocation.
candidate2, err := v.Provision(path)
if err != nil {
t.Fatalf(err.Error())
}
if candidate == candidate2 {
t.Fatalf("expected to get different codes")
}

v.Flush(0)
reload, ok = v.Pop(path)
if ok {
t.Fatal("code should have been flushed")
}
t.Run("basic", func(t *testing.T) {
v := newVerificationRegistry(time.Minute)

path := "adasdasdasd"
candidate, err := v.Provision(path)
if err != nil {
t.Fatal(err)
}

if !strings.HasPrefix(candidate, ".sewer_") {
t.Fatalf("expected code to have a '.sewer_' prefix")
}
if len(candidate) < 32 {
t.Fatalf("expected code to be at least 32 characters long")
}

reload, ok := v.Pop(path)
if !ok || reload != candidate {
t.Fatal("failed to report the right code")
}

reload, ok = v.Pop(path)
if ok {
t.Fatal("code should have been popped on first use")
}

// Get a different code on another invocation.
candidate2, err := v.Provision(path)
if err != nil {
t.Fatal(err)
}
if candidate == candidate2 {
t.Fatalf("expected to get different codes")
}
})

t.Run("expired", func(t *testing.T) {
v := newVerificationRegistry(time.Millisecond * 200)

path := "foobar"
candidate, err := v.Provision(path)
if err != nil {
t.Fatal(err)
}

// Positive control works as expected.
reload, ok := v.Pop(path)
if !ok || reload != candidate {
t.Fatal("failed to report the right code")
}

// Expiry works as expected.
_, err = v.Provision(path)
time.Sleep(time.Millisecond * 500)
reload, ok = v.Pop(path)
if ok {
t.Fatal("code should have expired already")
}
})
}

0 comments on commit f60e5a3

Please sign in to comment.