Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor sesion data store module #314

Merged
merged 24 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@

import com.alipay.sofa.registry.common.model.store.Publisher;
import com.google.common.collect.Lists;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

public class ClientOffPublishers {
private final ConnectId connectId;
private final List<Publisher> publishers;

public ClientOffPublishers(ConnectId connectId, List<Publisher> publishers) {
public ClientOffPublishers(ConnectId connectId, Collection<Publisher> publishers) {
this.connectId = connectId;
this.publishers = Collections.unmodifiableList(Lists.newArrayList(publishers));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package com.alipay.sofa.registry.common.model.store;

import com.alipay.sofa.registry.common.model.ConnectId;
import com.alipay.sofa.registry.common.model.RegisterVersion;

/**
* @author shangyu.wh
* @version $Id: StoreData.java, v 0.1 2017-11-30 19:48 shangyu.wh Exp $
Expand Down Expand Up @@ -43,4 +46,12 @@ enum DataType {
* @return
*/
ID getId();

String getDataInfoId();

RegisterVersion registerVersion();

long getRegisterTimestamp();

ConnectId connectId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public ElementType getElementType() {
}

private PushContext getPushContext(String dataCenter) {
PushContext ctx = null;
PushContext ctx;
if (lastPushContexts == null) {
ctx = new PushContext();
this.lastPushContexts = Collections.singletonMap(dataCenter, ctx);
Expand Down Expand Up @@ -223,7 +223,6 @@ public synchronized long markPushEmpty(String dataCenter, long emptyVersion) {
return emptyVersion;
}

/** @return */
public synchronized CircuitBreakerStatistic getStatistic(String dataCenter) {
final PushContext ctx = getPushContext(dataCenter);
return new CircuitBreakerStatistic(
Expand All @@ -234,28 +233,6 @@ public synchronized CircuitBreakerStatistic getStatistic(String dataCenter) {
ctx.lastPushedFailTimeStamp);
}

/**
* change subscriber word cache
*
* @param subscriber
* @return
*/
public static Subscriber internSubscriber(Subscriber subscriber) {
subscriber.setDataInfoId(subscriber.getDataInfoId());
subscriber.setInstanceId(subscriber.getInstanceId());
subscriber.setGroup(subscriber.getGroup());
subscriber.setDataId(subscriber.getDataId());
subscriber.setCell(subscriber.getCell());
subscriber.setProcessId(subscriber.getProcessId());
subscriber.setAppName(subscriber.getAppName());

subscriber.setSourceAddress(URL.internURL(subscriber.getSourceAddress()));
subscriber.setTargetAddress(URL.internURL(subscriber.getTargetAddress()));
subscriber.setAttributes(subscriber.getAttributes());

return subscriber;
}

protected Map<String, String> internAttributes(Map<String, String> attributes) {
Map<String, String> intern = super.internAttributes(attributes);
return com.alipay.sofa.registry.collections.Maps.trimMap(intern);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
*/
public class Watcher extends BaseInfo {

private static final long serialVersionUID = -2122505760402915804L;
private volatile long pushedVersion;

@Override
Expand Down Expand Up @@ -53,27 +54,4 @@ public String shortDesc() {
sb.append("sourceAddress=").append(getSourceAddress().buildAddressString());
return sb.toString();
}

/**
* change watcher word cache
*
* @param watcher
* @return
*/
public static Watcher internWatcher(Watcher watcher) {
watcher.setRegisterId(watcher.getRegisterId());
watcher.setDataInfoId(watcher.getDataInfoId());
watcher.setInstanceId(watcher.getInstanceId());
watcher.setGroup(watcher.getGroup());
watcher.setDataId(watcher.getDataId());
watcher.setClientId(watcher.getClientId());
watcher.setCell(watcher.getCell());
watcher.setProcessId(watcher.getProcessId());
watcher.setAppName(watcher.getAppName());
watcher.setSourceAddress(URL.internURL(watcher.getSourceAddress()));
watcher.setTargetAddress(URL.internURL(watcher.getTargetAddress()));
watcher.setAttributes(watcher.getAttributes());

return watcher;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.alipay.sofa.registry.common.model.ClientOffPublishers;
import com.alipay.sofa.registry.common.model.ConnectId;
import com.alipay.sofa.registry.common.model.store.Publisher;
import java.util.List;
import java.util.Collection;

/**
* @author yuzhi.lyz
Expand All @@ -30,7 +30,7 @@ public final class ClientOffWriteDataRequest implements WriteDataRequest<ClientO
private final ConnectId connectId;
private final ClientOffPublishers requestBody;

public ClientOffWriteDataRequest(ConnectId connectId, List<Publisher> publishers) {
public ClientOffWriteDataRequest(ConnectId connectId, Collection<Publisher> publishers) {
this.connectId = connectId;
this.requestBody = new ClientOffPublishers(connectId, publishers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@
import com.alipay.sofa.registry.server.session.slot.SlotTableCache;
import com.alipay.sofa.registry.server.session.slot.SlotTableCacheImpl;
import com.alipay.sofa.registry.server.session.store.*;
import com.alipay.sofa.registry.server.session.store.PublisherStore;
import com.alipay.sofa.registry.server.session.store.PublisherStoreImpl;
import com.alipay.sofa.registry.server.session.store.SubscriberStore;
import com.alipay.sofa.registry.server.session.store.SubscriberStoreImpl;
import com.alipay.sofa.registry.server.session.store.WatcherStore;
import com.alipay.sofa.registry.server.session.store.WatcherStoreImpl;
import com.alipay.sofa.registry.server.session.strategy.*;
import com.alipay.sofa.registry.server.shared.client.manager.BaseClientManagerService;
import com.alipay.sofa.registry.server.shared.client.manager.ClientManagerService;
Expand Down Expand Up @@ -476,20 +482,20 @@ public Registry sessionRegistry() {

@Bean
@ConditionalOnMissingBean
public Interests sessionInterests() {
return new SessionInterests();
public PublisherStore publisherStore(SlotTableCache slotTableCache) {
return new PublisherStoreImpl(slotTableCache);
}

@Bean
@ConditionalOnMissingBean
public Watchers sessionWatchers() {
return new SessionWatchers();
public SubscriberStore subscriberStore(SessionServerConfig sessionServerConfig) {
return new SubscriberStoreImpl(sessionServerConfig);
}

@Bean
@ConditionalOnMissingBean
public DataStore sessionDataStore() {
return new SessionDataStore();
public WatcherStore watcherStore() {
return new WatcherStoreImpl();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,12 @@ public void submit(DataServerReq dataServerReq) {
if (failed) {
throw new FastRejectedExecutionException(
String.format(
"BlockingQueues.put overflow, idx=%d, totalSize=%d, queueSize=%d",
idx, totalCachedRequests, singleWorkerCachedRequest));
"BlockingQueues.put overflow, idx=%d, totalCachedRequests=%d, singleWorkerCachedRequests=%d, halfMaximumBufferSize=%d, avgSingleQueueBufferSize=%d",
idx,
totalCachedRequests,
singleWorkerCachedRequest,
halfMaximumBufferSize,
avgSingleQueueBufferSize));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import com.alipay.sofa.registry.server.session.cache.Key;
import com.alipay.sofa.registry.server.session.cache.Value;
import com.alipay.sofa.registry.server.session.circuit.breaker.CircuitBreakerService;
import com.alipay.sofa.registry.server.session.store.Interests;
import com.alipay.sofa.registry.server.session.store.SubscriberStore;
import com.alipay.sofa.registry.server.shared.util.DatumUtils;
import com.alipay.sofa.registry.task.FastRejectedExecutionException;
import com.alipay.sofa.registry.task.KeyedThreadPoolExecutor;
Expand All @@ -54,7 +54,7 @@ public class FirePushService {

@Autowired CacheService sessionCacheService;

@Autowired Interests sessionInterests;
@Autowired SubscriberStore subscriberStore;

@Autowired CircuitBreakerService circuitBreakerService;

Expand Down Expand Up @@ -151,7 +151,8 @@ static void handleFireOnWatchException(Watcher watcher, Throwable e) {
public boolean fireOnDatum(SubDatum datum, String dataNode) {
try {
DataInfo dataInfo = DataInfo.valueOf(datum.getDataInfoId());
Collection<Subscriber> subscribers = sessionInterests.getInterests(dataInfo.getDataInfoId());
Collection<Subscriber> subscribers =
subscriberStore.getByDataInfoId(dataInfo.getDataInfoId());
final long now = System.currentTimeMillis();
TriggerPushContext pushCtx =
new TriggerPushContext(datum.getDataCenter(), datum.getVersion(), dataNode, now);
Expand Down Expand Up @@ -189,7 +190,7 @@ boolean doExecuteOnChange(String changeDataInfoId, TriggerPushContext changeCtx)

private void onDatumChange(TriggerPushContext changeCtx, SubDatum datum) {
Map<ScopeEnum, List<Subscriber>> scopes =
SubscriberUtils.groupByScope(sessionInterests.getDatas(datum.getDataInfoId()));
SubscriberUtils.groupByScope(subscriberStore.getByDataInfoId(datum.getDataInfoId()));
final long datumTimestamp = PushTrace.getTriggerPushTimestamp(datum);
final PushCause cause = new PushCause(changeCtx, PushType.Sub, datumTimestamp);
for (Map.Entry<ScopeEnum, List<Subscriber>> scope : scopes.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@
import com.alipay.sofa.registry.server.session.providedata.ConfigProvideDataWatcher;
import com.alipay.sofa.registry.server.session.push.FirePushService;
import com.alipay.sofa.registry.server.session.push.PushSwitchService;
import com.alipay.sofa.registry.server.session.store.Watchers;
import com.alipay.sofa.registry.server.session.store.WatcherStore;
import com.alipay.sofa.registry.util.ConcurrentUtils;
import com.alipay.sofa.registry.util.LoopRunnable;
import com.google.common.collect.Lists;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
Expand All @@ -52,7 +51,7 @@ public class DefaultClientRegistrationHook implements ClientRegistrationHook {
@Autowired protected FirePushService firePushService;
@Autowired protected PushSwitchService pushSwitchService;
@Autowired protected ConfigProvideDataWatcher configProvideDataWatcher;
@Autowired protected Watchers sessionWatchers;
@Autowired protected WatcherStore watcherStore;

@PostConstruct
public void init() {
Expand Down Expand Up @@ -175,8 +174,8 @@ public boolean processWatch(Watcher w, boolean watchEnable) {
: processWatchWhenWatchConfigDisable(w);
}

public Tuple<Set<String>, List<Watcher>> filter() {
List<Watcher> watchers = Lists.newLinkedList(sessionWatchers.getDataList());
public Tuple<Set<String>, Collection<Watcher>> filter() {
Collection<Watcher> watchers = watcherStore.getAll();
if (CollectionUtils.isEmpty(watchers)) {
return null;
}
Expand All @@ -194,7 +193,7 @@ public Tuple<Set<String>, List<Watcher>> filter() {
}

public void processWatch() {
Tuple<Set<String>, List<Watcher>> filtered = filter();
Tuple<Set<String>, Collection<Watcher>> filtered = filter();
if (filtered == null) {
return;
}
Expand Down
Loading