Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Sequencer for spring boot #41

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.mosn.layotto.examples.pubsub.springboot;

import io.mosn.layotto.v1.Sequencer;
import org.springframework.stereotype.Component;
import spec.sdk.runtime.v1.domain.sequencer.GetNextIdRequest;
@Component
public class SequencerMethod {
@Sequencer(store_name="sequencer_demo", key = "examples", options ="STRONG" )
public void method1(long nextId) {
System.out.println("NextID: "+ nextId);
}

@Sequencer(store_name="sequencer_demo", key = "examples", options ="WEAK" )
public void method2(long nextId) {
System.out.println("NextID: "+ nextId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,8 @@ public LayottoBeanPostProcessor layottoBeanPostProcessor(ConfigurableBeanFactory
public LayottoSubscriberStarter LayottoSubscriberStarter() {
return new LayottoSubscriberStarter();
}

@Bean
@ConditionalOnMissingBean
public LayottoBeanPostProcessorSequencer LayottoBeanPostProcessorSequencer(ConfigurableBeanFactory beanFactory){return new LayottoBeanPostProcessorSequencer(beanFactory); }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package io.mosn.layotto.springboot;

import io.mosn.layotto.v1.RuntimeClientBuilder;
import io.mosn.layotto.v1.Sequencer;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.beans.factory.config.EmbeddedValueResolver;
import spec.sdk.runtime.v1.client.RuntimeClient;
import spec.sdk.runtime.v1.domain.sequencer.GetNextIdRequest;
import spec.sdk.runtime.v1.domain.sequencer.SequencerOptions;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;


public class LayottoBeanPostProcessorSequencer implements BeanPostProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(LayottoBeanPostProcessorSequencer.class.getName());

private final EmbeddedValueResolver embeddedValueResolver;
@Autowired
private static LayottoProperties layottoConfig;


LayottoBeanPostProcessorSequencer(ConfigurableBeanFactory beanFactory) {
embeddedValueResolver = new EmbeddedValueResolver(beanFactory);
}

/**
* {@inheritDoc}
*/
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {

try {
sequencer(bean.getClass(), bean, embeddedValueResolver);
} catch (InvocationTargetException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}

return bean;
}


/**
* {@inheritDoc}
*/
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

return bean;
}

private static void sequencer(Class clazz, Object bean, EmbeddedValueResolver embeddedValueResolver) throws InvocationTargetException, IllegalAccessException {
if (clazz == null) {
return;
}
sequencer(clazz.getSuperclass(), bean, embeddedValueResolver);
for (Method method : clazz.getDeclaredMethods()) {
Sequencer getsequencer = method.getAnnotation(Sequencer.class);
if (getsequencer == null) {
continue;
}
String storeName = embeddedValueResolver.resolveStringValue(getsequencer.store_name());
String key = embeddedValueResolver.resolveStringValue(getsequencer.key());
String options = embeddedValueResolver.resolveStringValue(getsequencer.options());
if (StringUtils.isNotEmpty(storeName) && StringUtils.isNotEmpty(key)&& StringUtils.isNotEmpty(options)) {

RuntimeClient layottoRuntime = new RuntimeClientBuilder().withIp(layottoConfig.DEFAULT_IP).
withPort(layottoConfig.DEFAULT_PORT)
.build();
GetNextIdRequest getNextIdRequest = new GetNextIdRequest();
getNextIdRequest.setStoreName(storeName);
getNextIdRequest.setKey(key);

SequencerOptions anoptions = new SequencerOptions();
if(options.equals("STRONG") )
anoptions.setOption(SequencerOptions.AutoIncrement.STRONG);
else if(options.equals("WEAK"))
anoptions.setOption(SequencerOptions.AutoIncrement.WEAK);
getNextIdRequest.setOptions(anoptions);

long nextId = layottoRuntime.getNextId(getNextIdRequest).getNextId();
try {
method.invoke(bean, nextId);
} catch (Exception e) {
LOGGER.error("layotto sequencer method [{}] err:{ }", method.getName(), e.getMessage());
throw e;
}
LOGGER.info("NextID:{},options:{}", nextId,options);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,27 @@ public class LayottoProperties {

public static final int DEFAULT_SUBSCRIBER_PORT = 9999;

public static final int DEFAULT_PORT = 34904;

public static final String DEFAULT_IP = "127.0.0.1";


public Integer subscriberPort;

public Integer Port;

public String Ip;

public String getIp() { return Ip == null ? DEFAULT_IP : this.Ip;}

public void setIp(String ip) { Ip = ip; }


public Integer getPort() { return Port == null ? DEFAULT_PORT : this.Port; }

public void setPort(Integer port) { this.Port = port; }


public void setSubscriberPort(Integer subscriberPort) {
this.subscriberPort = subscriberPort;
}
Expand Down
14 changes: 14 additions & 0 deletions sdk/src/main/java/io/mosn/layotto/v1/Sequencer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@

package io.mosn.layotto.v1;

import java.lang.annotation.*;

@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Sequencer {

String store_name();
fft0518 marked this conversation as resolved.
Show resolved Hide resolved
String key();
String options();
}