feat: merge upstream develop

This commit is contained in:
chuntaojun 2020-09-04 01:12:51 +08:00
commit acbd5bfa36
59 changed files with 2720 additions and 727 deletions

View File

@ -56,7 +56,7 @@ public class DefaultPublisher extends Thread implements EventPublisher {
protected volatile Long lastEventSequence = -1L;
private final AtomicReferenceFieldUpdater<DefaultPublisher, Long> updater = AtomicReferenceFieldUpdater
private static final AtomicReferenceFieldUpdater<DefaultPublisher, Long> UPDATER = AtomicReferenceFieldUpdater
.newUpdater(DefaultPublisher.class, Long.class, "lastEventSequence");
@Override
@ -115,7 +115,7 @@ public class DefaultPublisher extends Thread implements EventPublisher {
}
final Event event = queue.take();
receiveEvent(event);
updater.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
}
} catch (Throwable ex) {
LOGGER.error("Event listener exception : {}", ex);

View File

@ -29,7 +29,6 @@ import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
/**
@ -45,8 +44,6 @@ public abstract class AbstractNacosTaskExecuteEngine<T extends NacosTask> implem
private final ConcurrentHashMap<Object, NacosTaskProcessor> taskProcessors = new ConcurrentHashMap<Object, NacosTaskProcessor>();
private final AtomicBoolean closed = new AtomicBoolean(false);
protected final ConcurrentHashMap<Object, T> tasks;
protected final ReentrantLock lock = new ReentrantLock();
@ -150,7 +147,6 @@ public abstract class AbstractNacosTaskExecuteEngine<T extends NacosTask> implem
@Override
public void shutdown() throws NacosException {
closed.compareAndSet(false, true);
processingExecutor.shutdown();
}
@ -167,12 +163,10 @@ public abstract class AbstractNacosTaskExecuteEngine<T extends NacosTask> implem
@Override
public void run() {
while (!closed.get()) {
try {
AbstractNacosTaskExecuteEngine.this.processTasks();
} catch (Throwable e) {
log.error(e.toString(), e);
}
try {
AbstractNacosTaskExecuteEngine.this.processTasks();
} catch (Throwable e) {
log.error(e.toString(), e);
}
}
}

View File

@ -32,6 +32,7 @@ import org.springframework.transaction.support.TransactionTemplate;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.stream.IntStream;
import static com.alibaba.nacos.config.server.utils.LogUtil.FATAL_LOG;
@ -196,6 +197,19 @@ public interface BaseDatabaseOperate extends DatabaseOperate {
*/
default Boolean update(TransactionTemplate transactionTemplate, JdbcTemplate jdbcTemplate,
List<ModifyRequest> contexts) {
return update(transactionTemplate, jdbcTemplate, contexts, null);
}
/**
* execute update operation, to fix #3617.
*
* @param transactionTemplate {@link TransactionTemplate}
* @param jdbcTemplate {@link JdbcTemplate}
* @param contexts {@link List} ModifyRequest list
* @return {@link Boolean}
*/
default Boolean update(TransactionTemplate transactionTemplate, JdbcTemplate jdbcTemplate,
List<ModifyRequest> contexts, BiConsumer<Boolean, Throwable> consumer) {
return transactionTemplate.execute(status -> {
String[] errSql = new String[] {null};
Object[][] args = new Object[][] {null};
@ -207,10 +221,16 @@ public interface BaseDatabaseOperate extends DatabaseOperate {
LoggerUtils.printIfDebugEnabled(LogUtil.DEFAULT_LOG, "current args : {}", args[0]);
jdbcTemplate.update(pair.getSql(), pair.getArgs());
});
if (consumer != null) {
consumer.accept(Boolean.TRUE, null);
}
return Boolean.TRUE;
} catch (BadSqlGrammarException | DataIntegrityViolationException e) {
FATAL_LOG.error("[db-error] sql : {}, args : {}, error : {}", errSql[0], args[0], e.toString());
return false;
if (consumer != null) {
consumer.accept(Boolean.FALSE, e);
}
return Boolean.FALSE;
} catch (CannotGetJdbcConnectionException e) {
FATAL_LOG.error("[db-error] sql : {}, args : {}, error : {}", errSql[0], args[0], e.toString());
throw e;

View File

@ -133,8 +133,20 @@ public interface DatabaseOperate {
* @return is success
*/
default Boolean blockUpdate() {
return blockUpdate(null);
}
/**
* data modify transaction The SqlContext to be executed in the current thread will be executed and automatically
* cleared.
* @author klw(213539@qq.com)
* 2020/8/24 18:16
* @param consumer the consumer
* @return java.lang.Boolean
*/
default Boolean blockUpdate(BiConsumer<Boolean, Throwable> consumer) {
try {
return update(EmbeddedStorageContextUtils.getCurrentSqlContext(), null);
return update(EmbeddedStorageContextUtils.getCurrentSqlContext(), consumer);
} finally {
EmbeddedStorageContextUtils.cleanAllContext();
}

View File

@ -17,6 +17,7 @@
package com.alibaba.nacos.config.server.service.repository.embedded;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException;
import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.config.server.configuration.ConditionOnEmbeddedStorage;
@ -70,6 +71,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiConsumer;
import static com.alibaba.nacos.config.server.service.repository.RowMapperManager.CONFIG_ADVANCE_INFO_ROW_MAPPER;
import static com.alibaba.nacos.config.server.service.repository.RowMapperManager.CONFIG_ALL_INFO_ROW_MAPPER;
@ -189,6 +191,12 @@ public class EmbeddedStoragePersistServiceImpl implements PersistService {
@Override
public void addConfigInfo(final String srcIp, final String srcUser, final ConfigInfo configInfo,
final Timestamp time, final Map<String, Object> configAdvanceInfo, final boolean notify) {
addConfigInfo(srcIp, srcUser, configInfo, time, configAdvanceInfo, notify, null);
}
private void addConfigInfo(final String srcIp, final String srcUser, final ConfigInfo configInfo,
final Timestamp time, final Map<String, Object> configAdvanceInfo, final boolean notify,
BiConsumer<Boolean, Throwable> consumer) {
try {
final String tenantTmp =
@ -205,7 +213,7 @@ public class EmbeddedStoragePersistServiceImpl implements PersistService {
configInfo.getTenant());
insertConfigHistoryAtomic(hisId, configInfo, srcIp, srcUser, time, "I");
EmbeddedStorageContextUtils.onModifyConfigInfo(configInfo, srcIp, time);
databaseOperate.blockUpdate();
databaseOperate.blockUpdate(consumer);
} finally {
EmbeddedStorageContextUtils.cleanAllContext();
}
@ -2291,6 +2299,12 @@ public class EmbeddedStoragePersistServiceImpl implements PersistService {
int skipCount = 0;
List<Map<String, String>> failData = null;
List<Map<String, String>> skipData = null;
final BiConsumer<Boolean, Throwable> callFinally = (result, t) -> {
if (t != null) {
throw new NacosRuntimeException(0, t);
}
};
for (int i = 0; i < configInfoList.size(); i++) {
ConfigAllInfo configInfo = configInfoList.get(i);
@ -2322,10 +2336,10 @@ public class EmbeddedStoragePersistServiceImpl implements PersistService {
}
configAdvanceInfo.put("type", type);
try {
addConfigInfo(srcIp, srcUser, configInfo2Save, time, configAdvanceInfo, notify);
addConfigInfo(srcIp, srcUser, configInfo2Save, time, configAdvanceInfo, notify, callFinally);
succCount++;
} catch (Throwable e) {
if (!StringUtils.contains("DuplicateKeyException", e.toString())) {
if (!StringUtils.contains(e.toString(), "DuplicateKeyException")) {
throw e;
}
// uniqueness constraint conflict

View File

@ -128,7 +128,7 @@ public class StandaloneDatabaseOperateImpl implements BaseDatabaseOperate {
@Override
public Boolean update(List<ModifyRequest> modifyRequests, BiConsumer<Boolean, Throwable> consumer) {
return update(modifyRequests);
return update(transactionTemplate, jdbcTemplate, modifyRequests, consumer);
}
@Override

View File

@ -14,14 +14,18 @@
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency;
package com.alibaba.nacos.consistency;
/**
* Apply action.
*
* @author nkorange
*/
public enum ApplyAction {
public enum DataOperation {
/**
* Data add.
*/
ADD,
/**
* Data changed.
*/
@ -29,5 +33,9 @@ public enum ApplyAction {
/**
* Data deleted.
*/
DELETE
DELETE,
/**
* Data verify.
*/
VERIFY;
}

View File

@ -33,8 +33,7 @@ public class SerializeFactory {
private static final Map<String, Serializer> SERIALIZER_MAP = new HashMap<String, Serializer>(4);
@SuppressWarnings("checkstyle:StaticVariableName")
public static String DEFAULT_SERIALIZER = HESSIAN_INDEX;
public static String defaultSerializer = HESSIAN_INDEX;
static {
Serializer serializer = new HessianSerializer();
@ -46,7 +45,7 @@ public class SerializeFactory {
}
public static Serializer getDefault() {
return SERIALIZER_MAP.get(DEFAULT_SERIALIZER);
return SERIALIZER_MAP.get(defaultSerializer);
}
}

View File

@ -33,10 +33,10 @@ server.port=8848
# spring.datasource.platform=mysql
### Count of DB:
# db.num=1
db.num=1
### Connect URL of DB:
# db.url.0=jdbc:mysql://127.0.0.1:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC
db.url.0=jdbc:mysql://127.0.0.1:3307/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC
# db.user=nacos
# db.password=nacos

View File

@ -0,0 +1,73 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.distributed.distro;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* Distro configuration.
*
* @author xiweng.yy
*/
@Component
public class DistroConfig {
@Value("${nacos.core.protocol.distro.data.sync_delay_ms:1000}")
private long syncDelayMillis = 1000;
@Value("${nacos.core.protocol.distro.data.sync_retry_delay_ms:3000}")
private long syncRetryDelayMillis = 3000;
@Value("${nacos.core.protocol.distro.data.verify_interval_ms:5000}")
private long verifyIntervalMillis = 5000;
@Value("${nacos.core.protocol.distro.data.load_retry_delay_ms:30000}")
private long loadDataRetryDelayMillis = 30000;
public long getSyncDelayMillis() {
return syncDelayMillis;
}
public void setSyncDelayMillis(long syncDelayMillis) {
this.syncDelayMillis = syncDelayMillis;
}
public long getSyncRetryDelayMillis() {
return syncRetryDelayMillis;
}
public void setSyncRetryDelayMillis(long syncRetryDelayMillis) {
this.syncRetryDelayMillis = syncRetryDelayMillis;
}
public long getVerifyIntervalMillis() {
return verifyIntervalMillis;
}
public void setVerifyIntervalMillis(long verifyIntervalMillis) {
this.verifyIntervalMillis = verifyIntervalMillis;
}
public long getLoadDataRetryDelayMillis() {
return loadDataRetryDelayMillis;
}
public void setLoadDataRetryDelayMillis(long loadDataRetryDelayMillis) {
this.loadDataRetryDelayMillis = loadDataRetryDelayMillis;
}
}

View File

@ -0,0 +1,196 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.distributed.distro;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.distributed.distro.component.DistroCallback;
import com.alibaba.nacos.core.distributed.distro.component.DistroComponentHolder;
import com.alibaba.nacos.core.distributed.distro.component.DistroDataProcessor;
import com.alibaba.nacos.core.distributed.distro.component.DistroDataStorage;
import com.alibaba.nacos.core.distributed.distro.component.DistroTransportAgent;
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import com.alibaba.nacos.core.distributed.distro.task.DistroTaskEngineHolder;
import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTask;
import com.alibaba.nacos.core.distributed.distro.task.load.DistroLoadDataTask;
import com.alibaba.nacos.core.distributed.distro.task.verify.DistroVerifyTask;
import com.alibaba.nacos.core.utils.GlobalExecutor;
import com.alibaba.nacos.core.utils.Loggers;
import org.springframework.stereotype.Component;
/**
* Distro protocol.
*
* @author xiweng.yy
*/
@Component
public class DistroProtocol {
private final ServerMemberManager memberManager;
private final DistroComponentHolder distroComponentHolder;
private final DistroTaskEngineHolder distroTaskEngineHolder;
private final DistroConfig distroConfig;
private volatile boolean loadCompleted = false;
public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,
DistroTaskEngineHolder distroTaskEngineHolder, DistroConfig distroConfig) {
this.memberManager = memberManager;
this.distroComponentHolder = distroComponentHolder;
this.distroTaskEngineHolder = distroTaskEngineHolder;
this.distroConfig = distroConfig;
startVerifyTask();
}
private void startVerifyTask() {
DistroCallback loadCallback = new DistroCallback() {
@Override
public void onSuccess() {
loadCompleted = true;
}
@Override
public void onFailed(Throwable throwable) {
loadCompleted = false;
}
};
GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTask(memberManager, distroComponentHolder),
distroConfig.getVerifyIntervalMillis());
GlobalExecutor.submitLoadDataTask(
new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));
}
public boolean isLoadCompleted() {
return loadCompleted;
}
/**
* Start to sync by configured delay.
*
* @param distroKey distro key of sync data
* @param action the action of data operation
*/
public void sync(DistroKey distroKey, DataOperation action) {
sync(distroKey, action, distroConfig.getSyncDelayMillis());
}
/**
* Start to sync data to all remote server.
*
* @param distroKey distro key of sync data
* @param action the action of data operation
*/
public void sync(DistroKey distroKey, DataOperation action, long delay) {
for (Member each : memberManager.allMembersWithoutSelf()) {
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
each.getAddress());
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
}
}
}
/**
* Query data from specified server.
*
* @param distroKey data key
* @return data
*/
public DistroData queryFromRemote(DistroKey distroKey) {
if (null == distroKey.getTargetServer()) {
Loggers.DISTRO.warn("[DISTRO] Can't query data from empty server");
return null;
}
String resourceType = distroKey.getResourceType();
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
if (null == transportAgent) {
Loggers.DISTRO.warn("[DISTRO] Can't find transport agent for key {}", resourceType);
return null;
}
return transportAgent.getData(distroKey, distroKey.getTargetServer());
}
/**
* Receive synced distro data, find processor to process.
*
* @param distroData Received data
* @return true if handle receive data successfully, otherwise false
*/
public boolean onReceive(DistroData distroData) {
String resourceType = distroData.getDistroKey().getResourceType();
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);
return false;
}
return dataProcessor.processData(distroData);
}
/**
* Receive verify data, find processor to process.
*
* @param distroData verify data
* @return true if verify data successfully, otherwise false
*/
public boolean onVerify(DistroData distroData) {
String resourceType = distroData.getDistroKey().getResourceType();
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO] Can't find verify data process for received data {}", resourceType);
return false;
}
return dataProcessor.processVerifyData(distroData);
}
/**
* Query data of input distro key.
*
* @param distroKey key of data
* @return data
*/
public DistroData onQuery(DistroKey distroKey) {
String resourceType = distroKey.getResourceType();
DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(resourceType);
if (null == distroDataStorage) {
Loggers.DISTRO.warn("[DISTRO] Can't find data storage for received key {}", resourceType);
return new DistroData(distroKey, new byte[0]);
}
return distroDataStorage.getDistroData(distroKey);
}
/**
* Query all datum snapshot.
*
* @param type datum type
* @return all datum snapshot
*/
public DistroData onSnapshot(String type) {
DistroDataStorage distroDataStorage = distroComponentHolder.findDataStorage(type);
if (null == distroDataStorage) {
Loggers.DISTRO.warn("[DISTRO] Can't find data storage for received key {}", type);
return new DistroData(new DistroKey("snapshot", type), new byte[0]);
}
return distroDataStorage.getDatumSnapshot();
}
}

View File

@ -0,0 +1,37 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.distributed.distro.component;
/**
* Distro callback.
*
* @author xiweng.yy
*/
public interface DistroCallback {
/**
* Callback when distro task execute successfully.
*/
void onSuccess();
/**
* Callback when distro task execute failed.
*
* @param throwable throwable if execute failed caused by exception
*/
void onFailed(Throwable throwable);
}

View File

@ -0,0 +1,76 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.distributed.distro.component;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* Distro component holder.
*
* @author xiweng.yy
*/
@Component
public class DistroComponentHolder {
private final Map<String, DistroTransportAgent> transportAgentMap = new HashMap<>();
private final Map<String, DistroDataStorage> dataStorageMap = new HashMap<>();
private final Map<String, DistroFailedTaskHandler> failedTaskHandlerMap = new HashMap<>();
private final Map<String, DistroDataProcessor> dataProcessorMap = new HashMap<>();
public DistroTransportAgent findTransportAgent(String type) {
return transportAgentMap.get(type);
}
public void registerTransportAgent(String type, DistroTransportAgent transportAgent) {
transportAgentMap.put(type, transportAgent);
}
public DistroDataStorage findDataStorage(String type) {
return dataStorageMap.get(type);
}
public void registerDataStorage(String type, DistroDataStorage dataStorage) {
dataStorageMap.put(type, dataStorage);
}
public Set<String> getDataStorageTypes() {
return dataStorageMap.keySet();
}
public DistroFailedTaskHandler findFailedTaskHandler(String type) {
return failedTaskHandlerMap.get(type);
}
public void registerFailedTaskHandler(String type, DistroFailedTaskHandler failedTaskHandler) {
failedTaskHandlerMap.put(type, failedTaskHandler);
}
public void registerDataProcessor(DistroDataProcessor dataProcessor) {
dataProcessorMap.putIfAbsent(dataProcessor.processType(), dataProcessor);
}
public DistroDataProcessor findDataProcessor(String processType) {
return dataProcessorMap.get(processType);
}
}

View File

@ -0,0 +1,58 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.distributed.distro.component;
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
/**
* Distro data processor.
*
* @author xiweng.yy
*/
public interface DistroDataProcessor {
/**
* Process type of this processor.
*
* @return type of this processor
*/
String processType();
/**
* Process received data.
*
* @param distroData received data
* @return true if process data successfully, otherwise false
*/
boolean processData(DistroData distroData);
/**
* Process received verify data.
*
* @param distroData verify data
* @return true if the data is available, otherwise false
*/
boolean processVerifyData(DistroData distroData);
/**
* Process snapshot data.
*
* @param distroData snapshot data
* @return true if process data successfully, otherwise false
*/
boolean processSnapshot(DistroData distroData);
}

View File

@ -0,0 +1,50 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.distributed.distro.component;
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
/**
* Distro data storage.
*
* @author xiweng.yy
*/
public interface DistroDataStorage {
/**
* Get distro datum.
*
* @param distroKey key of distro datum
* @return need to sync datum
*/
DistroData getDistroData(DistroKey distroKey);
/**
* Get all distro datum snapshot.
*
* @return all datum
*/
DistroData getDatumSnapshot();
/**
* Get verify datum.
*
* @return verify datum
*/
DistroData getVerifyData();
}

View File

@ -0,0 +1,36 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.distributed.distro.component;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
/**
* Distro failed task handler.
*
* @author xiweng.yy
*/
public interface DistroFailedTaskHandler {
/**
* Build retry task when distro task execute failed.
*
* @param distroKey distro key of failed task
* @param action action of task
*/
void retry(DistroKey distroKey, DataOperation action);
}

View File

@ -0,0 +1,81 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.distributed.distro.component;
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
/**
* Distro transport agent.
*
* @author xiweng.yy
*/
public interface DistroTransportAgent {
/**
* Sync data.
*
* @param data data
* @param targetServer target server
* @return true is sync successfully, otherwise false
*/
boolean syncData(DistroData data, String targetServer);
/**
* Sync data with callback.
*
* @param data data
* @param targetServer target server
* @param callback callback
*/
void syncData(DistroData data, String targetServer, DistroCallback callback);
/**
* Sync verify data.
*
* @param verifyData verify data
* @param targetServer target server
* @return true is verify successfully, otherwise false
*/
boolean syncVerifyData(DistroData verifyData, String targetServer);
/**
* Sync verify data.
*
* @param verifyData verify data
* @param targetServer target server
* @param callback callback
*/
void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback);
/**
* get Data from target server.
*
* @param key key of data
* @param targetServer target server
* @return distro data
*/
DistroData getData(DistroKey key, String targetServer);
/**
* Get all datum snapshot from target server.
*
* @param targetServer target server.
* @return distro data
*/
DistroData getDatumSnapshot(String targetServer);
}

View File

@ -0,0 +1,65 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.distributed.distro.entity;
import com.alibaba.nacos.consistency.DataOperation;
/**
* Distro data.
*
* @author xiweng.yy
*/
public class DistroData {
private DistroKey distroKey;
private DataOperation type;
private byte[] content;
public DistroData() {
}
public DistroData(DistroKey distroKey, byte[] content) {
this.distroKey = distroKey;
this.content = content;
}
public DistroKey getDistroKey() {
return distroKey;
}
public void setDistroKey(DistroKey distroKey) {
this.distroKey = distroKey;
}
public DataOperation getType() {
return type;
}
public void setType(DataOperation type) {
this.type = type;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
}

View File

@ -0,0 +1,94 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.distributed.distro.entity;
import java.util.Objects;
/**
* Distro key.
*
* @author xiweng.yy
*/
public class DistroKey {
private String resourceKey;
private String resourceType;
private String targetServer;
public DistroKey() {
}
public DistroKey(String resourceKey, String resourceType) {
this.resourceKey = resourceKey;
this.resourceType = resourceType;
}
public DistroKey(String resourceKey, String resourceType, String targetServer) {
this.resourceKey = resourceKey;
this.resourceType = resourceType;
this.targetServer = targetServer;
}
public String getResourceKey() {
return resourceKey;
}
public void setResourceKey(String resourceKey) {
this.resourceKey = resourceKey;
}
public String getResourceType() {
return resourceType;
}
public void setResourceType(String resourceType) {
this.resourceType = resourceType;
}
public String getTargetServer() {
return targetServer;
}
public void setTargetServer(String targetServer) {
this.targetServer = targetServer;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DistroKey distroKey = (DistroKey) o;
return Objects.equals(resourceKey, distroKey.resourceKey) && Objects
.equals(resourceType, distroKey.resourceType) && Objects.equals(targetServer, distroKey.targetServer);
}
@Override
public int hashCode() {
return Objects.hash(resourceKey, resourceType, targetServer);
}
@Override
public String toString() {
return "DistroKey{" + "resourceKey='" + resourceKey + '\'' + ", resourceType='" + resourceType + '\'' + '}';
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.distributed.distro.exception;
/**
* Distro exception.
*
* @author xiweng.yy
*/
public class DistroException extends RuntimeException {
private static final long serialVersionUID = 1711141952413139786L;
public DistroException(String message, Throwable cause) {
super(message, cause);
}
@Override
public String getMessage() {
return "[DISTRO-EXCEPTION]" + super.getMessage();
}
}

View File

@ -0,0 +1,54 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.distributed.distro.task;
import com.alibaba.nacos.common.task.NacosTaskProcessor;
import com.alibaba.nacos.core.distributed.distro.component.DistroComponentHolder;
import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskExecuteEngine;
import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskProcessor;
import com.alibaba.nacos.core.distributed.distro.task.execute.DistroExecuteWorkersManager;
import org.springframework.stereotype.Component;
/**
* Distro task engine holder.
*
* @author xiweng.yy
*/
@Component
public class DistroTaskEngineHolder {
private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine();
private final DistroExecuteWorkersManager executeWorkersManager = new DistroExecuteWorkersManager();
public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {
DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);
delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);
}
public DistroDelayTaskExecuteEngine getDelayTaskExecuteEngine() {
return delayTaskExecuteEngine;
}
public DistroExecuteWorkersManager getExecuteWorkersManager() {
return executeWorkersManager;
}
public void registerNacosTaskProcessor(Object key, NacosTaskProcessor nacosTaskProcessor) {
this.delayTaskExecuteEngine.addProcessor(key, nacosTaskProcessor);
}
}

View File

@ -0,0 +1,72 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.distributed.distro.task.delay;
import com.alibaba.nacos.common.task.AbstractDelayTask;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
/**
* Distro delay task.
*
* @author xiweng.yy
*/
public class DistroDelayTask extends AbstractDelayTask {
private final DistroKey distroKey;
private DataOperation action;
private long createTime;
public DistroDelayTask(DistroKey distroKey, long delayTime) {
this(distroKey, DataOperation.CHANGE, delayTime);
}
public DistroDelayTask(DistroKey distroKey, DataOperation action, long delayTime) {
this.distroKey = distroKey;
this.action = action;
this.createTime = System.currentTimeMillis();
setLastProcessTime(createTime);
setTaskInterval(delayTime);
}
public DistroKey getDistroKey() {
return distroKey;
}
public DataOperation getAction() {
return action;
}
public long getCreateTime() {
return createTime;
}
@Override
public void merge(AbstractDelayTask task) {
if (!(task instanceof DistroDelayTask)) {
return;
}
DistroDelayTask newTask = (DistroDelayTask) task;
if (!action.equals(newTask.getAction()) && createTime < newTask.getCreateTime()) {
action = newTask.getAction();
createTime = newTask.getCreateTime();
}
setLastProcessTime(newTask.getLastProcessTime());
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.distributed.distro.task.delay;
import com.alibaba.nacos.common.task.NacosTaskProcessor;
import com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngine;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import com.alibaba.nacos.core.utils.Loggers;
/**
* Distro delay task execute engine.
*
* @author xiweng.yy
*/
public class DistroDelayTaskExecuteEngine extends NacosDelayTaskExecuteEngine {
public DistroDelayTaskExecuteEngine() {
super(DistroDelayTaskExecuteEngine.class.getName(), Loggers.DISTRO);
}
@Override
public void addProcessor(Object key, NacosTaskProcessor taskProcessor) {
Object actualKey = getActualKey(key);
super.addProcessor(actualKey, taskProcessor);
}
@Override
public NacosTaskProcessor getProcessor(Object key) {
Object actualKey = getActualKey(key);
return super.getProcessor(actualKey);
}
private Object getActualKey(Object key) {
return key instanceof DistroKey ? ((DistroKey) key).getResourceType() : key;
}
}

View File

@ -0,0 +1,58 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.distributed.distro.task.delay;
import com.alibaba.nacos.common.task.AbstractDelayTask;
import com.alibaba.nacos.common.task.NacosTaskProcessor;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.core.distributed.distro.component.DistroComponentHolder;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import com.alibaba.nacos.core.distributed.distro.task.DistroTaskEngineHolder;
import com.alibaba.nacos.core.distributed.distro.task.execute.DistroSyncChangeTask;
/**
* Distro delay task processor.
*
* @author xiweng.yy
*/
public class DistroDelayTaskProcessor implements NacosTaskProcessor {
private final DistroTaskEngineHolder distroTaskEngineHolder;
private final DistroComponentHolder distroComponentHolder;
public DistroDelayTaskProcessor(DistroTaskEngineHolder distroTaskEngineHolder,
DistroComponentHolder distroComponentHolder) {
this.distroTaskEngineHolder = distroTaskEngineHolder;
this.distroComponentHolder = distroComponentHolder;
}
@Override
public boolean process(AbstractDelayTask task) {
if (!(task instanceof DistroDelayTask)) {
return true;
}
DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey();
if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {
DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);
distroTaskEngineHolder.getExecuteWorkersManager().dispatch(distroKey, syncChangeTask);
return true;
}
return false;
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.distributed.distro.task.execute;
import com.alibaba.nacos.common.task.AbstractExecuteTask;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
/**
* Abstract distro execute task.
*
* @author xiweng.yy
*/
public abstract class AbstractDistroExecuteTask extends AbstractExecuteTask implements Runnable {
private final DistroKey distroKey;
protected AbstractDistroExecuteTask(DistroKey distroKey) {
this.distroKey = distroKey;
}
protected DistroKey getDistroKey() {
return distroKey;
}
}

View File

@ -0,0 +1,127 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.distributed.distro.task.execute;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.lifecycle.Closeable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Distro execute worker.
*
* @author xiweng.yy
*/
public final class DistroExecuteWorker implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(DistroExecuteWorker.class);
private static final int QUEUE_CAPACITY = 50000;
private final BlockingQueue<Runnable> queue;
private final String name;
private final AtomicBoolean closed;
public DistroExecuteWorker(final int mod, final int total) {
name = getClass().getName() + "_" + mod + "%" + total;
queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
closed = new AtomicBoolean(false);
new InnerWorker(name).start();
}
public String getName() {
return name;
}
/**
* Execute task without result.
*/
public void execute(Runnable task) {
putTask(task);
}
/**
* Execute task with a result.
*/
public <V> Future<V> execute(Callable<V> task) {
FutureTask<V> future = new FutureTask(task);
putTask(future);
return future;
}
private void putTask(Runnable task) {
try {
queue.put(task);
} catch (InterruptedException ire) {
LOGGER.error(ire.toString(), ire);
}
}
public int pendingTaskCount() {
return queue.size();
}
/**
* Worker status.
*/
public String status() {
return name + ", pending tasks: " + pendingTaskCount();
}
@Override
public void shutdown() throws NacosException {
queue.clear();
closed.compareAndSet(false, true);
}
/**
* Inner execute worker.
*/
private class InnerWorker extends Thread {
InnerWorker(String name) {
setDaemon(false);
setName(name);
}
@Override
public void run() {
while (!closed.get()) {
try {
Runnable task = queue.take();
long begin = System.currentTimeMillis();
task.run();
long duration = System.currentTimeMillis() - begin;
if (duration > 1000L) {
LOGGER.warn("distro task {} takes {}ms", task, duration);
}
} catch (Throwable e) {
LOGGER.error("[DISTRO-FAILED] " + e.toString(), e);
}
}
}
}
}

View File

@ -0,0 +1,75 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.distributed.distro.task.execute;
/**
* Distro execute workers manager.
*
* @author xiweng.yy
*/
public final class DistroExecuteWorkersManager {
private final DistroExecuteWorker[] connectionWorkers;
public DistroExecuteWorkersManager() {
int workerCount = findWorkerCount();
connectionWorkers = new DistroExecuteWorker[workerCount];
for (int mod = 0; mod < workerCount; ++mod) {
connectionWorkers[mod] = new DistroExecuteWorker(mod, workerCount);
}
}
private int findWorkerCount() {
final int coreCount = Runtime.getRuntime().availableProcessors();
int result = 1;
while (result < coreCount) {
result <<= 1;
}
return result;
}
/**
* Dispatch task to worker by tag.
*/
public void dispatch(Object tag, Runnable task) {
DistroExecuteWorker worker = getWorker(tag);
worker.execute(task);
}
private DistroExecuteWorker getWorker(Object tag) {
int idx = (tag.hashCode() & Integer.MAX_VALUE) % workersCount();
return connectionWorkers[idx];
}
/**
* Get workers status.
*
* @return workers status string
*/
public String workersStatus() {
StringBuilder sb = new StringBuilder();
for (DistroExecuteWorker worker : connectionWorkers) {
sb.append(worker.status()).append("\n");
}
return sb.toString();
}
public int workersCount() {
return connectionWorkers.length;
}
}

View File

@ -0,0 +1,72 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.distributed.distro.task.execute;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.core.distributed.distro.component.DistroComponentHolder;
import com.alibaba.nacos.core.distributed.distro.component.DistroFailedTaskHandler;
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import com.alibaba.nacos.core.utils.Loggers;
/**
* Distro sync change task.
*
* @author xiweng.yy
*/
public class DistroSyncChangeTask extends AbstractDistroExecuteTask {
private final DistroComponentHolder distroComponentHolder;
public DistroSyncChangeTask(DistroKey distroKey, DistroComponentHolder distroComponentHolder) {
super(distroKey);
this.distroComponentHolder = distroComponentHolder;
}
@Override
public void run() {
Loggers.DISTRO.info("[DISTRO-START] {}", toString());
try {
String type = getDistroKey().getResourceType();
DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());
distroData.setType(DataOperation.CHANGE);
boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
if (!result) {
handleFailedTask();
}
Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);
} catch (Exception e) {
Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);
handleFailedTask();
}
}
private void handleFailedTask() {
String type = getDistroKey().getResourceType();
DistroFailedTaskHandler failedTaskHandler = distroComponentHolder.findFailedTaskHandler(type);
if (null == failedTaskHandler) {
Loggers.DISTRO.warn("[DISTRO] Can't find failed task for type {}, so discarded", type);
return;
}
failedTaskHandler.retry(getDistroKey(), DataOperation.CHANGE);
}
@Override
public String toString() {
return "DistroSyncChangeTask for " + getDistroKey().toString();
}
}

View File

@ -0,0 +1,129 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.distributed.distro.task.load;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.distributed.distro.DistroConfig;
import com.alibaba.nacos.core.distributed.distro.component.DistroCallback;
import com.alibaba.nacos.core.distributed.distro.component.DistroComponentHolder;
import com.alibaba.nacos.core.distributed.distro.component.DistroDataProcessor;
import com.alibaba.nacos.core.distributed.distro.component.DistroTransportAgent;
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
import com.alibaba.nacos.core.utils.GlobalExecutor;
import com.alibaba.nacos.core.utils.Loggers;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Distro load data task.
*
* @author xiweng.yy
*/
public class DistroLoadDataTask implements Runnable {
private final ServerMemberManager memberManager;
private final DistroComponentHolder distroComponentHolder;
private final DistroConfig distroConfig;
private final DistroCallback loadCallback;
private final Map<String, Boolean> loadCompletedMap;
public DistroLoadDataTask(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,
DistroConfig distroConfig, DistroCallback loadCallback) {
this.memberManager = memberManager;
this.distroComponentHolder = distroComponentHolder;
this.distroConfig = distroConfig;
this.loadCallback = loadCallback;
loadCompletedMap = new HashMap<>(1);
}
@Override
public void run() {
try {
load();
if (!checkCompleted()) {
GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
} else {
loadCallback.onSuccess();
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
}
} catch (Exception e) {
loadCallback.onFailed(e);
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
}
}
private void load() throws Exception {
while (memberManager.allMembersWithoutSelf().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
TimeUnit.SECONDS.sleep(1);
}
while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
TimeUnit.SECONDS.sleep(1);
}
for (String each : distroComponentHolder.getDataStorageTypes()) {
if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
}
}
}
private boolean loadAllDataSnapshotFromRemote(String resourceType) {
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == transportAgent || null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}",
resourceType, transportAgent, dataProcessor);
return false;
}
for (Member each : memberManager.allMembersWithoutSelf()) {
try {
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());
DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
boolean result = dataProcessor.processSnapshot(distroData);
Loggers.DISTRO
.info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(),
result);
if (result) {
return true;
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress());
}
}
return false;
}
private boolean checkCompleted() {
if (distroComponentHolder.getDataStorageTypes().size() != loadCompletedMap.size()) {
return false;
}
for (Boolean each : loadCompletedMap.values()) {
if (!each) {
return false;
}
}
return true;
}
}

