This commit is contained in:
nkorange 2019-01-09 10:08:05 +08:00
parent 0df5798147
commit c7800b42bd
5 changed files with 53 additions and 25 deletions

View File

@ -31,5 +31,7 @@ public class PropertyKeyConst {
public final static String CLUSTER_NAME = "clusterName";
public final static String ENCODE = "encode";
public final static String NAMING_LOAD_CACHE_AT_START = "namingLoadCacheAtStart";
public final static String NAMING_CLIENT_BEAT_THREAD_COUNT = "namingClientBeatThreadCount";
public final static String NAMING_POLLING_THREAD_COUNT = "namingPollingThreadCount";
}

View File

@ -35,6 +35,7 @@ import com.alibaba.nacos.client.naming.utils.LogUtils;
import com.alibaba.nacos.client.naming.utils.StringUtils;
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.math.NumberUtils;
import java.util.ArrayList;
import java.util.Iterator;
@ -101,7 +102,7 @@ public class NacosNamingService implements NamingService {
eventDispatcher = new EventDispatcher();
serverProxy = new NamingProxy(namespace, endpoint, serverList);
beatReactor = new BeatReactor(serverProxy);
hostReactor = new HostReactor(eventDispatcher, serverProxy, cacheDir, false);
hostReactor = new HostReactor(eventDispatcher, serverProxy, cacheDir);
}
public NacosNamingService(Properties properties) {
@ -131,10 +132,16 @@ public class NacosNamingService implements NamingService {
properties.getProperty(PropertyKeyConst.NAMING_LOAD_CACHE_AT_START));
}
int clientBeatThreadCount = NumberUtils.toInt(properties.getProperty(PropertyKeyConst.NAMING_CLIENT_BEAT_THREAD_COUNT),
UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);
int pollingThreadCount = NumberUtils.toInt(properties.getProperty(PropertyKeyConst.NAMING_POLLING_THREAD_COUNT),
UtilAndComs.DEFAULT_POLLING_THREAD_COUNT);
eventDispatcher = new EventDispatcher();
serverProxy = new NamingProxy(namespace, endpoint, serverList);
beatReactor = new BeatReactor(serverProxy);
hostReactor = new HostReactor(eventDispatcher, serverProxy, cacheDir, loadCacheAtStart);
beatReactor = new BeatReactor(serverProxy, clientBeatThreadCount);
hostReactor = new HostReactor(eventDispatcher, serverProxy, cacheDir, loadCacheAtStart, pollingThreadCount);
}

View File

@ -18,6 +18,7 @@ package com.alibaba.nacos.client.naming.beat;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.client.naming.net.NamingProxy;
import com.alibaba.nacos.client.naming.utils.LogUtils;
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
import java.util.Map;
import java.util.concurrent.*;
@ -27,15 +28,7 @@ import java.util.concurrent.*;
*/
public class BeatReactor {
private ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.beat.sender");
return thread;
}
});
private ScheduledExecutorService executorService;
private long clientBeatInterval = 5 * 1000;
@ -44,7 +37,22 @@ public class BeatReactor {
public final Map<String, BeatInfo> dom2Beat = new ConcurrentHashMap<String, BeatInfo>();
public BeatReactor(NamingProxy serverProxy) {
this(serverProxy, UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);
}
public BeatReactor(NamingProxy serverProxy, int threadCount) {
this.serverProxy = serverProxy;
executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.beat.sender");
return thread;
}
});
executorService.scheduleAtFixedRate(new BeatProcessor(), 0, clientBeatInterval, TimeUnit.MILLISECONDS);
}

View File

@ -16,14 +16,12 @@
package com.alibaba.nacos.client.naming.core;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.client.naming.backups.FailoverReactor;
import com.alibaba.nacos.client.naming.cache.DiskCache;
import com.alibaba.nacos.client.naming.net.NamingProxy;
import com.alibaba.nacos.client.naming.utils.LogUtils;
import com.alibaba.nacos.client.naming.utils.NetUtils;
import com.alibaba.nacos.client.naming.utils.StringUtils;
import com.alibaba.nacos.client.naming.utils.UtilAndComs;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@ -56,8 +54,25 @@ public class HostReactor {
private String cacheDir;
private ScheduledExecutorService executor;
public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir) {
this(eventDispatcher, serverProxy, cacheDir, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT);
}
public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir,
boolean loadCacheAtStart) {
boolean loadCacheAtStart, int pollingThreadCount) {
executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.client.naming.updater");
return thread;
}
});
this.eventDispatcher = eventDispatcher;
this.serverProxy = serverProxy;
this.cacheDir = cacheDir;
@ -72,16 +87,6 @@ public class HostReactor {
this.pushRecver = new PushRecver(this);
}
private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "com.alibaba.nacos.client.naming.updater");
thread.setDaemon(true);
return thread;
}
});
public Map<String, ServiceInfo> getServiceInfoMap() {
return serviceInfoMap;
}

View File

@ -45,4 +45,10 @@ public class UtilAndComs {
public static final String NACOS_NAMING_LOG_LEVEL = "com.alibaba.nacos.naming.log.level";
public static final String SERVER_ADDR_IP_SPLITER = ":";
public static final int DEFAULT_CLIENT_BEAT_THREAD_COUNT = Runtime.getRuntime().availableProcessors() > 1 ?
Runtime.getRuntime().availableProcessors() / 2 : 1;
public static final int DEFAULT_POLLING_THREAD_COUNT = Runtime.getRuntime().availableProcessors() > 1 ?
Runtime.getRuntime().availableProcessors() / 2 : 1;
}