refactor: request forwarding logic adjustment

This commit is contained in:
chuntaojun 2020-04-19 09:25:28 +08:00
parent 0af4d4f510
commit a9918f2332
8 changed files with 72 additions and 67 deletions

View File

@ -135,7 +135,9 @@ public class TransferToLeaderFilter implements Filter {
String val = req.getHeader(Constants.FORWARD_LEADER);
final int transferCnt = Integer.parseInt(StringUtils.isEmpty(val) ? "0" : val) + 1;
if (transferCnt > MAX_TRANSFER_CNT) {
// Requests can only be forwarded once if a downgrade is not triggered
if (transferCnt > MAX_TRANSFER_CNT && !downgrading) {
resp.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
"Exceeded forwarding times:" + req.getMethod() + ":" + req.getRequestURI());
return;

View File

@ -26,7 +26,7 @@ import java.util.concurrent.CompletableFuture;
/**
* Has nothing to do with the specific implementation of the consistency protocol
* Initialization sequence init(Config) => loadLogProcessor(List)
* Initialization sequence init(Config)
*
* <ul>
* <li>{@link Config} : Relevant configuration information required by the consistency protocol,

View File

@ -34,15 +34,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* Consistent protocol metadata information, <Key, <Key, Value >> structure
* Listeners that can register to listen to changes in value
*
* <ul>
* <li>
* <global, <cluster, List <String >> metadata information that exists by default, that is, all node information of the entire cluster
* </li>
* <li>
* <global, <self, String> The metadata information existing by default, that is, the IP: PORT information of the current node
* </li>
* </ul>
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
@SuppressWarnings("PMD.Rule:CollectionInitShouldAssignCapacityRule")

View File

@ -13,18 +13,16 @@ server.port=8848
#*************** Core Related Configurations ***************#
### Which nacos embedded distributed ID is turned on,
### If an external implementation is provided, the external implementation is automatically selected
nacos.core.id-generator.type=default
### The step size for each fetch of the embedded distributed ID
nacos.core.id-generator.default.acquire.step=100
### If nacos.core.idGenerator.type=snakeflower, You need to set the dataCenterID manually
### set the dataCenterID manually
# nacos.core.snowflake.data-center=
### If nacos.core.idGenerator.type=snakeflower, You need to set the WorkerID manually
### set the WorkerID manually
# nacos.core.snowflake.worker-id=
### MemberLookup
# for AddressServerMemberLookup
# Maximum number of retries to query the address server upon initialization
# nacos.core.address-server.retry=5
#*************** Config Module Related Configurations ***************#
### If user MySQL as datasource:
# spring.datasource.platform=mysql
@ -33,9 +31,10 @@ nacos.core.id-generator.default.acquire.step=100
# db.num=1
### Connect URL of DB:
# db.url.0=jdbc:mysql://1.1.1.1:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true
# db.user=user
# db.password=password
#db.num=1
#db.url.0=jdbc:mysql://127.0.0.1:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC
#db.user=root
#db.password=1017
#*************** Naming Module Related Configurations ***************#
@ -191,10 +190,3 @@ nacos.core.protocol.distro.data.sync_retry_delay_ms=5000
nacos.core.protocol.distro.data.distro_enabled=true
### Data synchronization retry strategy
nacos.core.protocol.distro.data.retry_policy=simple
# DB Configuration
#spring.datasource.platform=mysql
#db.num=1
#db.url.0=jdbc:mysql://127.0.0.1:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC
#db.user=root
#db.password=1017

View File

@ -57,7 +57,6 @@ public class AddressServerMemberLookup extends AbstractMemberLookup {
private NSyncHttpClient syncHttpClient = HttpClientManager
.newSyncHttpClient(AddressServerMemberLookup.class.getCanonicalName());
private AddressServerSyncTask task;
private int maxRetry = Integer.getInteger("nacos.address-server.retry", 5);
private volatile boolean shutdown = false;
@Override
@ -97,6 +96,7 @@ public class AddressServerMemberLookup extends AbstractMemberLookup {
// Repeat three times, successfully jump out
boolean success = false;
Throwable ex = null;
int maxRetry = ApplicationUtils.getProperty("nacos.core.address-server.retry", Integer.class, 5);
for (int i = 0; i < maxRetry; i ++) {
try {
syncFromAddressUrl();
@ -108,7 +108,7 @@ public class AddressServerMemberLookup extends AbstractMemberLookup {
}
}
if (!success) {
throw new RuntimeException(ex);
throw new NacosException(NacosException.SERVER_ERROR, ex);
}
task = new AddressServerSyncTask();

View File

@ -56,7 +56,7 @@ import java.util.Set;
* Member A
* [ip1.port,ip2.port]
*
* GossipMemberLookup
* DiscoveryMemberLookup
*
*
* [ip1:port,ip2:port,ip3:port]

View File

@ -17,6 +17,7 @@ package com.alibaba.nacos.core.distributed.id;
import com.alibaba.nacos.consistency.IdGenerator;
import com.alibaba.nacos.core.exception.SnakflowerException;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import java.net.InetAddress;
import java.net.UnknownHostException;
@ -30,7 +31,7 @@ import java.util.Map;
* on the Raft Term and the maximum DataCenterId information
*
* <strong>WorkerId</strong> generation policy: Calculate the InetAddress hashcode
*
* <p>
* The repeat rate of the dataCenterId, the value of the maximum dataCenterId times the
* time of each Raft election. The time for raft to select the master is generally measured
* in seconds. If the interval of an election is 5 seconds, it will take 150 seconds for
@ -48,7 +49,7 @@ public class SnakeFlowerIdGenerator implements IdGenerator {
private static final long WORKER_ID_BITS = 5L;
private static final long DATA_CENTER_ID_BITS = 5L;
public static final long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS);
public static final long MAX_DATA_CENTER_ID = ~(-1L << DATA_CENTER_ID_BITS);
public static final long MAX_DATA_CENTER_ID = ~(-1L << DATA_CENTER_ID_BITS);
private static final long SEQUENCE_BITS = 12L;
private static final long SEQUENCE_BITS1 = SEQUENCE_BITS;
@ -56,47 +57,62 @@ public class SnakeFlowerIdGenerator implements IdGenerator {
private static final long TIMESTAMP_LEFT_SHIFT =
SEQUENCE_BITS + WORKER_ID_BITS + DATA_CENTER_ID_BITS;
private static final long SEQUENCE_MASK = ~(-1L << SEQUENCE_BITS);
private static final long MAX_OFFSET = 5L;
private static long workerId;
private static volatile long dataCenterId = 1L;
private static long workerId = -1L;
private static volatile long dataCenterId = -1L;
private volatile long currentId;
private long sequence = 0L;
private long lastTimestamp = -1L;
private final long MAX_OFFSET = 5L;
public static void setDataCenterId(long dataCenterId) {
SnakeFlowerIdGenerator.dataCenterId = dataCenterId;
}
public static void setDataCenterId(long dataCenterId) {
SnakeFlowerIdGenerator.dataCenterId = dataCenterId;
}
static {
InetAddress address;
try {
address = InetAddress.getLocalHost();
}
catch (final UnknownHostException e) {
throw new IllegalStateException(
"Cannot get LocalHost InetAddress, please check your network!");
}
byte[] ipAddressByteArray = address.getAddress();
workerId = (
((ipAddressByteArray[ipAddressByteArray.length - 2] & 0B11) << Byte.SIZE)
+ (ipAddressByteArray[ipAddressByteArray.length - 1] & 0xFF));
}
{
long dataCenterId = ApplicationUtils
.getProperty("nacos.core.snowflake.data-center", Integer.class, -1);
long workerId = ApplicationUtils
.getProperty("nacos.core.snowflake.worker-id", Integer.class, -1);
if (dataCenterId != -1) {
SnakeFlowerIdGenerator.dataCenterId = dataCenterId;
} else {
SnakeFlowerIdGenerator.dataCenterId = 1L;
}
if (workerId != -1) {
SnakeFlowerIdGenerator.workerId = workerId;
}
else {
InetAddress address;
try {
address = InetAddress.getLocalHost();
}
catch (final UnknownHostException e) {
throw new IllegalStateException(
"Cannot get LocalHost InetAddress, please check your network!");
}
byte[] ipAddressByteArray = address.getAddress();
SnakeFlowerIdGenerator.workerId = (((ipAddressByteArray[ipAddressByteArray.length - 2] & 0B11)
<< Byte.SIZE) + (ipAddressByteArray[ipAddressByteArray.length - 1]
& 0xFF));
}
}
@Override
public void init() {
initialize(workerId, dataCenterId);
}
public void init() {
initialize(workerId, dataCenterId);
}
@Override
public long currentId() {
public long currentId() {
return currentId;
}
@Override
public synchronized long nextId() {
public synchronized long nextId() {
long timestamp = timeGen();
if (timestamp < lastTimestamp) {
@ -109,14 +125,16 @@ public class SnakeFlowerIdGenerator implements IdGenerator {
if (offset <= MAX_OFFSET) {
try {
wait(offset << 1);
} catch (InterruptedException ignore) {
}
catch (InterruptedException ignore) {
Thread.interrupted();
}
timestamp = timeGen();
if (timestamp < lastTimestamp) {
throw exception;
}
} else {
}
else {
throw exception;
}
}
@ -138,7 +156,7 @@ public class SnakeFlowerIdGenerator implements IdGenerator {
}
@Override
public Map<Object, Object> info() {
public Map<Object, Object> info() {
Map<Object, Object> info = new HashMap<>(4);
info.put("currentId", currentId);
info.put("dataCenterId", dataCenterId);
@ -158,15 +176,15 @@ public class SnakeFlowerIdGenerator implements IdGenerator {
if (workerId > MAX_WORKER_ID || workerId < 0) {
throw new IllegalArgumentException(
String.format("worker Id can't be greater than %d or less than 0",
MAX_WORKER_ID));
MAX_WORKER_ID));
}
if (dataCenterId > MAX_DATA_CENTER_ID || dataCenterId < 0) {
throw new IllegalArgumentException(
String.format("dataCenter Id can't be greater than %d or less than 0",
MAX_DATA_CENTER_ID));
MAX_DATA_CENTER_ID));
}
SnakeFlowerIdGenerator.workerId = workerId;
SnakeFlowerIdGenerator.dataCenterId = dataCenterId;
SnakeFlowerIdGenerator.workerId = workerId;
SnakeFlowerIdGenerator.dataCenterId = dataCenterId;
}
/**

View File

@ -255,6 +255,8 @@ public class JRaftServer {
// Ensure that each Raft Group has its own configuration and NodeOptions
Configuration configuration = conf.copy();
NodeOptions copy = nodeOptions.copy();
// Here, the LogProcessor is passed into StateMachine, and when the StateMachine
// triggers onApply, the onApply of the LogProcessor is actually called
NacosStateMachine machine = new NacosStateMachine(this, processor);
copy.setLogUri(logUri);