View File

@ -0,0 +1,74 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.distributed.distro.task.verify;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.core.distributed.distro.component.DistroComponentHolder;
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
import com.alibaba.nacos.core.utils.Loggers;
import java.util.List;
/**
* Distro verify task.
*
* @author xiweng.yy
*/
public class DistroVerifyTask implements Runnable {
private final ServerMemberManager serverMemberManager;
private final DistroComponentHolder distroComponentHolder;
public DistroVerifyTask(ServerMemberManager serverMemberManager, DistroComponentHolder distroComponentHolder) {
this.serverMemberManager = serverMemberManager;
this.distroComponentHolder = distroComponentHolder;
}
@Override
public void run() {
try {
List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("server list is: {}", targetServer);
}
for (String each : distroComponentHolder.getDataStorageTypes()) {
verifyForDataStorage(each, targetServer);
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);
}
}
private void verifyForDataStorage(String type, List<Member> targetServer) {
DistroData distroData = distroComponentHolder.findDataStorage(type).getVerifyData();
if (null == distroData) {
return;
}
distroData.setType(DataOperation.VERIFY);
for (Member member : targetServer) {
try {
distroComponentHolder.findTransportAgent(type).syncVerifyData(distroData, member.getAddress());
} catch (Exception e) {
Loggers.DISTRO.error(String
.format("[DISTRO-FAILED] verify data for type %s to %s failed.", type, member.getAddress()), e);
}
}
}
}

