diff --git a/integration/rpctest/rpc_harness.go b/integration/rpctest/rpc_harness.go index 605ca73cd45..6fe4f6e0234 100644 --- a/integration/rpctest/rpc_harness.go +++ b/integration/rpctest/rpc_harness.go @@ -563,6 +563,99 @@ func NextAvailablePort() int { panic("no ports available for listening") } +// NextAvailablePortForProcess returns the first port that is available for +// listening by a new node, using a lock file to make sure concurrent access for +// parallel tasks within the same process don't re-use the same port. It panics +// if no port is found and the maximum available TCP port is reached. +func NextAvailablePortForProcess(pid int) int { + lockFile := fmt.Sprintf( + "%s/rpctest-port-pid-%d.lock", os.TempDir(), pid, + ) + timeout := time.After(time.Second) + for { + // Attempt to acquire the lock file. If it already exists, wait + // for a bit and retry. + _, err := os.OpenFile(lockFile, os.O_CREATE|os.O_EXCL, 0600) + if err == nil { + // Lock acquired. + break + } + + // Wait for a bit and retry. + select { + case <-timeout: + panic("timeout waiting for lock file") + case <-time.After(10 * time.Millisecond): + } + } + + // Release the lock file when we're done. + defer func() { + err := os.Remove(lockFile) + if err != nil { + panic(fmt.Errorf("couldn't remove lock file: %w", err)) + } + }() + + portFile := fmt.Sprintf("%s/rpctest-port-pid-%d", os.TempDir(), pid) + port, err := os.ReadFile(portFile) + if err != nil { + if !os.IsNotExist(err) { + panic(fmt.Errorf("error reading port file: %w", err)) + } + port = []byte(strconv.Itoa(int(defaultNodePort))) + } + + lastPort, err := strconv.Atoi(string(port)) + if err != nil { + panic(fmt.Errorf("error parsing port: %w", err)) + } + + // We take the next one. + lastPort++ + for lastPort < 65535 { + // If there are no errors while attempting to listen on this + // port, close the socket and return it as available. While it + // could be the case that some other process picks up this port + // between the time the socket is closed and it's reopened in + // the harness node, in practice in CI servers this seems much + // less likely than simply some other process already being + // bound at the start of the tests. + addr := fmt.Sprintf(ListenerFormat, lastPort) + l, err := net.Listen("tcp4", addr) + if err == nil { + err := l.Close() + if err == nil { + err := os.WriteFile( + portFile, + []byte(strconv.Itoa(lastPort)), 0600, + ) + if err != nil { + panic(fmt.Errorf("error updating "+ + "port file: %w", err)) + } + + return lastPort + } + } + lastPort++ + } + + // No ports available? Must be a mistake. + panic("no ports available for listening") +} + +// GenerateProcessUniqueListenerAddresses is a function that returns two +// listener addresses with unique ports per the given process id and should be +// used to overwrite rpctest's default generator which is prone to use colliding +// ports. +func GenerateProcessUniqueListenerAddresses(pid int) (string, string) { + port1 := NextAvailablePortForProcess(pid) + port2 := NextAvailablePortForProcess(pid) + return fmt.Sprintf(ListenerFormat, port1), + fmt.Sprintf(ListenerFormat, port2) +} + // baseDir is the directory path of the temp directory for all rpctest files. func baseDir() (string, error) { dirPath := filepath.Join(os.TempDir(), "btcd", "rpctest")