Merge branch 'feature_subscription_by_query_optional' into feature_multi_tenant

# Conflicts:
#	client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java
#	client/src/main/java/com/alibaba/nacos/client/naming/core/HostReactor.java
#	client/src/main/java/com/alibaba/nacos/client/naming/net/NamingProxy.java
#	naming/src/main/java/com/alibaba/nacos/naming/controllers/HealthController.java
#	naming/src/main/java/com/alibaba/nacos/naming/monitor/PerformanceLoggerThread.java
#	naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java
#	naming/src/main/java/com/alibaba/nacos/naming/raft/RaftCore.java
#	naming/src/main/java/com/alibaba/nacos/naming/raft/RaftStore.java
This commit is contained in:
nkorange 2019-01-15 11:41:59 +08:00
parent d0bfc0e179
commit 65408a6194
10 changed files with 49 additions and 48 deletions

View File

@ -100,6 +100,9 @@ public class RaftCore {
@Autowired
private SwitchDomain switchDomain;
@Autowired
private RaftProxy raftProxy;
public volatile Notifier notifier = new Notifier();
@PostConstruct
@ -158,7 +161,7 @@ public class RaftCore {
Map<String, String> parameters = new HashMap<>(1);
parameters.put("key", key);
RaftProxy.proxyPostLarge(API_PUB, params.toJSONString(), parameters);
raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters);
return;
}
@ -232,7 +235,7 @@ public class RaftCore {
Map<String, String> params = new HashMap<>(1);
params.put("key", URLEncoder.encode(key, "UTF-8"));
RaftProxy.proxyGET(API_DEL, params);
raftProxy.proxyGET(getLeader().ip, API_DEL, params);
return;
}

View File