View File

@ -18,6 +18,7 @@ package com.alibaba.nacos.core.utils;
import com.alibaba.nacos.common.executor.ExecutorFactory;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -29,11 +30,14 @@ import java.util.concurrent.TimeUnit;
@SuppressWarnings("all")
public class GlobalExecutor {
private static final ScheduledExecutorService COMMON_EXECUTOR = ExecutorFactory.Managed.newScheduledExecutorService(
ClassUtils.getCanonicalName(GlobalExecutor.class),
4,
new NameThreadFactory("com.alibaba.nacos.core.common")
);
private static final ScheduledExecutorService COMMON_EXECUTOR = ExecutorFactory.Managed
.newScheduledExecutorService(ClassUtils.getCanonicalName(GlobalExecutor.class), 4,
new NameThreadFactory("com.alibaba.nacos.core.common"));
private static final ScheduledExecutorService DISTRO_EXECUTOR = ExecutorFactory.Managed
.newScheduledExecutorService(ClassUtils.getCanonicalName(GlobalExecutor.class),
Runtime.getRuntime().availableProcessors() * 2,
new NameThreadFactory("com.alibaba.nacos.core.protocal.distro"));
public static void runWithoutThread(Runnable runnable) {
runnable.run();
@ -53,4 +57,16 @@ public class GlobalExecutor {
COMMON_EXECUTOR.schedule(runnable, delayMs, TimeUnit.MILLISECONDS);
}
}
public static void submitLoadDataTask(Runnable runnable) {
DISTRO_EXECUTOR.submit(runnable);
}
public static void submitLoadDataTask(Runnable runnable, long delay) {
DISTRO_EXECUTOR.schedule(runnable, delay, TimeUnit.MILLISECONDS);
}
public static void schedulePartitionDataTimedSync(Runnable runnable, long interval) {
DISTRO_EXECUTOR.scheduleWithFixedDelay(runnable, interval, interval, TimeUnit.MILLISECONDS);
}
}

View File

@ -33,6 +33,8 @@ public class Loggers {
public static final Logger RAFT = LoggerFactory.getLogger("com.alibaba.nacos.core.protocol.raft");
public static final Logger DISTRO = LoggerFactory.getLogger("com.alibaba.nacos.core.protocol.distro");
public static final Logger CLUSTER = LoggerFactory.getLogger("com.alibaba.nacos.core.cluster");
}

View File

