Skip to content

subprocess Add support for terminating processes instead of killing them #624

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Jun 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -272,5 +272,5 @@
}
]
},
"generated_at": "2025-06-05T07:39:32Z"
"generated_at": "2025-06-04T19:30:24Z"
}
1 change: 1 addition & 0 deletions changes/20250530171020.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:sparkles: `subprocess` Add support for terminating processes instead of killing them
1 change: 1 addition & 0 deletions changes/20250604202816.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:sparkles: Introducing [diodes] module which is a copy of [cloud foundary's](https://github.com/cloudfoundry/go-diodes) library.
2 changes: 2 additions & 0 deletions utils/diodes/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Vendoring [cloud foundary](https://github.com/cloudfoundry/go-diodes)) diode libraries to avoid importing test dependencies.

130 changes: 130 additions & 0 deletions utils/diodes/many_to_one.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package diodes

import (
"log"
"sync/atomic"
"unsafe"
)

// ManyToOne diode is optimal for many writers (go-routines B-n) and a single
// reader (go-routine A). It is not thread safe for multiple readers.
type ManyToOne struct {
writeIndex uint64
buffer []unsafe.Pointer
readIndex uint64
alerter Alerter
}

// NewManyToOne creates a new diode (ring buffer). The ManyToOne diode
// is optimzed for many writers (on go-routines B-n) and a single reader
// (on go-routine A). The alerter is invoked on the read's go-routine. It is
// called when it notices that the writer go-routine has passed it and wrote
// over data. A nil can be used to ignore alerts.
func NewManyToOne(size int, alerter Alerter) *ManyToOne {
if alerter == nil {
alerter = AlertFunc(func(int) {})
}

d := &ManyToOne{
buffer: make([]unsafe.Pointer, size),
alerter: alerter,
}

// Start write index at the value before 0
// to allow the first write to use AddUint64
// and still have a beginning index of 0
d.writeIndex = ^d.writeIndex
return d
}

// Set sets the data in the next slot of the ring buffer.
func (d *ManyToOne) Set(data GenericDataType) {
for {
writeIndex := atomic.AddUint64(&d.writeIndex, 1)
idx := writeIndex % uint64(len(d.buffer))
old := atomic.LoadPointer(&d.buffer[idx])

if old != nil &&
(*bucket)(old) != nil &&
(*bucket)(old).seq > writeIndex-uint64(len(d.buffer)) {
log.Println("Diode set collision: consider using a larger diode")
continue
}

newBucket := &bucket{
data: data,
seq: writeIndex,
}

if !atomic.CompareAndSwapPointer(&d.buffer[idx], old, unsafe.Pointer(newBucket)) {
log.Println("Diode set collision: consider using a larger diode")
continue
}

return
}
}

// TryNext will attempt to read from the next slot of the ring buffer.
// If there is not data available, it will return (nil, false).
func (d *ManyToOne) TryNext() (data GenericDataType, ok bool) {
// Read a value from the ring buffer based on the readIndex.
idx := d.readIndex % uint64(len(d.buffer))
result := (*bucket)(atomic.SwapPointer(&d.buffer[idx], nil))

// When the result is nil that means the writer has not had the
// opportunity to write a value into the diode. This value must be ignored
// and the read head must not increment.
if result == nil {
return nil, false
}

// When the seq value is less than the current read index that means a
// value was read from idx that was previously written but has since has
// been dropped. This value must be ignored and the read head must not
// increment.
//
// The simulation for this scenario assumes the fast forward occurred as
// detailed below.
//
// 5. The reader reads again getting seq 5. It then reads again expecting
// seq 6 but gets seq 2. This is a read of a stale value that was
// effectively "dropped" so the read fails and the read head stays put.
// `| 4 | 5 | 2 | 3 |` r: 7, w: 6
//
if result.seq < d.readIndex {
return nil, false
}

// When the seq value is greater than the current read index that means a
// value was read from idx that overwrote the value that was expected to
// be at this idx. This happens when the writer has lapped the reader. The
// reader needs to catch up to the writer so it moves its write head to
// the new seq, effectively dropping the messages that were not read in
// between the two values.
//
// Here is a simulation of this scenario:
//
// 1. Both the read and write heads start at 0.
// `| nil | nil | nil | nil |` r: 0, w: 0
// 2. The writer fills the buffer.
// `| 0 | 1 | 2 | 3 |` r: 0, w: 4
// 3. The writer laps the read head.
// `| 4 | 5 | 2 | 3 |` r: 0, w: 6
// 4. The reader reads the first value, expecting a seq of 0 but reads 4,
// this forces the reader to fast forward to 5.
// `| 4 | 5 | 2 | 3 |` r: 5, w: 6
//
if result.seq > d.readIndex {
dropped := result.seq - d.readIndex
d.readIndex = result.seq
d.alerter.Alert(int(dropped)) // nolint:gosec
}

// Only increment read index if a regular read occurred (where seq was
// equal to readIndex) or a value was read that caused a fast forward
// (where seq was greater than readIndex).
//
d.readIndex++
return result.data, true
}
129 changes: 129 additions & 0 deletions utils/diodes/one_to_one.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package diodes

import (
"sync/atomic"
"unsafe"
)

// GenericDataType is the data type the diodes operate on.
type GenericDataType unsafe.Pointer

// Alerter is used to report how many values were overwritten since the
// last write.
type Alerter interface {
Alert(missed int)
}

// AlertFunc type is an adapter to allow the use of ordinary functions as
// Alert handlers.
type AlertFunc func(missed int)

// Alert calls f(missed)
func (f AlertFunc) Alert(missed int) {
f(missed)
}

type bucket struct {
data GenericDataType
seq uint64 // seq is the recorded write index at the time of writing
}

// OneToOne diode is meant to be used by a single reader and a single writer.
// It is not thread safe if used otherwise.
type OneToOne struct {
buffer []unsafe.Pointer
writeIndex uint64
readIndex uint64
alerter Alerter
}

// NewOneToOne creates a new diode is meant to be used by a single reader and
// a single writer. The alerter is invoked on the read's go-routine. It is
// called when it notices that the writer go-routine has passed it and wrote
// over data. A nil can be used to ignore alerts.
func NewOneToOne(size int, alerter Alerter) *OneToOne {
if alerter == nil {
alerter = AlertFunc(func(int) {})
}

return &OneToOne{
buffer: make([]unsafe.Pointer, size),
alerter: alerter,
}
}

// Set sets the data in the next slot of the ring buffer.
func (d *OneToOne) Set(data GenericDataType) {
idx := d.writeIndex % uint64(len(d.buffer))

newBucket := &bucket{
data: data,
seq: d.writeIndex,
}
d.writeIndex++

atomic.StorePointer(&d.buffer[idx], unsafe.Pointer(newBucket))
}

// TryNext will attempt to read from the next slot of the ring buffer.
// If there is no data available, it will return (nil, false).
func (d *OneToOne) TryNext() (data GenericDataType, ok bool) {
// Read a value from the ring buffer based on the readIndex.
idx := d.readIndex % uint64(len(d.buffer))
result := (*bucket)(atomic.SwapPointer(&d.buffer[idx], nil))

// When the result is nil that means the writer has not had the
// opportunity to write a value into the diode. This value must be ignored
// and the read head must not increment.
if result == nil {
return nil, false
}

// When the seq value is less than the current read index that means a
// value was read from idx that was previously written but has since has
// been dropped. This value must be ignored and the read head must not
// increment.
//
// The simulation for this scenario assumes the fast forward occurred as
// detailed below.
//
// 5. The reader reads again getting seq 5. It then reads again expecting
// seq 6 but gets seq 2. This is a read of a stale value that was
// effectively "dropped" so the read fails and the read head stays put.
// `| 4 | 5 | 2 | 3 |` r: 7, w: 6
//
if result.seq < d.readIndex {
return nil, false
}

// When the seq value is greater than the current read index that means a
// value was read from idx that overwrote the value that was expected to
// be at this idx. This happens when the writer has lapped the reader. The
// reader needs to catch up to the writer so it moves its write head to
// the new seq, effectively dropping the messages that were not read in
// between the two values.
//
// Here is a simulation of this scenario:
//
// 1. Both the read and write heads start at 0.
// `| nil | nil | nil | nil |` r: 0, w: 0
// 2. The writer fills the buffer.
// `| 0 | 1 | 2 | 3 |` r: 0, w: 4
// 3. The writer laps the read head.
// `| 4 | 5 | 2 | 3 |` r: 0, w: 6
// 4. The reader reads the first value, expecting a seq of 0 but reads 4,
// this forces the reader to fast forward to 5.
// `| 4 | 5 | 2 | 3 |` r: 5, w: 6
//
if result.seq > d.readIndex {
dropped := result.seq - d.readIndex
d.readIndex = result.seq
d.alerter.Alert(int(dropped)) // nolint:gosec
}

// Only increment read index if a regular read occurred (where seq was
// equal to readIndex) or a value was read that caused a fast forward
// (where seq was greater than readIndex).
d.readIndex++
return result.data, true
}
80 changes: 80 additions & 0 deletions utils/diodes/poller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package diodes

import (
"context"
"time"
)

// Diode is any implementation of a diode.
type Diode interface {
Set(GenericDataType)
TryNext() (GenericDataType, bool)
}

// Poller will poll a diode until a value is available.
type Poller struct {
Diode
interval time.Duration
ctx context.Context
}

// PollerConfigOption can be used to setup the poller.
type PollerConfigOption func(*Poller)

// WithPollingInterval sets the interval at which the diode is queried
// for new data. The default is 10ms.
func WithPollingInterval(interval time.Duration) PollerConfigOption {
return PollerConfigOption(func(c *Poller) {
c.interval = interval
})
}

// WithPollingContext sets the context to cancel any retrieval (Next()). It
// will not change any results for adding data (Set()). Default is
// context.Background().
func WithPollingContext(ctx context.Context) PollerConfigOption {
return PollerConfigOption(func(c *Poller) {
c.ctx = ctx
})
}

// NewPoller returns a new Poller that wraps the given diode.
func NewPoller(d Diode, opts ...PollerConfigOption) *Poller {
p := &Poller{
Diode: d,
interval: 10 * time.Millisecond,
ctx: context.Background(),
}

for _, o := range opts {
o(p)
}

return p
}

// Next polls the diode until data is available or until the context is done.
// If the context is done, then nil will be returned.
func (p *Poller) Next() GenericDataType {
for {
data, ok := p.Diode.TryNext() // nolint:staticcheck
if !ok {
if p.IsDone() {
return nil
}

time.Sleep(p.interval)
continue
}
return data
}
}

func (p *Poller) IsDone() bool {
select {
case <-p.ctx.Done():
return true
default:
return false
}
}
Loading
Loading