enhancement: CloseableHttpAsyncClient does not clean up expired and i… (#9727)

* enhancement: CloseableHttpAsyncClient does not clean up expired and invalid connections (#9708)

* enhancement: CloseableHttpAsyncClient does not clean up expired and invalid connections (#9708)

* style:satisfy style/NacosCheckStyle.xml

* add unit test(#9708)

* Revert "add unit test(#9708)"

This reverts commit 939905cb56f30af4d0ec35543d6d630f0777eb98.

* add unit test
This commit is contained in:
zz630 2023-01-05 10:04:27 +08:00 committed by GitHub
parent 34ba3818ce
commit 20145fd256
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 110 additions and 7 deletions

View File

@ -84,6 +84,8 @@ public abstract class AbstractHttpClientFactory implements HttpClientFactory {
final HttpClientConfig originalRequestConfig = buildHttpClientConfig();
final DefaultConnectingIOReactor ioreactor = getIoReactor(AYNC_IO_REACTOR_NAME);
final RequestConfig defaultConfig = getRequestConfig();
final NHttpClientConnectionManager connectionManager = getConnectionManager(originalRequestConfig, ioreactor);
monitorAndExtension(connectionManager);
return new NacosAsyncRestTemplate(assignLogger(), new DefaultAsyncHttpClientRequest(
HttpAsyncClients.custom().addInterceptorLast(new RequestContent(true))
.setThreadFactory(new NameThreadFactory(ASYNC_THREAD_NAME))
@ -91,7 +93,7 @@ public abstract class AbstractHttpClientFactory implements HttpClientFactory {
.setMaxConnTotal(originalRequestConfig.getMaxConnTotal())
.setMaxConnPerRoute(originalRequestConfig.getMaxConnPerRoute())
.setUserAgent(originalRequestConfig.getUserAgent())
.setConnectionManager(getConnectionManager(originalRequestConfig, ioreactor)).build(),
.setConnectionManager(connectionManager).build(),
ioreactor, defaultConfig));
}
@ -218,4 +220,10 @@ public abstract class AbstractHttpClientFactory implements HttpClientFactory {
* @return Logger
*/
protected abstract Logger assignLogger();
/**
* add some monitor and do some extension. default empty implementation, implemented by subclass
*/
protected void monitorAndExtension(NHttpClientConnectionManager connectionManager) {
}
}

View File

@ -83,6 +83,10 @@ public class GlobalExecutor {
private static final ExecutorService PUSH_CALLBACK_EXECUTOR = ExecutorFactory.Managed
.newSingleExecutorService("Push", new NameThreadFactory("com.alibaba.nacos.naming.push.callback"));
private static final ScheduledExecutorService MONITOR_HEALTH_CHECK_POOL_EXECUTOR = ExecutorFactory.Managed
.newScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),
1, new NameThreadFactory("com.alibaba.nacos.naming.health-check-pool"));
public static void registerServerStatusUpdater(Runnable runnable) {
NAMING_TIMER_EXECUTOR.scheduleAtFixedRate(runnable, 0, SERVER_STATUS_UPDATE_PERIOD, TimeUnit.MILLISECONDS);
}
@ -140,4 +144,9 @@ public class GlobalExecutor {
public static ExecutorService getCallbackExecutor() {
return PUSH_CALLBACK_EXECUTOR;
}
public static ScheduledFuture<?> scheduleMonitorHealthCheckPool(Runnable runnable, long initialDelay, long delay,
TimeUnit unit) {
return MONITOR_HEALTH_CHECK_POOL_EXECUTOR.scheduleWithFixedDelay(runnable, initialDelay, delay, unit);
}
}

View File

@ -16,6 +16,17 @@
package com.alibaba.nacos.naming.misc;
import static com.alibaba.nacos.naming.misc.Loggers.SRV_LOG;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.nio.conn.NHttpClientConnectionManager;
import org.apache.http.pool.PoolStats;
import org.slf4j.Logger;
import com.alibaba.nacos.common.http.AbstractApacheHttpClientFactory;
import com.alibaba.nacos.common.http.AbstractHttpClientFactory;
import com.alibaba.nacos.common.http.HttpClientBeanHolder;
@ -26,11 +37,6 @@ import com.alibaba.nacos.common.http.client.NacosRestTemplate;
import com.alibaba.nacos.common.utils.ExceptionUtil;
import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.slf4j.Logger;
import java.util.concurrent.TimeUnit;
import static com.alibaba.nacos.naming.misc.Loggers.SRV_LOG;
/**
* http Manager.
@ -155,7 +161,7 @@ public class HttpClientManager {
}
}
private static class ProcessorHttpClientFactory extends AbstractHttpClientFactory {
public static class ProcessorHttpClientFactory extends AbstractHttpClientFactory {
@Override
protected HttpClientConfig buildHttpClientConfig() {
@ -168,5 +174,52 @@ public class HttpClientManager {
protected Logger assignLogger() {
return SRV_LOG;
}
@Override
protected void monitorAndExtension(NHttpClientConnectionManager connectionManager) {
GlobalExecutor.scheduleMonitorHealthCheckPool(new MonitorHealthCheckPool(connectionManager), 60, 60, TimeUnit.SECONDS);
}
}
private static class MonitorHealthCheckPool implements Runnable {
private NHttpClientConnectionManager connectionManager;
public MonitorHealthCheckPool(NHttpClientConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}
@Override
public void run() {
closeExpiredAndIdleConnections();
monitor();
}
private void monitor() {
try {
PoolingNHttpClientConnectionManager manager = (PoolingNHttpClientConnectionManager) connectionManager;
// Get the status of each route
Set<HttpRoute> routes = manager.getRoutes();
if (routes != null && !routes.isEmpty()) {
for (HttpRoute httpRoute : routes) {
PoolStats stats = manager.getStats(httpRoute);
SRV_LOG.debug("connectionManager every route: {}", stats);
}
}
// Get the connection pool status of all routes
PoolStats totalStats = manager.getTotalStats();
SRV_LOG.debug("connectionManager total status: {}", totalStats);
} catch (Exception e) {
SRV_LOG.warn("MonitorHealthCheckPool monitor warn", e);
}
}
private void closeExpiredAndIdleConnections() {
try {
connectionManager.closeExpiredConnections();
connectionManager.closeIdleConnections(CON_TIME_OUT_MILLIS * 10, TimeUnit.SECONDS);
} catch (Exception e) {
SRV_LOG.warn("MonitorHealthCheckPool clean warn", e);
}
}
}
}

View File

@ -20,11 +20,15 @@ import com.alibaba.nacos.Nacos;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.http.Callback;
import com.alibaba.nacos.common.http.HttpClientBeanHolder;
import com.alibaba.nacos.common.http.HttpClientFactory;
import com.alibaba.nacos.common.http.HttpRestResult;
import com.alibaba.nacos.common.http.client.NacosAsyncRestTemplate;
import com.alibaba.nacos.common.http.param.Header;
import com.alibaba.nacos.common.http.param.Query;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.naming.misc.HttpClientManager.ProcessorHttpClientFactory;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.FixMethodOrder;
@ -32,8 +36,11 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.MethodSorters;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.Environment;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.HashMap;
@ -57,13 +64,23 @@ public class NacosAsyncRestTemplate_ITCase {
private NacosAsyncRestTemplate nacosRestTemplate = HttpClientBeanHolder
.getNacosAsyncRestTemplate(LoggerFactory.getLogger(NacosAsyncRestTemplate_ITCase.class));
private static final HttpClientFactory PROCESSOR_ASYNC_HTTP_CLIENT_FACTORY = new ProcessorHttpClientFactory();
private NacosAsyncRestTemplate processorRestTemplate = null;
private final String CONFIG_INSTANCE_PATH = "/nacos/v1/ns";
private String IP = null;
@Autowired
private Environment environment;
@Before
public void init() throws NacosException {
IP = String.format("http://localhost:%d", port);
EnvUtil.setEnvironment((ConfigurableEnvironment) environment);
processorRestTemplate = HttpClientBeanHolder
.getNacosAsyncRestTemplate(PROCESSOR_ASYNC_HTTP_CLIENT_FACTORY);
}
private class CallbackMap<T> implements Callback<T> {
@ -112,6 +129,22 @@ public class NacosAsyncRestTemplate_ITCase {
Assert.assertTrue(restResult.ok());
}
@Test
public void test_url_post_form_by_processor() throws Exception {
String url = IP + CONFIG_INSTANCE_PATH + "/instance";
Map<String, String> param = new HashMap<>();
param.put("serviceName", "app-test2");
param.put("port", "8080");
param.put("ip", "11.11.11.11");
CallbackMap<String> callbackMap = new CallbackMap<>();
processorRestTemplate.postForm(url, Header.newInstance(), Query.newInstance(), param, String.class, callbackMap);
Thread.sleep(2000);
HttpRestResult<String> restResult = callbackMap.getRestResult();
System.out.println(restResult.getData());
System.out.println(restResult.getHeader());
Assert.assertTrue(restResult.ok());
}
@Test
public void test_url_put_form() throws Exception {
String url = IP + CONFIG_INSTANCE_PATH + "/instance";