@ -95,7 +95,7 @@ public class NacosMcpService extends ResourceSourceGrpc.ResourceSourceImplBase {
for (String namespace : namespaces) {
Map<String, Service> services = serviceManager.getServiceMap(namespace);
Map<String, Service> services = serviceManager.chooseServiceMap(namespace);
if (services.isEmpty()) {
continue;

View File

@ -17,6 +17,7 @@
package com.alibaba.nacos.naming.consistency;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.naming.pojo.Record;
/**
@ -30,9 +31,9 @@ public class ValueChangeEvent extends Event {
private final Record value;
private final ApplyAction action;
private final DataOperation action;
public ValueChangeEvent(String key, Record value, ApplyAction action) {
public ValueChangeEvent(String key, Record value, DataOperation action) {
this.key = key;
this.value = value;
this.action = action;
@ -46,7 +47,7 @@ public class ValueChangeEvent extends Event {
return value;
}
public ApplyAction getAction() {
public DataOperation getAction() {
return action;
}
@ -60,7 +61,7 @@ public class ValueChangeEvent extends Event {
private Record value;
private ApplyAction action;
private DataOperation action;
private ValueChangeEventBuilder() {
}
@ -75,7 +76,7 @@ public class ValueChangeEvent extends Event {
return this;
}
public ValueChangeEventBuilder action(ApplyAction action) {
public ValueChangeEventBuilder action(DataOperation action) {
this.action = action;
return this;
}

View File

@ -1,225 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.ephemeral.distro;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.naming.cluster.transport.Serializer;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.misc.GlobalConfig;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NamingProxy;
import com.alibaba.nacos.naming.misc.NetUtils;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import org.apache.commons.lang3.StringUtils;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Data replicator.
*
* @author nkorange
* @since 1.0.0
*/
@Component
@DependsOn("ProtocolManager")
public class DataSyncer {
private final DataStore dataStore;
private final GlobalConfig partitionConfig;
private final Serializer serializer;
private final DistroMapper distroMapper;
private final ServerMemberManager memberManager;
private Map<String, String> taskMap = new ConcurrentHashMap<>(16);
public DataSyncer(DataStore dataStore, GlobalConfig partitionConfig, Serializer serializer,
DistroMapper distroMapper, ServerMemberManager memberManager) {
this.dataStore = dataStore;
this.partitionConfig = partitionConfig;
this.serializer = serializer;
this.distroMapper = distroMapper;
this.memberManager = memberManager;
}
@PostConstruct
public void init() {
startTimedSync();
}
/**
* Submit a {@link SyncTask} with a delay to execute.
*
* @param task synchronize data task
* @param delay delay to execute
*/
public void submit(SyncTask task, long delay) {
// If it's a new task:
if (task.getRetryCount() == 0) {
Iterator<String> iterator = task.getKeys().iterator();
while (iterator.hasNext()) {
String key = iterator.next();
if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) {
// associated key already exist:
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync already in process, key: {}", key);
}
iterator.remove();
}
}
}
if (task.getKeys().isEmpty()) {
// all keys are removed:
return;
}
GlobalExecutor.submitDataSync(() -> {
// 1. check the server
if (getServers() == null || getServers().isEmpty()) {
Loggers.SRV_LOG.warn("try to sync data but server list is empty.");
return;
}
List<String> keys = task.getKeys();
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("try to sync data for this keys {}.", keys);
}
// 2. get the datums by keys and check the datum is empty or not
Map<String, Datum> datumMap = dataStore.batchGet(keys);
if (datumMap == null || datumMap.isEmpty()) {
// clear all flags of this task:
for (String key : keys) {
taskMap.remove(buildKey(key, task.getTargetServer()));
}
return;
}
byte[] data = serializer.serialize(datumMap);
long timestamp = System.currentTimeMillis();
boolean success = NamingProxy.syncData(data, task.getTargetServer());
if (!success) {
SyncTask syncTask = new SyncTask();
syncTask.setKeys(task.getKeys());
syncTask.setRetryCount(task.getRetryCount() + 1);
syncTask.setLastExecuteTime(timestamp);
syncTask.setTargetServer(task.getTargetServer());
retrySync(syncTask);
} else {
// clear all flags of this task:
for (String key : task.getKeys()) {
taskMap.remove(buildKey(key, task.getTargetServer()));
}
}
}, delay);
}
private void retrySync(SyncTask syncTask) {
Member member = new Member();
member.setIp(syncTask.getTargetServer().split(":")[0]);
member.setPort(Integer.parseInt(syncTask.getTargetServer().split(":")[1]));
if (!getServers().contains(member)) {
// if server is no longer in healthy server list, ignore this task:
//fix #1665 remove existing tasks
if (syncTask.getKeys() != null) {
for (String key : syncTask.getKeys()) {
taskMap.remove(buildKey(key, syncTask.getTargetServer()));
}
}
return;
}
// TODO may choose other retry policy.
submit(syncTask, partitionConfig.getSyncRetryDelay());
}
public void startTimedSync() {
GlobalExecutor.schedulePartitionDataTimedSync(new TimedSync());
}
public class TimedSync implements Runnable {
@Override
public void run() {
try {
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("server list is: {}", getServers());
}
// send local timestamps to other servers:
Map<String, String> keyChecksums = new HashMap<>(64);
for (String key : dataStore.keys()) {
if (!distroMapper.responsible(KeyBuilder.getServiceName(key))) {
continue;
}
Datum datum = dataStore.get(key);
if (datum == null) {
continue;
}
keyChecksums.put(key, datum.value.getChecksum());
}
if (keyChecksums.isEmpty()) {
return;
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync checksums: {}", keyChecksums);
}
for (Member member : getServers()) {
if (NetUtils.localServer().equals(member.getAddress())) {
continue;
}
NamingProxy.syncCheckSums(keyChecksums, member.getAddress());
}
} catch (Exception e) {
Loggers.DISTRO.error("timed sync task failed.", e);
}
}
}
public Collection<Member> getServers() {
return memberManager.allMembers();
}
public String buildKey(String key, String targetServer) {
return key + UtilsAndCommons.CACHE_KEY_SPLITER + targetServer;
}
}

View File

@ -19,23 +19,31 @@ package com.alibaba.nacos.naming.consistency.ephemeral.distro;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.utils.Objects;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.naming.cluster.ServerStatus;
import com.alibaba.nacos.naming.cluster.transport.Serializer;
import com.alibaba.nacos.naming.consistency.ApplyAction;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.ephemeral.EphemeralConsistencyService;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.combined.DistroHttpCombinedKey;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.combined.DistroHttpCombinedKeyTaskFailedHandler;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.combined.DistroHttpDelayTaskProcessor;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.component.DistroDataStorageImpl;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.component.DistroHttpAgent;
import com.alibaba.nacos.core.distributed.distro.DistroProtocol;
import com.alibaba.nacos.core.distributed.distro.component.DistroComponentHolder;
import com.alibaba.nacos.core.distributed.distro.component.DistroDataProcessor;
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import com.alibaba.nacos.core.distributed.distro.task.DistroTaskEngineHolder;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.misc.GlobalConfig;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NamingProxy;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.pojo.Record;
import org.apache.commons.lang3.StringUtils;
@ -66,98 +74,60 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*/
@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
private final DistroMapper distroMapper;
private final DataStore dataStore;
private final TaskDispatcher taskDispatcher;
private final Serializer serializer;
private final ServerMemberManager memberManager;
private final SwitchDomain switchDomain;
private final GlobalConfig globalConfig;
private boolean initialized = false;
private final DistroProtocol distroProtocol;
private volatile Notifier notifier = new Notifier();
private LoadDataTask loadDataTask = new LoadDataTask();
private Map<String, ConcurrentLinkedQueue<RecordListener>> listeners = new ConcurrentHashMap<>();
private Map<String, String> syncChecksumTasks = new ConcurrentHashMap<>(16);
public DistroConsistencyServiceImpl(DistroMapper distroMapper, DataStore dataStore, TaskDispatcher taskDispatcher,
Serializer serializer, ServerMemberManager memberManager, SwitchDomain switchDomain,
GlobalConfig globalConfig) {
public DistroConsistencyServiceImpl(DistroMapper distroMapper, DataStore dataStore, Serializer serializer,
SwitchDomain switchDomain, GlobalConfig globalConfig, DistroProtocol distroProtocol) {
this.distroMapper = distroMapper;
this.dataStore = dataStore;
this.taskDispatcher = taskDispatcher;
this.serializer = serializer;
this.memberManager = memberManager;
this.switchDomain = switchDomain;
this.globalConfig = globalConfig;
this.distroProtocol = distroProtocol;
registerDistroComponent();
}
private void registerDistroComponent() {
DistroComponentHolder componentHolder = ApplicationUtils.getBean(DistroComponentHolder.class);
DistroTaskEngineHolder taskEngineHolder = ApplicationUtils.getBean(DistroTaskEngineHolder.class);
componentHolder.registerDataStorage(KeyBuilder.INSTANCE_LIST_KEY_PREFIX,
new DistroDataStorageImpl(dataStore, distroMapper));
componentHolder.registerTransportAgent(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, new DistroHttpAgent());
componentHolder.registerFailedTaskHandler(KeyBuilder.INSTANCE_LIST_KEY_PREFIX,
new DistroHttpCombinedKeyTaskFailedHandler(globalConfig, taskEngineHolder));
taskEngineHolder.registerNacosTaskProcessor(KeyBuilder.INSTANCE_LIST_KEY_PREFIX,
new DistroHttpDelayTaskProcessor(globalConfig, taskEngineHolder));
componentHolder.registerDataProcessor(this);
}
@PostConstruct
public void init() {
GlobalExecutor.submitLoadDataTask(loadDataTask);
GlobalExecutor.submitDistroNotifyTask(notifier);
}
private class LoadDataTask implements Runnable {
@Override
public void run() {
try {
load();
if (!initialized) {
GlobalExecutor.submitLoadDataTask(this, globalConfig.getLoadDataRetryDelayMillis());
} else {
Loggers.DISTRO.info("load data success");
}
} catch (Exception e) {
Loggers.DISTRO.error("load data failed.", e);
}
}
}
private void load() throws Exception {
if (ApplicationUtils.getStandaloneMode()) {
initialized = true;
return;
}
// size = 1 means only myself in the list, we need at least one another server alive:
while (memberManager.getServerList().size() <= 1) {
Thread.sleep(1000L);
Loggers.DISTRO.info("waiting server list init...");
}
for (Map.Entry<String, Member> entry : memberManager.getServerList().entrySet()) {
final String address = entry.getValue().getAddress();
if (ApplicationUtils.getLocalAddress().equals(address)) {
continue;
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync from " + address);
}
// try sync data from remote server:
if (syncAllDataFromRemote(address)) {
initialized = true;
return;
}
}
}
@Override
public void put(String key, Record value) throws NacosException {
onPut(key, value);
taskDispatcher.addTask(key);
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
@Override
@ -191,7 +161,7 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
return;
}
notifier.addTask(key, ApplyAction.CHANGE);
notifier.addTask(key, DataOperation.CHANGE);
}
/**
@ -207,7 +177,7 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
return;
}
notifier.addTask(key, ApplyAction.DELETE);
notifier.addTask(key, DataOperation.DELETE);
}
/**
@ -267,8 +237,13 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
}
try {
byte[] result = NamingProxy.getData(toUpdateKeys, server);
processData(result);
DistroHttpCombinedKey distroKey = new DistroHttpCombinedKey(KeyBuilder.INSTANCE_LIST_KEY_PREFIX,
server);
distroKey.getActualResourceTypes().addAll(toUpdateKeys);
DistroData remoteData = distroProtocol.queryFromRemote(distroKey);
if (null != remoteData) {
processData(remoteData.getContent());
}
} catch (Exception e) {
Loggers.DISTRO.error("get data from " + server + " failed!", e);
}
@ -278,17 +253,6 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
}
}
private boolean syncAllDataFromRemote(String server) {
try {
byte[] data = NamingProxy.getAllData(server);
return processData(data);
} catch (Exception e) {
Loggers.DISTRO.error("sync full data from " + server + " failed!", e);
return false;
}
}
private boolean processData(byte[] data) throws Exception {
if (data.length > 0) {
Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class);
@ -345,6 +309,37 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
return true;
}
@Override
public boolean processData(DistroData distroData) {
DistroHttpData distroHttpData = (DistroHttpData) distroData;
Datum<Instances> datum = (Datum<Instances>) distroHttpData.getDeserializedContent();
onPut(datum.key, datum.value);
return true;
}
@Override
public String processType() {
return KeyBuilder.INSTANCE_LIST_KEY_PREFIX;
}
@Override
public boolean processVerifyData(DistroData distroData) {
DistroHttpData distroHttpData = (DistroHttpData) distroData;
String sourceServer = distroData.getDistroKey().getResourceKey();
Map<String, String> verifyData = (Map<String, String>) distroHttpData.getDeserializedContent();
onReceiveChecksums(verifyData, sourceServer);
return true;
}
@Override
public boolean processSnapshot(DistroData distroData) {
try {
return processData(distroData.getContent());
} catch (Exception e) {
return false;
}
}
@Override
public void listen(String key, RecordListener listener) throws NacosException {
if (!listeners.containsKey(key)) {
@ -377,14 +372,14 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
}
public boolean isInitialized() {
return initialized || !globalConfig.isDataWarmup();
return distroProtocol.isLoadCompleted() || !globalConfig.isDataWarmup();
}
public class Notifier implements Runnable {
private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
private BlockingQueue<Pair<String, ApplyAction>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
/**
* Add new notify task to queue.
@ -392,12 +387,12 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
* @param datumKey data key
* @param action action for data
*/
public void addTask(String datumKey, ApplyAction action) {
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == ApplyAction.CHANGE) {
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
tasks.offer(Pair.with(datumKey, action));
@ -413,7 +408,7 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
for (; ; ) {
try {
Pair<String, ApplyAction> pair = tasks.take();
Pair<String, DataOperation> pair = tasks.take();
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
@ -421,10 +416,10 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
}
}
private void handle(Pair<String, ApplyAction> pair) {
private void handle(Pair<String, DataOperation> pair) {
try {
String datumKey = pair.getValue0();
ApplyAction action = pair.getValue1();
DataOperation action = pair.getValue1();
services.remove(datumKey);
@ -439,12 +434,12 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
count++;
try {
if (action == ApplyAction.CHANGE) {
if (action == DataOperation.CHANGE) {
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == ApplyAction.DELETE) {
if (action == DataOperation.DELETE) {
listener.onDelete(datumKey);
continue;
}
@ -463,4 +458,4 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService
}
}
}
}
}

View File

@ -0,0 +1,48 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.ephemeral.distro;
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
/**
* Distro http received data.
*
* <p>
* Apply for old distro http api. The data content has been deserialize by spring mvc, so there is no need to
* deserialize again.
* </p>
*
* @author xiweng.yy
*/
public class DistroHttpData extends DistroData {
private Object deserializedContent;
public DistroHttpData(DistroKey distroKey, byte[] content, Object deserializedContent) {
super(distroKey, content);
this.deserializedContent = deserializedContent;
}
public Object getDeserializedContent() {
return deserializedContent;
}
public void setDeserializedContent(Object deserializedContent) {
this.deserializedContent = deserializedContent;
}
}

View File

@ -1,68 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.ephemeral.distro;
import java.util.List;
/**
* Synchronize task.
*
* @author nkorange
* @since 1.0.0
*/
public class SyncTask {
private List<String> keys;
private int retryCount;
private long lastExecuteTime;
private String targetServer;
public List<String> getKeys() {
return keys;
}
public void setKeys(List<String> keys) {
this.keys = keys;
}
public int getRetryCount() {
return retryCount;
}
public void setRetryCount(int retryCount) {
this.retryCount = retryCount;
}
public long getLastExecuteTime() {
return lastExecuteTime;
}
public void setLastExecuteTime(long lastExecuteTime) {
this.lastExecuteTime = lastExecuteTime;
}
public String getTargetServer() {
return targetServer;
}
public void setTargetServer(String targetServer) {
this.targetServer = targetServer;
}
}

View File

@ -1,151 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.ephemeral.distro;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.naming.misc.GlobalConfig;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NetUtils;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* Data sync task dispatcher.
*
* @author nkorange
* @since 1.0.0
*/
@Component
public class TaskDispatcher {
@Autowired
private GlobalConfig partitionConfig;
@Autowired
private DataSyncer dataSyncer;
private List<TaskScheduler> taskSchedulerList = new ArrayList<>();
private final int cpuCoreCount = Runtime.getRuntime().availableProcessors();
/**
* Init task dispatcher.
*/
@PostConstruct
public void init() {
for (int i = 0; i < cpuCoreCount; i++) {
TaskScheduler taskScheduler = new TaskScheduler(i);
taskSchedulerList.add(taskScheduler);
GlobalExecutor.submitTaskDispatch(taskScheduler);
}
}
public void addTask(String key) {
taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);
}
public class TaskScheduler implements Runnable {
private int index;
private int dataSize = 0;
private long lastDispatchTime = 0L;
private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);
public TaskScheduler(int index) {
this.index = index;
}
public void addTask(String key) {
queue.offer(key);
}
public int getIndex() {
return index;
}
@Override
public void run() {
List<String> keys = new ArrayList<>();
while (true) {
try {
String key = queue.poll(partitionConfig.getTaskDispatchPeriod(), TimeUnit.MILLISECONDS);
if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
Loggers.DISTRO.debug("got key: {}", key);
}
if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {
continue;
}
if (StringUtils.isBlank(key)) {
continue;
}
if (dataSize == 0) {
keys = new ArrayList<>();
}
keys.add(key);
dataSize++;
if (dataSize == partitionConfig.getBatchSyncKeyCount()
|| (System.currentTimeMillis() - lastDispatchTime) > partitionConfig
.getTaskDispatchPeriod()) {
for (Member member : dataSyncer.getServers()) {
if (NetUtils.localServer().equals(member.getAddress())) {
continue;
}
SyncTask syncTask = new SyncTask();
syncTask.setKeys(keys);
syncTask.setTargetServer(member.getAddress());
if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
Loggers.DISTRO.debug("add sync task: {}", JacksonUtils.toJson(syncTask));
}
dataSyncer.submit(syncTask, 0);
}
lastDispatchTime = System.currentTimeMillis();
dataSize = 0;
}
} catch (Exception e) {
Loggers.DISTRO.error("dispatch sync task failed.", e);
}
}
}
}
}

