Skip to content

Commit

Permalink
267 - Expose constructor for unsafe ftp client (#268)
Browse files Browse the repository at this point in the history
  • Loading branch information
regis-leray authored Aug 8, 2023
1 parent f74ea98 commit a616f4c
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 18 deletions.
28 changes: 18 additions & 10 deletions src/main/scala/fs2/ftp/SecureFtp.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package fs2.ftp

import java.io._
import cats.effect.{ Resource, Sync }
import cats.effect.{ Async, Resource }
import cats.syntax.applicativeError._
import fs2.{ Pipe, Stream }
import fs2.Stream._
Expand All @@ -13,7 +13,7 @@ import fs2.ftp.FtpSettings.{ KeyFileSftpIdentity, RawKeySftpIdentity, SecureFtpS

import scala.jdk.CollectionConverters._

final private class SecureFtp[F[_]: Sync](unsafeClient: SecureFtp.Client, maxUnconfirmedWrites: Int)
final private class SecureFtp[F[_]: Async](unsafeClient: SecureFtp.Client, maxUnconfirmedWrites: Int)
extends FtpClient[F, JSFTPClient] {

def ls(path: String): fs2.Stream[F, FtpResource] =
Expand All @@ -38,7 +38,7 @@ final private class SecureFtp[F[_]: Sync](unsafeClient: SecureFtp.Client, maxUnc
execute(client => Option(client.statExistence(path)).map(FtpResource(path, _)))

def execute[T](f: JSFTPClient => T): F[T] =
Sync[F].blocking(f(unsafeClient))
Async[F].blocking(f(unsafeClient))

def readFile(
path: String,
Expand All @@ -57,7 +57,7 @@ final private class SecureFtp[F[_]: Sync](unsafeClient: SecureFtp.Client, maxUnc
}
}

input <- fs2.io.readInputStream(Sync[F].pure(is), chunkSize)
input <- fs2.io.readInputStream(Async[F].pure(is), chunkSize)
} yield input

def rm(path: String): F[Unit] =
Expand All @@ -83,22 +83,30 @@ final private class SecureFtp[F[_]: Sync](unsafeClient: SecureFtp.Client, maxUnc
super.close()
}
}
_ <- source.through(fs2.io.writeOutputStream(Sync[F].pure(os)))
_ <- source.through(fs2.io.writeOutputStream(Async[F].pure(os)))
} yield ())
}

object SecureFtp {

type Client = JSFTPClient

def connect[F[_]: Sync](
def apply[F[_]: Async](
unsafeClient: Client,
maxUnconfirmedWrites: Int = 0
): Resource[F, FtpClient[F, SecureFtp.Client]] =
Resource.make(Async[F].pure(new SecureFtp[F](unsafeClient, maxUnconfirmedWrites))) { client =>
client.execute(_.close()).voidError
}

def connect[F[_]: Async](
settings: SecureFtpSettings
): Resource[F, FtpClient[F, SecureFtp.Client]] =
for {
ssh <- Resource.make(Sync[F].delay(new SSHClient(settings.sshConfig)))(ssh =>
Sync[F].delay(if (ssh.isConnected) ssh.disconnect() else {})
ssh <- Resource.make(Async[F].delay(new SSHClient(settings.sshConfig)))(ssh =>
Async[F].delay(if (ssh.isConnected) ssh.disconnect() else {}).voidError
)
r <- Resource.make[F, FtpClient[F, JSFTPClient]](Sync[F].delay {
r <- Resource.make[F, FtpClient[F, JSFTPClient]](Async[F].delay {
import settings._

if (!strictHostKeyChecking)
Expand All @@ -117,7 +125,7 @@ object SecureFtp {
)

new SecureFtp(ssh.newSFTPClient(), settings.maxUnconfirmedWrites)
})(client => client.execute(_.close()))
})(client => client.execute(_.close()).voidError)
} yield r

private[this] def setIdentity(identity: SftpIdentity, username: String)(ssh: SSHClient): Unit = {
Expand Down
28 changes: 20 additions & 8 deletions src/main/scala/fs2/ftp/UnsecureFtp.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package fs2.ftp

import java.io.{ FileNotFoundException, InputStream }
import cats.effect.{ Async, Resource }
import cats.syntax.applicativeError._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.monadError._
import cats.effect.{ Async, Resource, Sync }
import cats.syntax.all._
import fs2.ftp.FtpSettings.UnsecureFtpSettings
import fs2.{ Pipe, Stream }
import org.apache.commons.net.ftp.{ FTP, FTPClient => JFTPClient, FTPSClient => JFTPSClient }
import FtpSettings.UnsecureFtpSettings
import cats.effect.kernel.Sync

import java.io.{ FileNotFoundException, InputStream }

final private class UnsecureFtp[F[_]: Async](
unsafeClient: UnsecureFtp.Client
Expand Down Expand Up @@ -83,6 +80,20 @@ object UnsecureFtp {

type Client = JFTPClient

def apply[F[_]: Async](unsafeClient: Client): Resource[F, FtpClient[F, UnsecureFtp.Client]] =
Resource.make(Async[F].pure(new UnsecureFtp[F](unsafeClient))) { client =>
for {
connected <- client.execute(_.isConnected)
_ <- if (!connected) Async[F].unit
else
client
.execute(_.logout)
.attempt
.flatMap(_ => client.execute(_.disconnect))
.voidError
} yield ()
}

def connect[F[_]: Async](
settings: UnsecureFtpSettings
): Resource[F, FtpClient[F, UnsecureFtp.Client]] =
Expand Down Expand Up @@ -128,6 +139,7 @@ object UnsecureFtp {
.execute(_.logout)
.attempt
.flatMap(_ => client.execute(_.disconnect))
.voidError
} yield ()
}
} yield r
Expand Down

0 comments on commit a616f4c

Please sign in to comment.