Skip to content

Commit

Permalink
init connector runtime v2
Browse files Browse the repository at this point in the history
  • Loading branch information
xwm1992 committed Apr 11, 2024
1 parent a930f6d commit d4eda1e
Show file tree
Hide file tree
Showing 23 changed files with 747 additions and 0 deletions.
19 changes: 19 additions & 0 deletions eventmesh-runtime-v2/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
plugins {
id 'java'
}

group 'org.apache.eventmesh'
version '1.10.0-release'

repositories {
mavenCentral()
}

dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.apache.eventmesh.runtime;

public interface Runtime {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.apache.eventmesh.runtime;

public interface RuntimeFactory extends AutoCloseable {

void init() throws Exception;

Runtime createRuntime(RuntimeInstanceConfig runtimeInstanceConfig);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package org.apache.eventmesh.runtime;

public class RuntimeInstanceConfig {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package org.apache.eventmesh.runtime;

public class RuntimeUtils {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.runtime.boot;

import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.common.utils.SystemUtils;
import org.apache.eventmesh.common.utils.ThreadUtils;
import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.EventExecutorGroup;

import lombok.extern.slf4j.Slf4j;

/**
* The most basic server
*/
@Slf4j
public abstract class AbstractRemotingServer implements RemotingServer {

private static final int MAX_THREADS = Runtime.getRuntime().availableProcessors();
private static final int DEFAULT_SLEEP_SECONDS = 30;

private EventLoopGroup bossGroup;
private EventLoopGroup ioGroup;
private EventExecutorGroup workerGroup;
protected ProducerManager producerManager;

private int port;

protected void buildBossGroup(final String threadPrefix) {
if (useEpoll()) {
bossGroup = new EpollEventLoopGroup(1, new EventMeshThreadFactory(threadPrefix + "NettyEpoll-Boss", true));
} else {
bossGroup = new NioEventLoopGroup(1, new EventMeshThreadFactory(threadPrefix + "NettyNio-Boss", true));
}

}

private void buildIOGroup(final String threadPrefix) {
if (useEpoll()) {
ioGroup = new EpollEventLoopGroup(MAX_THREADS, new EventMeshThreadFactory(threadPrefix + "-NettyEpoll-IO"));
} else {
ioGroup = new NioEventLoopGroup(MAX_THREADS, new EventMeshThreadFactory(threadPrefix + "-NettyNio-IO"));
}
}

private void buildWorkerGroup(final String threadPrefix) {
workerGroup = new NioEventLoopGroup(MAX_THREADS, new EventMeshThreadFactory(threadPrefix + "-worker"));
}

protected void initProducerManager() throws Exception {
producerManager = new ProducerManager(this);
producerManager.init();
}

public ProducerManager getProducerManager() {
return producerManager;
}

public void init(final String threadPrefix) throws Exception {
buildBossGroup(threadPrefix);
buildIOGroup(threadPrefix);
buildWorkerGroup(threadPrefix);
}

public void start() throws Exception {
producerManager.start();
}

public void shutdown() throws Exception {
if (bossGroup != null) {
bossGroup.shutdownGracefully();
log.info("shutdown bossGroup");
}
if (Objects.isNull(producerManager)) {
producerManager.shutdown();
}
ThreadUtils.randomPause(TimeUnit.SECONDS.toMillis(DEFAULT_SLEEP_SECONDS));

if (ioGroup != null) {
ioGroup.shutdownGracefully();
log.info("shutdown ioGroup");
}

if (workerGroup != null) {
workerGroup.shutdownGracefully();

log.info("shutdown workerGroup");
}
}

protected boolean useEpoll() {
return SystemUtils.isLinuxPlatform() && Epoll.isAvailable();
}

public EventLoopGroup getBossGroup() {
return bossGroup;
}

public void setBossGroup(final EventLoopGroup bossGroup) {
this.bossGroup = bossGroup;
}

public EventLoopGroup getIoGroup() {
return ioGroup;
}

public void setIoGroup(final EventLoopGroup ioGroup) {
this.ioGroup = ioGroup;
}

public EventExecutorGroup getWorkerGroup() {
return workerGroup;
}

public void setWorkerGroup(final EventExecutorGroup workerGroup) {
this.workerGroup = workerGroup;
}

public int getPort() {
return port;
}

public void setPort(final int port) {
this.port = port;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.runtime.boot;

/**
* concrete server bootstrap
*/
public interface EventMeshBootstrap {

void init() throws Exception;

void start() throws Exception;

void shutdown() throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.runtime.boot;

import static org.apache.eventmesh.common.Constants.GRPC;

import org.apache.eventmesh.common.config.ConfigService;
import org.apache.eventmesh.common.utils.ConfigurationContextUtil;
import org.apache.eventmesh.runtime.configuration.EventMeshGrpcConfiguration;

public class EventMeshGrpcBootstrap implements EventMeshBootstrap {

private final EventMeshGrpcConfiguration eventMeshGrpcConfiguration;

private EventMeshGrpcServer eventMeshGrpcServer;

private final EventMeshServer eventMeshServer;

public EventMeshGrpcBootstrap(final EventMeshServer eventMeshServer) {
this.eventMeshServer = eventMeshServer;
ConfigService configService = ConfigService.getInstance();
this.eventMeshGrpcConfiguration = configService.buildConfigInstance(EventMeshGrpcConfiguration.class);

ConfigurationContextUtil.putIfAbsent(GRPC, eventMeshGrpcConfiguration);
}

@Override
public void init() throws Exception {
// server init
if (eventMeshGrpcConfiguration != null) {
eventMeshGrpcServer = new EventMeshGrpcServer(this.eventMeshServer, this.eventMeshGrpcConfiguration);
eventMeshGrpcServer.init();
}
}

@Override
public void start() throws Exception {
// server start
if (eventMeshGrpcConfiguration != null) {
eventMeshGrpcServer.start();
}
}

@Override
public void shutdown() throws Exception {
if (eventMeshGrpcConfiguration != null) {
eventMeshGrpcServer.shutdown();
}
}

public EventMeshGrpcServer getEventMeshGrpcServer() {
return eventMeshGrpcServer;
}

public void setEventMeshGrpcServer(EventMeshGrpcServer eventMeshGrpcServer) {
this.eventMeshGrpcServer = eventMeshGrpcServer;
}
}
Loading

0 comments on commit d4eda1e

Please sign in to comment.