@ -18,6 +18,7 @@ package com.alibaba.nacos.naming.consistency.cp.simpleraft;
import com.alibaba.nacos.naming.boot.RunningConfig;
import com.alibaba.nacos.naming.misc.HttpClient;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import org.springframework.stereotype.Component;
import java.net.HttpURLConnection;
import java.util.Map;
@ -25,18 +26,11 @@ import java.util.Map;
/**
* @author nacos
*/
@Component
public class RaftProxy {
public static void proxyGET(String api, Map<String, String> params) throws Exception {
if (RaftCore.isLeader()) {
throw new IllegalStateException("I'm leader, no need to do proxy");
}
if (RaftCore.getLeader() == null) {
throw new IllegalStateException("No leader at present");
}
public void proxyGET(String server, String api, Map<String, String> params) throws Exception {
// do proxy
String server = RaftCore.getLeader().ip;
if (!server.contains(UtilsAndCommons.CLUSTER_CONF_IP_SPLITER)) {
server = server + UtilsAndCommons.CLUSTER_CONF_IP_SPLITER + RunningConfig.getServerPort();
}
@ -48,17 +42,8 @@ public class RaftProxy {
}
}
public static void proxyPostLarge(String api, String content, Map<String, String> headers) throws Exception {
if (RaftCore.isLeader()) {
throw new IllegalStateException("I'm leader, no need to do proxy");
}
if (RaftCore.getLeader() == null) {
throw new IllegalStateException("No leader at present");
}
public static void proxyPostLarge(String server, String api, String content, Map<String, String> headers) throws Exception {
// do proxy
String server = RaftCore.getLeader().ip;
if (!server.contains(UtilsAndCommons.CLUSTER_CONF_IP_SPLITER)) {
server = server + UtilsAndCommons.CLUSTER_CONF_IP_SPLITER + RunningConfig.getServerPort();
}

View File

@ -19,8 +19,8 @@ import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.core.utils.WebUtils;
import com.alibaba.nacos.naming.boot.RunningConfig;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.DomainsManager;
import com.alibaba.nacos.naming.core.IpAddress;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.core.VirtualClusterDomain;
import com.alibaba.nacos.naming.misc.HttpClient;
import com.alibaba.nacos.naming.misc.Loggers;
@ -45,8 +45,15 @@ import java.util.Map;
@RestController("namingHealthController")
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/health")
public class HealthController {
@Autowired
private DomainsManager domainsManager;
private ServiceManager serviceManager;
@Autowired
private DistroMapper distroMapper;
@Autowired
private PushService pushService;
@RequestMapping(method = {RequestMethod.POST, RequestMethod.PUT})
public String update(HttpServletRequest request) throws Exception {
@ -59,8 +66,8 @@ public class HealthController {
boolean valid = Boolean.valueOf(WebUtils.required(request, "valid"));
String clusterName = WebUtils.optional(request, "clusterName", UtilsAndCommons.DEFAULT_CLUSTER_NAME);
if (!DistroMapper.responsible(dom)) {
String server = DistroMapper.mapSrv(dom);
if (!distroMapper.responsible(dom)) {
String server = distroMapper.mapSrv(dom);
Loggers.EVT_LOG.info("I'm not responsible for " + dom + ", proxy it to " + server);
Map<String, String> proxyParams = new HashMap<>(16);
for (Map.Entry<String, String[]> entry : request.getParameterMap().entrySet()) {
@ -81,7 +88,7 @@ public class HealthController {
throw new IllegalArgumentException("failed to proxy health update to " + server + ", dom: " + dom);
}
} else {
VirtualClusterDomain virtualClusterDomain = (VirtualClusterDomain) domainsManager.getDomain(namespaceId, dom);
VirtualClusterDomain virtualClusterDomain = serviceManager.getService(namespaceId, dom);
// Only health check "none" need update health status with api
if (!virtualClusterDomain.getEnableHealthCheck() && !virtualClusterDomain.getEnableClientBeat()) {
for (IpAddress ipAddress : virtualClusterDomain.allIPs(Lists.newArrayList(clusterName))) {
@ -90,7 +97,7 @@ public class HealthController {
Loggers.EVT_LOG.info((valid ? "[IP-ENABLED]" : "[IP-DISABLED]") + " ips: "
+ ipAddress.getIp() + ":" + ipAddress.getPort() + "@" + ipAddress.getClusterName()
+ ", dom: " + dom + ", msg: update thought HealthController api");
PushService.domChanged(namespaceId, virtualClusterDomain.getName());
pushService.domChanged(namespaceId, virtualClusterDomain.getName());
break;
}
}

View File

@ -214,9 +214,9 @@ public class IpAddress extends Instance implements Comparable {
@JSONField(serialize = false)
public String getDatumKey() {
if (getPort() > 0) {
return getIp() + ":" + getPort() + ":" + DistroMapper.LOCALHOST_SITE;
return getIp() + ":" + getPort() + ":" + UtilsAndCommons.LOCALHOST_SITE;
} else {
return getIp() + ":" + DistroMapper.LOCALHOST_SITE;
return getIp() + ":" + UtilsAndCommons.LOCALHOST_SITE;
}
}

View File

@ -20,6 +20,7 @@ import com.alibaba.nacos.naming.core.Cluster;
import com.alibaba.nacos.naming.core.IpAddress;
import com.alibaba.nacos.naming.core.VirtualClusterDomain;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.ning.http.client.AsyncCompletionHandler;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.AsyncHttpClientConfig;

View File

@ -21,6 +21,7 @@ import com.alibaba.nacos.naming.core.IpAddress;
import com.alibaba.nacos.naming.core.VirtualClusterDomain;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.mysql.jdbc.jdbc2.optional.MysqlDataSource;
import io.netty.channel.ConnectTimeoutException;
import org.apache.commons.collections.CollectionUtils;

View File

@ -19,7 +19,6 @@ import com.alibaba.nacos.naming.core.Cluster;
import com.alibaba.nacos.naming.core.IpAddress;
import com.alibaba.nacos.naming.core.VirtualClusterDomain;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.Switch;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import org.apache.commons.collections.CollectionUtils;

View File

@ -78,6 +78,8 @@ public class UtilsAndCommons {
public static final String DEFAULT_CLUSTER_NAME = "DEFAULT";
public static final String LOCALHOST_SITE = UtilsAndCommons.UNKNOWN_SITE;
public static final int RAFT_PUBLISH_TIMEOUT = 5000;
static public final String RAFT_DOM_PRE = "meta";

View File

@ -15,6 +15,8 @@
*/
package com.alibaba.nacos.naming.monitor;
import com.alibaba.nacos.naming.consistency.cp.simpleraft.RaftCore;
import com.alibaba.nacos.naming.consistency.cp.simpleraft.RaftPeer;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
@ -33,8 +35,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import static com.alibaba.nacos.naming.raft.RaftPeer.State.FOLLOWER;
/**
* @author nacos
*/
@ -48,6 +48,12 @@ public class PerformanceLoggerThread {
@Autowired
private SwitchDomain switchDomain;
@Autowired
private PushService pushService;
@Autowired
private RaftCore raftCore;
private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
@ -91,8 +97,8 @@ public class PerformanceLoggerThread {
@Scheduled(cron = "0 0 0 * * ?")
public void refreshMetrics() {
PushService.setFailedPush(0);
PushService.setTotalPush(0);
pushService.setFailedPush(0);
pushService.setTotalPush(0);
MetricsMonitor.getHttpHealthCheckMonitor().set(0);
MetricsMonitor.getMysqlHealthCheckMonitor().set(0);
MetricsMonitor.getTcpHealthCheckMonitor().set(0);
@ -100,10 +106,10 @@ public class PerformanceLoggerThread {
@Scheduled(cron = "0/15 * * * * ?")
public void collectmetrics() {
int domCount = domainsManager.getDomCount();
int domCount = serviceManager.getDomCount();
MetricsMonitor.getDomCountMonitor().set(domCount);
int ipCount = domainsManager.getInstanceCount();
int ipCount = serviceManager.getInstanceCount();
MetricsMonitor.getIpCountMonitor().set(ipCount);
long maxPushCost = getMaxPushCost();
@ -112,12 +118,12 @@ public class PerformanceLoggerThread {
long avgPushCost = getAvgPushCost();
MetricsMonitor.getAvgPushCostMonitor().set(avgPushCost);
MetricsMonitor.getTotalPushMonitor().set(PushService.getTotalPush());
MetricsMonitor.getFailedPushMonitor().set(PushService.getFailedPushCount());
MetricsMonitor.getTotalPushMonitor().set(pushService.getTotalPush());
MetricsMonitor.getFailedPushMonitor().set(pushService.getFailedPushCount());
if (RaftCore.isLeader()) {
if (raftCore.isLeader()) {
MetricsMonitor.getLeaderStatusMonitor().set(1);
} else if (RaftCore.getPeerSet().local().state == FOLLOWER) {
} else if (raftCore.getPeerSet().local().state == RaftPeer.State.FOLLOWER) {
MetricsMonitor.getLeaderStatusMonitor().set(0);
} else {
MetricsMonitor.getLeaderStatusMonitor().set(2);
@ -132,8 +138,6 @@ public class PerformanceLoggerThread {
int domCount = serviceManager.getDomCount();
int ipCount = serviceManager.getInstanceCount();
long maxPushMaxCost = getMaxPushCost();
int domCount = domainsManager.getDomCount();
int ipCount = domainsManager.getInstanceCount();
long maxPushCost = getMaxPushCost();
long avgPushCost = getAvgPushCost();

View File

@ -121,16 +121,15 @@ public class PushService {
}
}
public static int getTotalPush() {
public int getTotalPush() {
return totalPush;
}
public void addClient(String namespaceId,
public static void setTotalPush(int totalPush) {
public void setTotalPush(int totalPush) {
PushService.totalPush = totalPush;
}
public static void addClient(String namespaceId,
public void addClient(String namespaceId,
String dom,
String clusters,
String agent,
@ -307,11 +306,11 @@ public class PushService {
return new ArrayList<Receiver.AckEntry>(ackMap.values());
}
public static int getFailedPushCount() {
public int getFailedPushCount() {
return ackMap.size() + failedPush;
}
public static void setFailedPush(int failedPush) {
public void setFailedPush(int failedPush) {
PushService.failedPush = failedPush;
}