#502 Fix bug
This commit is contained in:
parent
71af535efd
commit
918dcf9905
@ -173,6 +173,7 @@ public class NamingProxy {
|
|||||||
params.put("weight", String.valueOf(instance.getWeight()));
|
params.put("weight", String.valueOf(instance.getWeight()));
|
||||||
params.put("enable", String.valueOf(instance.isEnabled()));
|
params.put("enable", String.valueOf(instance.isEnabled()));
|
||||||
params.put("healthy", String.valueOf(instance.isHealthy()));
|
params.put("healthy", String.valueOf(instance.isHealthy()));
|
||||||
|
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
|
||||||
params.put("metadata", JSON.toJSONString(instance.getMetadata()));
|
params.put("metadata", JSON.toJSONString(instance.getMetadata()));
|
||||||
|
|
||||||
reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
|
reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
|
||||||
|
@ -39,6 +39,9 @@ public class PartitionConfig {
|
|||||||
@Value("${nacos.naming.partition.syncRetryDelay}")
|
@Value("${nacos.naming.partition.syncRetryDelay}")
|
||||||
private long syncRetryDelay = 5000L;
|
private long syncRetryDelay = 5000L;
|
||||||
|
|
||||||
|
@Value("${nacos.naming.partition.taskDispatchThreadCount}")
|
||||||
|
private int taskDispatchThreadCount = Runtime.getRuntime().availableProcessors();
|
||||||
|
|
||||||
public int getTaskDispatchPeriod() {
|
public int getTaskDispatchPeriod() {
|
||||||
return taskDispatchPeriod;
|
return taskDispatchPeriod;
|
||||||
}
|
}
|
||||||
@ -54,4 +57,8 @@ public class PartitionConfig {
|
|||||||
public long getSyncRetryDelay() {
|
public long getSyncRetryDelay() {
|
||||||
return syncRetryDelay;
|
return syncRetryDelay;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getTaskDispatchThreadCount() {
|
||||||
|
return taskDispatchThreadCount;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -130,7 +130,8 @@ public class PartitionConsistencyServiceImpl implements EphemeralConsistencyServ
|
|||||||
if (isResponsible(entry.getKey())) {
|
if (isResponsible(entry.getKey())) {
|
||||||
// this key should not be sent from remote server:
|
// this key should not be sent from remote server:
|
||||||
Loggers.EPHEMERAL.error("receive responsible key timestamp of " + entry.getKey() + " from " + server);
|
Loggers.EPHEMERAL.error("receive responsible key timestamp of " + entry.getKey() + " from " + server);
|
||||||
continue;
|
// abort the procedure:
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
if (!dataStore.contains(entry.getKey()) ||
|
if (!dataStore.contains(entry.getKey()) ||
|
||||||
dataStore.get(entry.getKey()).value == null ||
|
dataStore.get(entry.getKey()).value == null ||
|
||||||
|
@ -25,8 +25,7 @@ import org.springframework.stereotype.Component;
|
|||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.*;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Data sync task dispatcher
|
* Data sync task dispatcher
|
||||||
@ -37,28 +36,29 @@ import java.util.concurrent.LinkedBlockingQueue;
|
|||||||
@Component
|
@Component
|
||||||
public class TaskDispatcher {
|
public class TaskDispatcher {
|
||||||
|
|
||||||
private List<BlockingQueue<String>> taskList = new ArrayList<>();
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private PartitionConfig partitionConfig;
|
private PartitionConfig partitionConfig;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private DataSyncer dataSyncer;
|
private DataSyncer dataSyncer;
|
||||||
|
|
||||||
|
private List<TaskScheduler> taskSchedulerList = new ArrayList<>();
|
||||||
|
|
||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void init() {
|
public void init() {
|
||||||
for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) {
|
for (int i = 0; i < partitionConfig.getTaskDispatchThreadCount(); i++) {
|
||||||
taskList.add(new LinkedBlockingQueue<>(128 * 1024));
|
TaskScheduler taskScheduler = new TaskScheduler(i);
|
||||||
GlobalExecutor.submitTaskDispatch(new TaskScheduler(i));
|
taskSchedulerList.add(taskScheduler);
|
||||||
|
GlobalExecutor.submitTaskDispatch(taskScheduler);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public int mapTask(String key) {
|
public int mapTask(String key) {
|
||||||
return Math.abs(key.hashCode() % Integer.MAX_VALUE) % Runtime.getRuntime().availableProcessors();
|
return Math.abs(key.hashCode()) % partitionConfig.getTaskDispatchThreadCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addTask(String key) {
|
public void addTask(String key) {
|
||||||
taskList.get(mapTask(key)).add(key);
|
taskSchedulerList.get(mapTask(key)).addTask(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
public class TaskScheduler implements Runnable {
|
public class TaskScheduler implements Runnable {
|
||||||
@ -69,10 +69,16 @@ public class TaskDispatcher {
|
|||||||
|
|
||||||
private long lastDispatchTime = 0L;
|
private long lastDispatchTime = 0L;
|
||||||
|
|
||||||
|
private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);
|
||||||
|
|
||||||
public TaskScheduler(int index) {
|
public TaskScheduler(int index) {
|
||||||
this.index = index;
|
this.index = index;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void addTask(String key) {
|
||||||
|
queue.offer(key);
|
||||||
|
}
|
||||||
|
|
||||||
public int getIndex() {
|
public int getIndex() {
|
||||||
return index;
|
return index;
|
||||||
}
|
}
|
||||||
@ -85,7 +91,8 @@ public class TaskDispatcher {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
String key = taskList.get(index).take();
|
String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),
|
||||||
|
TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
if (dataSize == 0) {
|
if (dataSize == 0) {
|
||||||
keys = new ArrayList<>();
|
keys = new ArrayList<>();
|
||||||
|
@ -24,12 +24,12 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
|
|||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.http.HttpMethod;
|
import org.springframework.http.HttpMethod;
|
||||||
import org.springframework.jmx.export.UnableToRegisterMBeanException;
|
|
||||||
|
|
||||||
import javax.servlet.*;
|
import javax.servlet.*;
|
||||||
import javax.servlet.http.HttpServletRequest;
|
import javax.servlet.http.HttpServletRequest;
|
||||||
import javax.servlet.http.HttpServletResponse;
|
import javax.servlet.http.HttpServletResponse;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -67,32 +67,44 @@ public class DistroFilter implements Filter {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
String serviceName = req.getParameter(CommonParams.SERVICE_NAME);
|
try {
|
||||||
|
|
||||||
if (StringUtils.isNoneBlank(serviceName) && !HttpMethod.GET.name().equals(req.getMethod())
|
String path = new URI(req.getRequestURI()).getPath();
|
||||||
&& !distroMapper.responsible(serviceName)) {
|
|
||||||
|
|
||||||
String url = "http://" + distroMapper.mapSrv(serviceName) +
|
String serviceName = req.getParameter(CommonParams.SERVICE_NAME);
|
||||||
req.getRequestURI() + "?" + req.getQueryString();
|
|
||||||
try {
|
boolean isMethodWrite =
|
||||||
resp.setCharacterEncoding("utf-8");
|
(HttpMethod.POST.name().equals(req.getMethod()) || HttpMethod.DELETE.name().equals(req.getMethod()));
|
||||||
resp.getWriter().write(distroMapper.mapSrv(serviceName));
|
|
||||||
resp.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT);
|
boolean isUrlInstance =
|
||||||
} catch (Exception ignore) {
|
path.startsWith(UtilsAndCommons.NACOS_NAMING_CONTEXT + UtilsAndCommons.NACOS_NAMING_INSTANCE_CONTEXT);
|
||||||
Loggers.SRV_LOG.warn("[DISTRO-FILTER] request failed: " + url);
|
|
||||||
|
if (isUrlInstance && isMethodWrite && !distroMapper.responsible(serviceName)) {
|
||||||
|
|
||||||
|
String url = "http://" + distroMapper.mapSrv(serviceName) +
|
||||||
|
req.getRequestURI() + "?" + req.getQueryString();
|
||||||
|
try {
|
||||||
|
resp.setCharacterEncoding("utf-8");
|
||||||
|
resp.getWriter().write(distroMapper.mapSrv(serviceName));
|
||||||
|
resp.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT);
|
||||||
|
} catch (Exception ignore) {
|
||||||
|
Loggers.SRV_LOG.warn("[DISTRO-FILTER] request failed: " + url);
|
||||||
|
}
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
return;
|
|
||||||
|
String groupName = req.getParameter(CommonParams.GROUP_NAME);
|
||||||
|
if (StringUtils.isBlank(groupName)) {
|
||||||
|
groupName = UtilsAndCommons.DEFAULT_GROUP_NAME;
|
||||||
|
}
|
||||||
|
|
||||||
|
OverrideParameterRequestWrapper requestWrapper = OverrideParameterRequestWrapper.buildRequest(req);
|
||||||
|
requestWrapper.addParameter(CommonParams.SERVICE_NAME, groupName + UtilsAndCommons.GROUP_SERVICE_CONNECTOR + serviceName);
|
||||||
|
filterChain.doFilter(requestWrapper, resp);
|
||||||
|
} catch (Exception e) {
|
||||||
|
resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
|
||||||
|
"Server failed," + UtilsAndCommons.getAllExceptionMsg(e));
|
||||||
}
|
}
|
||||||
|
|
||||||
String groupName = req.getParameter(CommonParams.GROUP_NAME);
|
|
||||||
if (StringUtils.isBlank(groupName)) {
|
|
||||||
groupName = UtilsAndCommons.DEFAULT_GROUP_NAME;
|
|
||||||
}
|
|
||||||
|
|
||||||
OverrideParameterRequestWrapper requestWrapper = OverrideParameterRequestWrapper.buildRequest(req);
|
|
||||||
requestWrapper.addParameter(CommonParams.SERVICE_NAME, groupName + UtilsAndCommons.GROUP_SERVICE_CONNECTOR + serviceName);
|
|
||||||
|
|
||||||
filterChain.doFilter(requestWrapper, resp);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Loading…
Reference in New Issue
Block a user