Skip to content

Commit

Permalink
Update grpc-netty & Remove jetcd-core
Browse files Browse the repository at this point in the history
  • Loading branch information
Danil Bykov committed Mar 21, 2022
1 parent 04a46c8 commit 99b85be
Show file tree
Hide file tree
Showing 109 changed files with 4,360 additions and 3,898 deletions.
20 changes: 4 additions & 16 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -47,29 +47,17 @@ lazy val etcd = project
.settings(
name := "reactive-config-etcd",
libraryDependencies ++= Seq(
"io.grpc" % "grpc-netty" % "1.22.3",
"io.grpc" % "grpc-netty" % "1.41.0",
"io.netty" % "netty-tcnative-boringssl-static" % "2.0.25.Final",
"io.netty" % "netty-codec-http2" % "4.1.53.Final",
"io.netty" % "netty-handler-proxy" % "4.1.53.Final",
"com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion,
"com.coreos" % "jetcd-core" % "0.0.2" excludeAll (ExclusionRule(organization = "io.grpc")),
"com.coreos" % "jetcd-common" % "0.0.2" excludeAll (ExclusionRule(organization = "io.grpc")),
"com.pauldijou" %% "jwt-core" % "4.2.0"
),
Compile / PB.targets := Seq(
scalapb.gen() -> (Compile / sourceManaged).value / "scalapb"
)
)
.settings {
val generateSources = TaskKey.apply[Unit]("generateSources")
def genPackage(f: File): File = f / "com" / "github" / "fit51" / "reactiveconfig" / "etcd" / "gen"
generateSources := {
(PB.generate in Compile).value
val genDir = genPackage((sourceManaged in Compile).value)
val targetDir = genPackage((sourceDirectory in Compile).value / "scala")
println(s"Generated in: $genDir")
println(s"Moved to: $targetDir")
IO.copyDirectory(genDir, targetDir, true, true)
IO.delete(genDir)
}
}

lazy val typesafe = project
.in(file("typesafe"))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,59 +1,52 @@
package com.github.fit51.reactiveconfig.etcd;

import com.google.common.annotations.VisibleForTesting;
import java.net.URI;
import java.util.List;
import java.util.concurrent.Executor;
import java.net.InetSocketAddress;
import java.util.stream.Collectors;

import com.google.common.base.Preconditions;
import com.coreos.jetcd.common.exception.ErrorCode;
import com.coreos.jetcd.common.exception.EtcdExceptionFactory;
import com.coreos.jetcd.resolver.URIResolver;
import com.coreos.jetcd.resolver.URIResolverLoader;
import com.google.common.base.Strings;
import com.google.common.net.HostAndPort;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.grpc.Attributes;
import io.grpc.NameResolver.Listener;
import io.grpc.EquivalentAddressGroup;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.SharedResourceHolder;

