Skip to content

Commit

Permalink
Handle AggregateErrors on Node.js
Browse files Browse the repository at this point in the history
  • Loading branch information
armanbilge committed Aug 2, 2023
1 parent af4d7c6 commit 97d9cdd
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 18 deletions.
29 changes: 16 additions & 13 deletions io/js/src/main/scala/fs2/io/IOException.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package fs2.io

import fs2.io.file.FileSystemException
import fs2.io.internal.facade.AggregateError
import fs2.io.net.SocketException
import fs2.io.net.SocketTimeoutException
import fs2.io.net.UnknownHostException
Expand All @@ -35,20 +36,22 @@ private class JavaScriptIOException(message: String, cause: js.JavaScriptExcepti
with NoStackTrace

object IOException {
private[io] def unapply(cause: js.JavaScriptException): Option[IOException] =
InterruptedIOException
.unapply(cause)
.orElse(SocketException.unapply(cause))
.orElse(SSLException.unapply(cause))
.orElse(FileSystemException.unapply(cause))
.orElse(UnknownHostException.unapply(cause))
.orElse {
cause.exception match {
case error: js.Error if error.message.contains("EPIPE") =>
Some(new JavaScriptIOException("Broken pipe", cause))
case _ => None
}
private[io] def unapply(cause: js.JavaScriptException): Option[IOException] = cause match {
case js.JavaScriptException(aggregate: AggregateError) =>
// just use the first one. catching aggregated / CompositeFailures is impractical
unapply(js.JavaScriptException(aggregate.errors(0)))
case InterruptedIOException(ex) => Some(ex)
case SocketException(ex) => Some(ex)
case SSLException(ex) => Some(ex)
case FileSystemException(ex) => Some(ex)
case UnknownHostException(ex) => Some(ex)
case _ =>
cause.exception match {
case error: js.Error if error.message.contains("EPIPE") =>
Some(new JavaScriptIOException("Broken pipe", cause))
case _ => None
}
}
}

class InterruptedIOException(message: String = null, cause: Throwable = null)
Expand Down
31 changes: 31 additions & 0 deletions io/js/src/main/scala/fs2/io/internal/facade/AggregateError.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2013 Functional Streams for Scala
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package fs2.io.internal.facade

import scala.scalajs.js
import scala.scalajs.js.annotation.JSGlobal

@JSGlobal
@js.native
private[io] class AggregateError extends js.Error {
def errors: js.Array[js.Error] = js.native
}
13 changes: 8 additions & 5 deletions io/shared/src/test/scala/fs2/io/net/tcp/SocketSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,15 @@ class SocketSuite extends Fs2IoSuite with SocketSuitePlatform {
.drain
}

test("errors - should be captured in the effect") {
test("errors - should be captured in the effect".only) {
(for {
bindAddress <- Network[IO].serverResource(Some(ip"127.0.0.1")).use(s => IO.pure(s._1))
_ <- Network[IO].client(bindAddress).use(_ => IO.unit).recover {
case ex: ConnectException => assertEquals(ex.getMessage, "Connection refused")
}
port <- Network[IO].serverResource(Some(ip"127.0.0.1")).use(s => IO.pure(s._1.port))
_ <- Network[IO]
.client(SocketAddress(host"localhost", port))
.use_
.recover { case ex: ConnectException =>
assertEquals(ex.getMessage, "Connection refused")
}
} yield ()) >> (for {
bindAddress <- Network[IO].serverResource(Some(ip"127.0.0.1")).map(_._1)
_ <- Network[IO]
Expand Down

0 comments on commit 97d9cdd

Please sign in to comment.