Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalized Dispatcher #10

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions src/client/Client.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package client;

import concurrentUtils.BoundedBuffer;
import connection.messages.*;
import connection.utils.Connection;
import connection.utils.Message;
import connection.utils.Type;
import server.ClientReader;

import java.io.FileOutputStream;
import java.io.IOException;
Expand All @@ -14,24 +16,30 @@
import java.util.Scanner;

public class Client {

public static void main(String[] args) {
var replyBuffer = new BoundedBuffer<Message>(128);

try (var connection = new Connection(new Socket("localhost", 1337))) {
authenticate(connection);

var clientReader = new Thread(new ClientReader(connection, replyBuffer));
clientReader.start();

var scanner = new Scanner(System.in);
while (true) {
send(connection);
send(connection, scanner.nextLine());
Thread.startVirtualThread(() -> {
try {
receive(connection);
} catch (IOException e) {
handleMessage(replyBuffer.get());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
} catch (IOException | NoSuchElementException e) {
System.out.println("Connection ended.");
} catch (Exception e) {
throw new RuntimeException(e);
var exceptionMessage = e.getMessage();
System.out.println("Connection ended" + (exceptionMessage != null ? " with error: " + exceptionMessage : "."));
}
}

Expand All @@ -47,7 +55,7 @@ private static void authenticate(Connection connection) throws IOException, NoSu
connection.send(new AuthRequest(username, password));

Message message;
while ((message = connection.receive()).type() != Type.AUTH_REPLY);
while ((message = connection.receive()).type() != Type.AUTH_REPLY) ;
var authReply = (AuthReply) message;

if (authReply.success()) {
Expand All @@ -59,9 +67,7 @@ private static void authenticate(Connection connection) throws IOException, NoSu
}
}

private static void send(Connection connection) throws IOException {
var scanner = new Scanner(System.in);
var line = scanner.nextLine();
private static void send(Connection connection, String line) throws IOException {
var tokens = line.split(" ");

switch (tokens[0]) {
Expand All @@ -72,13 +78,14 @@ private static void send(Connection connection) throws IOException {
}
}

private static void receive(Connection connection) throws IOException {
var message = connection.receive();
private static void handleMessage(Message message) {
switch (message.type()) {
case JOB_REPLY_OK -> {
System.out.println("Job finished successfully.");
try (var fos = new FileOutputStream("out.txt")) {
fos.write(((JobReplyOk) message).output());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
case JOB_REPLY_ERROR -> {
Expand Down
2 changes: 1 addition & 1 deletion src/connection/utils/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public void send(Message message) throws IOException {
}

@Override
public void close() throws Exception {
public void close() throws IOException {
socket.close();
inputStream.close();
outputStream.close();
Expand Down
30 changes: 30 additions & 0 deletions src/server/ClientReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package server;

import concurrentUtils.BoundedBuffer;
import connection.utils.Connection;
import connection.utils.Message;

import java.io.IOException;

public class ClientReader implements Runnable {
private final Connection connection;
private final BoundedBuffer<Message> replyBuffer;

public ClientReader(Connection connection, BoundedBuffer<Message> replyBuffer) {
this.connection = connection;
this.replyBuffer = replyBuffer;
}

@Override
public void run() {
while (true) {
try {
replyBuffer.put(connection.receive());
} catch (IOException | InterruptedException e) {
var exceptionMessage = e.getMessage();
System.out.println("Error processing request" + (exceptionMessage != null ? ": " + exceptionMessage : "."));
break;
}
}
}
}
8 changes: 7 additions & 1 deletion src/server/Server.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
package server;

import concurrentUtils.BoundedBuffer;
import concurrentUtils.ThreadPool;

import java.io.IOException;
import java.net.ServerSocket;

public class Server {
public static void main(String[] args) throws IOException {
var auth = new Auth();
var scheduler = new Scheduler(Integer.parseInt(args[0]));
var taskBuffer = new BoundedBuffer<Runnable>(1024);
var threadPool = new ThreadPool(8, taskBuffer);
threadPool.start();
try (var serverSocket = new ServerSocket(1337)) {
while (true) {
var socket = serverSocket.accept();
var session = new Thread(new Session(socket, auth, scheduler));
var session = new Thread(new Session(socket, taskBuffer, auth, scheduler));
session.start();
}
}
Expand Down
33 changes: 20 additions & 13 deletions src/server/Session.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package server;

import concurrentUtils.BoundedBuffer;
import connection.messages.*;
import connection.utils.Connection;
import connection.utils.Message;
Expand All @@ -13,11 +14,13 @@ public class Session implements Runnable {
private final Socket socket;
private final Auth auth;
private final Scheduler scheduler;
private final BoundedBuffer<Runnable> taskBuffer;

public Session(Socket socket, Auth auth, Scheduler scheduler) {
public Session(Socket socket, BoundedBuffer<Runnable> taskBuffer, Auth auth, Scheduler scheduler) {
this.socket = socket;
this.auth = auth;
this.scheduler = scheduler;
this.taskBuffer = taskBuffer;
}

@Override
Expand All @@ -28,24 +31,28 @@ public void run() {
while (true) {
var message = connection.receive();

switch (message.type()) {
case JOB_REQUEST -> connection.send(runJob((JobRequest) message));
default -> System.out.println("Received unknown message type");
}
taskBuffer.put(() -> {
try {
switch (message.type()) {
case JOB_REQUEST -> connection.send(runJob((JobRequest) message));
default -> System.out.println("Received unknown message type");
}
} catch (IOException | InterruptedException e) {
var exceptionMessage = e.getMessage();
System.out.println("Error processing request" + (exceptionMessage != null ? ": " + exceptionMessage : "."));
}
});
}
} catch (IOException e) {
System.out.println("Connection ended.");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
} catch (IOException | InterruptedException e) {
var exceptionMessage = e.getMessage();
System.out.println("Connection ended" + (exceptionMessage != null ? " with error: " + exceptionMessage : "."));
}
}

private void authenticate(Connection connection) throws IOException {
while (true) {
Message message;
while ((message = connection.receive()).type() != Type.AUTH_REQUEST);
while ((message = connection.receive()).type() != Type.AUTH_REQUEST) ;
var authRequest = (AuthRequest) message;

if (auth.authenticate(authRequest.username(), authRequest.password())) {
Expand All @@ -57,7 +64,7 @@ private void authenticate(Connection connection) throws IOException {
connection.send(new AuthReply(true));
}

private Message runJob(JobRequest jobRequest) throws IOException, InterruptedException {
private Message runJob(JobRequest jobRequest) throws InterruptedException {
System.out.println("Running job");
try {
return new JobReplyOk(scheduler.addJob(jobRequest.code()));
Expand Down