Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a way to access a SharedQueue with timeout after being waiting by some time #1008

Open
fxgallego opened this issue Sep 2, 2020 · 2 comments

Comments

@fxgallego
Copy link
Contributor

Is your feature request related to a problem? Please describe.
I need to acquire a new connection from my own implementation of a connection pool that uses a SharedQueue. I'd like to be able to specify a maximum time to wait to get a new element from the pool in order to alert the user about some time and not wait forever.

Describe the solution you'd like
I was thinking in a new method SharedQueue>>nextWait: and I've also implemented a working example with some snippet I've found in Dolphin's newsgroup in a post from Esteban Maringolo.

nextWait: milliseconds
	| sem signaler process element |
	sem := Semaphore new.
	signaler := 
			[[(Delay forMilliseconds: milliseconds) wait] ensure: 
					[sem signal.
					process ifNotNil: [process kill]]]
					newProcess.
	process := 
			[signaler resume.
			element := self next.
			sem signal] fork.
	sem wait.
	^element

This is returning nil on timeout. It would be useful to raise a timeout exception. If you think that this is ok to add into the base I could provide a PR with the corresponding tests.
I know that maybe there is a better solution for this as I'm aware of Semaphore>>wait:ret: comments

@blairmcg
Copy link
Contributor

blairmcg commented Sep 4, 2020

I don't think this implementation is suitable for the base system for a number of reasons, e.g. starting two processes is very heavyweight, Process>>kill should never be used in normal code as it doesn't clean up the Process properly, plus the nil check on the process variable highlights a possible race condition.
Given that a Delay works by signalling a Semaphore, you could probably implement a special case solution for this by sharing the queue counter Semaphore with a Delay used for timeout so that it can be signalled by either. i.e. create a new Delay, and replace its own waitSemaphore with the valueAvailable Semaphore, then wait on the Delay. On resumption either there is a valueAvailable, so cancel the Delay, or the Delay fired, so it timed out. The obvious difficulty is differentiating between timeout expiry and the arrival of elements in the queue, complicated by the race condition where the timeout Delay fires and signals the Semaphore after it has already been signalled by the queue's nextPut:, but before the nextWait: logic has had a chance to cancel the Delay. This could lead to an excess signal count on valueAvailable, but that could be ignored by looping around when the queue is empty and just waiting again in the case where the wait hasn't timed out yet. It seems like it would work, but the devil is in the detail and it will require care to get right, and would also break the existing next method (which treats the queue being empty when nextAvailable is signalled as an error - again this could be modified to just loop and wait again).

@fxgallego
Copy link
Contributor Author

fxgallego commented Sep 4, 2020

I was aware about the things you mention here. I moved it to BlockClosure so now it is less dangerous in the sense of inadvertently using it in SharedQueue an also it can be used in other scenarios. In any case I understand that this implementation is not correct for the base system. Still a race condition is theoretically possible in process terminate. I don't like too much the method name. I wrote some tests in milliseconds and it is working as expected. I am going to use it always for 10 seconds timeouts.

timedValue: aDuration onTimeout: aBlock
	"Answer the result of evaluating the receiver in a maximum <aDuration>.
	If timeout answer the result of evaluation <aBlock>."
	
	| sem signaler process value timeout |
	sem := Semaphore new.
	process := nil.
	timeout := false.
	signaler := 
			[
			[(Delay forDuration: aDuration) wait.
			timeout := true] ensure: 
						[process terminate.
						sem signal]]
					newProcess.
	process := 
			[signaler resume.
			value := self value.
			signaler terminate.
			sem signal] fork.
	sem wait.
	^timeout ifTrue: [aBlock value] ifFalse: [value]

I could also try your idea later

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants