This reverts commit 95c8bf242b
.
This commit is contained in:
parent
d84d1a86cc
commit
f423d39d10
@ -32,7 +32,6 @@ import com.alibaba.nacos.client.utils.ParamUtil;
|
||||
import com.alibaba.nacos.client.utils.TenantUtil;
|
||||
import com.alibaba.nacos.common.http.HttpRestResult;
|
||||
import com.alibaba.nacos.common.lifecycle.Closeable;
|
||||
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
|
||||
import com.alibaba.nacos.common.utils.ConvertUtils;
|
||||
import com.alibaba.nacos.common.utils.MD5Utils;
|
||||
import com.alibaba.nacos.common.utils.StringUtils;
|
||||
@ -166,7 +165,6 @@ public class ClientWorker implements Closeable {
|
||||
copy.remove(groupKey);
|
||||
cacheMap.set(copy);
|
||||
}
|
||||
reMakeCacheDataTaskId();
|
||||
LOGGER.info("[{}] [unsubscribe] {}", this.agent.getName(), groupKey);
|
||||
|
||||
MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size());
|
||||
@ -179,38 +177,11 @@ public class ClientWorker implements Closeable {
|
||||
copy.remove(groupKey);
|
||||
cacheMap.set(copy);
|
||||
}
|
||||
reMakeCacheDataTaskId();
|
||||
LOGGER.info("[{}] [unsubscribe] {}", agent.getName(), groupKey);
|
||||
|
||||
MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size());
|
||||
}
|
||||
|
||||
/**
|
||||
* Remake cacheData taskId.
|
||||
*/
|
||||
private void reMakeCacheDataTaskId() {
|
||||
int listenerSize = cacheMap.get().size();
|
||||
int remakeTaskId = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
|
||||
if (remakeTaskId < (int) currentLongingTaskCount) {
|
||||
for (int i = 0; i < remakeTaskId; i++) {
|
||||
int count = 0;
|
||||
for (String key : cacheMap.get().keySet()) {
|
||||
if (count == ParamUtil.getPerTaskConfigSize()) {
|
||||
break;
|
||||
}
|
||||
CacheData cacheData = cacheMap.get().get(key);
|
||||
cacheData.setTaskId(i);
|
||||
synchronized (cacheMap) {
|
||||
Map<String, CacheData> copy = new HashMap<String, CacheData>(this.cacheMap.get());
|
||||
copy.put(key, cacheData);
|
||||
cacheMap.set(copy);
|
||||
}
|
||||
count++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add cache data if absent.
|
||||
*
|
||||
@ -277,8 +248,6 @@ public class ClientWorker implements Closeable {
|
||||
cache.setInitializing(true);
|
||||
} else {
|
||||
cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
|
||||
int taskId = cacheMap.get().size() / (int) ParamUtil.getPerTaskConfigSize();
|
||||
cache.setTaskId(taskId);
|
||||
// fix issue # 1317
|
||||
if (enableRemoteSyncConfig) {
|
||||
String[] ct = getServerConfig(dataId, group, tenant, 3000L);
|
||||
@ -425,17 +394,12 @@ public class ClientWorker implements Closeable {
|
||||
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
|
||||
if (longingTaskCount > currentLongingTaskCount) {
|
||||
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
|
||||
taskIdSet.add(i);
|
||||
// The task list is no order.So it maybe has issues when changing.
|
||||
executorService.execute(new LongPollingRunnable(i));
|
||||
}
|
||||
} else if (longingTaskCount < currentLongingTaskCount) {
|
||||
for (int i = longingTaskCount; i < (int) currentLongingTaskCount; i++) {
|
||||
taskIdSet.remove(i);
|
||||
}
|
||||
}
|
||||
currentLongingTaskCount = longingTaskCount;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch the dataId list from server.
|
||||
@ -692,9 +656,7 @@ public class ClientWorker implements Closeable {
|
||||
}
|
||||
inInitializingCacheList.clear();
|
||||
|
||||
if (taskIdSet.contains(taskId)) {
|
||||
executorService.execute(this);
|
||||
}
|
||||
|
||||
} catch (Throwable e) {
|
||||
|
||||
@ -723,11 +685,6 @@ public class ClientWorker implements Closeable {
|
||||
private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(
|
||||
new HashMap<String, CacheData>());
|
||||
|
||||
/**
|
||||
* Store the running taskId.
|
||||
*/
|
||||
private final ConcurrentHashSet<Integer> taskIdSet = new ConcurrentHashSet<Integer>();
|
||||
|
||||
private final HttpAgent agent;
|
||||
|
||||
private final ConfigFilterChainManager configFilterChainManager;
|
||||
|
@ -1,132 +0,0 @@
|
||||
/*
|
||||
* Copyright 1999-2018 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.client.config.listener.impl;
|
||||
|
||||
import com.alibaba.nacos.api.config.listener.Listener;
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.alibaba.nacos.client.config.filter.impl.ConfigFilterChainManager;
|
||||
import com.alibaba.nacos.client.config.http.MetricsHttpAgent;
|
||||
import com.alibaba.nacos.client.config.impl.ClientWorker;
|
||||
import com.alibaba.nacos.client.utils.ParamUtil;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Modifier;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class ClientWorkerTest {
|
||||
|
||||
@Mock
|
||||
ScheduledExecutorService scheduledExecutorService;
|
||||
|
||||
private ClientWorker clientWorker;
|
||||
|
||||
private List<Listener> listeners;
|
||||
|
||||
private final String dataId = "data";
|
||||
|
||||
private final String group = "group";
|
||||
|
||||
private final String currentLongingTaskCount = "currentLongingTaskCount";
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
clientWorker = new ClientWorker(mock(MetricsHttpAgent.class), mock(ConfigFilterChainManager.class),
|
||||
mock(Properties.class));
|
||||
try {
|
||||
Field executorServiceField = clientWorker.getClass().getDeclaredField("executorService");
|
||||
executorServiceField.setAccessible(true);
|
||||
Field modifiersField = Field.class.getDeclaredField("modifiers");
|
||||
modifiersField.setAccessible(true);
|
||||
modifiersField.setInt(executorServiceField, executorServiceField.getModifiers() & ~Modifier.FINAL);
|
||||
executorServiceField.set(clientWorker, scheduledExecutorService);
|
||||
Listener listener = new Listener() {
|
||||
@Override
|
||||
public Executor getExecutor() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void receiveConfigInfo(String configInfo) {
|
||||
|
||||
}
|
||||
};
|
||||
listeners = Arrays.asList(listener);
|
||||
} catch (NoSuchFieldException e) {
|
||||
e.printStackTrace();
|
||||
} catch (IllegalAccessException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddLongPollNumberThreads() {
|
||||
try {
|
||||
for (int i = 0; i < ParamUtil.getPerTaskConfigSize(); i++) {
|
||||
clientWorker.addTenantListeners(dataId + i, group, listeners);
|
||||
}
|
||||
Field currentLongingTaskCountField = clientWorker.getClass().getDeclaredField(currentLongingTaskCount);
|
||||
currentLongingTaskCountField.setAccessible(true);
|
||||
Assert.assertEquals(currentLongingTaskCount, (int) currentLongingTaskCountField.getDouble(clientWorker), 1);
|
||||
for (int i = (int) ParamUtil.getPerTaskConfigSize(); i < ParamUtil.getPerTaskConfigSize() * 2; i++) {
|
||||
clientWorker.addTenantListeners(dataId + i, group, listeners);
|
||||
}
|
||||
Assert.assertEquals(currentLongingTaskCount, (int) currentLongingTaskCountField.getDouble(clientWorker), 2);
|
||||
} catch (NacosException e) {
|
||||
e.printStackTrace();
|
||||
} catch (IllegalAccessException e) {
|
||||
e.printStackTrace();
|
||||
} catch (NoSuchFieldException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReduceLongPollNumberThreads() {
|
||||
try {
|
||||
for (int i = 0; i < ParamUtil.getPerTaskConfigSize() * 3; i++) {
|
||||
clientWorker.addTenantListeners(dataId + i, group, listeners);
|
||||
}
|
||||
Field currentLongingTaskCountField = clientWorker.getClass().getDeclaredField(currentLongingTaskCount);
|
||||
currentLongingTaskCountField.setAccessible(true);
|
||||
Assert.assertEquals(currentLongingTaskCount, (int) currentLongingTaskCountField.getDouble(clientWorker), 3);
|
||||
|
||||
for (int i = (int) ParamUtil.getPerTaskConfigSize(); i < ParamUtil.getPerTaskConfigSize() * 2; i++) {
|
||||
clientWorker.removeTenantListener(dataId + i, group, listeners.get(0));
|
||||
}
|
||||
Assert.assertEquals(currentLongingTaskCount, (int) currentLongingTaskCountField.getDouble(clientWorker), 2);
|
||||
} catch (NacosException e) {
|
||||
e.printStackTrace();
|
||||
} catch (IllegalAccessException e) {
|
||||
e.printStackTrace();
|
||||
} catch (NoSuchFieldException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user