Revert "[issue #10148]Replace sync forward request with async request (#10158)" (#10358)

This reverts commit a273705b8d.
This commit is contained in:
杨翊 SionYang 2023-04-20 20:12:33 +08:00 committed by GitHub
parent a273705b8d
commit 0e0a73e025
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 27 additions and 362 deletions

View File

@ -352,29 +352,15 @@ public class NacosAsyncRestTemplate extends AbstractNacosRestTemplate {
responseType, callback);
}
/**
* async general http request.
*
* <p>{@code responseType} can be an RestResult or RestResult data {@code T} type.
*
* <p>{@code callback} Result callback execution,
* if you need response headers, you can convert the received RestResult to HttpRestResult.
*
* @param url url
* @param httpMethod http header param
* @param requestEntity http body param
* @param responseType return type
* @param callback callback {@link Callback#onReceive(com.alibaba.nacos.common.model.RestResult)}
*/
@SuppressWarnings("unchecked")
public <T> void execute(String url, String httpMethod, RequestHttpEntity requestEntity, Type responseType,
private <T> void execute(String url, String httpMethod, RequestHttpEntity requestEntity, Type type,
Callback<T> callback) {
try {
URI uri = HttpUtils.buildUri(url, requestEntity.getQuery());
if (logger.isDebugEnabled()) {
logger.debug("HTTP method: {}, url: {}, body: {}", httpMethod, uri, requestEntity.getBody());
}
ResponseHandler<T> responseHandler = super.selectResponseHandler(responseType);
ResponseHandler<T> responseHandler = super.selectResponseHandler(type);
clientRequest.execute(uri, httpMethod, requestEntity, responseHandler, callback);
} catch (Exception e) {
// When an exception occurs, use Callback to pass it instead of throw it directly.

View File

@ -44,8 +44,6 @@ public class DistroConfig extends AbstractDynamicConfig {
private long loadDataTimeoutMillis = DistroConstants.DEFAULT_DATA_LOAD_TIMEOUT_MILLISECONDS;
private boolean asyncDistroForward = DistroConstants.DEFAULT_ASYNC_DISTRO_FORWARD_VALUE;
private DistroConfig() {
super(DISTRO);
resetConfig();
@ -67,9 +65,6 @@ public class DistroConfig extends AbstractDynamicConfig {
DistroConstants.DEFAULT_DATA_LOAD_RETRY_DELAY_MILLISECONDS);
loadDataTimeoutMillis = EnvUtil.getProperty(DistroConstants.DATA_LOAD_TIMEOUT_MILLISECONDS, Long.class,
DistroConstants.DEFAULT_DATA_LOAD_TIMEOUT_MILLISECONDS);
asyncDistroForward = EnvUtil.getProperty(DistroConstants.NACOS_ASYNC_DISTRO_FORWARD_NAME, Boolean.class,
DistroConstants.DEFAULT_ASYNC_DISTRO_FORWARD_VALUE);
}
public static DistroConfig getInstance() {
@ -132,14 +127,6 @@ public class DistroConfig extends AbstractDynamicConfig {
this.loadDataTimeoutMillis = loadDataTimeoutMillis;
}
public boolean isAsyncDistroForward() {
return asyncDistroForward;
}
public void setAsyncDistroForward(boolean asyncDistroForward) {
this.asyncDistroForward = asyncDistroForward;
}
@Override
protected String printConfig() {
return "DistroConfig{" + "syncDelayMillis=" + syncDelayMillis + ", syncTimeoutMillis=" + syncTimeoutMillis

View File

@ -51,8 +51,4 @@ public class DistroConstants {
public static final long DEFAULT_DATA_LOAD_TIMEOUT_MILLISECONDS = 30000L;
public static final String NACOS_ASYNC_DISTRO_FORWARD_NAME = "nacos.async.distro.forward";
public static final boolean DEFAULT_ASYNC_DISTRO_FORWARD_VALUE = false;
}

View File

@ -138,14 +138,6 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.mock-server</groupId>
<artifactId>mockserver-netty</artifactId>
</dependency>
<dependency>
<groupId>org.mock-server</groupId>
<artifactId>mockserver-client-java</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -109,4 +109,5 @@ public final class Constants {
* Min value of instance weight.
*/
public static final double MIN_WEIGHT_VALUE = 0.00D;
}

View File

@ -25,7 +25,6 @@ import com.alibaba.nacos.common.http.client.NacosRestTemplate;
import com.alibaba.nacos.common.http.param.Header;
import com.alibaba.nacos.common.http.param.MediaType;
import com.alibaba.nacos.common.http.param.Query;
import com.alibaba.nacos.common.model.RequestHttpEntity;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.HttpMethod;
import com.alibaba.nacos.common.utils.VersionUtils;
@ -103,7 +102,11 @@ public class HttpClient {
if (CollectionUtils.isNotEmpty(headers)) {
header.addAll(headers);
}
configDefaultHeaders(header, encoding);
header.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
header.addParam(HttpHeaderConsts.CLIENT_VERSION_HEADER, VersionUtils.version);
header.addParam(HttpHeaderConsts.USER_AGENT_HEADER, UtilsAndCommons.SERVER_VERSION);
header.addParam(HttpHeaderConsts.REQUEST_SOURCE_HEADER, EnvUtil.getLocalAddress());
header.addParam(HttpHeaderConsts.ACCEPT_CHARSET, encoding);
AuthHeaderUtil.addIdentityToHeader(header);
HttpClientConfig httpClientConfig = HttpClientConfig.builder().setConTimeOutMillis(connectTimeout)
@ -119,14 +122,6 @@ public class HttpClient {
return RestResult.<String>builder().withCode(500).withMsg(e.toString()).build();
}
}
private static void configDefaultHeaders(Header header, String encoding) {
header.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
header.addParam(HttpHeaderConsts.CLIENT_VERSION_HEADER, VersionUtils.version);
header.addParam(HttpHeaderConsts.USER_AGENT_HEADER, UtilsAndCommons.SERVER_VERSION);
header.addParam(HttpHeaderConsts.REQUEST_SOURCE_HEADER, EnvUtil.getLocalAddress());
header.addParam(HttpHeaderConsts.ACCEPT_CHARSET, encoding);
}
/**
* Request http get method by async.
@ -207,33 +202,6 @@ public class HttpClient {
}
}
/**
* Do http request by async with request body.
*
* @param url request url
* @param headers request headers
* @param paramValues request params
* @param body request body
* @param callback async callback func
* @param method http method
* @throws Exception exception when request
*/
public static void asyncHttpRequest(String url, List<String> headers, Map<String, String> paramValues, String body,
Callback<String> callback, String method) throws Exception {
Query query = Query.newInstance().initParams(paramValues);
query.addParam(FieldsConstants.ENCODING, ENCODING);
query.addParam(FieldsConstants.NOFIX, NOFIX);
Header header = Header.newInstance();
if (CollectionUtils.isNotEmpty(headers)) {
header.addAll(headers);
}
configDefaultHeaders(header, "UTF-8");
AuthHeaderUtil.addIdentityToHeader(header);
ASYNC_REST_TEMPLATE.execute(url, method, new RequestHttpEntity(header, query, body), String.class, callback);
}
/**
* Request http post method by async with large body.
*

View File

@ -17,12 +17,10 @@
package com.alibaba.nacos.naming.web;
import com.alibaba.nacos.common.constant.HttpHeaderConsts;
import com.alibaba.nacos.common.http.Callback;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.ExceptionUtil;
import com.alibaba.nacos.common.utils.IoUtils;
import com.alibaba.nacos.core.code.ControllerMethodsCache;
import com.alibaba.nacos.core.distributed.distro.DistroConfig;
import com.alibaba.nacos.core.utils.ReuseHttpServletRequest;
import com.alibaba.nacos.core.utils.WebUtils;
import com.alibaba.nacos.naming.core.DistroMapper;
@ -32,7 +30,6 @@ import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.common.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import javax.servlet.AsyncContext;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
@ -120,6 +117,7 @@ public class DistroFilter implements Filter {
}
final String targetServer = distroMapper.mapSrv(distroTag);
List<String> headerList = new ArrayList<>(16);
Enumeration<String> headers = req.getHeaderNames();
while (headers.hasMoreElements()) {
@ -131,10 +129,14 @@ public class DistroFilter implements Filter {
final String body = IoUtils.toString(req.getInputStream(), StandardCharsets.UTF_8.name());
final Map<String, String> paramsValue = HttpClient.translateParameterMap(req.getParameterMap());
if (!DistroConfig.getInstance().isAsyncDistroForward()) {
syncForward(req, resp, urlString, targetServer, headerList, paramsValue, body);
} else {
asyncForward(req, resp, urlString, targetServer, headerList, paramsValue, body);
RestResult<String> result = HttpClient
.request(HTTP_PREFIX + targetServer + req.getRequestURI(), headerList, paramsValue, body,
PROXY_CONNECT_TIMEOUT, PROXY_READ_TIMEOUT, StandardCharsets.UTF_8.name(), req.getMethod());
String data = result.ok() ? result.getData() : result.getMessage();
try {
WebUtils.response(resp, data, result.getCode());
} catch (Exception ignore) {
Loggers.SRV_LOG.warn("[DISTRO-FILTER] request failed: " + distroMapper.mapSrv(distroTag) + urlString);
}
} catch (AccessControlException e) {
resp.sendError(HttpServletResponse.SC_FORBIDDEN, "access denied: " + ExceptionUtil.getAllExceptionMsg(e));
@ -142,60 +144,11 @@ public class DistroFilter implements Filter {
resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED,
"no such api:" + req.getMethod() + ":" + req.getRequestURI());
} catch (Exception e) {
onError(resp, e);
}
}
private void syncForward(ReuseHttpServletRequest req, HttpServletResponse resp, String urlString,
String targetServer, List<String> headerList, Map<String, String> paramsValue, String body) {
RestResult<String> result = HttpClient.request(HTTP_PREFIX + targetServer + req.getRequestURI(), headerList,
paramsValue, body, PROXY_CONNECT_TIMEOUT, PROXY_READ_TIMEOUT, StandardCharsets.UTF_8.name(),
req.getMethod());
String data = result.ok() ? result.getData() : result.getMessage();
try {
WebUtils.response(resp, data, result.getCode());
} catch (Exception ignore) {
Loggers.SRV_LOG.warn("[DISTRO-FILTER] request failed: " + targetServer + urlString);
}
}
private void asyncForward(HttpServletRequest req, HttpServletResponse resp, String urlString, String targetServer,
List<String> headerList, Map<String, String> paramsValue, String body) throws Exception {
final AsyncContext asyncContext = req.startAsync();
asyncContext.setTimeout(PROXY_READ_TIMEOUT);
HttpClient.asyncHttpRequest(HTTP_PREFIX + targetServer + req.getRequestURI(), headerList, paramsValue, body,
new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
String data = result.ok() ? result.getData() : result.getMessage();
try {
WebUtils.response(resp, data, result.getCode());
} catch (Exception ignore) {
Loggers.SRV_LOG.warn("[DISTRO-FILTER] request failed: " + targetServer + urlString);
}
asyncContext.complete();
}
@Override
public void onError(Throwable e) {
DistroFilter.this.onError(resp, e);
asyncContext.complete();
}
@Override
public void onCancel() {
}
}, req.getMethod());
}
private void onError(HttpServletResponse response, Throwable e) {
try {
Loggers.SRV_LOG.warn("[DISTRO-FILTER] Server failed: ", e);
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
"Server failed, " + ExceptionUtil.getAllExceptionMsg(e));
} catch (Exception ignore) {
}
}
@Override

View File

@ -44,7 +44,6 @@ public class NamingConfig {
registration.setFilter(distroFilter());
registration.addUrlPatterns(UTL_PATTERNS);
registration.setName(DISTRO_FILTER);
registration.setAsyncSupported(true);
registration.setOrder(7);
return registration;
}

View File

@ -1,203 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.web;
import com.alibaba.nacos.auth.config.AuthConfigs;
import com.alibaba.nacos.core.code.ControllerMethodsCache;
import com.alibaba.nacos.core.distributed.distro.DistroConfig;
import com.alibaba.nacos.naming.controllers.InstanceController;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockserver.client.server.MockServerClient;
import org.mockserver.integration.ClientAndServer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.mock.env.MockEnvironment;
import org.springframework.mock.web.MockFilterChain;
import org.springframework.mock.web.MockHttpServletRequest;
import org.springframework.mock.web.MockHttpServletResponse;
import org.springframework.util.ReflectionUtils;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
import static org.mockserver.model.HttpRequest.request;
import static org.mockserver.model.HttpResponse.response;
public class DistroFilterTest {
@Mock
private DistroMapper distroMapper;
@Mock
private ControllerMethodsCache controllerMethodsCache;
@Mock
private DistroTagGenerator distroTagGenerator;
@Mock
private ConfigurableApplicationContext context;
@Mock
private AuthConfigs authConfigs;
@InjectMocks
private DistroFilter distroFilter;
private static ClientAndServer mockServer;
@BeforeClass
public static void beforeClass() {
MockEnvironment environment = new MockEnvironment();
EnvUtil.setEnvironment(environment);
EnvUtil.setContextPath("/nacos");
mockServer = ClientAndServer.startClientAndServer(8080);
//mock nacos naming server, and delay 1 seconds
new MockServerClient("127.0.0.1", 8080).when(request().withMethod("POST").withPath("/nacos/v1/ns/instance"))
.respond(response().withStatusCode(200).withBody("ok").withDelay(TimeUnit.SECONDS, 1));
}
@Before
public void setUp() {
MockitoAnnotations.openMocks(this);
ApplicationUtils.injectContext(context);
when(context.getBean(AuthConfigs.class)).thenReturn(authConfigs);
final Method register = ReflectionUtils.findMethod(InstanceController.class, "register",
HttpServletRequest.class);
when(controllerMethodsCache.getMethod(any())).thenReturn(register);
when(distroTagGenerator.getResponsibleTag(any())).thenReturn("tag");
when(distroMapper.responsible(anyString())).thenReturn(false);
when(distroMapper.mapSrv(anyString())).thenReturn("127.0.0.1:8080");
}
@Test
public void givenAsyncCanForwardRequestToTarget() throws ServletException, IOException, InterruptedException {
MockHttpServletRequest request = new MockHttpServletRequest();
request.setMethod("POST");
request.setRequestURI("/nacos/v1/ns/instance");
request.setAsyncSupported(true);
MockHttpServletResponse response = new MockHttpServletResponse();
final MockFilterChain filterChain = new MockFilterChain();
DistroConfig.getInstance().setAsyncDistroForward(true);
distroFilter.doFilter(request, response, filterChain);
final AsyncContext asyncContext = request.getAsyncContext();
Assert.assertNotNull(asyncContext);
CountDownLatch latch = new CountDownLatch(1);
asyncContext.addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent event) throws IOException {
latch.countDown();
}
@Override
public void onTimeout(AsyncEvent event) throws IOException {
latch.countDown();
}
@Override
public void onError(AsyncEvent event) throws IOException {
latch.countDown();
}
@Override
public void onStartAsync(AsyncEvent event) throws IOException {
}
});
latch.await();
Assert.assertEquals("ok", response.getContentAsString());
}
@Test
public void asyncForwardRequestNotBlockTomcatThread() throws InterruptedException {
DistroConfig.getInstance().setAsyncDistroForward(true);
Executor mockTomcatThread = new ThreadPoolExecutor(1, 1, 1, TimeUnit.MINUTES, new SynchronousQueue<>());
for (int i = 0; i < 3; ++i) {
//nacos naming server will block 1 second and then return response
//if use sync forward request, the mockTomcatThread will throw RejectedExecutionException
mockTomcatThread.execute(() -> {
try {
asyncForwardRequest();
} catch (Exception e) {
Assert.assertNull(e);
}
});
Thread.sleep(500);
}
}
private void asyncForwardRequest() throws ServletException, IOException {
MockHttpServletRequest request = new MockHttpServletRequest();
request.setMethod("POST");
request.setRequestURI("/nacos/v1/ns/instance");
request.setAsyncSupported(true);
MockHttpServletResponse response = new MockHttpServletResponse();
final MockFilterChain filterChain = new MockFilterChain();
distroFilter.doFilter(request, response, filterChain);
}
@Test(expected = RejectedExecutionException.class)
public void syncForwardRequestWillBlockTomcatThread() throws InterruptedException {
DistroConfig.getInstance().setAsyncDistroForward(false);
Executor mockTomcatThread = new ThreadPoolExecutor(1, 1, 1, TimeUnit.MINUTES, new SynchronousQueue<>());
for (int i = 0; i < 3; ++i) {
//nacos naming server will block 1 second and then return response
//if use sync forward request, the mockTomcatThread will throw RejectedExecutionException
mockTomcatThread.execute(() -> {
try {
asyncForwardRequest();
} catch (Exception e) {
Assert.assertNull(e);
}
});
Thread.sleep(500);
}
}
@AfterClass
public static void afterClass() {
mockServer.stop();
}
}

13
pom.xml
View File

@ -155,7 +155,6 @@
<jraft-core.version>1.3.12</jraft-core.version>
<rpc-grpc-impl.version>${jraft-core.version}</rpc-grpc-impl.version>
<SnakeYaml.version>2.0</SnakeYaml.version>
<mock-server.version>3.10.8</mock-server.version>
</properties>
<!-- == -->
<!-- =========================================================Build plugins================================================ -->
@ -1004,18 +1003,6 @@
<artifactId>snakeyaml</artifactId>
<version>${SnakeYaml.version}</version>
</dependency>
<dependency>
<groupId>org.mock-server</groupId>
<artifactId>mockserver-netty</artifactId>
<version>${mock-server.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mock-server</groupId>
<artifactId>mockserver-client-java</artifactId>
<version>${mock-server.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -103,11 +103,11 @@ public class EnvUtil {
private static final String NACOS_TEMP_DIR_1 = "data";
private static final String NACOS_TEMP_DIR_2 = "tmp";
private static final String NACOS_CUSTOM_ENVIRONMENT_ENABLED = "nacos.custom.environment.enabled";
private static final String NACOS_CUSTOM_CONFIG_NAME = "customFirstNacosConfig";
@JustForTest
private static String confPath = "";
@ -115,7 +115,7 @@ public class EnvUtil {
private static String nacosHomePath = null;
private static ConfigurableEnvironment environment;
/**
* customEnvironment.
*/
@ -127,13 +127,12 @@ public class EnvUtil {
for (String key : propertyKeys) {
sourcePropertyMap.put(key, getProperty(key, Object.class));
}
Map<String, Object> targetMap = CustomEnvironmentPluginManager.getInstance()
.getCustomValues(sourcePropertyMap);
Map<String, Object> targetMap = CustomEnvironmentPluginManager.getInstance().getCustomValues(sourcePropertyMap);
MutablePropertySources propertySources = environment.getPropertySources();
propertySources.addFirst(new MapPropertySource(NACOS_CUSTOM_CONFIG_NAME, targetMap));
}
}
public static ConfigurableEnvironment getEnvironment() {
return environment;
}
@ -303,8 +302,8 @@ public class EnvUtil {
}
public static float getMem() {
return (float) (1
- OperatingSystemBeanManager.getFreePhysicalMem() / OperatingSystemBeanManager.getTotalPhysicalMem());
return (float) (1 - OperatingSystemBeanManager.getFreePhysicalMem() / OperatingSystemBeanManager
.getTotalPhysicalMem());
}
public static String getConfPath() {