Fix sync connection client not removed.

This commit is contained in:
KomachiSion 2022-10-27 12:11:06 +08:00
parent 0428fe98a3
commit f56dedc7fe
4 changed files with 40 additions and 5 deletions

View File

@ -24,7 +24,10 @@ import com.alibaba.nacos.common.notify.Event;
* @author yanda
*/
public class TraceEvent extends Event {
private String type;
private static final long serialVersionUID = -3065900892505697062L;
private final String type;
private final long eventTime;

View File

@ -74,14 +74,23 @@ public class NacosCombinedTraceSubscriber extends SmartSubscriber {
if (null == subscribers) {
return;
}
TraceEvent traceEvent = (TraceEvent) event;
for (NacosTraceSubscriber each : subscribers) {
try {
each.onEvent((TraceEvent) event);
} catch (Exception ignored) {
if (null != each.executor()) {
each.executor().execute(() -> onEvent0(each, traceEvent));
} else {
onEvent0(each, traceEvent);
}
}
}
private void onEvent0(NacosTraceSubscriber subscriber, TraceEvent event) {
try {
subscriber.onEvent(event);
} catch (Exception ignored) {
}
}
public void shutdown() {
NotifyCenter.deregisterSubscriber(this);
}

View File

@ -29,17 +29,23 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.springframework.test.util.ReflectionTestUtils;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ -105,4 +111,21 @@ public class NacosCombinedTraceSubscriberTest {
verify(mockSubscriber, never()).onEvent(event2);
verify(mockSubscriber2, never()).onEvent(event2);
}
@Test
public void testOnEventWithExecutor() {
Executor executor = mock(Executor.class);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
invocationOnMock.getArgument(0, Runnable.class).run();
return null;
}
}).when(executor).execute(any(Runnable.class));
when(mockSubscriber.executor()).thenReturn(executor);
RegisterInstanceTraceEvent event = new RegisterInstanceTraceEvent(1L, "", true, "", "", "", "", 1);
combinedTraceSubscriber.onEvent(event);
verify(mockSubscriber).onEvent(event);
verify(mockSubscriber2).onEvent(event);
}
}

View File

@ -46,7 +46,7 @@ public class ConnectionBasedClientFactory implements ClientFactory<ConnectionBas
@Override
public ConnectionBasedClient newSyncedClient(String clientId, ClientAttributes attributes) {
long revision = attributes.getClientAttribute(REVISION, 0);
ConnectionBasedClient connectionBasedClient = new ConnectionBasedClient(clientId, true, revision);
ConnectionBasedClient connectionBasedClient = new ConnectionBasedClient(clientId, false, revision);
connectionBasedClient.setAttributes(attributes);
return connectionBasedClient;
}