import java.net.URI;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import javax.annotation.concurrent.GuardedBy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
//ToDo: Rewrite in Scala
public class SmartNameResolver extends NameResolver {
// Stolen from jetcd-core://io.etcd.jetcd.resolver
public class MultipleAddressesResolver extends NameResolver {
public static final int ETCD_CLIENT_PORT = 2379;

private static final Logger LOGGER = LoggerFactory.getLogger(SmartNameResolver.class);
private static final Logger LOGGER = LoggerFactory.getLogger(MultipleAddressesResolver.class);

private final Object lock;
private final String authority;
private final Collection<URI> uris;
private final List<URIResolver> resolvers;
private final URI targetUri;

private volatile boolean shutdown;
private volatile boolean resolving;

@GuardedBy("lock")
private Executor executor;
@GuardedBy("lock")
private Listener listener;

public SmartNameResolver(String authority, Collection<URI> uris, URIResolverLoader loader) {
this.lock = new Object();
this.authority = authority;
this.uris = uris;
private final List<HostAndPort> addresses;

this.resolvers = new ArrayList<>();
this.resolvers.add(new DirectUriResolver());
this.resolvers.addAll(loader.load());
this.resolvers.sort(Comparator.comparingInt(r -> r.priority()));
public MultipleAddressesResolver(URI targetUri, List<HostAndPort> addresses) {
this.lock = new Object();
this.targetUri = targetUri;
this.authority = targetUri.getAuthority() != null ? targetUri.getAuthority() : "";
this.addresses = addresses;
}

@VisibleForTesting
public List<URIResolver> getResolvers() {
return Collections.unmodifiableList(resolvers);
public URI getTargetUri() {
return targetUri;
}

@Override
Expand Down Expand Up @@ -110,31 +103,38 @@ private void doResolve() {
}

try {
List<EquivalentAddressGroup> groups = new ArrayList<>();

for (URI uri : uris) {
resolvers.stream()
.filter(r -> r.supports(uri))
.limit(1)
.flatMap(r -> r.resolve(uri).stream())
.map(EquivalentAddressGroup::new)
.forEach(groups::add);
}

List<EquivalentAddressGroup> groups = computeAddressGroups();
if (groups.isEmpty()) {
throw EtcdExceptionFactory.newEtcdException(
ErrorCode.INVALID_ARGUMENT,
("Unable to resolve endpoints " + uris)
);
throw new RuntimeException("Unable to resolve endpoint " + targetUri);
}

savedListener.onAddresses(groups, Attributes.EMPTY);

} catch (Exception e) {
LOGGER.warn("Error wile getting list of servers", e);
savedListener.onError(Status.NOT_FOUND);
} finally {
resolving = false;
}
}
}

protected List<EquivalentAddressGroup> computeAddressGroups() {
if (addresses.isEmpty()) {
throw new RuntimeException("Unable to resolve endpoint " + targetUri);
}

return addresses.stream()
.map(address -> {
return new EquivalentAddressGroup(
new InetSocketAddress(
address.getHost(),
address.getPortOrDefault(ETCD_CLIENT_PORT)),
Strings.isNullOrEmpty(getServiceAuthority())
? Attributes.newBuilder()
.set(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE, address.toString())
.build()
: Attributes.EMPTY);
})
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.github.fit51.reactiveconfig.etcd

import com.coreos.jetcd.resolver.URIResolverLoader
import com.github.fit51.reactiveconfig.etcd.gen.rpc.{AuthGrpc, AuthenticateRequest, AuthenticateResponse}
import com.google.common.net.HostAndPort
import com.typesafe.scalalogging.StrictLogging
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener
import io.grpc.Status.Code
import io.grpc._
import io.grpc.NameResolver.Args
import io.grpc.netty.{GrpcSslContexts, NettyChannelBuilder}
import io.netty.channel.ChannelOption
import io.netty.handler.ssl.ClientAuth
Expand All @@ -16,6 +17,7 @@ import java.net.URI
import java.time.Clock
import java.util.concurrent.TimeUnit
import javax.net.ssl.TrustManagerFactory
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

Expand All @@ -29,10 +31,8 @@ object ChannelManager {
options: ChannelOptions = ChannelOptions(),
authority: Option[String] = None,
trustManagerFactory: Option[TrustManagerFactory] = None
)(implicit exec: ExecutionContext): ChannelManager = {
val uris = endpoints.split(',').map(new URI(_)).toList
new ChannelManager(uris, options, authority, trustManagerFactory)
}
)(implicit exec: ExecutionContext): ChannelManager =
new ChannelManager(endpoints, options, authority, trustManagerFactory)

def apply(
endpoints: String,
Expand All @@ -41,12 +41,41 @@ object ChannelManager {
authority: Option[String] = None,
trustManagerFactory: Option[TrustManagerFactory] = None
)(implicit exec: ExecutionContext, cl: Clock): ChannelManager with Authorization = {
val uris = endpoints.split(',').map(new URI(_)).toList
new ChannelManager(uris, options, authority, trustManagerFactory) with Authorization {
val uris = endpoints
.split(',')
.map(new URI(_))
.map(e => e.getHost() + (if (e.getPort() != -1) s":${e.getPort()}" else ""))
.mkString(",")
val target = s"etcd://${authority.getOrElse("etcd")}/$uris"
new ChannelManager(target, options, authority, trustManagerFactory) with Authorization {
override val credentials = credential
override implicit val clock = cl
}
}

NameResolverRegistry
.getDefaultRegistry().register(new NameResolverProvider() {
override def newNameResolver(targetUri: URI, args: Args): NameResolver = {
val addresses = targetUri
.getPath()
.split(",")
.map(_.trim())
.map(a => if (a.startsWith("/")) a.substring(1) else a)
.map(HostAndPort.fromString)
.toList
.asJava
new MultipleAddressesResolver(targetUri, addresses)
}

override def getDefaultScheme(): String =
"etcd"

override protected def isAvailable(): Boolean =
true

override protected def priority(): Int =
Int.MinValue
})
}

/** @param uris
Expand All @@ -56,7 +85,7 @@ object ChannelManager {
* @param exec
*/
class ChannelManager(
uris: List[URI],
target: String,
options: ChannelOptions,
authority: Option[String],
tmf: Option[TrustManagerFactory]
Expand All @@ -70,10 +99,7 @@ class ChannelManager(
* Stub Api would fail and we have to retry it ourselves.
*/
val builder = NettyChannelBuilder
.forTarget("etcd")
.nameResolverFactory(
new SmartNameResolverFactory(uris, authority.getOrElse("etcd"), URIResolverLoader.defaultLoader)
)
.forTarget(target)
.defaultLoadBalancingPolicy("pick_first")
.keepAliveTime(options.keepAliveTime.toSeconds, TimeUnit.SECONDS)
.keepAliveTimeout(options.keepAliveTimeout.toSeconds, TimeUnit.SECONDS)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package com.github.fit51.reactiveconfig.etcd

import cats.effect.{Async, ContextShift}
import com.coreos.jetcd.resolver.URIResolverLoader
import com.google.protobuf.ByteString
import io.grpc.internal.GrpcUtil
import io.grpc.{Attributes, NameResolver}
import pdi.jwt.JwtClaim

import java.net.URI
import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.concurrent.duration._
Expand Down Expand Up @@ -53,14 +50,6 @@ object EtcdUtils {

class EtcdException(m: String) extends Exception(m)

class SmartNameResolverFactory(uris: List[URI], authority: String, loader: URIResolverLoader)
extends NameResolver.Factory {
override def getDefaultScheme: String = "etcd"

override def newNameResolver(targetUri: URI, params: Attributes): NameResolver =
new SmartNameResolver(authority, uris.asJava, loader)
}

sealed trait Token {
def value: String
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@
package com.github.fit51.reactiveconfig.etcd.gen.auth

object AuthProto extends _root_.scalapb.GeneratedFileObject {
lazy val dependencies: Seq[_root_.scalapb.GeneratedFileObject] = Seq(
)
lazy val dependencies: Seq[_root_.scalapb.GeneratedFileObject] = Seq.empty
lazy val messagesCompanions: Seq[_root_.scalapb.GeneratedMessageCompanion[_ <: _root_.scalapb.GeneratedMessage]] =
Seq[_root_.scalapb.GeneratedMessageCompanion[_ <: _root_.scalapb.GeneratedMessage]](
com.github.fit51.reactiveconfig.etcd.gen.auth.User,
com.github.fit51.reactiveconfig.etcd.gen.auth.Permission,
com.github.fit51.reactiveconfig.etcd.gen.auth.Role
)
private lazy val ProtoBytes: Array[Byte] =
private lazy val ProtoBytes: _root_.scala.Array[Byte] =
scalapb.Encoding.fromBase64(scala.collection.immutable.Seq(
"""CgphdXRoLnByb3RvEgZhdXRocGIicgoEVXNlchIdCgRuYW1lGAEgASgMQgniPwYSBG5hbWVSBG5hbWUSKQoIcGFzc3dvcmQYA
iABKAxCDeI/ChIIcGFzc3dvcmRSCHBhc3N3b3JkEiAKBXJvbGVzGAMgAygJQgriPwcSBXJvbGVzUgVyb2xlcyLEAQoKUGVybWlzc
Expand All @@ -30,7 +29,7 @@ object AuthProto extends _root_.scalapb.GeneratedFileObject {
}
lazy val javaDescriptor: com.google.protobuf.Descriptors.FileDescriptor = {
val javaProto = com.google.protobuf.DescriptorProtos.FileDescriptorProto.parseFrom(ProtoBytes)
com.google.protobuf.Descriptors.FileDescriptor.buildFrom(javaProto, Array(
com.google.protobuf.Descriptors.FileDescriptor.buildFrom(javaProto, _root_.scala.Array(
))
}
@deprecated("Use javaDescriptor instead. In a future version this will refer to scalaDescriptor.", "ScalaPB 0.5.47")
Expand Down
Loading

0 comments on commit 99b85be

Please sign in to comment.