View File

@ -0,0 +1,80 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.ephemeral.distro.combined;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
/**
* Distro http key.
*
* @author xiweng.yy
*/
public class DistroHttpCombinedKey extends DistroKey {
private static final AtomicLong SEQUENCE = new AtomicLong(0);
private final List<String> actualResourceTypes = new LinkedList<>();
public DistroHttpCombinedKey(String resourceType, String targetServer) {
super(DistroHttpCombinedKey.getSequenceKey(), resourceType, targetServer);
}
public List<String> getActualResourceTypes() {
return actualResourceTypes;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof DistroHttpCombinedKey)) {
return false;
}
if (!super.equals(o)) {
return false;
}
DistroHttpCombinedKey that = (DistroHttpCombinedKey) o;
return Objects.equals(getResourceKey(), that.getResourceKey())
&& Objects.equals(getResourceType(), that.getResourceType())
&& Objects.equals(getTargetServer(), that.getTargetServer())
&& Objects.equals(actualResourceTypes, that.actualResourceTypes);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), actualResourceTypes);
}
@Override
public String toString() {
return getResourceKey() + "{" + "actualResourceTypes=" + actualResourceTypes + "} to " + getTargetServer();
}
public static String getSequenceKey() {
return DistroHttpCombinedKey.class.getSimpleName() + "-" + SEQUENCE.get();
}
public static void incrementSequence() {
SEQUENCE.incrementAndGet();
}
}

View File

@ -0,0 +1,65 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.ephemeral.distro.combined;
import com.alibaba.nacos.common.task.AbstractDelayTask;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTask;
import java.util.HashSet;
import java.util.Set;
/**
* Distro combined multi keys delay task for http.
*
* @author xiweng.yy
*/
public class DistroHttpCombinedKeyDelayTask extends DistroDelayTask {
private final int batchSize;
private final Set<String> actualResourceKeys = new HashSet<>();
public DistroHttpCombinedKeyDelayTask(DistroKey distroKey, DataOperation action, long delayTime, int batchSize) {
super(distroKey, action, delayTime);
this.batchSize = batchSize;
}
public Set<String> getActualResourceKeys() {
return actualResourceKeys;
}
@Override
public void merge(AbstractDelayTask task) {
actualResourceKeys.addAll(((DistroHttpCombinedKeyDelayTask) task).getActualResourceKeys());
if (actualResourceKeys.size() >= batchSize) {
this.setLastProcessTime(0);
DistroHttpCombinedKey.incrementSequence();
}
}
@Override
public DistroKey getDistroKey() {
DistroKey taskKey = super.getDistroKey();
DistroHttpCombinedKey result = new DistroHttpCombinedKey(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, taskKey.getTargetServer());
result.setResourceKey(taskKey.getResourceKey());
result.getActualResourceTypes().addAll(actualResourceKeys);
return result;
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.ephemeral.distro.combined;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskExecuteEngine;
import com.alibaba.nacos.naming.misc.GlobalConfig;
import com.alibaba.nacos.naming.misc.Loggers;
/**
* Distro http combined key execute task.
*
* <p>
* In this task, it will generate combined key delay task and add back to delay engine.
* </p>
*
* @author xiweng.yy
*/
public class DistroHttpCombinedKeyExecuteTask implements Runnable {
private final GlobalConfig globalConfig;
private final DistroDelayTaskExecuteEngine distroDelayTaskExecuteEngine;
private final DistroKey singleDistroKey;
private final DataOperation taskAction;
public DistroHttpCombinedKeyExecuteTask(GlobalConfig globalConfig,
DistroDelayTaskExecuteEngine distroDelayTaskExecuteEngine, DistroKey singleDistroKey,
DataOperation taskAction) {
this.globalConfig = globalConfig;
this.distroDelayTaskExecuteEngine = distroDelayTaskExecuteEngine;
this.singleDistroKey = singleDistroKey;
this.taskAction = taskAction;
}
@Override
public void run() {
try {
DistroKey newKey = new DistroKey(DistroHttpCombinedKey.getSequenceKey(),
DistroHttpCombinedKeyDelayTask.class.getSimpleName(), singleDistroKey.getTargetServer());
DistroHttpCombinedKeyDelayTask combinedTask = new DistroHttpCombinedKeyDelayTask(newKey, taskAction,
globalConfig.getTaskDispatchPeriod() / 2, globalConfig.getBatchSyncKeyCount());
combinedTask.getActualResourceKeys().add(singleDistroKey.getResourceKey());
distroDelayTaskExecuteEngine.addTask(newKey, combinedTask);
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-FAILED] Combined key for http failed. ", e);
}
}
}

View File

@ -0,0 +1,53 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.ephemeral.distro.combined;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.core.distributed.distro.component.DistroFailedTaskHandler;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import com.alibaba.nacos.core.distributed.distro.task.DistroTaskEngineHolder;
import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTask;
import com.alibaba.nacos.naming.misc.GlobalConfig;
/**
* Distro combined key task failed handler.
*
* @author xiweng.yy
*/
public class DistroHttpCombinedKeyTaskFailedHandler implements DistroFailedTaskHandler {
private final GlobalConfig globalConfig;
private final DistroTaskEngineHolder distroTaskEngineHolder;
public DistroHttpCombinedKeyTaskFailedHandler(GlobalConfig globalConfig,
DistroTaskEngineHolder distroTaskEngineHolder) {
this.globalConfig = globalConfig;
this.distroTaskEngineHolder = distroTaskEngineHolder;
}
@Override
public void retry(DistroKey distroKey, DataOperation action) {
DistroHttpCombinedKey combinedKey = (DistroHttpCombinedKey) distroKey;
for (String each : combinedKey.getActualResourceTypes()) {
DistroKey newKey = new DistroKey(each, KeyBuilder.INSTANCE_LIST_KEY_PREFIX, distroKey.getTargetServer());
DistroDelayTask newTask = new DistroDelayTask(newKey, action, globalConfig.getSyncRetryDelay());
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(newKey, newTask);
}
}
}

View File

@ -0,0 +1,61 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.ephemeral.distro.combined;
import com.alibaba.nacos.common.task.AbstractDelayTask;
import com.alibaba.nacos.common.task.NacosTaskProcessor;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import com.alibaba.nacos.core.distributed.distro.task.DistroTaskEngineHolder;
import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTask;
import com.alibaba.nacos.naming.misc.GlobalConfig;
/**
* Distro http task processor.
*
* <p>
* The reason of create this delay task execute engine is that HTTP will establish and close tcp connection frequently,
* so that cost more memory and cpu. What's more, there may be much 'TIME_WAIT' status tcp connection when service
* change frequently.
* </p>
*
* <p>
* For naming usage, the only task is the ephemeral instances change.
* </p>
*
* @author xiweng.yy
*/
public class DistroHttpDelayTaskProcessor implements NacosTaskProcessor {
private final GlobalConfig globalConfig;
private final DistroTaskEngineHolder distroTaskEngineHolder;
public DistroHttpDelayTaskProcessor(GlobalConfig globalConfig, DistroTaskEngineHolder distroTaskEngineHolder) {
this.globalConfig = globalConfig;
this.distroTaskEngineHolder = distroTaskEngineHolder;
}
@Override
public boolean process(AbstractDelayTask task) {
DistroDelayTask distroDelayTask = (DistroDelayTask) task;
DistroKey distroKey = distroDelayTask.getDistroKey();
DistroHttpCombinedKeyExecuteTask executeTask = new DistroHttpCombinedKeyExecuteTask(globalConfig,
distroTaskEngineHolder.getDelayTaskExecuteEngine(), distroKey, distroDelayTask.getAction());
distroTaskEngineHolder.getExecuteWorkersManager().dispatch(distroKey, executeTask);
return true;
}
}

View File

@ -0,0 +1,89 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.ephemeral.distro.component;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.naming.cluster.transport.Serializer;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.DataStore;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.combined.DistroHttpCombinedKey;
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
import com.alibaba.nacos.core.distributed.distro.component.DistroDataStorage;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import com.alibaba.nacos.naming.core.DistroMapper;
import java.util.HashMap;
import java.util.Map;
/**
* Distro data storage impl.
*
* @author xiweng.yy
*/
public class DistroDataStorageImpl implements DistroDataStorage {
private final DataStore dataStore;
private final DistroMapper distroMapper;
public DistroDataStorageImpl(DataStore dataStore, DistroMapper distroMapper) {
this.dataStore = dataStore;
this.distroMapper = distroMapper;
}
@Override
public DistroData getDistroData(DistroKey distroKey) {
Map<String, Datum> result = new HashMap<>(1);
if (distroKey instanceof DistroHttpCombinedKey) {
result = dataStore.batchGet(((DistroHttpCombinedKey) distroKey).getActualResourceTypes());
} else {
Datum datum = dataStore.get(distroKey.getResourceKey());
result.put(distroKey.getResourceKey(), datum);
}
byte[] dataContent = ApplicationUtils.getBean(Serializer.class).serialize(result);
return new DistroData(distroKey, dataContent);
}
@Override
public DistroData getDatumSnapshot() {
Map<String, Datum> result = dataStore.getDataMap();
byte[] dataContent = ApplicationUtils.getBean(Serializer.class).serialize(result);
DistroKey distroKey = new DistroKey("snapshot", KeyBuilder.INSTANCE_LIST_KEY_PREFIX);
return new DistroData(distroKey, dataContent);
}
@Override
public DistroData getVerifyData() {
Map<String, String> keyChecksums = new HashMap<>(64);
for (String key : dataStore.keys()) {
if (!distroMapper.responsible(KeyBuilder.getServiceName(key))) {
continue;
}
Datum datum = dataStore.get(key);
if (datum == null) {
continue;
}
keyChecksums.put(key, datum.value.getChecksum());
}
if (keyChecksums.isEmpty()) {
return null;
}
DistroKey distroKey = new DistroKey("checksum", KeyBuilder.INSTANCE_LIST_KEY_PREFIX);
return new DistroData(distroKey, ApplicationUtils.getBean(Serializer.class).serialize(keyChecksums));
}
}

View File

@ -0,0 +1,86 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.ephemeral.distro.component;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.combined.DistroHttpCombinedKey;
import com.alibaba.nacos.core.distributed.distro.component.DistroCallback;
import com.alibaba.nacos.core.distributed.distro.component.DistroTransportAgent;
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import com.alibaba.nacos.core.distributed.distro.exception.DistroException;
import com.alibaba.nacos.naming.misc.NamingProxy;
import java.util.ArrayList;
import java.util.List;
/**
* Distro http agent.
*
* @author xiweng.yy
*/
public class DistroHttpAgent implements DistroTransportAgent {
@Override
public boolean syncData(DistroData data, String targetServer) {
byte[] dataContent = data.getContent();
return NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer());
}
@Override
public void syncData(DistroData data, String targetServer, DistroCallback callback) {
}
@Override
public boolean syncVerifyData(DistroData verifyData, String targetServer) {
NamingProxy.syncCheckSums(verifyData.getContent(), targetServer);
return true;
}
@Override
public void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback) {
}
@Override
public DistroData getData(DistroKey key, String targetServer) {
try {
List<String> toUpdateKeys = null;
if (key instanceof DistroHttpCombinedKey) {
toUpdateKeys = ((DistroHttpCombinedKey) key).getActualResourceTypes();
} else {
toUpdateKeys = new ArrayList<>(1);
toUpdateKeys.add(key.getResourceKey());
}
byte[] queriedData = NamingProxy.getData(toUpdateKeys, key.getTargetServer());
return new DistroData(key, queriedData);
} catch (Exception e) {
throw new DistroException(String.format("Get data from %s failed.", key.getTargetServer()), e);
}
}
@Override
public DistroData getDatumSnapshot(String targetServer) {
try {
byte[] allDatum = NamingProxy.getAllData(targetServer);
return new DistroData(new DistroKey("snapshot", KeyBuilder.INSTANCE_LIST_KEY_PREFIX), allDatum);
} catch (Exception e) {
throw new DistroException(String.format("Get snapshot from %s failed.", targetServer), e);
}
}
}

