From 256e85f16e8193dd0de67b0cde0907680d3cb705 Mon Sep 17 00:00:00 2001 From: carlos Date: Fri, 1 Dec 2023 16:48:46 +0000 Subject: [PATCH 01/15] Add Auth class to handle authentication requests concurrenty --- src/{ => client}/Client.java | 0 src/server/Auth.java | 33 +++++++++++++++++++++++++++++++++ src/{ => server}/Server.java | 2 ++ src/server/Session.java | 2 ++ 4 files changed, 37 insertions(+) rename src/{ => client}/Client.java (100%) create mode 100644 src/server/Auth.java rename src/{ => server}/Server.java (95%) create mode 100644 src/server/Session.java diff --git a/src/Client.java b/src/client/Client.java similarity index 100% rename from src/Client.java rename to src/client/Client.java diff --git a/src/server/Auth.java b/src/server/Auth.java new file mode 100644 index 0000000..8539bdd --- /dev/null +++ b/src/server/Auth.java @@ -0,0 +1,33 @@ +package server; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class Auth { + private final Map registry = new HashMap<>(); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final Lock readLock = lock.readLock(); + private final Lock writeLock = lock.writeLock(); + + public boolean authenticate(String username, String password) { + String storedPassword; + readLock.lock(); + try { + storedPassword = registry.get(username); + } finally { + readLock.unlock(); + } + if (storedPassword != null) { + return storedPassword.equals(password); + } + writeLock.lock(); + try { + registry.put(username, password); + } finally { + writeLock.unlock(); + } + return true; + } +} diff --git a/src/Server.java b/src/server/Server.java similarity index 95% rename from src/Server.java rename to src/server/Server.java index 2fe917a..1a6e8de 100644 --- a/src/Server.java +++ b/src/server/Server.java @@ -14,6 +14,8 @@ public static void main(String[] args) throws IOException { try (var serverSocket = new ServerSocket(1337)) { while (true) { var socket = serverSocket.accept(); + var session = new Session(socket); + session.start(); var in = new DataInputStream(socket.getInputStream()); var out = new DataOutputStream(socket.getOutputStream()); diff --git a/src/server/Session.java b/src/server/Session.java new file mode 100644 index 0000000..5116e07 --- /dev/null +++ b/src/server/Session.java @@ -0,0 +1,2 @@ +package server;public class Session { +} From c2e348d0195bca0b45b0e8da8ee81f9c2c8c117b Mon Sep 17 00:00:00 2001 From: carlos Date: Fri, 1 Dec 2023 16:50:47 +0000 Subject: [PATCH 02/15] Make server session code concurrent and move it to Session class --- src/server/Server.java | 34 ++++------------------------ src/server/Session.java | 50 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 53 insertions(+), 31 deletions(-) diff --git a/src/server/Server.java b/src/server/Server.java index 1a6e8de..57d4d33 100644 --- a/src/server/Server.java +++ b/src/server/Server.java @@ -1,3 +1,5 @@ +package server; + import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -10,40 +12,12 @@ public class Server { public static void main(String[] args) throws IOException { - HashMap auth = new HashMap<>(); + var auth = new Auth(); try (var serverSocket = new ServerSocket(1337)) { while (true) { var socket = serverSocket.accept(); - var session = new Session(socket); + var session = new Thread(new Session(socket, auth)); session.start(); - var in = new DataInputStream(socket.getInputStream()); - var out = new DataOutputStream(socket.getOutputStream()); - - while (true) { - while (Type.deserialize(in) != Type.AUTH_REQUEST) { - in.readAllBytes(); - } - - var login = AuthRequest.deserialize(in); - var username = login.username(); - var password = login.password(); - - if (auth.containsKey(username)) { - if (auth.get(username).equals(password)) { - System.out.println("User " + username + " logged in."); - new AuthReply(true).serialize(out); - break; - } else { - System.out.println("User " + username + " failed to log in."); - new AuthReply(false).serialize(out); - } - } else { - auth.put(username, password); - System.out.println("User " + username + " registered."); - new AuthReply(true).serialize(out); - break; - } - } } } } diff --git a/src/server/Session.java b/src/server/Session.java index 5116e07..c36bbcd 100644 --- a/src/server/Session.java +++ b/src/server/Session.java @@ -1,2 +1,50 @@ -package server;public class Session { +package server; + +import messages.AuthReply; +import messages.AuthRequest; +import messages.Type; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.Socket; + +public class Session implements Runnable { + private final Socket socket; + private final Auth auth; + + public Session(Socket socket, Auth auth) { + this.socket = socket; + this.auth = auth; + } + + @Override + public void run() { + try (var in = new DataInputStream(socket.getInputStream()); + var out = new DataOutputStream(socket.getOutputStream())) { + authenticate(in, out); + } catch (IOException e) { + // throw new RuntimeException(e); + System.out.println("Failed to authenticate."); + } + } + + private void authenticate(DataInputStream in, DataOutputStream out) throws IOException { + while (true) { + while (Type.deserialize(in) != Type.AUTH_REQUEST) { + in.readAllBytes(); // this seems problematic + } + + var login = AuthRequest.deserialize(in); + var username = login.username(); + var password = login.password(); + + if (auth.authenticate(username, password)) { + break; + } else { + new AuthReply(false).serialize(out); + } + } + new AuthReply(true).serialize(out); + } } From 0c05f7a069c20ff8080ee1a720856f1999007bb4 Mon Sep 17 00:00:00 2001 From: carlos Date: Fri, 1 Dec 2023 16:53:50 +0000 Subject: [PATCH 03/15] Move Client authentication code to authenticate method --- src/client/Client.java | 60 ++++++++++++++++++++++++------------------ 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/src/client/Client.java b/src/client/Client.java index e238f1f..fde3277 100644 --- a/src/client/Client.java +++ b/src/client/Client.java @@ -1,3 +1,5 @@ +package client; + import messages.AuthReply; import messages.AuthRequest; import messages.Type; @@ -6,37 +8,43 @@ import java.io.DataOutputStream; import java.io.IOException; import java.net.Socket; +import java.util.NoSuchElementException; import java.util.Scanner; public class Client { public static void main(String[] args) { - try (var socket = new Socket("localhost", 1337)) { - var in = new DataInputStream(socket.getInputStream()); - var out = new DataOutputStream(socket.getOutputStream()); - var scanner = new Scanner(System.in); - - while (true) { - System.out.print("Enter username: "); - var username = scanner.nextLine(); - System.out.print("Enter password: "); - var password = scanner.nextLine(); - - new AuthRequest(username, password).serialize(out); - - while (Type.deserialize(in) != messages.Type.AUTH_REPLY) { - in.readAllBytes(); - } - - var authReply = AuthReply.deserialize(in); - if (authReply.success()) { - System.out.println("Login successful."); - break; - } else { - System.out.println("Failed to login."); - } + try (var socket = new Socket("localhost", 1337); + var in = new DataInputStream(socket.getInputStream()); + var out = new DataOutputStream(socket.getOutputStream())) { + authenticate(in, out); + } catch (IOException | NoSuchElementException e) { + // throw new RuntimeException(e); + System.out.println("Failed to connect to server."); + } + } + + private static void authenticate(DataInputStream in, DataOutputStream out) throws IOException, NoSuchElementException { + var scanner = new Scanner(System.in); + + while (true) { + System.out.print("Enter username: "); + var username = scanner.nextLine(); + System.out.print("Enter password: "); + var password = scanner.nextLine(); + + new AuthRequest(username, password).serialize(out); + + while (Type.deserialize(in) != messages.Type.AUTH_REPLY) { + in.readAllBytes(); + } + + var authReply = AuthReply.deserialize(in); + if (authReply.success()) { + System.out.println("Login successful."); + break; + } else { + System.out.println("Failed to login."); } - } catch (IOException e) { - throw new RuntimeException(e); } } } From 63ea97cd07f82cea16009323139f77de7587be50 Mon Sep 17 00:00:00 2001 From: carlos Date: Fri, 1 Dec 2023 17:21:23 +0000 Subject: [PATCH 04/15] Change type of lock to ReadWriteLock --- src/server/Auth.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/server/Auth.java b/src/server/Auth.java index 8539bdd..159562d 100644 --- a/src/server/Auth.java +++ b/src/server/Auth.java @@ -3,11 +3,12 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class Auth { private final Map registry = new HashMap<>(); - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final Lock readLock = lock.readLock(); private final Lock writeLock = lock.writeLock(); From 256142d9b1127ac6cf266d6080fc5a61f3ddcac3 Mon Sep 17 00:00:00 2001 From: carlos Date: Sat, 2 Dec 2023 19:44:15 +0000 Subject: [PATCH 05/15] Add job execution in server --- src/server/Scheduler.java | 26 ++++++++++++++++++++++++++ src/server/Server.java | 7 ------- src/server/Session.java | 21 ++++++++++++++++++--- 3 files changed, 44 insertions(+), 10 deletions(-) create mode 100644 src/server/Scheduler.java diff --git a/src/server/Scheduler.java b/src/server/Scheduler.java new file mode 100644 index 0000000..0ef88bf --- /dev/null +++ b/src/server/Scheduler.java @@ -0,0 +1,26 @@ +package server; + +public class Status { + private final int memoryCapacity; + private int memoryInUse = 0; + private int pendingJobs = 0; + + + public Status(int memoryCapacity) { + this.memoryCapacity = memoryCapacity; + } + + public void addJob(byte[] job) { + if (memoryInUse + job.length > memoryCapacity) { + } + else { + memoryInUse += job.length; + pendingJobs++; + } + } + + public void removeJob(byte[] job) { + memoryInUse -= job.length; + pendingJobs--; + } +} diff --git a/src/server/Server.java b/src/server/Server.java index 57d4d33..8195744 100644 --- a/src/server/Server.java +++ b/src/server/Server.java @@ -1,14 +1,7 @@ package server; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.net.ServerSocket; -import java.util.HashMap; - -import messages.AuthReply; -import messages.AuthRequest; -import messages.Type; public class Server { public static void main(String[] args) throws IOException { diff --git a/src/server/Session.java b/src/server/Session.java index c36bbcd..5cd847a 100644 --- a/src/server/Session.java +++ b/src/server/Session.java @@ -1,14 +1,15 @@ package server; -import messages.AuthReply; -import messages.AuthRequest; -import messages.Type; +import messages.*; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.Socket; +import sd23.JobFunction; +import sd23.JobFunctionException; + public class Session implements Runnable { private final Socket socket; private final Auth auth; @@ -23,6 +24,20 @@ public void run() { try (var in = new DataInputStream(socket.getInputStream()); var out = new DataOutputStream(socket.getOutputStream())) { authenticate(in, out); + while (true) { + switch (Type.deserialize(in)) { + case JOB_REQUEST -> { + var jobRequest = JobRequest.deserialize(in); + System.out.println("Received job:\n" + new String(jobRequest.code())); + try { + new JobReplyOk(JobFunction.execute(jobRequest.code())).serialize(out); + } catch (JobFunctionException e) { + new JobReplyError(e.getCode(), e.getMessage()).serialize(out); + } + } + default -> System.out.println("Received unknown message type"); + } + } } catch (IOException e) { // throw new RuntimeException(e); System.out.println("Failed to authenticate."); From f0208f4fa61bfd1c02f5893063497e6851f75c77 Mon Sep 17 00:00:00 2001 From: carlos Date: Sat, 2 Dec 2023 19:44:56 +0000 Subject: [PATCH 06/15] Add job execution request to client --- src/client/Client.java | 53 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 48 insertions(+), 5 deletions(-) diff --git a/src/client/Client.java b/src/client/Client.java index fde3277..ed4117f 100644 --- a/src/client/Client.java +++ b/src/client/Client.java @@ -1,13 +1,14 @@ package client; -import messages.AuthReply; -import messages.AuthRequest; -import messages.Type; +import messages.*; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.net.Socket; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.NoSuchElementException; import java.util.Scanner; @@ -17,9 +18,20 @@ public static void main(String[] args) { var in = new DataInputStream(socket.getInputStream()); var out = new DataOutputStream(socket.getOutputStream())) { authenticate(in, out); + + while (true) { + send(out); + Thread.startVirtualThread(() -> { + try { + receive(in); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } } catch (IOException | NoSuchElementException e) { // throw new RuntimeException(e); - System.out.println("Failed to connect to server."); + System.out.println("Connection ended."); } } @@ -34,7 +46,7 @@ private static void authenticate(DataInputStream in, DataOutputStream out) throw new AuthRequest(username, password).serialize(out); - while (Type.deserialize(in) != messages.Type.AUTH_REPLY) { + while (Type.deserialize(in) != Type.AUTH_REPLY) { in.readAllBytes(); } @@ -47,4 +59,35 @@ private static void authenticate(DataInputStream in, DataOutputStream out) throw } } } + + private static void send(DataOutputStream out) throws IOException { + var scanner = new Scanner(System.in); + var line = scanner.nextLine(); + var tokens = line.split(" "); + + switch (tokens[0]) { + case "exec" -> { + var job = Files.readAllBytes(Path.of(tokens[1])); + new JobRequest(job).serialize(out); + } + default -> System.out.println("Unknown command"); + } + } + + private static void receive(DataInputStream in) throws IOException { + switch (Type.deserialize(in)) { + case JOB_REPLY_OK -> { + var jobReplyOk = JobReplyOk.deserialize(in); + var fileOut = "here.txt"; + try (var fos = new FileOutputStream(fileOut)) { + fos.write(jobReplyOk.output()); + } + } + case JOB_REPLY_ERROR -> { + var jobReplyError = JobReplyError.deserialize(in); + System.out.println("Job failed.\n\tCode: " + jobReplyError.code() + "\n\tMessage: " + jobReplyError.message()); + } + default -> System.out.println("Received unknown message type"); + } + } } From 300469d0bbc7a7cb2201e18f50eff8a7ba03a8b6 Mon Sep 17 00:00:00 2001 From: carlos Date: Sat, 2 Dec 2023 19:45:52 +0000 Subject: [PATCH 07/15] Update SD.iml with the sd23.jar dependency --- SD.iml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/SD.iml b/SD.iml index c90834f..500bb9c 100644 --- a/SD.iml +++ b/SD.iml @@ -7,5 +7,14 @@ + + + + + + + + + \ No newline at end of file From c92782f0e00af1536bcc41d7166180d287fefd4b Mon Sep 17 00:00:00 2001 From: carlos Date: Sat, 2 Dec 2023 21:35:15 +0000 Subject: [PATCH 08/15] Remove unrelated class --- src/server/Scheduler.java | 26 -------------------------- 1 file changed, 26 deletions(-) delete mode 100644 src/server/Scheduler.java diff --git a/src/server/Scheduler.java b/src/server/Scheduler.java deleted file mode 100644 index 0ef88bf..0000000 --- a/src/server/Scheduler.java +++ /dev/null @@ -1,26 +0,0 @@ -package server; - -public class Status { - private final int memoryCapacity; - private int memoryInUse = 0; - private int pendingJobs = 0; - - - public Status(int memoryCapacity) { - this.memoryCapacity = memoryCapacity; - } - - public void addJob(byte[] job) { - if (memoryInUse + job.length > memoryCapacity) { - } - else { - memoryInUse += job.length; - pendingJobs++; - } - } - - public void removeJob(byte[] job) { - memoryInUse -= job.length; - pendingJobs--; - } -} From 296bf414f7eedf953f42c261a30f62c566f4ea93 Mon Sep 17 00:00:00 2001 From: carlos Date: Sat, 2 Dec 2023 23:11:08 +0000 Subject: [PATCH 09/15] Add scheduler to job execution --- src/server/Scheduler.java | 73 +++++++++++++++++++++++++++++++++++++++ src/server/Server.java | 3 +- src/server/Session.java | 45 ++++++++++++++++-------- 3 files changed, 106 insertions(+), 15 deletions(-) create mode 100644 src/server/Scheduler.java diff --git a/src/server/Scheduler.java b/src/server/Scheduler.java new file mode 100644 index 0000000..9665dce --- /dev/null +++ b/src/server/Scheduler.java @@ -0,0 +1,73 @@ +package server; + +import sd23.JobFunction; +import sd23.JobFunctionException; + +import java.util.Optional; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class Scheduler { + private final int memoryCapacity; + private final Lock lock = new ReentrantLock(); + private final Condition cond = lock.newCondition(); + private long turn = 0; + private long nextTicket = 0; + private int memoryInUse = 0; + private int pendingJobs = 0; + + public Scheduler(int memoryCapacity) { + this.memoryCapacity = memoryCapacity; + } + + public Optional addJob(byte[] job) throws InterruptedException, JobFunctionException { + if (job.length > memoryCapacity) { + return Optional.empty(); + } + + lock.lock(); + try { + var ticket = nextTicket++; + while (memoryInUse + job.length > memoryCapacity || ticket > turn) { + System.out.println("suspend: " + Thread.currentThread().threadId()); + cond.await(); + } + turn++; + memoryInUse += job.length; + pendingJobs++; + } finally { + lock.unlock(); + } + + try { + return Optional.of(JobFunction.execute(job)); + } finally { + lock.lock(); + try { + memoryInUse -= job.length; + pendingJobs--; + cond.signalAll(); + } finally { + lock.unlock(); + } + } + } + + public int getPendingJobs() { + try { + lock.lock(); + return pendingJobs; + } finally { + lock.unlock(); + } + } + + public int getMemoryInUse() { + try { + return memoryInUse; + } finally { + lock.unlock(); + } + } +} diff --git a/src/server/Server.java b/src/server/Server.java index 8195744..d0a664d 100644 --- a/src/server/Server.java +++ b/src/server/Server.java @@ -6,10 +6,11 @@ public class Server { public static void main(String[] args) throws IOException { var auth = new Auth(); + var scheduler = new Scheduler(Integer.parseInt(args[0])); try (var serverSocket = new ServerSocket(1337)) { while (true) { var socket = serverSocket.accept(); - var session = new Thread(new Session(socket, auth)); + var session = new Thread(new Session(socket, auth, scheduler)); session.start(); } } diff --git a/src/server/Session.java b/src/server/Session.java index 5cd847a..391d3ce 100644 --- a/src/server/Session.java +++ b/src/server/Session.java @@ -1,22 +1,22 @@ package server; import messages.*; +import sd23.JobFunctionException; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.Socket; -import sd23.JobFunction; -import sd23.JobFunctionException; - public class Session implements Runnable { private final Socket socket; private final Auth auth; + private final Scheduler scheduler; - public Session(Socket socket, Auth auth) { + public Session(Socket socket, Auth auth, Scheduler scheduler) { this.socket = socket; this.auth = auth; + this.scheduler = scheduler; } @Override @@ -24,23 +24,18 @@ public void run() { try (var in = new DataInputStream(socket.getInputStream()); var out = new DataOutputStream(socket.getOutputStream())) { authenticate(in, out); + while (true) { switch (Type.deserialize(in)) { - case JOB_REQUEST -> { - var jobRequest = JobRequest.deserialize(in); - System.out.println("Received job:\n" + new String(jobRequest.code())); - try { - new JobReplyOk(JobFunction.execute(jobRequest.code())).serialize(out); - } catch (JobFunctionException e) { - new JobReplyError(e.getCode(), e.getMessage()).serialize(out); - } - } + case JOB_REQUEST -> job(in, out); default -> System.out.println("Received unknown message type"); } } } catch (IOException e) { // throw new RuntimeException(e); - System.out.println("Failed to authenticate."); + System.out.println("Connection ended."); + } catch (InterruptedException e) { + throw new RuntimeException(e); } } @@ -62,4 +57,26 @@ private void authenticate(DataInputStream in, DataOutputStream out) throws IOExc } new AuthReply(true).serialize(out); } + + private void job(DataInputStream in, DataOutputStream out) throws IOException, InterruptedException { + var jobRequest = JobRequest.deserialize(in); + System.out.println("Received job:\n" + new String(jobRequest.code())); + try { + scheduler.addJob(jobRequest.code()).ifPresentOrElse(jobReply -> { + try { + new JobReplyOk(jobReply).serialize(out); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, () -> { + try { + new JobReplyError(0, "Not enough memory").serialize(out); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } catch (JobFunctionException e) { + new JobReplyError(e.getCode(), e.getMessage()).serialize(out); + } + } } From be9d613864a5477e8426a5feeee3d4400521085d Mon Sep 17 00:00:00 2001 From: carlos Date: Sun, 3 Dec 2023 00:01:31 +0000 Subject: [PATCH 10/15] Change addJob from returning Optional to throwing exception --- src/server/JobTooBigException.java | 4 ++++ src/server/Scheduler.java | 6 +++--- src/server/Session.java | 22 ++++------------------ 3 files changed, 11 insertions(+), 21 deletions(-) create mode 100644 src/server/JobTooBigException.java diff --git a/src/server/JobTooBigException.java b/src/server/JobTooBigException.java new file mode 100644 index 0000000..a332ad7 --- /dev/null +++ b/src/server/JobTooBigException.java @@ -0,0 +1,4 @@ +package server; + +public class JobTooBigException extends Exception { +} diff --git a/src/server/Scheduler.java b/src/server/Scheduler.java index 9665dce..eefdc89 100644 --- a/src/server/Scheduler.java +++ b/src/server/Scheduler.java @@ -21,9 +21,9 @@ public Scheduler(int memoryCapacity) { this.memoryCapacity = memoryCapacity; } - public Optional addJob(byte[] job) throws InterruptedException, JobFunctionException { + public byte[] addJob(byte[] job) throws InterruptedException, JobFunctionException, JobTooBigException { if (job.length > memoryCapacity) { - return Optional.empty(); + throw new JobTooBigException(); } lock.lock(); @@ -41,7 +41,7 @@ public Optional addJob(byte[] job) throws InterruptedException, JobFunct } try { - return Optional.of(JobFunction.execute(job)); + return JobFunction.execute(job); } finally { lock.lock(); try { diff --git a/src/server/Session.java b/src/server/Session.java index 391d3ce..4cf6333 100644 --- a/src/server/Session.java +++ b/src/server/Session.java @@ -46,10 +46,8 @@ private void authenticate(DataInputStream in, DataOutputStream out) throws IOExc } var login = AuthRequest.deserialize(in); - var username = login.username(); - var password = login.password(); - if (auth.authenticate(username, password)) { + if (auth.authenticate(login.username(), login.password())) { break; } else { new AuthReply(false).serialize(out); @@ -59,22 +57,10 @@ private void authenticate(DataInputStream in, DataOutputStream out) throws IOExc } private void job(DataInputStream in, DataOutputStream out) throws IOException, InterruptedException { - var jobRequest = JobRequest.deserialize(in); - System.out.println("Received job:\n" + new String(jobRequest.code())); try { - scheduler.addJob(jobRequest.code()).ifPresentOrElse(jobReply -> { - try { - new JobReplyOk(jobReply).serialize(out); - } catch (IOException e) { - throw new RuntimeException(e); - } - }, () -> { - try { - new JobReplyError(0, "Not enough memory").serialize(out); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + new JobReplyOk(scheduler.addJob(JobRequest.deserialize(in).code())).serialize(out); + } catch (JobTooBigException e) { + new JobReplyError(0, "Not enough memory").serialize(out); } catch (JobFunctionException e) { new JobReplyError(e.getCode(), e.getMessage()).serialize(out); } From 71a5a0473ce79fd783bdb312e02b93aba8448f7d Mon Sep 17 00:00:00 2001 From: carlos Date: Fri, 8 Dec 2023 18:51:21 +0000 Subject: [PATCH 11/15] Encapsulate messages and their types in a Message class --- src/client/Client.java | 29 +++++---- src/{ => connection}/messages/AuthReply.java | 13 +++- .../messages/AuthRequest.java | 13 +++- .../messages/JobReplyError.java | 13 +++- src/{ => connection}/messages/JobReplyOk.java | 13 +++- src/{ => connection}/messages/JobRequest.java | 13 +++- .../messages/StatusReply.java | 13 +++- src/connection/messages/StatusRequest.java | 24 +++++++ src/connection/utils/Message.java | 23 +++++++ src/connection/utils/Payload.java | 9 +++ .../utils}/Serializable.java | 2 +- src/connection/utils/Type.java | 62 +++++++++++++++++++ src/messages/StatusRequest.java | 11 ---- src/messages/Type.java | 24 ------- src/server/Session.java | 35 ++++++----- 15 files changed, 212 insertions(+), 85 deletions(-) rename src/{ => connection}/messages/AuthReply.java (62%) rename src/{ => connection}/messages/AuthRequest.java (73%) rename src/{ => connection}/messages/JobReplyError.java (73%) rename src/{ => connection}/messages/JobReplyOk.java (67%) rename src/{ => connection}/messages/JobRequest.java (67%) rename src/{ => connection}/messages/StatusReply.java (74%) create mode 100644 src/connection/messages/StatusRequest.java create mode 100644 src/connection/utils/Message.java create mode 100644 src/connection/utils/Payload.java rename src/{messages => connection/utils}/Serializable.java (85%) create mode 100644 src/connection/utils/Type.java delete mode 100644 src/messages/StatusRequest.java delete mode 100644 src/messages/Type.java diff --git a/src/client/Client.java b/src/client/Client.java index ed4117f..a72a34a 100644 --- a/src/client/Client.java +++ b/src/client/Client.java @@ -1,6 +1,8 @@ package client; -import messages.*; +import connection.messages.*; +import connection.utils.Message; +import connection.utils.Type; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -30,7 +32,6 @@ public static void main(String[] args) { }); } } catch (IOException | NoSuchElementException e) { - // throw new RuntimeException(e); System.out.println("Connection ended."); } } @@ -44,13 +45,12 @@ private static void authenticate(DataInputStream in, DataOutputStream out) throw System.out.print("Enter password: "); var password = scanner.nextLine(); - new AuthRequest(username, password).serialize(out); + new Message(new AuthRequest(username, password)).send(out); - while (Type.deserialize(in) != Type.AUTH_REPLY) { - in.readAllBytes(); - } + Message message; + while ((message = Message.receive(in)).type() != Type.AUTH_REPLY); + var authReply = (AuthReply) message.payload(); - var authReply = AuthReply.deserialize(in); if (authReply.success()) { System.out.println("Login successful."); break; @@ -67,24 +67,23 @@ private static void send(DataOutputStream out) throws IOException { switch (tokens[0]) { case "exec" -> { - var job = Files.readAllBytes(Path.of(tokens[1])); - new JobRequest(job).serialize(out); + new Message(new JobRequest(Files.readAllBytes(Path.of(tokens[1])))).send(out); } default -> System.out.println("Unknown command"); } } private static void receive(DataInputStream in) throws IOException { - switch (Type.deserialize(in)) { + var message = Message.receive(in); + switch (message.type()) { case JOB_REPLY_OK -> { - var jobReplyOk = JobReplyOk.deserialize(in); - var fileOut = "here.txt"; - try (var fos = new FileOutputStream(fileOut)) { - fos.write(jobReplyOk.output()); + System.out.println("Job finished successfully."); + try (var fos = new FileOutputStream("out.txt")) { + fos.write(((JobReplyOk) message.payload()).output()); } } case JOB_REPLY_ERROR -> { - var jobReplyError = JobReplyError.deserialize(in); + var jobReplyError = (JobReplyError) message.payload(); System.out.println("Job failed.\n\tCode: " + jobReplyError.code() + "\n\tMessage: " + jobReplyError.message()); } default -> System.out.println("Received unknown message type"); diff --git a/src/messages/AuthReply.java b/src/connection/messages/AuthReply.java similarity index 62% rename from src/messages/AuthReply.java rename to src/connection/messages/AuthReply.java index 4ebbf5d..1ad6120 100644 --- a/src/messages/AuthReply.java +++ b/src/connection/messages/AuthReply.java @@ -1,19 +1,26 @@ -package messages; +package connection.messages; + +import connection.utils.Payload; +import connection.utils.Type; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -public record AuthReply(boolean success) implements Serializable { +public record AuthReply(boolean success) implements Payload { public static AuthReply deserialize(DataInputStream in) throws IOException { boolean success = in.readBoolean(); return new AuthReply(success); } + @Override + public Type getType() { + return Type.AUTH_REPLY; + } + @Override public void serialize(DataOutputStream out) throws IOException { - Type.AUTH_REPLY.serialize(out); out.writeBoolean(success); } } diff --git a/src/messages/AuthRequest.java b/src/connection/messages/AuthRequest.java similarity index 73% rename from src/messages/AuthRequest.java rename to src/connection/messages/AuthRequest.java index 7b666c8..a3d0419 100644 --- a/src/messages/AuthRequest.java +++ b/src/connection/messages/AuthRequest.java @@ -1,10 +1,13 @@ -package messages; +package connection.messages; + +import connection.utils.Payload; +import connection.utils.Type; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -public record AuthRequest(String username, String password) implements Serializable { +public record AuthRequest(String username, String password) implements Payload { public static AuthRequest deserialize(DataInputStream in) throws IOException { String username = in.readUTF(); @@ -12,9 +15,13 @@ public static AuthRequest deserialize(DataInputStream in) throws IOException { return new AuthRequest(username, password); } + @Override + public Type getType() { + return Type.AUTH_REQUEST; + } + @Override public void serialize(DataOutputStream out) throws IOException { - Type.AUTH_REQUEST.serialize(out); out.writeUTF(username); out.writeUTF(password); } diff --git a/src/messages/JobReplyError.java b/src/connection/messages/JobReplyError.java similarity index 73% rename from src/messages/JobReplyError.java rename to src/connection/messages/JobReplyError.java index 85bcb20..d96caf8 100644 --- a/src/messages/JobReplyError.java +++ b/src/connection/messages/JobReplyError.java @@ -1,10 +1,13 @@ -package messages; +package connection.messages; + +import connection.utils.Payload; +import connection.utils.Type; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -public record JobReplyError(int code, String message) implements Serializable { +public record JobReplyError(int code, String message) implements Payload { public static JobReplyError deserialize(DataInputStream in) throws IOException { int code = in.readInt(); @@ -12,9 +15,13 @@ public static JobReplyError deserialize(DataInputStream in) throws IOException { return new JobReplyError(code, message); } + @Override + public Type getType() { + return Type.JOB_REPLY_ERROR; + } + @Override public void serialize(DataOutputStream out) throws IOException { - Type.JOB_REPLY_ERROR.serialize(out); out.writeInt(code); out.writeUTF(message); } diff --git a/src/messages/JobReplyOk.java b/src/connection/messages/JobReplyOk.java similarity index 67% rename from src/messages/JobReplyOk.java rename to src/connection/messages/JobReplyOk.java index 9f322fd..5253b55 100644 --- a/src/messages/JobReplyOk.java +++ b/src/connection/messages/JobReplyOk.java @@ -1,10 +1,13 @@ -package messages; +package connection.messages; + +import connection.utils.Payload; +import connection.utils.Type; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -public record JobReplyOk(byte[] output) implements Serializable { +public record JobReplyOk(byte[] output) implements Payload { public static JobReplyOk deserialize(DataInputStream in) throws IOException { int outputLength = in.readInt(); @@ -13,9 +16,13 @@ public static JobReplyOk deserialize(DataInputStream in) throws IOException { return new JobReplyOk(output); } + @Override + public Type getType() { + return Type.JOB_REPLY_OK; + } + @Override public void serialize(DataOutputStream out) throws IOException { - Type.JOB_REPLY_OK.serialize(out); out.writeInt(output.length); out.write(output); } diff --git a/src/messages/JobRequest.java b/src/connection/messages/JobRequest.java similarity index 67% rename from src/messages/JobRequest.java rename to src/connection/messages/JobRequest.java index c8f6e59..e17aa83 100644 --- a/src/messages/JobRequest.java +++ b/src/connection/messages/JobRequest.java @@ -1,10 +1,13 @@ -package messages; +package connection.messages; + +import connection.utils.Payload; +import connection.utils.Type; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -public record JobRequest(byte[] code) implements Serializable { +public record JobRequest(byte[] code) implements Payload { public static JobRequest deserialize(DataInputStream in) throws IOException { int length = in.readInt(); @@ -13,9 +16,13 @@ public static JobRequest deserialize(DataInputStream in) throws IOException { return new JobRequest(data); } + @Override + public Type getType() { + return Type.JOB_REQUEST; + } + @Override public void serialize(DataOutputStream out) throws IOException { - Type.JOB_REQUEST.serialize(out); out.writeInt(code.length); out.write(code); } diff --git a/src/messages/StatusReply.java b/src/connection/messages/StatusReply.java similarity index 74% rename from src/messages/StatusReply.java rename to src/connection/messages/StatusReply.java index de3a5f0..90f9153 100644 --- a/src/messages/StatusReply.java +++ b/src/connection/messages/StatusReply.java @@ -1,10 +1,13 @@ -package messages; +package connection.messages; + +import connection.utils.Payload; +import connection.utils.Type; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -public record StatusReply(int availableMemory, int pendingTasks) implements Serializable { +public record StatusReply(int availableMemory, int pendingTasks) implements Payload { public static StatusReply deserialize(DataInputStream in) throws IOException { int availableMemory = in.readInt(); @@ -12,9 +15,13 @@ public static StatusReply deserialize(DataInputStream in) throws IOException { return new StatusReply(availableMemory, pendingTasks); } + @Override + public Type getType() { + return Type.STATUS_REPLY; + } + @Override public void serialize(DataOutputStream out) throws IOException { - Type.STATUS_REPLY.serialize(out); out.writeInt(availableMemory); out.writeInt(pendingTasks); } diff --git a/src/connection/messages/StatusRequest.java b/src/connection/messages/StatusRequest.java new file mode 100644 index 0000000..d0a962e --- /dev/null +++ b/src/connection/messages/StatusRequest.java @@ -0,0 +1,24 @@ +package connection.messages; + +import connection.utils.Payload; +import connection.utils.Type; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +public record StatusRequest() implements Payload { + + public static StatusRequest deserialize(DataInputStream in) throws IOException { + return new StatusRequest(); + } + + @Override + public Type getType() { + return Type.STATUS_REQUEST; + } + + @Override + public void serialize(DataOutputStream out) throws IOException { + } +} diff --git a/src/connection/utils/Message.java b/src/connection/utils/Message.java new file mode 100644 index 0000000..c0f2e8c --- /dev/null +++ b/src/connection/utils/Message.java @@ -0,0 +1,23 @@ +package connection.utils; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +public record Message(Type type, Payload payload) { + + public Message(Payload payload) { + this(payload.getType(), payload); + } + + public static Message receive(DataInputStream in) throws IOException { + var type = Type.deserialize(in); + return new Message(type, type.deserializePayload(in)); + } + + public void send(DataOutputStream out) throws IOException { + type.serialize(out); + payload.serialize(out); + out.flush(); + } +} diff --git a/src/connection/utils/Payload.java b/src/connection/utils/Payload.java new file mode 100644 index 0000000..d71aaff --- /dev/null +++ b/src/connection/utils/Payload.java @@ -0,0 +1,9 @@ +package connection.utils; + +import java.io.DataOutputStream; +import java.io.IOException; + +public interface Payload extends Serializable { + Type getType(); + +} diff --git a/src/messages/Serializable.java b/src/connection/utils/Serializable.java similarity index 85% rename from src/messages/Serializable.java rename to src/connection/utils/Serializable.java index b12f17c..b232672 100644 --- a/src/messages/Serializable.java +++ b/src/connection/utils/Serializable.java @@ -1,4 +1,4 @@ -package messages; +package connection.utils; import java.io.DataOutputStream; import java.io.IOException; diff --git a/src/connection/utils/Type.java b/src/connection/utils/Type.java new file mode 100644 index 0000000..4080ef0 --- /dev/null +++ b/src/connection/utils/Type.java @@ -0,0 +1,62 @@ +package connection.utils; + +import connection.messages.*; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +public enum Type implements Serializable { + AUTH_REQUEST { + @Override + public AuthRequest deserializePayload(DataInputStream in) throws IOException { + return AuthRequest.deserialize(in); + } + }, + AUTH_REPLY { + @Override + public AuthReply deserializePayload(DataInputStream in) throws IOException { + return AuthReply.deserialize(in); + } + }, + JOB_REQUEST { + @Override + public JobRequest deserializePayload(DataInputStream in) throws IOException { + return JobRequest.deserialize(in); + } + }, + JOB_REPLY_OK { + @Override + public JobReplyOk deserializePayload(DataInputStream in) throws IOException { + return JobReplyOk.deserialize(in); + } + }, + JOB_REPLY_ERROR { + @Override + public JobReplyError deserializePayload(DataInputStream in) throws IOException { + return JobReplyError.deserialize(in); + } + }, + STATUS_REQUEST { + @Override + public StatusRequest deserializePayload(DataInputStream in) { + return new StatusRequest(); + } + }, + STATUS_REPLY { + @Override + public StatusReply deserializePayload(DataInputStream in) throws IOException { + return StatusReply.deserialize(in); + } + }; + + public void serialize(DataOutputStream out) throws IOException { + out.writeChar(ordinal()); + } + + public static Type deserialize(DataInputStream in) throws IOException { + return Type.values()[in.readChar()]; + } + + public abstract Payload deserializePayload(DataInputStream in) throws IOException; +} diff --git a/src/messages/StatusRequest.java b/src/messages/StatusRequest.java deleted file mode 100644 index 1defb02..0000000 --- a/src/messages/StatusRequest.java +++ /dev/null @@ -1,11 +0,0 @@ -package messages; - -import java.io.DataOutputStream; -import java.io.IOException; - -public record StatusRequest() implements Serializable { - @Override - public void serialize(DataOutputStream out) throws IOException { - Type.STATUS_REQUEST.serialize(out); - } -} diff --git a/src/messages/Type.java b/src/messages/Type.java deleted file mode 100644 index b7e482d..0000000 --- a/src/messages/Type.java +++ /dev/null @@ -1,24 +0,0 @@ -package messages; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; - -public enum Type implements Serializable { - AUTH_REQUEST, - AUTH_REPLY, - JOB_REQUEST, - JOB_REPLY_OK, - JOB_REPLY_ERROR, - STATUS_REQUEST, - STATUS_REPLY; - - @Override - public void serialize(DataOutputStream out) throws IOException { - out.writeChar(ordinal()); - } - - public static Type deserialize(DataInputStream in) throws IOException { - return Type.values()[in.readChar()]; - } -} diff --git a/src/server/Session.java b/src/server/Session.java index 4cf6333..f1d6a8e 100644 --- a/src/server/Session.java +++ b/src/server/Session.java @@ -1,6 +1,9 @@ package server; -import messages.*; +import connection.messages.*; +import connection.utils.Message; +import connection.utils.Payload; +import connection.utils.Type; import sd23.JobFunctionException; import java.io.DataInputStream; @@ -26,13 +29,14 @@ public void run() { authenticate(in, out); while (true) { - switch (Type.deserialize(in)) { - case JOB_REQUEST -> job(in, out); + var message = Message.receive(in); + + switch (message.type()) { + case JOB_REQUEST -> new Message(runJob((JobRequest) message.payload())).send(out); default -> System.out.println("Received unknown message type"); } } } catch (IOException e) { - // throw new RuntimeException(e); System.out.println("Connection ended."); } catch (InterruptedException e) { throw new RuntimeException(e); @@ -41,28 +45,27 @@ public void run() { private void authenticate(DataInputStream in, DataOutputStream out) throws IOException { while (true) { - while (Type.deserialize(in) != Type.AUTH_REQUEST) { - in.readAllBytes(); // this seems problematic - } - - var login = AuthRequest.deserialize(in); + Message message; + while ((message = Message.receive(in)).type() != Type.AUTH_REQUEST); + var authRequest = (AuthRequest) message.payload(); - if (auth.authenticate(login.username(), login.password())) { + if (auth.authenticate(authRequest.username(), authRequest.password())) { break; } else { - new AuthReply(false).serialize(out); + new Message(new AuthReply(false)).send(out); } } - new AuthReply(true).serialize(out); + new Message(new AuthReply(true)).send(out); } - private void job(DataInputStream in, DataOutputStream out) throws IOException, InterruptedException { + private Payload runJob(JobRequest jobRequest) throws IOException, InterruptedException { + System.out.println("Running job"); try { - new JobReplyOk(scheduler.addJob(JobRequest.deserialize(in).code())).serialize(out); + return new JobReplyOk(scheduler.addJob(jobRequest.code())); } catch (JobTooBigException e) { - new JobReplyError(0, "Not enough memory").serialize(out); + return new JobReplyError(0, "Not enough memory"); } catch (JobFunctionException e) { - new JobReplyError(e.getCode(), e.getMessage()).serialize(out); + return new JobReplyError(e.getCode(), e.getMessage()); } } } From 00a3430740c73654a460470f4c54feae8591aad3 Mon Sep 17 00:00:00 2001 From: carlos Date: Fri, 8 Dec 2023 20:44:40 +0000 Subject: [PATCH 12/15] Change Payloads to be the Message --- src/client/Client.java | 17 +++++++++-------- src/connection/messages/AuthReply.java | 4 ++-- src/connection/messages/AuthRequest.java | 4 ++-- src/connection/messages/JobReplyError.java | 4 ++-- src/connection/messages/JobReplyOk.java | 4 ++-- src/connection/messages/JobRequest.java | 4 ++-- src/connection/messages/StatusReply.java | 4 ++-- src/connection/messages/StatusRequest.java | 4 ++-- src/connection/utils/Connection.java | 18 ++++++++++++++++++ src/connection/utils/Message.java | 22 ++-------------------- src/connection/utils/Payload.java | 9 --------- src/connection/utils/Type.java | 2 +- src/server/Session.java | 18 +++++++++--------- 13 files changed, 53 insertions(+), 61 deletions(-) create mode 100644 src/connection/utils/Connection.java delete mode 100644 src/connection/utils/Payload.java diff --git a/src/client/Client.java b/src/client/Client.java index a72a34a..aad4d9d 100644 --- a/src/client/Client.java +++ b/src/client/Client.java @@ -1,6 +1,7 @@ package client; import connection.messages.*; +import connection.utils.Connection; import connection.utils.Message; import connection.utils.Type; @@ -45,11 +46,11 @@ private static void authenticate(DataInputStream in, DataOutputStream out) throw System.out.print("Enter password: "); var password = scanner.nextLine(); - new Message(new AuthRequest(username, password)).send(out); + new Connection(new AuthRequest(username, password)).send(out); Message message; - while ((message = Message.receive(in)).type() != Type.AUTH_REPLY); - var authReply = (AuthReply) message.payload(); + while ((message = Connection.receive(in)).getType() != Type.AUTH_REPLY); + var authReply = (AuthReply) message; if (authReply.success()) { System.out.println("Login successful."); @@ -67,23 +68,23 @@ private static void send(DataOutputStream out) throws IOException { switch (tokens[0]) { case "exec" -> { - new Message(new JobRequest(Files.readAllBytes(Path.of(tokens[1])))).send(out); + new Connection(new JobRequest(Files.readAllBytes(Path.of(tokens[1])))).send(out); } default -> System.out.println("Unknown command"); } } private static void receive(DataInputStream in) throws IOException { - var message = Message.receive(in); - switch (message.type()) { + var message = Connection.receive(in); + switch (message.getType()) { case JOB_REPLY_OK -> { System.out.println("Job finished successfully."); try (var fos = new FileOutputStream("out.txt")) { - fos.write(((JobReplyOk) message.payload()).output()); + fos.write(((JobReplyOk) message).output()); } } case JOB_REPLY_ERROR -> { - var jobReplyError = (JobReplyError) message.payload(); + var jobReplyError = (JobReplyError) message; System.out.println("Job failed.\n\tCode: " + jobReplyError.code() + "\n\tMessage: " + jobReplyError.message()); } default -> System.out.println("Received unknown message type"); diff --git a/src/connection/messages/AuthReply.java b/src/connection/messages/AuthReply.java index 1ad6120..e69ca0a 100644 --- a/src/connection/messages/AuthReply.java +++ b/src/connection/messages/AuthReply.java @@ -1,13 +1,13 @@ package connection.messages; -import connection.utils.Payload; +import connection.utils.Message; import connection.utils.Type; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -public record AuthReply(boolean success) implements Payload { +public record AuthReply(boolean success) implements Message { public static AuthReply deserialize(DataInputStream in) throws IOException { boolean success = in.readBoolean(); diff --git a/src/connection/messages/AuthRequest.java b/src/connection/messages/AuthRequest.java index a3d0419..bc38609 100644 --- a/src/connection/messages/AuthRequest.java +++ b/src/connection/messages/AuthRequest.java @@ -1,13 +1,13 @@ package connection.messages; -import connection.utils.Payload; +import connection.utils.Message; import connection.utils.Type; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -public record AuthRequest(String username, String password) implements Payload { +public record AuthRequest(String username, String password) implements Message { public static AuthRequest deserialize(DataInputStream in) throws IOException { String username = in.readUTF(); diff --git a/src/connection/messages/JobReplyError.java b/src/connection/messages/JobReplyError.java index d96caf8..793f066 100644 --- a/src/connection/messages/JobReplyError.java +++ b/src/connection/messages/JobReplyError.java @@ -1,13 +1,13 @@ package connection.messages; -import connection.utils.Payload; +import connection.utils.Message; import connection.utils.Type; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -public record JobReplyError(int code, String message) implements Payload { +public record JobReplyError(int code, String message) implements Message { public static JobReplyError deserialize(DataInputStream in) throws IOException { int code = in.readInt(); diff --git a/src/connection/messages/JobReplyOk.java b/src/connection/messages/JobReplyOk.java index 5253b55..92774f3 100644 --- a/src/connection/messages/JobReplyOk.java +++ b/src/connection/messages/JobReplyOk.java @@ -1,13 +1,13 @@ package connection.messages; -import connection.utils.Payload; +import connection.utils.Message; import connection.utils.Type; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -public record JobReplyOk(byte[] output) implements Payload { +public record JobReplyOk(byte[] output) implements Message { public static JobReplyOk deserialize(DataInputStream in) throws IOException { int outputLength = in.readInt(); diff --git a/src/connection/messages/JobRequest.java b/src/connection/messages/JobRequest.java index e17aa83..5dd763c 100644 --- a/src/connection/messages/JobRequest.java +++ b/src/connection/messages/JobRequest.java @@ -1,13 +1,13 @@ package connection.messages; -import connection.utils.Payload; +import connection.utils.Message; import connection.utils.Type; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -public record JobRequest(byte[] code) implements Payload { +public record JobRequest(byte[] code) implements Message { public static JobRequest deserialize(DataInputStream in) throws IOException { int length = in.readInt(); diff --git a/src/connection/messages/StatusReply.java b/src/connection/messages/StatusReply.java index 90f9153..076cdf3 100644 --- a/src/connection/messages/StatusReply.java +++ b/src/connection/messages/StatusReply.java @@ -1,13 +1,13 @@ package connection.messages; -import connection.utils.Payload; +import connection.utils.Message; import connection.utils.Type; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -public record StatusReply(int availableMemory, int pendingTasks) implements Payload { +public record StatusReply(int availableMemory, int pendingTasks) implements Message { public static StatusReply deserialize(DataInputStream in) throws IOException { int availableMemory = in.readInt(); diff --git a/src/connection/messages/StatusRequest.java b/src/connection/messages/StatusRequest.java index d0a962e..33b3ea2 100644 --- a/src/connection/messages/StatusRequest.java +++ b/src/connection/messages/StatusRequest.java @@ -1,13 +1,13 @@ package connection.messages; -import connection.utils.Payload; +import connection.utils.Message; import connection.utils.Type; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -public record StatusRequest() implements Payload { +public record StatusRequest() implements Message { public static StatusRequest deserialize(DataInputStream in) throws IOException { return new StatusRequest(); diff --git a/src/connection/utils/Connection.java b/src/connection/utils/Connection.java new file mode 100644 index 0000000..d3394fe --- /dev/null +++ b/src/connection/utils/Connection.java @@ -0,0 +1,18 @@ +package connection.utils; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +public record Connection(Message message) { + + public static Message receive(DataInputStream in) throws IOException { + return Type.deserialize(in).deserializePayload(in); + } + + public void send(DataOutputStream out) throws IOException { + message.getType().serialize(out); + message.serialize(out); + out.flush(); + } +} diff --git a/src/connection/utils/Message.java b/src/connection/utils/Message.java index c0f2e8c..09246d2 100644 --- a/src/connection/utils/Message.java +++ b/src/connection/utils/Message.java @@ -1,23 +1,5 @@ package connection.utils; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; - -public record Message(Type type, Payload payload) { - - public Message(Payload payload) { - this(payload.getType(), payload); - } - - public static Message receive(DataInputStream in) throws IOException { - var type = Type.deserialize(in); - return new Message(type, type.deserializePayload(in)); - } - - public void send(DataOutputStream out) throws IOException { - type.serialize(out); - payload.serialize(out); - out.flush(); - } +public interface Message extends Serializable { + Type getType(); } diff --git a/src/connection/utils/Payload.java b/src/connection/utils/Payload.java deleted file mode 100644 index d71aaff..0000000 --- a/src/connection/utils/Payload.java +++ /dev/null @@ -1,9 +0,0 @@ -package connection.utils; - -import java.io.DataOutputStream; -import java.io.IOException; - -public interface Payload extends Serializable { - Type getType(); - -} diff --git a/src/connection/utils/Type.java b/src/connection/utils/Type.java index 4080ef0..8259dc2 100644 --- a/src/connection/utils/Type.java +++ b/src/connection/utils/Type.java @@ -58,5 +58,5 @@ public static Type deserialize(DataInputStream in) throws IOException { return Type.values()[in.readChar()]; } - public abstract Payload deserializePayload(DataInputStream in) throws IOException; + public abstract Message deserializePayload(DataInputStream in) throws IOException; } diff --git a/src/server/Session.java b/src/server/Session.java index f1d6a8e..23d5a97 100644 --- a/src/server/Session.java +++ b/src/server/Session.java @@ -1,8 +1,8 @@ package server; import connection.messages.*; +import connection.utils.Connection; import connection.utils.Message; -import connection.utils.Payload; import connection.utils.Type; import sd23.JobFunctionException; @@ -29,10 +29,10 @@ public void run() { authenticate(in, out); while (true) { - var message = Message.receive(in); + var message = Connection.receive(in); - switch (message.type()) { - case JOB_REQUEST -> new Message(runJob((JobRequest) message.payload())).send(out); + switch (message.getType()) { + case JOB_REQUEST -> new Connection(runJob((JobRequest) message)).send(out); default -> System.out.println("Received unknown message type"); } } @@ -46,19 +46,19 @@ public void run() { private void authenticate(DataInputStream in, DataOutputStream out) throws IOException { while (true) { Message message; - while ((message = Message.receive(in)).type() != Type.AUTH_REQUEST); - var authRequest = (AuthRequest) message.payload(); + while ((message = Connection.receive(in)).getType() != Type.AUTH_REQUEST); + var authRequest = (AuthRequest) message; if (auth.authenticate(authRequest.username(), authRequest.password())) { break; } else { - new Message(new AuthReply(false)).send(out); + new Connection(new AuthReply(false)).send(out); } } - new Message(new AuthReply(true)).send(out); + new Connection(new AuthReply(true)).send(out); } - private Payload runJob(JobRequest jobRequest) throws IOException, InterruptedException { + private Message runJob(JobRequest jobRequest) throws IOException, InterruptedException { System.out.println("Running job"); try { return new JobReplyOk(scheduler.addJob(jobRequest.code())); From 601889c60f84d8fd51a2e13a7e465e22cb894439 Mon Sep 17 00:00:00 2001 From: carlos Date: Fri, 8 Dec 2023 21:49:15 +0000 Subject: [PATCH 13/15] Implement connection --- src/client/Client.java | 30 ++++++++++----------- src/connection/messages/AuthReply.java | 2 +- src/connection/messages/AuthRequest.java | 2 +- src/connection/messages/JobReplyError.java | 2 +- src/connection/messages/JobReplyOk.java | 2 +- src/connection/messages/JobRequest.java | 2 +- src/connection/messages/StatusReply.java | 2 +- src/connection/messages/StatusRequest.java | 2 +- src/connection/utils/Connection.java | 31 +++++++++++++++++----- src/connection/utils/Message.java | 2 +- src/connection/utils/Type.java | 16 +++++------ src/server/Session.java | 23 ++++++++-------- 12 files changed, 65 insertions(+), 51 deletions(-) diff --git a/src/client/Client.java b/src/client/Client.java index aad4d9d..3fcf0ea 100644 --- a/src/client/Client.java +++ b/src/client/Client.java @@ -5,8 +5,6 @@ import connection.utils.Message; import connection.utils.Type; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.FileOutputStream; import java.io.IOException; import java.net.Socket; @@ -17,16 +15,14 @@ public class Client { public static void main(String[] args) { - try (var socket = new Socket("localhost", 1337); - var in = new DataInputStream(socket.getInputStream()); - var out = new DataOutputStream(socket.getOutputStream())) { - authenticate(in, out); + try (var connection = new Connection(new Socket("localhost", 1337))) { + authenticate(connection); while (true) { - send(out); + send(connection); Thread.startVirtualThread(() -> { try { - receive(in); + receive(connection); } catch (IOException e) { throw new RuntimeException(e); } @@ -34,10 +30,12 @@ public static void main(String[] args) { } } catch (IOException | NoSuchElementException e) { System.out.println("Connection ended."); + } catch (Exception e) { + throw new RuntimeException(e); } } - private static void authenticate(DataInputStream in, DataOutputStream out) throws IOException, NoSuchElementException { + private static void authenticate(Connection connection) throws IOException, NoSuchElementException { var scanner = new Scanner(System.in); while (true) { @@ -46,10 +44,10 @@ private static void authenticate(DataInputStream in, DataOutputStream out) throw System.out.print("Enter password: "); var password = scanner.nextLine(); - new Connection(new AuthRequest(username, password)).send(out); + connection.send(new AuthRequest(username, password)); Message message; - while ((message = Connection.receive(in)).getType() != Type.AUTH_REPLY); + while ((message = connection.receive()).type() != Type.AUTH_REPLY); var authReply = (AuthReply) message; if (authReply.success()) { @@ -61,22 +59,22 @@ private static void authenticate(DataInputStream in, DataOutputStream out) throw } } - private static void send(DataOutputStream out) throws IOException { + private static void send(Connection connection) throws IOException { var scanner = new Scanner(System.in); var line = scanner.nextLine(); var tokens = line.split(" "); switch (tokens[0]) { case "exec" -> { - new Connection(new JobRequest(Files.readAllBytes(Path.of(tokens[1])))).send(out); + connection.send(new JobRequest(Files.readAllBytes(Path.of(tokens[1])))); } default -> System.out.println("Unknown command"); } } - private static void receive(DataInputStream in) throws IOException { - var message = Connection.receive(in); - switch (message.getType()) { + private static void receive(Connection connection) throws IOException { + var message = connection.receive(); + switch (message.type()) { case JOB_REPLY_OK -> { System.out.println("Job finished successfully."); try (var fos = new FileOutputStream("out.txt")) { diff --git a/src/connection/messages/AuthReply.java b/src/connection/messages/AuthReply.java index e69ca0a..898acfe 100644 --- a/src/connection/messages/AuthReply.java +++ b/src/connection/messages/AuthReply.java @@ -15,7 +15,7 @@ public static AuthReply deserialize(DataInputStream in) throws IOException { } @Override - public Type getType() { + public Type type() { return Type.AUTH_REPLY; } diff --git a/src/connection/messages/AuthRequest.java b/src/connection/messages/AuthRequest.java index bc38609..86de79d 100644 --- a/src/connection/messages/AuthRequest.java +++ b/src/connection/messages/AuthRequest.java @@ -16,7 +16,7 @@ public static AuthRequest deserialize(DataInputStream in) throws IOException { } @Override - public Type getType() { + public Type type() { return Type.AUTH_REQUEST; } diff --git a/src/connection/messages/JobReplyError.java b/src/connection/messages/JobReplyError.java index 793f066..faa1f9e 100644 --- a/src/connection/messages/JobReplyError.java +++ b/src/connection/messages/JobReplyError.java @@ -16,7 +16,7 @@ public static JobReplyError deserialize(DataInputStream in) throws IOException { } @Override - public Type getType() { + public Type type() { return Type.JOB_REPLY_ERROR; } diff --git a/src/connection/messages/JobReplyOk.java b/src/connection/messages/JobReplyOk.java index 92774f3..8e36c0f 100644 --- a/src/connection/messages/JobReplyOk.java +++ b/src/connection/messages/JobReplyOk.java @@ -17,7 +17,7 @@ public static JobReplyOk deserialize(DataInputStream in) throws IOException { } @Override - public Type getType() { + public Type type() { return Type.JOB_REPLY_OK; } diff --git a/src/connection/messages/JobRequest.java b/src/connection/messages/JobRequest.java index 5dd763c..fdb0f3c 100644 --- a/src/connection/messages/JobRequest.java +++ b/src/connection/messages/JobRequest.java @@ -17,7 +17,7 @@ public static JobRequest deserialize(DataInputStream in) throws IOException { } @Override - public Type getType() { + public Type type() { return Type.JOB_REQUEST; } diff --git a/src/connection/messages/StatusReply.java b/src/connection/messages/StatusReply.java index 076cdf3..10babca 100644 --- a/src/connection/messages/StatusReply.java +++ b/src/connection/messages/StatusReply.java @@ -16,7 +16,7 @@ public static StatusReply deserialize(DataInputStream in) throws IOException { } @Override - public Type getType() { + public Type type() { return Type.STATUS_REPLY; } diff --git a/src/connection/messages/StatusRequest.java b/src/connection/messages/StatusRequest.java index 33b3ea2..456c8fb 100644 --- a/src/connection/messages/StatusRequest.java +++ b/src/connection/messages/StatusRequest.java @@ -14,7 +14,7 @@ public static StatusRequest deserialize(DataInputStream in) throws IOException { } @Override - public Type getType() { + public Type type() { return Type.STATUS_REQUEST; } diff --git a/src/connection/utils/Connection.java b/src/connection/utils/Connection.java index d3394fe..590578b 100644 --- a/src/connection/utils/Connection.java +++ b/src/connection/utils/Connection.java @@ -3,16 +3,33 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.net.Socket; -public record Connection(Message message) { +public class Connection implements AutoCloseable { + private final Socket socket; + private final DataInputStream inputStream; + private final DataOutputStream outputStream; - public static Message receive(DataInputStream in) throws IOException { - return Type.deserialize(in).deserializePayload(in); + public Connection(Socket socket) throws IOException { + this.socket = socket; + this.inputStream = new DataInputStream(socket.getInputStream()); + this.outputStream = new DataOutputStream(socket.getOutputStream()); } - public void send(DataOutputStream out) throws IOException { - message.getType().serialize(out); - message.serialize(out); - out.flush(); + public Message receive() throws IOException { + return Type.deserialize(inputStream).deserializeMessage(inputStream); + } + + public void send(Message message) throws IOException { + message.type().serialize(outputStream); + message.serialize(outputStream); + outputStream.flush(); + } + + @Override + public void close() throws Exception { + socket.close(); + inputStream.close(); + outputStream.close(); } } diff --git a/src/connection/utils/Message.java b/src/connection/utils/Message.java index 09246d2..f68a6dc 100644 --- a/src/connection/utils/Message.java +++ b/src/connection/utils/Message.java @@ -1,5 +1,5 @@ package connection.utils; public interface Message extends Serializable { - Type getType(); + Type type(); } diff --git a/src/connection/utils/Type.java b/src/connection/utils/Type.java index 8259dc2..d29e0c4 100644 --- a/src/connection/utils/Type.java +++ b/src/connection/utils/Type.java @@ -9,43 +9,43 @@ public enum Type implements Serializable { AUTH_REQUEST { @Override - public AuthRequest deserializePayload(DataInputStream in) throws IOException { + public AuthRequest deserializeMessage(DataInputStream in) throws IOException { return AuthRequest.deserialize(in); } }, AUTH_REPLY { @Override - public AuthReply deserializePayload(DataInputStream in) throws IOException { + public AuthReply deserializeMessage(DataInputStream in) throws IOException { return AuthReply.deserialize(in); } }, JOB_REQUEST { @Override - public JobRequest deserializePayload(DataInputStream in) throws IOException { + public JobRequest deserializeMessage(DataInputStream in) throws IOException { return JobRequest.deserialize(in); } }, JOB_REPLY_OK { @Override - public JobReplyOk deserializePayload(DataInputStream in) throws IOException { + public JobReplyOk deserializeMessage(DataInputStream in) throws IOException { return JobReplyOk.deserialize(in); } }, JOB_REPLY_ERROR { @Override - public JobReplyError deserializePayload(DataInputStream in) throws IOException { + public JobReplyError deserializeMessage(DataInputStream in) throws IOException { return JobReplyError.deserialize(in); } }, STATUS_REQUEST { @Override - public StatusRequest deserializePayload(DataInputStream in) { + public StatusRequest deserializeMessage(DataInputStream in) { return new StatusRequest(); } }, STATUS_REPLY { @Override - public StatusReply deserializePayload(DataInputStream in) throws IOException { + public StatusReply deserializeMessage(DataInputStream in) throws IOException { return StatusReply.deserialize(in); } }; @@ -58,5 +58,5 @@ public static Type deserialize(DataInputStream in) throws IOException { return Type.values()[in.readChar()]; } - public abstract Message deserializePayload(DataInputStream in) throws IOException; + public abstract Message deserializeMessage(DataInputStream in) throws IOException; } diff --git a/src/server/Session.java b/src/server/Session.java index 23d5a97..c30cd95 100644 --- a/src/server/Session.java +++ b/src/server/Session.java @@ -6,8 +6,6 @@ import connection.utils.Type; import sd23.JobFunctionException; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.net.Socket; @@ -24,15 +22,14 @@ public Session(Socket socket, Auth auth, Scheduler scheduler) { @Override public void run() { - try (var in = new DataInputStream(socket.getInputStream()); - var out = new DataOutputStream(socket.getOutputStream())) { - authenticate(in, out); + try (var connection = new Connection(socket)) { + authenticate(connection); while (true) { - var message = Connection.receive(in); + var message = connection.receive(); - switch (message.getType()) { - case JOB_REQUEST -> new Connection(runJob((JobRequest) message)).send(out); + switch (message.type()) { + case JOB_REQUEST -> connection.send(runJob((JobRequest) message)); default -> System.out.println("Received unknown message type"); } } @@ -40,22 +37,24 @@ public void run() { System.out.println("Connection ended."); } catch (InterruptedException e) { throw new RuntimeException(e); + } catch (Exception e) { + throw new RuntimeException(e); } } - private void authenticate(DataInputStream in, DataOutputStream out) throws IOException { + private void authenticate(Connection connection) throws IOException { while (true) { Message message; - while ((message = Connection.receive(in)).getType() != Type.AUTH_REQUEST); + while ((message = connection.receive()).type() != Type.AUTH_REQUEST); var authRequest = (AuthRequest) message; if (auth.authenticate(authRequest.username(), authRequest.password())) { break; } else { - new Connection(new AuthReply(false)).send(out); + connection.send(new AuthReply(false)); } } - new Connection(new AuthReply(true)).send(out); + connection.send(new AuthReply(true)); } private Message runJob(JobRequest jobRequest) throws IOException, InterruptedException { From a2b375b32fca7617b861c89517202c7105dcd42e Mon Sep 17 00:00:00 2001 From: carlos Date: Sat, 9 Dec 2023 22:51:48 +0000 Subject: [PATCH 14/15] Make server accept messages concurrently --- src/server/Server.java | 8 +++++++- src/server/Session.java | 31 +++++++++++++++++++------------ 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/src/server/Server.java b/src/server/Server.java index d0a664d..87e0436 100644 --- a/src/server/Server.java +++ b/src/server/Server.java @@ -1,5 +1,8 @@ package server; +import concurrentUtils.BoundedBuffer; +import concurrentUtils.ThreadPool; + import java.io.IOException; import java.net.ServerSocket; @@ -7,10 +10,13 @@ public class Server { public static void main(String[] args) throws IOException { var auth = new Auth(); var scheduler = new Scheduler(Integer.parseInt(args[0])); + var boundedBuffer = new BoundedBuffer(1024); + var threadPool = new ThreadPool(8, boundedBuffer); + threadPool.start(); try (var serverSocket = new ServerSocket(1337)) { while (true) { var socket = serverSocket.accept(); - var session = new Thread(new Session(socket, auth, scheduler)); + var session = new Thread(new Session(socket, boundedBuffer, auth, scheduler)); session.start(); } } diff --git a/src/server/Session.java b/src/server/Session.java index c30cd95..a67bf44 100644 --- a/src/server/Session.java +++ b/src/server/Session.java @@ -1,5 +1,6 @@ package server; +import concurrentUtils.BoundedBuffer; import connection.messages.*; import connection.utils.Connection; import connection.utils.Message; @@ -13,11 +14,13 @@ public class Session implements Runnable { private final Socket socket; private final Auth auth; private final Scheduler scheduler; + private final BoundedBuffer boundedBuffer; - public Session(Socket socket, Auth auth, Scheduler scheduler) { + public Session(Socket socket, BoundedBuffer boundedBuffer, Auth auth, Scheduler scheduler) { this.socket = socket; this.auth = auth; this.scheduler = scheduler; + this.boundedBuffer = boundedBuffer; } @Override @@ -28,24 +31,28 @@ public void run() { while (true) { var message = connection.receive(); - switch (message.type()) { - case JOB_REQUEST -> connection.send(runJob((JobRequest) message)); - default -> System.out.println("Received unknown message type"); - } + boundedBuffer.put(() -> { + try { + switch (message.type()) { + case JOB_REQUEST -> connection.send(runJob((JobRequest) message)); + default -> System.out.println("Received unknown message type"); + } + } catch (IOException | InterruptedException e) { + var exceptionMessage = e.getMessage(); + System.out.println("Error processing request" + (exceptionMessage != null ? ": " + exceptionMessage : ".")); + } + }); } - } catch (IOException e) { - System.out.println("Connection ended."); - } catch (InterruptedException e) { - throw new RuntimeException(e); } catch (Exception e) { - throw new RuntimeException(e); + var exceptionMessage = e.getMessage(); + System.out.println("Connection ended" + (exceptionMessage != null ? " with error: " + exceptionMessage : ".")); } } private void authenticate(Connection connection) throws IOException { while (true) { Message message; - while ((message = connection.receive()).type() != Type.AUTH_REQUEST); + while ((message = connection.receive()).type() != Type.AUTH_REQUEST) ; var authRequest = (AuthRequest) message; if (auth.authenticate(authRequest.username(), authRequest.password())) { @@ -57,7 +64,7 @@ private void authenticate(Connection connection) throws IOException { connection.send(new AuthReply(true)); } - private Message runJob(JobRequest jobRequest) throws IOException, InterruptedException { + private Message runJob(JobRequest jobRequest) throws InterruptedException { System.out.println("Running job"); try { return new JobReplyOk(scheduler.addJob(jobRequest.code())); From adfc0ee29f73e334473eb5a4bc4753d42c6f147c Mon Sep 17 00:00:00 2001 From: carlos Date: Sun, 10 Dec 2023 11:52:25 +0000 Subject: [PATCH 15/15] Make client send messages concurrently --- src/client/Client.java | 31 +++++++++++++++++----------- src/connection/utils/Connection.java | 3 +-- src/server/ClientReader.java | 30 +++++++++++++++++++++++++++ src/server/Server.java | 6 +++--- src/server/Session.java | 10 ++++----- 5 files changed, 58 insertions(+), 22 deletions(-) create mode 100644 src/server/ClientReader.java diff --git a/src/client/Client.java b/src/client/Client.java index 3fcf0ea..8b91b69 100644 --- a/src/client/Client.java +++ b/src/client/Client.java @@ -1,9 +1,11 @@ package client; +import concurrentUtils.BoundedBuffer; import connection.messages.*; import connection.utils.Connection; import connection.utils.Message; import connection.utils.Type; +import server.ClientReader; import java.io.FileOutputStream; import java.io.IOException; @@ -14,24 +16,30 @@ import java.util.Scanner; public class Client { + public static void main(String[] args) { + var replyBuffer = new BoundedBuffer(128); + try (var connection = new Connection(new Socket("localhost", 1337))) { authenticate(connection); + var clientReader = new Thread(new ClientReader(connection, replyBuffer)); + clientReader.start(); + + var scanner = new Scanner(System.in); while (true) { - send(connection); + send(connection, scanner.nextLine()); Thread.startVirtualThread(() -> { try { - receive(connection); - } catch (IOException e) { + handleMessage(replyBuffer.get()); + } catch (InterruptedException e) { throw new RuntimeException(e); } }); } } catch (IOException | NoSuchElementException e) { - System.out.println("Connection ended."); - } catch (Exception e) { - throw new RuntimeException(e); + var exceptionMessage = e.getMessage(); + System.out.println("Connection ended" + (exceptionMessage != null ? " with error: " + exceptionMessage : ".")); } } @@ -47,7 +55,7 @@ private static void authenticate(Connection connection) throws IOException, NoSu connection.send(new AuthRequest(username, password)); Message message; - while ((message = connection.receive()).type() != Type.AUTH_REPLY); + while ((message = connection.receive()).type() != Type.AUTH_REPLY) ; var authReply = (AuthReply) message; if (authReply.success()) { @@ -59,9 +67,7 @@ private static void authenticate(Connection connection) throws IOException, NoSu } } - private static void send(Connection connection) throws IOException { - var scanner = new Scanner(System.in); - var line = scanner.nextLine(); + private static void send(Connection connection, String line) throws IOException { var tokens = line.split(" "); switch (tokens[0]) { @@ -72,13 +78,14 @@ private static void send(Connection connection) throws IOException { } } - private static void receive(Connection connection) throws IOException { - var message = connection.receive(); + private static void handleMessage(Message message) { switch (message.type()) { case JOB_REPLY_OK -> { System.out.println("Job finished successfully."); try (var fos = new FileOutputStream("out.txt")) { fos.write(((JobReplyOk) message).output()); + } catch (IOException e) { + throw new RuntimeException(e); } } case JOB_REPLY_ERROR -> { diff --git a/src/connection/utils/Connection.java b/src/connection/utils/Connection.java index 590578b..1fc69ea 100644 --- a/src/connection/utils/Connection.java +++ b/src/connection/utils/Connection.java @@ -26,8 +26,7 @@ public void send(Message message) throws IOException { outputStream.flush(); } - @Override - public void close() throws Exception { + public void close() throws IOException { socket.close(); inputStream.close(); outputStream.close(); diff --git a/src/server/ClientReader.java b/src/server/ClientReader.java new file mode 100644 index 0000000..667f044 --- /dev/null +++ b/src/server/ClientReader.java @@ -0,0 +1,30 @@ +package server; + +import concurrentUtils.BoundedBuffer; +import connection.utils.Connection; +import connection.utils.Message; + +import java.io.IOException; + +public class ClientReader implements Runnable { + private final Connection connection; + private final BoundedBuffer replyBuffer; + + public ClientReader(Connection connection, BoundedBuffer replyBuffer) { + this.connection = connection; + this.replyBuffer = replyBuffer; + } + + @Override + public void run() { + while (true) { + try { + replyBuffer.put(connection.receive()); + } catch (IOException | InterruptedException e) { + var exceptionMessage = e.getMessage(); + System.out.println("Error processing request" + (exceptionMessage != null ? ": " + exceptionMessage : ".")); + break; + } + } + } +} diff --git a/src/server/Server.java b/src/server/Server.java index 87e0436..0cc9d45 100644 --- a/src/server/Server.java +++ b/src/server/Server.java @@ -10,13 +10,13 @@ public class Server { public static void main(String[] args) throws IOException { var auth = new Auth(); var scheduler = new Scheduler(Integer.parseInt(args[0])); - var boundedBuffer = new BoundedBuffer(1024); - var threadPool = new ThreadPool(8, boundedBuffer); + var taskBuffer = new BoundedBuffer(1024); + var threadPool = new ThreadPool(8, taskBuffer); threadPool.start(); try (var serverSocket = new ServerSocket(1337)) { while (true) { var socket = serverSocket.accept(); - var session = new Thread(new Session(socket, boundedBuffer, auth, scheduler)); + var session = new Thread(new Session(socket, taskBuffer, auth, scheduler)); session.start(); } } diff --git a/src/server/Session.java b/src/server/Session.java index a67bf44..a765f23 100644 --- a/src/server/Session.java +++ b/src/server/Session.java @@ -14,13 +14,13 @@ public class Session implements Runnable { private final Socket socket; private final Auth auth; private final Scheduler scheduler; - private final BoundedBuffer boundedBuffer; + private final BoundedBuffer taskBuffer; - public Session(Socket socket, BoundedBuffer boundedBuffer, Auth auth, Scheduler scheduler) { + public Session(Socket socket, BoundedBuffer taskBuffer, Auth auth, Scheduler scheduler) { this.socket = socket; this.auth = auth; this.scheduler = scheduler; - this.boundedBuffer = boundedBuffer; + this.taskBuffer = taskBuffer; } @Override @@ -31,7 +31,7 @@ public void run() { while (true) { var message = connection.receive(); - boundedBuffer.put(() -> { + taskBuffer.put(() -> { try { switch (message.type()) { case JOB_REQUEST -> connection.send(runJob((JobRequest) message)); @@ -43,7 +43,7 @@ public void run() { } }); } - } catch (Exception e) { + } catch (IOException | InterruptedException e) { var exceptionMessage = e.getMessage(); System.out.println("Connection ended" + (exceptionMessage != null ? " with error: " + exceptionMessage : ".")); }