Skip to content

Commit

Permalink
Adds ItemPublisher.pause and makes resignation work in Game
Browse files Browse the repository at this point in the history
  • Loading branch information
fathzer committed Nov 14, 2024
1 parent 9e87e46 commit 2aaecae
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 32 deletions.
12 changes: 7 additions & 5 deletions src/main/java/com/fathzer/games/game/Game.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ public class Game<M,B extends MoveGenerator<M>> implements Runnable {
private sealed interface IncomingEvent<T> {}

private static record MoveEvent<T>(T move) implements IncomingEvent<T> {}
private static record ResignationEvent<T>(Color player) implements IncomingEvent<T> {}
private static record DrawProposal<T>(Color player) implements IncomingEvent<T> {}
private static record DrawAcceptance<T>(Color player, boolean accepted) implements IncomingEvent<T> {}
private static record ResignationEvent<T>(Color color) implements IncomingEvent<T> {}
private static record DrawProposal<T>(Color color) implements IncomingEvent<T> {}
private static record DrawAcceptance<T>(Color color, boolean accepted) implements IncomingEvent<T> {}
// public static record PauseEvent(boolean paused) implements IncomingEvent {}
private static record TimeUpEvent<T>() implements IncomingEvent<T> {}

Expand Down Expand Up @@ -57,7 +57,9 @@ public Game(B board, Clock clock, Player<M, B> white, Player<M,B> black) {
throw new IllegalArgumentException("Players can't be null");
}
this.white = white;
white.setResignationMethod(this, () -> addEvent(new ResignationEvent<>(Color.WHITE)));
this.black = black;
black.setResignationMethod(this, () -> addEvent(new ResignationEvent<>(Color.BLACK)));
this.history = new GameHistory<>(board);
this.clock = clock;
if (clock!=null) {
Expand Down Expand Up @@ -112,7 +114,7 @@ void doEvent(IncomingEvent<M> event) {
} else if (event instanceof TimeUpEvent) {
doTimeUp();
} else if (event instanceof ResignationEvent<M> resignation) {
doResignation(resignation.player());
doResignation(resignation.color());
} else {
throw new UnsupportedOperationException(event+" is not yet supported"); //TODO
}
Expand Down Expand Up @@ -172,7 +174,7 @@ private void doTimeUp() {
}

private void doResignation(Color player) {
this.getHistory().earlyEnd(Color.WHITE==player?Status.BLACK_WON:Status.WHITE_WON, TerminationCause.TIME_FORFEIT);
this.getHistory().earlyEnd(Color.WHITE==player?Status.BLACK_WON:Status.WHITE_WON, TerminationCause.ABANDONED);
onEndGame();
}

Expand Down
52 changes: 41 additions & 11 deletions src/main/java/com/fathzer/games/game/ItemPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

public class ItemPublisher<T> implements AutoCloseable, Runnable {
Expand All @@ -18,6 +20,8 @@ default void onComplete(ItemPublisher<T> itemPublisher) {}
private final Queue<T> items;
private final List<ItemListener<T>> subscribers;
private final ExecutorService executor;
private final AtomicBoolean paused;
private final Semaphore pauseLock;
private volatile boolean isClosed = false;
private volatile boolean wasInterrupted;

Expand All @@ -29,6 +33,8 @@ public ItemPublisher(ExecutorService itemProcessor) {
this.items = new LinkedList<>();
this.subscribers = new LinkedList<>();
this.executor = itemProcessor;
this.paused = new AtomicBoolean();
this.pauseLock = new Semaphore(1);
}

public void subscribe(ItemListener<T> subscriber) {
Expand All @@ -53,16 +59,29 @@ public void run() {
item = items.poll();
}
if (item!=null) {
process(item);
pauseLock.acquire();
try {
process(item);
} finally {
pauseLock.release();
}
}
} catch (InterruptedException e) {
// Exit gracefully
this.isClosed = true;
this.items.clear();
close();
Thread.currentThread().interrupt();
doInterrupted(e);
}
}
for (ItemListener<T> sub : subscribers) {
sub.onComplete(this);
}
}

private void doInterrupted(InterruptedException e) {
wasInterrupted = true;
// Exit gracefully
this.isClosed = true;
this.items.clear();
close();
Thread.currentThread().interrupt();
}

private void process(T item) throws InterruptedException {
Expand Down Expand Up @@ -96,15 +115,26 @@ public boolean submit(Collection<T> items) {
}
return true;
}

public boolean pause(boolean pause) {
if (!this.paused.compareAndSet(!pause, pause)) {
return false;
}
if (pause) {
try {
pauseLock.acquire();
} catch (InterruptedException e) {
doInterrupted(e);
}
} else {
pauseLock.release();
}
return true;
}

@Override
public void close() {
this.isClosed = true;
synchronized (subscribers) {
for (ItemListener<T> sub : subscribers) {
sub.onComplete(this);
}
}
synchronized (items) {
items.notifyAll();
}
Expand Down
79 changes: 63 additions & 16 deletions src/test/java/com/fathzer/games/game/GameTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.util.function.Consumer;

import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;

import com.fathzer.games.Color;
import com.fathzer.games.Status;
Expand All @@ -24,6 +23,13 @@
import com.fathzer.jchess.settings.Settings.Variant;

class GameTest {
private static final CoordinatesSystem CS;

static {
Board<Move> board = Variant.STANDARD.getRules().apply(null);
CS = board.getCoordinatesSystem();
}

private static class EventCounter implements Consumer<Game<Move, Board<Move>>>, BiConsumer<Game<Move, Board<Move>>, Move> {
private int moveCounter, endCounter;
@Override
Expand All @@ -36,15 +42,16 @@ public void accept(Game<Move, Board<Move>> game) {
endCounter++;
}
}


private static MoveAction mv(String from, String to) {
return new MoveAction(new BasicMove(CS.getIndex(from), CS.getIndex(to)));
}

@Test
void testNoClock() throws InterruptedException {
LoggerFactory.getLogger(GameTest.class).debug("Here we are");

Board<Move> board = Variant.STANDARD.getRules().apply(null);
CoordinatesSystem cs = board.getCoordinatesSystem();
TestPlayer white = new TestPlayer(Color.WHITE, Arrays.asList(new BasicMove(cs.getIndex("e2"), cs.getIndex("e4")), new BasicMove(cs.getIndex("f1"), cs.getIndex("c4")), new BasicMove(cs.getIndex("d1"), cs.getIndex("h5")), new BasicMove(cs.getIndex("h5"), cs.getIndex("f7"))));
TestPlayer black = new TestPlayer(Color.BLACK, Arrays.asList(new BasicMove(cs.getIndex("e7"), cs.getIndex("e5")), new BasicMove(cs.getIndex("f8"), cs.getIndex("c5")), new BasicMove(cs.getIndex("b8"), cs.getIndex("c6"))));
TestPlayer white = new TestPlayer(Color.WHITE, Arrays.asList(mv("e2","e4"), mv("f1","c4"), mv("d1","h5"), mv("h5","f7")));
TestPlayer black = new TestPlayer(Color.BLACK, Arrays.asList(mv("e7","e5"), mv("f8","c5"), mv("b8", "c6")));
final Game<Move, Board<Move>> game = new Game<>(board, null, white, black);
assertThrows(IllegalStateException.class, () -> game.setStartClockAfterFirstMove(true));
assertTrue(game.isPaused());
Expand All @@ -63,13 +70,38 @@ void testNoClock() throws InterruptedException {
assertEquals(7, counter.moveCounter);
}

@Test
void testResignation() throws InterruptedException {
Board<Move> board = Variant.STANDARD.getRules().apply(null);
TestPlayer white = new TestPlayer(Color.WHITE, Arrays.asList(mv("e2","e4"), mv("f1","c4"), new ResignAction()));
TestPlayer black = new TestPlayer(Color.BLACK, Arrays.asList(mv("e7","e5"), mv("f8","c5"), mv("b8", "c6")));
final Game<Move, Board<Move>> game = new Game<>(board, null, white, black);
final EventCounter counter = new EventCounter();
game.addMoveListener(counter);
game.addEndGameListener(counter);
Thread gameThread = new Thread(game);
gameThread.start();
gameThread.join();
assertTrue(game.isEnded());
assertEquals(Status.BLACK_WON, game.getHistory().getStatus());
assertEquals(TerminationCause.ABANDONED, game.getHistory().getTerminationCause());
assertFalse(white.errorOccured);
assertFalse(black.errorOccured);
assertEquals(1, counter.endCounter);
assertEquals(4, counter.moveCounter);
}


@Test
void test() throws InterruptedException {
void testDrawNegociation() throws InterruptedException {
fail("Not yet implemented");
}

@Test
void testTimeForfeit() throws InterruptedException {
Board<Move> board = Variant.STANDARD.getRules().apply(null);
CoordinatesSystem cs = board.getCoordinatesSystem();
TestPlayer white = new TestPlayer(Color.WHITE, Arrays.asList(new BasicMove(cs.getIndex("e2"), cs.getIndex("e4")), new BasicMove(cs.getIndex("f1"), cs.getIndex("c4")), new BasicMove(cs.getIndex("d1"), cs.getIndex("h5")), new BasicMove(cs.getIndex("h5"), cs.getIndex("f7"))));
TestPlayer black = new TestPlayer(Color.BLACK, Arrays.asList(new BasicMove(cs.getIndex("e7"), cs.getIndex("e5")), new BasicMove(cs.getIndex("f8"), cs.getIndex("c5")), new BasicMove(cs.getIndex("b8"), cs.getIndex("c6"))));
TestPlayer white = new TestPlayer(Color.WHITE, Arrays.asList(mv("e2","e4"), mv("f1","c4"), mv("d1","h5"), mv("h5","f7")));
TestPlayer black = new TestPlayer(Color.BLACK, Arrays.asList(mv("e7","e5"), mv("f8","c5"), mv("b8", "c6")));
black.thinkTime = 550;

ClockSettings settings = new ClockSettings(1);
Expand All @@ -93,17 +125,26 @@ void test() throws InterruptedException {
assertEquals(3, counter.moveCounter);
}

private sealed interface Action {}
private record MoveAction(Move move) implements Action{}
private record ResignAction() implements Action{}

private static final class TestPlayer implements Player<Move, Board<Move>> {
private final Queue<Move> moves;
private final Queue<Action> actions;
private final Color color;
private boolean errorOccured;
private Thread requestThread;
private long thinkTime = 0;
private Runnable resignation;

TestPlayer(Color color, List<Move> moves) {
TestPlayer(Color color, List<Action> actions) {
this.color = color;
this.moves = new LinkedList<>(moves);
this.actions = new LinkedList<>(actions);
}

@Override
public void setResignationMethod(Game<Move, Board<Move>> game, Runnable resignation) {
this.resignation = resignation;
}

@Override
Expand All @@ -113,7 +154,14 @@ public void requestMove(Game<Move, Board<Move>> game, Consumer<Move> callBack) {
if (thinkTime>0) {
Thread.sleep(thinkTime);
}
callBack.accept(moves.poll());
final Action action = actions.poll();
if (action instanceof MoveAction mv) {
callBack.accept(mv.move);
} else if (action instanceof ResignAction) {
resignation.run();
} else {
throw new UnsupportedOperationException();
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
Expand All @@ -131,5 +179,4 @@ private void join() throws InterruptedException {
}
}
}

}
106 changes: 106 additions & 0 deletions src/test/java/com/fathzer/games/game/ItemPublisherTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package com.fathzer.games.game;

import static org.junit.jupiter.api.Assertions.*;

import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executors;

import org.junit.jupiter.api.Test;

import com.fathzer.games.game.ItemPublisher.ItemListener;
import com.fathzer.games.util.exec.CustomThreadFactory;

class ItemPublisherTest {
private static class MyListener implements ItemListener<Long> {
private volatile boolean subscribed;
private volatile boolean completed;
private final List<Long> items = new LinkedList<>();

@Override
public void onSubscribe(ItemPublisher<Long> itemPublisher) {
subscribed = true;
}

@Override
public void onComplete(ItemPublisher<Long> itemPublisher) {
completed = true;
}

@Override
public void accept(Long t) {
if (subscribed && !completed) {
items.add(t);
}
}
}

@Test
void test() throws InterruptedException {
final ItemPublisher<Long> pub = new ItemPublisher<>(Executors.newFixedThreadPool(2, new CustomThreadFactory(()->"publishWorker", true)));
assertFalse(pub.wasInterrupted());
final Thread pubThread = new Thread(pub);
final MyListener listener = new MyListener();
final MyListener other = new MyListener();
assertFalse(listener.subscribed);
assertFalse(listener.completed);
assertTrue(listener.items.isEmpty());
pub.subscribe(listener);
pub.subscribe(other);
pubThread.start();
assertTrue(listener.subscribed);
assertFalse(listener.completed);
assertTrue(listener.items.isEmpty());
assertTrue(pub.submit(Arrays.asList(1L,2L)));
final List<Long> expected = new LinkedList<>();
Thread.sleep(100);
assertFalse(listener.completed);
expected.addAll(Arrays.asList(1L, 2L));
assertEquals(expected, listener.items);
assertEquals(expected, other.items);
assertFalse(pub.pause(false));
// Pause the publisher
assertTrue(pub.pause(true));
assertFalse(pub.pause(true));
assertTrue(pub.submit(Arrays.asList(3L)));
assertTrue(pub.submit(Arrays.asList(4L)));
Thread.sleep(100);
// Nothing received despite event are published
assertEquals(expected, listener.items);
assertEquals(expected, other.items);
// Restart the publisher
assertTrue(pub.pause(false));
Thread.sleep(100);
// Items received during pause should now be received
expected.addAll(Arrays.asList(3L, 4L));
assertEquals(expected, listener.items);
assertEquals(expected, other.items);
// Pause again and publish an item
assertTrue(pub.pause(true));
assertTrue(pub.submit(Arrays.asList(5L)));
// close the paused publisher
pub.close();
Thread.sleep(100);
// listeners should receive nothing
assertEquals(expected, listener.items);
assertEquals(expected, other.items);
assertFalse(listener.completed);
assertFalse(other.completed);
// Publish one more item that should be ignored
assertFalse(pub.submit(Arrays.asList(6L)));
expected.add(5L);
// Restart the publisher
assertTrue(pub.pause(false));
Thread.sleep(100);
// Check 5 was received and not 6
assertEquals(expected, listener.items);
assertEquals(expected, other.items);
// Check complete event was received
assertTrue(listener.completed);
assertTrue(other.completed);
// Check the publisher thread was ended
assertFalse(pubThread.isAlive());
}

}

0 comments on commit 2aaecae

Please sign in to comment.