View File

@ -81,8 +81,9 @@ public class ClusterVersionJudgement {
allMemberIsNewVersion = false;
}
}
this.allMemberIsNewVersion = allMemberIsNewVersion;
if (allMemberIsNewVersion) {
// can only trigger once
if (allMemberIsNewVersion && !this.allMemberIsNewVersion) {
this.allMemberIsNewVersion = true;
Collections.sort(observers);
for (consumerWithPriority consumer : observers) {
consumer.consumer.accept(true);

View File

@ -31,7 +31,7 @@ import com.alibaba.nacos.consistency.entity.GetRequest;
import com.alibaba.nacos.consistency.entity.Log;
import com.alibaba.nacos.consistency.entity.Response;
import com.alibaba.nacos.consistency.snapshot.SnapshotOperation;
import com.alibaba.nacos.core.distributed.raft.RaftConfig;
import com.alibaba.nacos.core.distributed.ProtocolManager;
import com.alibaba.nacos.core.exception.ErrorCode;
import com.alibaba.nacos.core.exception.KVStorageException;
import com.alibaba.nacos.core.storage.StorageFactory;
@ -92,7 +92,7 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
}
}
private final CPProtocol<RaftConfig, LogProcessor4CP> protocol;
private final CPProtocol protocol;
private final KvStorage kvStorage;
@ -121,9 +121,9 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
*/
private volatile boolean hasError = false;
public PersistentServiceProcessor(final CPProtocol<RaftConfig, LogProcessor4CP> protocol,
public PersistentServiceProcessor(final ProtocolManager protocolManager,
final ClusterVersionJudgement versionJudgement, final RaftStore oldStore) {
this.protocol = protocol;
this.protocol = protocolManager.getCpProtocol();
this.oldStore = oldStore;
this.versionJudgement = versionJudgement;
this.kvStorage = StorageFactory.createKVStorage(KvStorage.KVType.File, "naming-persistent",
@ -139,6 +139,7 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
init();
}
@SuppressWarnings("unchecked")
private void init() {
NotifyCenter.registerToPublisher(ValueChangeEvent.class, 16384);
this.protocol.addLogProcessors(Collections.singletonList(this));
@ -210,9 +211,11 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
* Pull old data into the new data store. When loading old data information, write locks must be added, and new
* requests can be processed only after the old data has been loaded
*/
@SuppressWarnings("unchecked")
public void loadFromOldData() {
final Lock lock = this.lock.writeLock();
lock.lock();
Loggers.RAFT.warn("start to load data to new raft protocol!!!");
try {
if (protocol.isLeader(Constants.NAMING_PERSISTENT_SERVICE_GROUP)) {
Map<String, Datum> datumMap = new HashMap<>(64);
@ -230,7 +233,7 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
BatchWriteRequest request = new BatchWriteRequest();
request.setKeys(keys);
request.setValues(values);
CompletableFuture<Response> future = protocol.submitAsync(
CompletableFuture future = protocol.submitAsync(
Log.newBuilder().setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP)
.setData(ByteString.copyFrom(serializer.serialize(request))).build())
.whenComplete(((response, throwable) -> {

View File

@ -21,7 +21,7 @@ import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.naming.consistency.ApplyAction;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener;
@ -178,8 +178,12 @@ public class RaftCore {
versionJudgement.registerObserver(isAllNewVersion -> {
stopWork = isAllNewVersion;
if (stopWork) {
Loggers.RAFT.warn("start to close old raft protocol!!!");
Loggers.RAFT.warn("stop old raft protocol task for notifier");
NotifyCenter.deregisterSubscriber(notifier);
Loggers.RAFT.warn("stop old raft protocol task for master task");
masterTask.cancel(true);
Loggers.RAFT.warn("stop old raft protocol task for heartbeat task");
heartbeatTask.cancel(true);
}
}, 100);
@ -386,9 +390,7 @@ public class RaftCore {
}
}
raftStore.updateTerm(local.term.get());
NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(ApplyAction.CHANGE).build());
NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());
Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}
@ -838,7 +840,8 @@ public class RaftCore {
raftStore.write(newDatum);
datums.put(newDatum.key, newDatum);
NotifyCenter.publishEvent(ValueChangeEvent.builder().key(newDatum.key).action(ApplyAction.CHANGE).build());
NotifyCenter.publishEvent(ValueChangeEvent.builder().key(newDatum.key).action(DataOperation.CHANGE).build());
local.resetLeaderDue();
@ -1008,7 +1011,7 @@ public class RaftCore {
public void addDatum(Datum datum) {
datums.put(datum.key, datum);
NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(ApplyAction.CHANGE).build());
NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());
}
/**
@ -1037,7 +1040,7 @@ public class RaftCore {
raftStore.delete(deleted);
Loggers.RAFT.info("datum deleted, key: {}", key);
}
NotifyCenter.publishEvent(ValueChangeEvent.builder().key(URLDecoder.decode(key, "UTF-8")).action(ApplyAction.DELETE).build());
NotifyCenter.publishEvent(ValueChangeEvent.builder().key(URLDecoder.decode(key, "UTF-8")).action(DataOperation.DELETE).build());
} catch (UnsupportedEncodingException e) {
Loggers.RAFT.warn("datum key decode failed: {}", key);
}
@ -1051,4 +1054,5 @@ public class RaftCore {
public int getNotifyTaskCount() {
return (int) publisher.currentEventSize();
}
}
}

View File

@ -20,6 +20,7 @@ import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement;
import com.alibaba.nacos.naming.misc.Loggers;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.event.SmartApplicationListener;
import org.springframework.stereotype.Component;
@ -55,6 +56,7 @@ public class RaftListener implements SmartApplicationListener {
this.versionJudgement.registerObserver(isAllNewVersion -> {
stopUpdate = isAllNewVersion;
if (stopUpdate) {
Loggers.RAFT.warn("start to move old raft protocol metadata");
Member self = memberManager.getSelf();
self.delExtendVal(GROUP);
memberManager.update(self);

View File

@ -19,7 +19,7 @@ package com.alibaba.nacos.naming.consistency.persistent.raft;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.naming.consistency.ApplyAction;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.persistent.PersistentNotifier;
@ -86,7 +86,7 @@ public class RaftStore {
datums.put(datum.key, datum);
if (notifier != null) {
NotifyCenter
.publishEvent(ValueChangeEvent.builder().key(datum.key).action(ApplyAction.CHANGE).build());
.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());
}
}
}

View File

@ -18,18 +18,19 @@ package com.alibaba.nacos.naming.controllers;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.naming.cluster.transport.Serializer;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.DataStore;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroHttpData;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.combined.DistroHttpCombinedKey;
import com.alibaba.nacos.core.distributed.distro.DistroProtocol;
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.fasterxml.jackson.databind.JsonNode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
@ -39,7 +40,6 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
/**
@ -53,13 +53,7 @@ import java.util.Map;
public class DistroController {
@Autowired
private Serializer serializer;
@Autowired
private DistroConsistencyServiceImpl consistencyService;
@Autowired
private DataStore dataStore;
private DistroProtocol distroProtocol;
@Autowired
private ServiceManager serviceManager;
@ -90,7 +84,9 @@ public class DistroController {
.isDefaultInstanceEphemeral()) {
serviceManager.createEmptyService(namespaceId, serviceName, true);
}
consistencyService.onPut(entry.getKey(), entry.getValue().value);
DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(entry.getKey()), null,
entry.getValue());
distroProtocol.onReceive(distroHttpData);
}
}
return ResponseEntity.ok("ok");
@ -105,8 +101,8 @@ public class DistroController {
*/
@PutMapping("/checksum")
public ResponseEntity syncChecksum(@RequestParam String source, @RequestBody Map<String, String> dataMap) {
consistencyService.onReceiveChecksums(dataMap, source);
DistroHttpData distroHttpData = new DistroHttpData(createDistroKey(source), null, dataMap);
distroProtocol.onVerify(distroHttpData);
return ResponseEntity.ok("ok");
}
@ -123,17 +119,12 @@ public class DistroController {
JsonNode bodyNode = JacksonUtils.toObj(body);
String keys = bodyNode.get("keys").asText();
String keySplitter = ",";
Map<String, Datum> datumMap = new HashMap<>(64);
DistroHttpCombinedKey distroKey = new DistroHttpCombinedKey(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, "");
for (String key : keys.split(keySplitter)) {
Datum datum = consistencyService.get(key);
if (datum == null) {
continue;
}
datumMap.put(key, datum);
distroKey.getActualResourceTypes().add(key);
}
byte[] content = serializer.serialize(datumMap);
return ResponseEntity.ok(content);
DistroData distroData = distroProtocol.onQuery(distroKey);
return ResponseEntity.ok(distroData.getContent());
}
/**
@ -143,7 +134,11 @@ public class DistroController {
*/
@GetMapping("/datums")
public ResponseEntity getAllDatums() {
byte[] content = serializer.serialize(dataStore.getDataMap());
return ResponseEntity.ok(content);
DistroData distroData = distroProtocol.onSnapshot(KeyBuilder.INSTANCE_LIST_KEY_PREFIX);
return ResponseEntity.ok(distroData.getContent());
}
private DistroKey createDistroKey(String resourceKey) {
return new DistroKey(resourceKey, KeyBuilder.INSTANCE_LIST_KEY_PREFIX);
}
}

View File

@ -763,10 +763,6 @@ public class ServiceManager implements RecordListener<Service> {
return total;
}
public Map<String, Service> getServiceMap(String namespaceId) {
return serviceMap.get(namespaceId);
}
public int getPagedService(String namespaceId, int startPage, int pageSize, String param, String containedInstance,
List<Service> serviceList, boolean hasIpCount) {
@ -1018,4 +1014,4 @@ public class ServiceManager implements RecordListener<Service> {
return JacksonUtils.toJson(this);
}
}
}
}

View File

@ -16,9 +16,12 @@
package com.alibaba.nacos.naming.misc;
import com.alibaba.nacos.core.distributed.distro.DistroConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* Stores some configurations for Distro protocol.
*
@ -28,6 +31,8 @@ import org.springframework.stereotype.Component;
@Component
public class GlobalConfig {
private final DistroConfig distroConfig;
@Value("${nacos.naming.distro.taskDispatchPeriod:2000}")
private int taskDispatchPeriod = 2000;
@ -46,6 +51,22 @@ public class GlobalConfig {
@Value("${nacos.naming.distro.loadDataRetryDelayMillis:30000}")
private long loadDataRetryDelayMillis = 30000;
public GlobalConfig(DistroConfig distroConfig) {
this.distroConfig = distroConfig;
}
@PostConstruct
public void printGlobalConfig() {
Loggers.SRV_LOG.info(toString());
overrideDistroConfiguration();
}
private void overrideDistroConfiguration() {
distroConfig.setSyncDelayMillis(taskDispatchPeriod);
distroConfig.setSyncRetryDelayMillis(syncRetryDelay);
distroConfig.setLoadDataRetryDelayMillis(loadDataRetryDelayMillis);
}
public int getTaskDispatchPeriod() {
return taskDispatchPeriod;
}
@ -69,4 +90,11 @@ public class GlobalConfig {
public long getLoadDataRetryDelayMillis() {
return loadDataRetryDelayMillis;
}
@Override
public String toString() {
return "GlobalConfig{" + "taskDispatchPeriod=" + taskDispatchPeriod + ", batchSyncKeyCount=" + batchSyncKeyCount
+ ", syncRetryDelay=" + syncRetryDelay + ", dataWarmup=" + dataWarmup + ", expireInstance="
+ expireInstance + ", loadDataRetryDelayMillis=" + loadDataRetryDelayMillis + '}';
}
}

View File

@ -46,8 +46,6 @@ public class GlobalExecutor {
public static final long TICK_PERIOD_MS = TimeUnit.MILLISECONDS.toMillis(500L);
private static final long PARTITION_DATA_TIMED_SYNC_INTERVAL = TimeUnit.SECONDS.toMillis(5);
private static final long SERVER_STATUS_UPDATE_PERIOD = TimeUnit.SECONDS.toMillis(5);
public static final int DEFAULT_THREAD_COUNT =
@ -58,16 +56,6 @@ public class GlobalExecutor {
Runtime.getRuntime().availableProcessors() * 2,
new NameThreadFactory("com.alibaba.nacos.naming.timer"));
private static final ScheduledExecutorService TASK_DISPATCHER_EXECUTOR = ExecutorFactory.Managed
.newScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),
Runtime.getRuntime().availableProcessors(),
new NameThreadFactory("com.alibaba.nacos.naming.distro.task.dispatcher"));
private static final ScheduledExecutorService DATA_SYNC_EXECUTOR = ExecutorFactory.Managed
.newScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),
Runtime.getRuntime().availableProcessors(),
new NameThreadFactory("com.alibaba.nacos.naming.distro.data.syncer"));
private static final ScheduledExecutorService SERVER_STATUS_EXECUTOR = ExecutorFactory.Managed
.newSingleScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),
new NameThreadFactory("com.alibaba.nacos.naming.status.worker"));
@ -127,15 +115,6 @@ public class GlobalExecutor {
.newSingleScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),
new NameThreadFactory("com.alibaba.nacos.naming.nacos-server-performance"));
public static void submitDataSync(Runnable runnable, long delay) {
DATA_SYNC_EXECUTOR.schedule(runnable, delay, TimeUnit.MILLISECONDS);
}
public static void schedulePartitionDataTimedSync(Runnable runnable) {
DATA_SYNC_EXECUTOR.scheduleWithFixedDelay(runnable, PARTITION_DATA_TIMED_SYNC_INTERVAL,
PARTITION_DATA_TIMED_SYNC_INTERVAL, TimeUnit.MILLISECONDS);
}
public static ScheduledFuture registerMasterElection(Runnable runnable) {
return NAMING_TIMER_EXECUTOR.scheduleAtFixedRate(runnable, 0, TICK_PERIOD_MS, TimeUnit.MILLISECONDS);
}
@ -160,26 +139,10 @@ public class GlobalExecutor {
NAMING_TIMER_EXECUTOR.scheduleAtFixedRate(runnable, initialDelay, period, TimeUnit.MILLISECONDS);
}
public static void submitTaskDispatch(Runnable runnable) {
TASK_DISPATCHER_EXECUTOR.submit(runnable);
}
public static void submitLoadDataTask(Runnable runnable) {
NAMING_TIMER_EXECUTOR.submit(runnable);
}
public static void submitLoadDataTask(Runnable runnable, long delay) {
NAMING_TIMER_EXECUTOR.schedule(runnable, delay, TimeUnit.MILLISECONDS);
}
public static void submitClusterVersionJudge(Runnable runnable, long delay) {
NAMING_TIMER_EXECUTOR.schedule(runnable, delay, TimeUnit.MILLISECONDS);
}
public static void submitLoadOldData(Runnable runnable, long delay) {
NAMING_TIMER_EXECUTOR.schedule(runnable, delay, TimeUnit.MILLISECONDS);
}
public static void submitDistroNotifyTask(Runnable runnable) {
DISTRO_NOTIFY_EXECUTOR.submit(runnable);
}

