diff --git a/src/main/scala/fs2/ftp/SecureFtp.scala b/src/main/scala/fs2/ftp/SecureFtp.scala index 1e352aa..7cc5f83 100644 --- a/src/main/scala/fs2/ftp/SecureFtp.scala +++ b/src/main/scala/fs2/ftp/SecureFtp.scala @@ -1,7 +1,7 @@ package fs2.ftp import java.io._ -import cats.effect.{ Resource, Sync } +import cats.effect.{ Async, Resource } import cats.syntax.applicativeError._ import fs2.{ Pipe, Stream } import fs2.Stream._ @@ -13,7 +13,7 @@ import fs2.ftp.FtpSettings.{ KeyFileSftpIdentity, RawKeySftpIdentity, SecureFtpS import scala.jdk.CollectionConverters._ -final private class SecureFtp[F[_]: Sync](unsafeClient: SecureFtp.Client, maxUnconfirmedWrites: Int) +final private class SecureFtp[F[_]: Async](unsafeClient: SecureFtp.Client, maxUnconfirmedWrites: Int) extends FtpClient[F, JSFTPClient] { def ls(path: String): fs2.Stream[F, FtpResource] = @@ -38,7 +38,7 @@ final private class SecureFtp[F[_]: Sync](unsafeClient: SecureFtp.Client, maxUnc execute(client => Option(client.statExistence(path)).map(FtpResource(path, _))) def execute[T](f: JSFTPClient => T): F[T] = - Sync[F].blocking(f(unsafeClient)) + Async[F].blocking(f(unsafeClient)) def readFile( path: String, @@ -57,7 +57,7 @@ final private class SecureFtp[F[_]: Sync](unsafeClient: SecureFtp.Client, maxUnc } } - input <- fs2.io.readInputStream(Sync[F].pure(is), chunkSize) + input <- fs2.io.readInputStream(Async[F].pure(is), chunkSize) } yield input def rm(path: String): F[Unit] = @@ -83,7 +83,7 @@ final private class SecureFtp[F[_]: Sync](unsafeClient: SecureFtp.Client, maxUnc super.close() } } - _ <- source.through(fs2.io.writeOutputStream(Sync[F].pure(os))) + _ <- source.through(fs2.io.writeOutputStream(Async[F].pure(os))) } yield ()) } @@ -91,14 +91,22 @@ object SecureFtp { type Client = JSFTPClient - def connect[F[_]: Sync]( + def apply[F[_]: Async]( + unsafeClient: Client, + maxUnconfirmedWrites: Int = 0 + ): Resource[F, FtpClient[F, SecureFtp.Client]] = + Resource.make(Async[F].pure(new SecureFtp[F](unsafeClient, maxUnconfirmedWrites))) { client => + client.execute(_.close()).voidError + } + + def connect[F[_]: Async]( settings: SecureFtpSettings ): Resource[F, FtpClient[F, SecureFtp.Client]] = for { - ssh <- Resource.make(Sync[F].delay(new SSHClient(settings.sshConfig)))(ssh => - Sync[F].delay(if (ssh.isConnected) ssh.disconnect() else {}) + ssh <- Resource.make(Async[F].delay(new SSHClient(settings.sshConfig)))(ssh => + Async[F].delay(if (ssh.isConnected) ssh.disconnect() else {}).voidError ) - r <- Resource.make[F, FtpClient[F, JSFTPClient]](Sync[F].delay { + r <- Resource.make[F, FtpClient[F, JSFTPClient]](Async[F].delay { import settings._ if (!strictHostKeyChecking) @@ -117,7 +125,7 @@ object SecureFtp { ) new SecureFtp(ssh.newSFTPClient(), settings.maxUnconfirmedWrites) - })(client => client.execute(_.close())) + })(client => client.execute(_.close()).voidError) } yield r private[this] def setIdentity(identity: SftpIdentity, username: String)(ssh: SSHClient): Unit = { diff --git a/src/main/scala/fs2/ftp/UnsecureFtp.scala b/src/main/scala/fs2/ftp/UnsecureFtp.scala index 77f65c1..73e88f5 100644 --- a/src/main/scala/fs2/ftp/UnsecureFtp.scala +++ b/src/main/scala/fs2/ftp/UnsecureFtp.scala @@ -1,15 +1,12 @@ package fs2.ftp -import java.io.{ FileNotFoundException, InputStream } -import cats.effect.{ Async, Resource } -import cats.syntax.applicativeError._ -import cats.syntax.flatMap._ -import cats.syntax.functor._ -import cats.syntax.monadError._ +import cats.effect.{ Async, Resource, Sync } +import cats.syntax.all._ +import fs2.ftp.FtpSettings.UnsecureFtpSettings import fs2.{ Pipe, Stream } import org.apache.commons.net.ftp.{ FTP, FTPClient => JFTPClient, FTPSClient => JFTPSClient } -import FtpSettings.UnsecureFtpSettings -import cats.effect.kernel.Sync + +import java.io.{ FileNotFoundException, InputStream } final private class UnsecureFtp[F[_]: Async]( unsafeClient: UnsecureFtp.Client @@ -83,6 +80,20 @@ object UnsecureFtp { type Client = JFTPClient + def apply[F[_]: Async](unsafeClient: Client): Resource[F, FtpClient[F, UnsecureFtp.Client]] = + Resource.make(Async[F].pure(new UnsecureFtp[F](unsafeClient))) { client => + for { + connected <- client.execute(_.isConnected) + _ <- if (!connected) Async[F].unit + else + client + .execute(_.logout) + .attempt + .flatMap(_ => client.execute(_.disconnect)) + .voidError + } yield () + } + def connect[F[_]: Async]( settings: UnsecureFtpSettings ): Resource[F, FtpClient[F, UnsecureFtp.Client]] = @@ -128,6 +139,7 @@ object UnsecureFtp { .execute(_.logout) .attempt .flatMap(_ => client.execute(_.disconnect)) + .voidError } yield () } } yield r