View File

@ -54,7 +54,16 @@ public class NamingProxy {
* @param server server address
*/
public static void syncCheckSums(Map<String, String> checksumMap, String server) {
syncCheckSums(JacksonUtils.toJsonBytes(checksumMap), server);
}
/**
* Synchronize check sums.
*
* @param checksums checksum map bytes
* @param server server address
*/
public static void syncCheckSums(byte[] checksums, String server) {
try {
Map<String, String> headers = new HashMap<>(128);
@ -64,8 +73,8 @@ public class NamingProxy {
HttpClient.asyncHttpPutLarge(
"http://" + server + ApplicationUtils.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT
+ TIMESTAMP_SYNC_URL + "?source=" + NetUtils.localServer(), headers,
JacksonUtils.toJsonBytes(checksumMap), new AsyncCompletionHandler() {
+ TIMESTAMP_SYNC_URL + "?source=" + NetUtils.localServer(), headers, checksums,
new AsyncCompletionHandler() {
@Override
public Object onCompleted(Response response) throws Exception {
if (HttpURLConnection.HTTP_OK != response.getStatusCode()) {

View File

@ -0,0 +1,137 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.ephemeral.distro;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.naming.BaseTest;
import com.alibaba.nacos.naming.cluster.transport.Serializer;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.core.distributed.distro.DistroProtocol;
import com.alibaba.nacos.core.distributed.distro.component.DistroComponentHolder;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import com.alibaba.nacos.core.distributed.distro.task.DistroTaskEngineHolder;
import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.naming.misc.GlobalConfig;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.springframework.test.util.ReflectionTestUtils;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class DistroConsistencyServiceImplTest extends BaseTest {
private DistroConsistencyServiceImpl distroConsistencyService;
@Mock
private DataStore dataStore;
@Mock
private Serializer serializer;
@Mock
private DistroProtocol distroProtocol;
@Mock
private GlobalConfig globalConfig;
@Mock
private DistroConsistencyServiceImpl.Notifier notifier;
@Mock
private RecordListener<Instances> recordListener;
@Mock
private DistroComponentHolder distroComponentHolder;
@Mock
private DistroTaskEngineHolder distroTaskEngineHolder;
private Map<String, ConcurrentLinkedQueue<RecordListener>> listeners;
private Instances instances;
@Before
public void setUp() throws Exception {
doReturn(distroComponentHolder).when(context).getBean(DistroComponentHolder.class);
doReturn(distroTaskEngineHolder).when(context).getBean(DistroTaskEngineHolder.class);
when(globalConfig.getTaskDispatchPeriod()).thenReturn(2000);
distroConsistencyService = new DistroConsistencyServiceImpl(distroMapper, dataStore, serializer, switchDomain,
globalConfig, distroProtocol);
ReflectionTestUtils.setField(distroConsistencyService, "notifier", notifier);
ReflectionTestUtils.setField(distroConsistencyService, "distroProtocol", distroProtocol);
listeners = (Map<String, ConcurrentLinkedQueue<RecordListener>>) ReflectionTestUtils
.getField(distroConsistencyService, "listeners");
instances = new Instances();
}
@After
public void tearDown() throws Exception {
}
@Test
public void testPutWithListener() throws NacosException {
String key = KeyBuilder.buildInstanceListKey(TEST_NAMESPACE, TEST_SERVICE_NAME, true);
distroConsistencyService.listen(key, recordListener);
distroConsistencyService.put(key, instances);
verify(distroProtocol).sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, 1000L);
verify(notifier).addTask(key, DataOperation.CHANGE);
verify(dataStore).put(eq(key), any(Datum.class));
}
@Test
public void testPutWithoutListener() throws NacosException {
String key = KeyBuilder.buildInstanceListKey(TEST_NAMESPACE, TEST_SERVICE_NAME, true);
distroConsistencyService.put(key, instances);
verify(distroProtocol).sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, 1000L);
verify(notifier, never()).addTask(key, DataOperation.CHANGE);
verify(dataStore).put(eq(key), any(Datum.class));
}
@Test
public void testRemoveWithListener() throws NacosException {
String key = KeyBuilder.buildInstanceListKey(TEST_NAMESPACE, TEST_SERVICE_NAME, true);
distroConsistencyService.listen(key, recordListener);
distroConsistencyService.remove(key);
verify(dataStore).remove(key);
verify(notifier).addTask(key, DataOperation.DELETE);
assertTrue(listeners.isEmpty());
}
@Test
public void testRemoveWithoutListener() throws NacosException {
String key = KeyBuilder.buildInstanceListKey(TEST_NAMESPACE, TEST_SERVICE_NAME, true);
distroConsistencyService.remove(key);
verify(dataStore).remove(key);
verify(notifier, never()).addTask(key, DataOperation.DELETE);
assertTrue(listeners.isEmpty());
}
}

View File

@ -1,42 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.ephemeral.distro;
import com.alibaba.nacos.naming.misc.GlobalConfig;
import org.junit.Before;
import org.junit.Test;
import org.springframework.test.util.ReflectionTestUtils;
public class TaskDispatcherTest {
private TaskDispatcher taskDispatcher;
@Before
public void init() {
taskDispatcher = new TaskDispatcher();
GlobalConfig conf = new GlobalConfig();
ReflectionTestUtils.setField(conf, "taskDispatchThreadCount", 3);
ReflectionTestUtils.setField(taskDispatcher, "partitionConfig", conf);
taskDispatcher.init();
}
@Test
public void testAddTask() {
char[] chars = new char[] {2325, 9, 30, 12, 2};
taskDispatcher.addTask(new String(chars));
}
}

View File

@ -17,8 +17,11 @@
package com.alibaba.nacos.naming.core;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.PreservedMetadataKeys;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.naming.BaseTest;
import com.alibaba.nacos.naming.consistency.ConsistencyService;
import com.alibaba.nacos.naming.consistency.Datum;
@ -32,18 +35,25 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Spy;
import org.springframework.test.util.ReflectionTestUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ServiceManagerTest extends BaseTest {
@Spy
private ServiceManager serviceManager;
@Mock
@ -52,21 +62,181 @@ public class ServiceManagerTest extends BaseTest {
@Mock
private Synchronizer synchronizer;
@Mock
private ServerMemberManager serverMemberManager;
private Service service;
private Cluster cluster;
private Instance instance;
private Instance instance2;
@Before
public void before() {
super.before();
mockInjectHealthCheckProcessor();
mockInjectDistroMapper();
serviceManager = new ServiceManager(switchDomain, distroMapper, serverMemberManager, pushService, peerSet);
ReflectionTestUtils.setField(serviceManager, "consistencyService", consistencyService);
ReflectionTestUtils.setField(serviceManager, "synchronizer", synchronizer);
mockInjectSwitchDomain();
mockInjectDistroMapper();
mockService();
mockCluster();
mockInstance();
}
private void mockService() {
service = new Service(TEST_SERVICE_NAME);
service.setNamespaceId(TEST_NAMESPACE);
}
private void mockCluster() {
cluster = new Cluster(TEST_CLUSTER_NAME, service);
}
private void mockInstance() {
instance = new Instance("1.1.1.1", 1, TEST_CLUSTER_NAME);
instance2 = new Instance("2.2.2.2", 2);
}
@Test
public void testGetAllNamespaces() throws NacosException {
assertTrue(serviceManager.getAllNamespaces().isEmpty());
serviceManager.createEmptyService(TEST_NAMESPACE, TEST_SERVICE_NAME, true);
assertFalse(serviceManager.getAllNamespaces().isEmpty());
assertEquals(1, serviceManager.getAllNamespaces().size());
assertEquals(TEST_NAMESPACE, serviceManager.getAllNamespaces().iterator().next());
}
@Test
public void testGetAllServiceNames() throws NacosException {
assertTrue(serviceManager.getAllServiceNames().isEmpty());
serviceManager.createEmptyService(TEST_NAMESPACE, TEST_SERVICE_NAME, true);
assertFalse(serviceManager.getAllServiceNames().isEmpty());
assertEquals(1, serviceManager.getAllServiceNames().size());
assertEquals(1, serviceManager.getAllServiceNames(TEST_NAMESPACE).size());
assertEquals(TEST_SERVICE_NAME, serviceManager.getAllServiceNames(TEST_NAMESPACE).iterator().next());
}
@Test
public void testGetAllServiceNameList() throws NacosException {
assertTrue(serviceManager.getAllServiceNameList(TEST_NAMESPACE).isEmpty());
serviceManager.createEmptyService(TEST_NAMESPACE, TEST_SERVICE_NAME, true);
assertFalse(serviceManager.getAllServiceNameList(TEST_NAMESPACE).isEmpty());
assertEquals(1, serviceManager.getAllServiceNameList(TEST_NAMESPACE).size());
assertEquals(TEST_SERVICE_NAME, serviceManager.getAllServiceNameList(TEST_NAMESPACE).get(0));
}
@Test
public void testGetResponsibleServices() throws NacosException {
when(distroMapper.responsible(TEST_SERVICE_NAME)).thenReturn(true);
assertEquals(0, serviceManager.getResponsibleServiceCount());
serviceManager.createEmptyService(TEST_NAMESPACE, TEST_SERVICE_NAME, true);
assertEquals(1, serviceManager.getResponsibleServiceCount());
assertEquals(TEST_SERVICE_NAME,
serviceManager.getResponsibleServices().get(TEST_NAMESPACE).iterator().next().getName());
}
@Test
public void getResponsibleInstanceCount() throws NacosException {
when(distroMapper.responsible(TEST_SERVICE_NAME)).thenReturn(true);
assertEquals(0, serviceManager.getResponsibleInstanceCount());
serviceManager.createEmptyService(TEST_NAMESPACE, TEST_SERVICE_NAME, true);
Service service = serviceManager.getService(TEST_NAMESPACE, TEST_SERVICE_NAME);
service.addCluster(cluster);
((Set<Instance>) ReflectionTestUtils.getField(cluster, "ephemeralInstances")).add(instance);
assertEquals(1, serviceManager.getResponsibleInstanceCount());
}
@Test
public void testCreateEmptyServiceForEphemeral() throws NacosException {
assertFalse(serviceManager.containService(TEST_NAMESPACE, TEST_SERVICE_NAME));
assertEquals(0, serviceManager.getServiceCount());
serviceManager.createServiceIfAbsent(TEST_NAMESPACE, TEST_SERVICE_NAME, true,
new Cluster(TEST_CLUSTER_NAME, service));
assertTrue(serviceManager.containService(TEST_NAMESPACE, TEST_SERVICE_NAME));
assertEquals(1, serviceManager.getServiceCount());
verify(consistencyService).listen(eq(KeyBuilder.buildInstanceListKey(TEST_NAMESPACE, TEST_SERVICE_NAME, true)),
any(Service.class));
verify(consistencyService).listen(eq(KeyBuilder.buildInstanceListKey(TEST_NAMESPACE, TEST_SERVICE_NAME, false)),
any(Service.class));
verify(consistencyService, never())
.put(eq(KeyBuilder.buildServiceMetaKey(TEST_NAMESPACE, TEST_SERVICE_NAME)), any(Service.class));
}
@Test
public void testCreateEmptyServiceForPersistent() throws NacosException {
assertFalse(serviceManager.containService(TEST_NAMESPACE, TEST_SERVICE_NAME));
assertEquals(0, serviceManager.getServiceCount());
serviceManager.createServiceIfAbsent(TEST_NAMESPACE, TEST_SERVICE_NAME, false,
new Cluster(TEST_CLUSTER_NAME, service));
assertTrue(serviceManager.containService(TEST_NAMESPACE, TEST_SERVICE_NAME));
assertEquals(1, serviceManager.getServiceCount());
verify(consistencyService).listen(eq(KeyBuilder.buildInstanceListKey(TEST_NAMESPACE, TEST_SERVICE_NAME, true)),
any(Service.class));
verify(consistencyService).listen(eq(KeyBuilder.buildInstanceListKey(TEST_NAMESPACE, TEST_SERVICE_NAME, false)),
any(Service.class));
verify(consistencyService)
.put(eq(KeyBuilder.buildServiceMetaKey(TEST_NAMESPACE, TEST_SERVICE_NAME)), any(Service.class));
}
@Test
public void testEasyRemoveServiceSuccessfully() throws Exception {
serviceManager.createEmptyService(TEST_NAMESPACE, TEST_SERVICE_NAME, true);
serviceManager.easyRemoveService(TEST_NAMESPACE, TEST_SERVICE_NAME);
verify(consistencyService).remove(KeyBuilder.buildServiceMetaKey(TEST_NAMESPACE, TEST_SERVICE_NAME));
}
@Test
public void testEasyRemoveServiceFailed() throws Exception {
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("specified service not exist, serviceName : " + TEST_SERVICE_NAME);
serviceManager.easyRemoveService(TEST_NAMESPACE, TEST_SERVICE_NAME);
}
@Test
public void testRegisterInstance() throws NacosException {
assertEquals(0, serviceManager.getInstanceCount());
serviceManager.registerInstance(TEST_NAMESPACE, TEST_SERVICE_NAME, instance);
String instanceListKey = KeyBuilder.buildInstanceListKey(TEST_NAMESPACE, TEST_SERVICE_NAME, true);
verify(consistencyService).put(eq(instanceListKey), any(Instances.class));
}
@Test
public void testUpdateInstance() throws NacosException {
serviceManager.createEmptyService(TEST_NAMESPACE, TEST_SERVICE_NAME, true);
Service service = serviceManager.getService(TEST_NAMESPACE, TEST_SERVICE_NAME);
service.addCluster(cluster);
((Set<Instance>) ReflectionTestUtils.getField(cluster, "ephemeralInstances")).add(instance);
serviceManager.updateInstance(TEST_NAMESPACE, TEST_SERVICE_NAME, instance);
String instanceListKey = KeyBuilder.buildInstanceListKey(TEST_NAMESPACE, TEST_SERVICE_NAME, true);
verify(consistencyService).put(eq(instanceListKey), any(Instances.class));
}
@Test
public void testRemoveInstance() throws NacosException {
serviceManager.createEmptyService(TEST_NAMESPACE, TEST_SERVICE_NAME, true);
serviceManager.removeInstance(TEST_NAMESPACE, TEST_SERVICE_NAME, true, instance);
String instanceListKey = KeyBuilder.buildInstanceListKey(TEST_NAMESPACE, TEST_SERVICE_NAME, true);
verify(consistencyService).put(eq(instanceListKey), any(Instances.class));
}
@Test
public void testGetInstance() throws NacosException {
assertNull(serviceManager.getInstance(TEST_NAMESPACE, TEST_SERVICE_NAME, TEST_CLUSTER_NAME, "1.1.1.1", 1));
serviceManager.createEmptyService(TEST_NAMESPACE, TEST_SERVICE_NAME, true);
assertNull(serviceManager.getInstance(TEST_NAMESPACE, TEST_SERVICE_NAME, TEST_CLUSTER_NAME, "1.1.1.1", 1));
Service service = serviceManager.getService(TEST_NAMESPACE, TEST_SERVICE_NAME);
service.addCluster(cluster);
((Set<Instance>) ReflectionTestUtils.getField(cluster, "ephemeralInstances")).add(instance);
assertEquals(instance,
serviceManager.getInstance(TEST_NAMESPACE, TEST_SERVICE_NAME, TEST_CLUSTER_NAME, "1.1.1.1", 1));
assertNull(serviceManager.getInstance(TEST_NAMESPACE, TEST_SERVICE_NAME, TEST_CLUSTER_NAME, "2.2.2.2", 2));
}
@Test
public void testUpdateIpAddresses() throws Exception {
ReflectionTestUtils.setField(serviceManager, "consistencyService", consistencyService);
Service service = new Service(TEST_SERVICE_NAME);
service.setNamespaceId(TEST_NAMESPACE);
Instance instance = new Instance("1.1.1.1", 1);
instance.setClusterName(TEST_CLUSTER_NAME);
List<Instance> instanceList = serviceManager
.updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, true, instance);
Assert.assertEquals(1, instanceList.size());
@ -78,7 +248,7 @@ public class ServiceManagerTest extends BaseTest {
Datum datam = new Datum();
datam.key = KeyBuilder.buildInstanceListKey(TEST_NAMESPACE, TEST_SERVICE_NAME, true);
Instances instances = new Instances();
instanceList.add(new Instance("2.2.2.2", 2));
instanceList.add(instance2);
instances.setInstanceList(instanceList);
datam.value = instances;
when(consistencyService.get(KeyBuilder.buildInstanceListKey(TEST_NAMESPACE, TEST_SERVICE_NAME, true)))
@ -87,7 +257,7 @@ public class ServiceManagerTest extends BaseTest {
instanceList = serviceManager
.updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE, true, instance);
Assert.assertEquals(1, instanceList.size());
Assert.assertEquals(new Instance("2.2.2.2", 2), instanceList.get(0));
Assert.assertEquals(instance2, instanceList.get(0));
Assert.assertEquals(1, service.getClusterMap().size());
Assert.assertEquals(new Cluster(instance.getClusterName(), service),
service.getClusterMap().get(TEST_CLUSTER_NAME));
@ -97,41 +267,52 @@ public class ServiceManagerTest extends BaseTest {
public void testUpdateIpAddressesNoInstance() throws Exception {
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("ip list can not be empty, service: test-service, ip list: []");
ReflectionTestUtils.setField(serviceManager, "consistencyService", consistencyService);
Service service = new Service(TEST_SERVICE_NAME);
service.setNamespaceId(TEST_NAMESPACE);
serviceManager.updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, true);
}
@Test
public void testSearchServices() throws NacosException {
serviceManager.createEmptyService(TEST_NAMESPACE, TEST_SERVICE_NAME, true);
List<Service> actual = serviceManager
.searchServices(TEST_NAMESPACE, Constants.ANY_PATTERN + TEST_SERVICE_NAME + Constants.ANY_PATTERN);
assertEquals(1, actual.size());
assertEquals(TEST_SERVICE_NAME, actual.get(0).getName());
}
@Test
public void testGetPagedService() throws NacosException {
serviceManager.createEmptyService(TEST_NAMESPACE, TEST_SERVICE_NAME, true);
Service service = serviceManager.getService(TEST_NAMESPACE, TEST_SERVICE_NAME);
service.addCluster(cluster);
((Set<Instance>) ReflectionTestUtils.getField(cluster, "ephemeralInstances")).add(instance);
List<Service> actualServices = new ArrayList<>(8);
int actualSize = serviceManager
.getPagedService(TEST_NAMESPACE, 0, 10, StringUtils.EMPTY, "1.1.1.1:1", actualServices, true);
assertEquals(1, actualSize);
assertEquals(TEST_SERVICE_NAME, actualServices.get(0).getName());
}
@Test
public void testSnowflakeInstanceId() throws Exception {
ReflectionTestUtils.setField(serviceManager, "consistencyService", consistencyService);
Service service = new Service(TEST_SERVICE_NAME);
service.setNamespaceId(TEST_NAMESPACE);
Map<String, String> metaData = Maps.newHashMap();
metaData.put(PreservedMetadataKeys.INSTANCE_ID_GENERATOR, Constants.SNOWFLAKE_INSTANCE_ID_GENERATOR);
Instance instance1 = new Instance("1.1.1.1", 1);
instance1.setClusterName(TEST_CLUSTER_NAME);
instance1.setMetadata(metaData);
instance.setMetadata(metaData);
Instance instance2 = new Instance("2.2.2.2", 2);
instance2.setClusterName(TEST_CLUSTER_NAME);
instance2.setMetadata(metaData);
List<Instance> instanceList = serviceManager
.updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, true, instance1, instance2);
.updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, true, instance, instance2);
Assert.assertNotNull(instanceList);
Assert.assertEquals(2, instanceList.size());
int instanceId1 = Integer.parseInt(instance1.getInstanceId());
int instanceId1 = Integer.parseInt(instance.getInstanceId());
int instanceId2 = Integer.parseInt(instance2.getInstanceId());
Assert.assertNotEquals(instanceId1, instanceId2);
}
@Test
public void testUpdatedHealthStatus() {
ReflectionTestUtils.setField(serviceManager, "synchronizer", synchronizer);
String namespaceId = "namespaceId";
String serviceName = "testService";
String serverIp = "127.0.0.1";