[ISSUE#8481]Unified TRACE capacity building (#8521)
This commit is contained in:
parent
10e0490359
commit
a7d8066e41
@ -0,0 +1,42 @@
|
||||
/*
|
||||
* Copyright 1999-2018 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.common.trace;
|
||||
|
||||
/**
|
||||
* The reasons of deregister instance.
|
||||
*
|
||||
* @author yanda
|
||||
*/
|
||||
public enum DeregisterInstanceReason {
|
||||
/**
|
||||
* client initiates request.
|
||||
*/
|
||||
REQUEST,
|
||||
/**
|
||||
* Instance native disconnected.
|
||||
*/
|
||||
NATIVE_DISCONNECTED,
|
||||
/**
|
||||
* Instance synced disconnected.
|
||||
*/
|
||||
SYNCED_DISCONNECTED,
|
||||
/**
|
||||
* Instance heart beat timeout expire.
|
||||
*/
|
||||
HEARTBEAT_EXPIRE,
|
||||
|
||||
}
|
@ -0,0 +1,42 @@
|
||||
/*
|
||||
* Copyright 1999-2018 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.common.trace;
|
||||
|
||||
/**
|
||||
* The reasons of health state change.
|
||||
*
|
||||
* @author yanda
|
||||
*/
|
||||
|
||||
public enum HealthStateChangeReason {
|
||||
/**
|
||||
* Instance heart beat timeout.
|
||||
*/
|
||||
HEARTBEAT_TIMEOUT,
|
||||
/**
|
||||
* Instance heart beat refresh.
|
||||
*/
|
||||
HEARTBEAT_REFRESH,
|
||||
/**
|
||||
* Instance health check fail.
|
||||
*/
|
||||
HEALTH_CHECK_FAIL,
|
||||
/**
|
||||
* Instance health check success.
|
||||
*/
|
||||
HEALTH_CHECK_SUCCESS;
|
||||
}
|
@ -0,0 +1,258 @@
|
||||
/*
|
||||
* Copyright 1999-2018 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.common.trace.event;
|
||||
|
||||
import com.alibaba.nacos.common.trace.DeregisterInstanceReason;
|
||||
import com.alibaba.nacos.common.trace.HealthStateChangeReason;
|
||||
|
||||
/**
|
||||
* Naming trace event.
|
||||
*
|
||||
* @author yanda
|
||||
*/
|
||||
public class NamingTraceEvent extends TraceEvent {
|
||||
|
||||
private static final long serialVersionUID = 2923077640400851816L;
|
||||
|
||||
public NamingTraceEvent(long eventTime, String serviceNamespace, String serviceGroup, String name) {
|
||||
super(eventTime, serviceNamespace, serviceGroup, name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Naming register instance trace event.
|
||||
*/
|
||||
public static class RegisterInstanceTraceEvent extends NamingTraceEvent {
|
||||
|
||||
private static final long serialVersionUID = -8283438151444483864L;
|
||||
|
||||
private final String clientIp;
|
||||
|
||||
private final boolean rpc;
|
||||
|
||||
private final String instanceIp;
|
||||
|
||||
public String getClientIp() {
|
||||
return clientIp;
|
||||
}
|
||||
|
||||
public boolean isRpc() {
|
||||
return rpc;
|
||||
}
|
||||
|
||||
public String getInstanceIp() {
|
||||
return instanceIp;
|
||||
}
|
||||
|
||||
public RegisterInstanceTraceEvent(long eventTime, String clientIp, boolean rpc, String serviceNamespace,
|
||||
String serviceGroup, String serviceName, String instanceIp) {
|
||||
super(eventTime, serviceNamespace, serviceGroup, serviceName);
|
||||
this.clientIp = clientIp;
|
||||
this.rpc = rpc;
|
||||
this.instanceIp = instanceIp;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Naming deregister instance trace event.
|
||||
*/
|
||||
public static class DeregisterInstanceTraceEvent extends NamingTraceEvent {
|
||||
|
||||
private static final long serialVersionUID = 3850573686472190256L;
|
||||
|
||||
private final String clientIp;
|
||||
|
||||
private final boolean rpc;
|
||||
|
||||
private final String instanceIp;
|
||||
|
||||
public final DeregisterInstanceReason reason;
|
||||
|
||||
public String getClientIp() {
|
||||
return clientIp;
|
||||
}
|
||||
|
||||
public boolean isRpc() {
|
||||
return rpc;
|
||||
}
|
||||
|
||||
public String getInstanceIp() {
|
||||
return instanceIp;
|
||||
}
|
||||
|
||||
public DeregisterInstanceReason getReason() {
|
||||
return reason;
|
||||
}
|
||||
|
||||
public DeregisterInstanceTraceEvent(long eventTime, String clientIp, boolean rpc, DeregisterInstanceReason reason,
|
||||
String serviceNamespace, String serviceGroup, String serviceName, String instanceIp) {
|
||||
super(eventTime, serviceNamespace, serviceGroup, serviceName);
|
||||
this.clientIp = clientIp;
|
||||
this.reason = reason;
|
||||
this.rpc = rpc;
|
||||
this.instanceIp = instanceIp;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Naming deregister service trace event.
|
||||
*/
|
||||
public static class RegisterServiceTraceEvent extends NamingTraceEvent {
|
||||
|
||||
private static final long serialVersionUID = -8568231862586636388L;
|
||||
|
||||
public RegisterServiceTraceEvent(long eventTime, String serviceNamespace,
|
||||
String serviceGroup, String serviceName) {
|
||||
super(eventTime, serviceNamespace, serviceGroup, serviceName);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Naming deregister service trace event.
|
||||
*/
|
||||
public static class DeregisterServiceTraceEvent extends NamingTraceEvent {
|
||||
|
||||
private static final long serialVersionUID = 7358195336881398548L;
|
||||
|
||||
public DeregisterServiceTraceEvent(long eventTime, String serviceNamespace,
|
||||
String serviceGroup, String serviceName) {
|
||||
super(eventTime, serviceNamespace, serviceGroup, serviceName);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Naming subscribe service trace event.
|
||||
*/
|
||||
public static class SubscribeServiceTraceEvent extends NamingTraceEvent {
|
||||
|
||||
private static final long serialVersionUID = -8856834879168816801L;
|
||||
|
||||
private final String clientIp;
|
||||
|
||||
public String getClientIp() {
|
||||
return clientIp;
|
||||
}
|
||||
|
||||
public SubscribeServiceTraceEvent(long eventTime, String clientIp, String serviceNamespace,
|
||||
String serviceGroup, String serviceName) {
|
||||
super(eventTime, serviceNamespace, serviceGroup, serviceName);
|
||||
this.clientIp = clientIp;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Naming unsubscribe service trace event.
|
||||
*/
|
||||
public static class UnsubscribeServiceTraceEvent extends NamingTraceEvent {
|
||||
|
||||
private static final long serialVersionUID = -7461808613817897106L;
|
||||
|
||||
private final String clientIp;
|
||||
|
||||
public String getClientIp() {
|
||||
return clientIp;
|
||||
}
|
||||
|
||||
public UnsubscribeServiceTraceEvent(long eventTime, String clientIp, String serviceNamespace,
|
||||
String serviceGroup, String serviceName) {
|
||||
super(eventTime, serviceNamespace, serviceGroup, serviceName);
|
||||
this.clientIp = clientIp;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Naming push service trace event.
|
||||
*/
|
||||
public static class PushServiceTraceEvent extends NamingTraceEvent {
|
||||
|
||||
private static final long serialVersionUID = 787915741281241877L;
|
||||
|
||||
private final String clientIp;
|
||||
|
||||
private final int instanceSize;
|
||||
|
||||
private final long pushCostTimeForNetWork;
|
||||
|
||||
private final long pushCostTimeForAll;
|
||||
|
||||
private final long serviceLevelAgreementTime;
|
||||
|
||||
public String getClientIp() {
|
||||
return clientIp;
|
||||
}
|
||||
|
||||
public int getInstanceSize() {
|
||||
return instanceSize;
|
||||
}
|
||||
|
||||
public long getPushCostTimeForNetWork() {
|
||||
return pushCostTimeForNetWork;
|
||||
}
|
||||
|
||||
public long getPushCostTimeForAll() {
|
||||
return pushCostTimeForAll;
|
||||
}
|
||||
|
||||
public long getServiceLevelAgreementTime() {
|
||||
return serviceLevelAgreementTime;
|
||||
}
|
||||
|
||||
public PushServiceTraceEvent(long eventTime, long pushCostTimeForNetWork, long pushCostTimeForAll,
|
||||
long serviceLevelAgreementTime, String clientIp, String serviceNamespace,
|
||||
String serviceGroup, String serviceName, int instanceSize) {
|
||||
super(eventTime, serviceNamespace, serviceGroup, serviceName);
|
||||
this.clientIp = clientIp;
|
||||
this.instanceSize = instanceSize;
|
||||
this.pushCostTimeForAll = pushCostTimeForAll;
|
||||
this.pushCostTimeForNetWork = pushCostTimeForNetWork;
|
||||
this.serviceLevelAgreementTime = serviceLevelAgreementTime;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Naming instance http heartbeat timeout trace event.
|
||||
*/
|
||||
public static class HealthStateChangeTraceEvent extends NamingTraceEvent {
|
||||
|
||||
private static final long serialVersionUID = 6966396191118694597L;
|
||||
|
||||
private final String instanceIp;
|
||||
|
||||
private boolean isHealthy;
|
||||
|
||||
private HealthStateChangeReason reason;
|
||||
|
||||
public String getInstanceIp() {
|
||||
return instanceIp;
|
||||
}
|
||||
|
||||
public boolean isHealthy() {
|
||||
return isHealthy;
|
||||
}
|
||||
|
||||
public HealthStateChangeReason getReason() {
|
||||
return reason;
|
||||
}
|
||||
|
||||
public HealthStateChangeTraceEvent(long eventTime, String serviceNamespace, String serviceGroup,
|
||||
String serviceName, String instanceIp, boolean isHealthy, HealthStateChangeReason reason) {
|
||||
super(eventTime, serviceNamespace, serviceGroup, serviceName);
|
||||
this.instanceIp = instanceIp;
|
||||
this.isHealthy = isHealthy;
|
||||
this.reason = reason;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,57 @@
|
||||
/*
|
||||
* Copyright 1999-2018 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.common.trace.event;
|
||||
|
||||
import com.alibaba.nacos.common.notify.Event;
|
||||
|
||||
/**
|
||||
* Trace event.
|
||||
*
|
||||
* @author yanda
|
||||
*/
|
||||
public class TraceEvent extends Event {
|
||||
private final long eventTime;
|
||||
|
||||
private final String namespace;
|
||||
|
||||
private final String group;
|
||||
|
||||
private final String name;
|
||||
|
||||
public long getEventTime() {
|
||||
return eventTime;
|
||||
}
|
||||
|
||||
public String getNamespace() {
|
||||
return namespace;
|
||||
}
|
||||
|
||||
public String getGroup() {
|
||||
return group;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public TraceEvent(long eventTime, String namespace, String group, String name) {
|
||||
this.eventTime = eventTime;
|
||||
this.namespace = namespace;
|
||||
this.group = group;
|
||||
this.name = name;
|
||||
}
|
||||
}
|
@ -0,0 +1,195 @@
|
||||
/*
|
||||
* Copyright 1999-2020 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.common.trace.publisher;
|
||||
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.alibaba.nacos.common.notify.Event;
|
||||
import com.alibaba.nacos.common.notify.ShardedEventPublisher;
|
||||
import com.alibaba.nacos.common.notify.listener.Subscriber;
|
||||
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
|
||||
import com.alibaba.nacos.common.utils.ThreadUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
/**
|
||||
* Event publisher for trace event.
|
||||
*
|
||||
* @author yanda
|
||||
*/
|
||||
public class TraceEventPublisher extends Thread implements ShardedEventPublisher {
|
||||
|
||||
private static final String THREAD_NAME = "trace.publisher-";
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger("com.alibaba.nacos.common.trace.publisher");
|
||||
|
||||
private static final int DEFAULT_WAIT_TIME = 60;
|
||||
|
||||
private final Map<Class<? extends Event>, Set<Subscriber<? extends Event>>> subscribes = new ConcurrentHashMap<>();
|
||||
|
||||
private volatile boolean initialized = false;
|
||||
|
||||
private volatile boolean shutdown = false;
|
||||
|
||||
private int queueMaxSize = -1;
|
||||
|
||||
private BlockingQueue<Event> queue;
|
||||
|
||||
private String publisherName;
|
||||
|
||||
@Override
|
||||
public void init(Class<? extends Event> type, int bufferSize) {
|
||||
this.queueMaxSize = bufferSize;
|
||||
this.queue = new ArrayBlockingQueue<>(bufferSize);
|
||||
this.publisherName = type.getSimpleName();
|
||||
super.setName(THREAD_NAME + this.publisherName);
|
||||
super.setDaemon(true);
|
||||
super.start();
|
||||
initialized = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long currentEventSize() {
|
||||
return this.queue.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addSubscriber(Subscriber subscriber) {
|
||||
addSubscriber(subscriber, subscriber.subscribeType());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType) {
|
||||
subscribes.computeIfAbsent(subscribeType, inputType -> new ConcurrentHashSet<>());
|
||||
subscribes.get(subscribeType).add(subscriber);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeSubscriber(Subscriber subscriber) {
|
||||
removeSubscriber(subscriber, subscriber.subscribeType());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType) {
|
||||
subscribes.computeIfPresent(subscribeType, (inputType, subscribers) -> {
|
||||
subscribers.remove(subscriber);
|
||||
return subscribers.isEmpty() ? null : subscribers;
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean publish(Event event) {
|
||||
checkIsStart();
|
||||
boolean success = this.queue.offer(event);
|
||||
if (!success) {
|
||||
LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
|
||||
handleEvent(event);
|
||||
return true;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifySubscriber(Subscriber subscriber, Event event) {
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
|
||||
}
|
||||
final Runnable job = () -> subscriber.onEvent(event);
|
||||
final Executor executor = subscriber.executor();
|
||||
if (executor != null) {
|
||||
executor.execute(job);
|
||||
} else {
|
||||
try {
|
||||
job.run();
|
||||
} catch (Throwable e) {
|
||||
LOGGER.error("Event callback exception: ", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() throws NacosException {
|
||||
this.shutdown = true;
|
||||
this.queue.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
waitSubscriberForInit();
|
||||
handleEvents();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Trace Event Publisher {}, stop to handle event due to unexpected exception: ",
|
||||
this.publisherName, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void waitSubscriberForInit() {
|
||||
// To ensure that messages are not lost, enable EventHandler when
|
||||
// waiting for the first Subscriber to register
|
||||
for (int waitTimes = DEFAULT_WAIT_TIME; waitTimes > 0; waitTimes--) {
|
||||
if (shutdown || !subscribes.isEmpty()) {
|
||||
break;
|
||||
}
|
||||
ThreadUtils.sleep(1000L);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleEvents() {
|
||||
while (!shutdown) {
|
||||
try {
|
||||
final Event event = queue.take();
|
||||
handleEvent(event);
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.warn("Trace Event Publisher {} take event from queue failed:", this.publisherName, e);
|
||||
// set the interrupted flag
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void handleEvent(Event event) {
|
||||
Class<? extends Event> eventType = event.getClass();
|
||||
Set<Subscriber<? extends Event>> subscribers = subscribes.get(eventType);
|
||||
if (null == subscribers) {
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("[NotifyCenter] No subscribers for slow event {}", eventType.getName());
|
||||
}
|
||||
return;
|
||||
}
|
||||
for (Subscriber subscriber : subscribers) {
|
||||
notifySubscriber(subscriber, event);
|
||||
}
|
||||
}
|
||||
|
||||
void checkIsStart() {
|
||||
if (!initialized) {
|
||||
throw new IllegalStateException("Publisher does not start");
|
||||
}
|
||||
}
|
||||
|
||||
public String getStatus() {
|
||||
return String.format("Publisher %-30s: shutdown=%5s, queue=%7d/%-7d", publisherName, shutdown,
|
||||
currentEventSize(), queueMaxSize);
|
||||
}
|
||||
}
|
@ -0,0 +1,65 @@
|
||||
/*
|
||||
* Copyright 1999-2020 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.common.trace.publisher;
|
||||
|
||||
import com.alibaba.nacos.common.notify.Event;
|
||||
import com.alibaba.nacos.common.notify.EventPublisher;
|
||||
import com.alibaba.nacos.common.notify.EventPublisherFactory;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* event publisher factory for trace event.
|
||||
*
|
||||
* @author yanda
|
||||
*/
|
||||
|
||||
public class TraceEventPublisherFactory implements EventPublisherFactory {
|
||||
private static final TraceEventPublisherFactory INSTANCE = new TraceEventPublisherFactory();
|
||||
|
||||
private final Map<Class<? extends Event>, TraceEventPublisher> publisher;
|
||||
|
||||
private TraceEventPublisherFactory() {
|
||||
publisher = new ConcurrentHashMap<>();
|
||||
}
|
||||
|
||||
public static TraceEventPublisherFactory getInstance() {
|
||||
return INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventPublisher apply(final Class<? extends Event> eventType, final Integer maxQueueSize) {
|
||||
// Like ClientEvent$ClientChangeEvent cache by ClientEvent
|
||||
Class<? extends Event> cachedEventType =
|
||||
eventType.isMemberClass() ? (Class<? extends Event>) eventType.getEnclosingClass() : eventType;
|
||||
publisher.computeIfAbsent(cachedEventType, eventClass -> {
|
||||
TraceEventPublisher result = new TraceEventPublisher();
|
||||
result.init(eventClass, maxQueueSize);
|
||||
return result;
|
||||
});
|
||||
return publisher.get(cachedEventType);
|
||||
}
|
||||
|
||||
public String getAllPublisherStatues() {
|
||||
StringBuilder result = new StringBuilder("Trace event publisher statues:\n");
|
||||
for (TraceEventPublisher each : publisher.values()) {
|
||||
result.append('\t').append(each.getStatus()).append('\n');
|
||||
}
|
||||
return result.toString();
|
||||
}
|
||||
}
|
@ -0,0 +1,62 @@
|
||||
/*
|
||||
* Copyright 1999-2018 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.common.trace.publisher;
|
||||
|
||||
import com.alibaba.nacos.common.notify.EventPublisher;
|
||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
public class TraceEventPublisherFactoryTest {
|
||||
private Map<String, EventPublisher> originalEventPublisherMap;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
originalEventPublisherMap = new HashMap<>(NotifyCenter.getPublisherMap());
|
||||
NotifyCenter.getPublisherMap().clear();
|
||||
// Protect other unit test publisher affect this case.
|
||||
Field field = TraceEventPublisherFactory.class.getDeclaredField("publisher");
|
||||
field.setAccessible(true);
|
||||
Map map = (Map) field.get(TraceEventPublisherFactory.getInstance());
|
||||
map.clear();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
NotifyCenter.getPublisherMap().clear();
|
||||
NotifyCenter.getPublisherMap().putAll(originalEventPublisherMap);
|
||||
originalEventPublisherMap = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testApply() {
|
||||
TraceEventPublisherFactory.getInstance().apply(TraceTestEvent.TraceTestEvent1.class, Byte.SIZE);
|
||||
TraceEventPublisherFactory.getInstance().apply(TraceTestEvent.TraceTestEvent2.class, Byte.SIZE);
|
||||
TraceEventPublisherFactory.getInstance().apply(TraceTestEvent.class, Byte.SIZE);
|
||||
String expectedStatus = "Trace event publisher statues:\n"
|
||||
+ "\tPublisher TraceTestEvent : shutdown=false, queue= 0/8 \n";
|
||||
assertThat(TraceEventPublisherFactory.getInstance().getAllPublisherStatues(), is(expectedStatus));
|
||||
}
|
||||
}
|
@ -0,0 +1,113 @@
|
||||
/*
|
||||
* Copyright 1999-2018 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.common.trace.publisher;
|
||||
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.alibaba.nacos.common.notify.listener.SmartSubscriber;
|
||||
import com.alibaba.nacos.common.notify.listener.Subscriber;
|
||||
import com.alibaba.nacos.common.utils.ThreadUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class TraceEventPublisherTest {
|
||||
|
||||
@Mock
|
||||
private Subscriber subscriber;
|
||||
|
||||
@Mock
|
||||
private SmartSubscriber smartSubscriber;
|
||||
|
||||
private TraceEventPublisher traceEventPublisher;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
traceEventPublisher = new TraceEventPublisher();
|
||||
traceEventPublisher.init(TraceTestEvent.class, Byte.SIZE);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
traceEventPublisher.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddSubscriber() {
|
||||
traceEventPublisher.addSubscriber(subscriber, TraceTestEvent.TraceTestEvent1.class);
|
||||
traceEventPublisher.addSubscriber(smartSubscriber, TraceTestEvent.TraceTestEvent2.class);
|
||||
TraceTestEvent.TraceTestEvent1 traceTestEvent1 = new TraceTestEvent.TraceTestEvent1();
|
||||
TraceTestEvent.TraceTestEvent2 traceTestEvent2 = new TraceTestEvent.TraceTestEvent2();
|
||||
traceEventPublisher.publish(traceTestEvent1);
|
||||
traceEventPublisher.publish(traceTestEvent2);
|
||||
ThreadUtils.sleep(2000L);
|
||||
verify(subscriber).onEvent(traceTestEvent1);
|
||||
verify(smartSubscriber).onEvent(traceTestEvent2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveSubscriber() {
|
||||
traceEventPublisher.addSubscriber(subscriber, TraceTestEvent.TraceTestEvent1.class);
|
||||
traceEventPublisher.addSubscriber(smartSubscriber, TraceTestEvent.TraceTestEvent1.class);
|
||||
TraceTestEvent.TraceTestEvent1 traceTestEvent1 = new TraceTestEvent.TraceTestEvent1();
|
||||
traceEventPublisher.publish(traceTestEvent1);
|
||||
ThreadUtils.sleep(2000L);
|
||||
verify(subscriber).onEvent(traceTestEvent1);
|
||||
verify(smartSubscriber).onEvent(traceTestEvent1);
|
||||
traceEventPublisher.removeSubscriber(smartSubscriber, TraceTestEvent.TraceTestEvent1.class);
|
||||
traceTestEvent1 = new TraceTestEvent.TraceTestEvent1();
|
||||
traceEventPublisher.publish(traceTestEvent1);
|
||||
ThreadUtils.sleep(500L);
|
||||
verify(subscriber).onEvent(traceTestEvent1);
|
||||
verify(smartSubscriber, never()).onEvent(traceTestEvent1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPublishOverFlow() {
|
||||
TraceTestEvent testEvent = new TraceTestEvent();
|
||||
for (int i = 0; i < Byte.SIZE; i++) {
|
||||
traceEventPublisher.publish(testEvent);
|
||||
}
|
||||
traceEventPublisher.addSubscriber(subscriber, TraceTestEvent.class);
|
||||
traceEventPublisher.publish(testEvent);
|
||||
verify(subscriber).onEvent(testEvent);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getStatus() throws NacosException {
|
||||
traceEventPublisher.publish(new TraceTestEvent());
|
||||
traceEventPublisher.publish(new TraceTestEvent.TraceTestEvent1());
|
||||
traceEventPublisher.publish(new TraceTestEvent.TraceTestEvent2());
|
||||
String expectedStatus = "Publisher TraceTestEvent : shutdown=false, queue= 3/8 ";
|
||||
assertThat(traceEventPublisher.getStatus(), is(expectedStatus));
|
||||
traceEventPublisher.addSubscriber(subscriber, TraceTestEvent.TraceTestEvent1.class);
|
||||
ThreadUtils.sleep(2000L);
|
||||
expectedStatus = "Publisher TraceTestEvent : shutdown=false, queue= 0/8 ";
|
||||
assertThat(traceEventPublisher.getStatus(), is(expectedStatus));
|
||||
traceEventPublisher.shutdown();
|
||||
expectedStatus = "Publisher TraceTestEvent : shutdown= true, queue= 0/8 ";
|
||||
assertThat(traceEventPublisher.getStatus(), is(expectedStatus));
|
||||
}
|
||||
}
|
@ -0,0 +1,33 @@
|
||||
/*
|
||||
* Copyright 1999-2018 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.common.trace.publisher;
|
||||
|
||||
import com.alibaba.nacos.common.notify.Event;
|
||||
|
||||
public class TraceTestEvent extends Event {
|
||||
private static final long serialVersionUID = 8568231862586636388L;
|
||||
|
||||
static class TraceTestEvent1 extends TraceTestEvent {
|
||||
|
||||
private static final long serialVersionUID = 4188906203345433816L;
|
||||
}
|
||||
|
||||
static class TraceTestEvent2 extends TraceTestEvent {
|
||||
|
||||
private static final long serialVersionUID = -7358195336881398548L;
|
||||
}
|
||||
}
|
@ -22,7 +22,10 @@ import com.alibaba.nacos.api.naming.CommonParams;
|
||||
import com.alibaba.nacos.api.naming.pojo.Instance;
|
||||
import com.alibaba.nacos.api.naming.utils.NamingUtils;
|
||||
import com.alibaba.nacos.auth.annotation.Secured;
|
||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import com.alibaba.nacos.common.spi.NacosServiceLoader;
|
||||
import com.alibaba.nacos.common.trace.DeregisterInstanceReason;
|
||||
import com.alibaba.nacos.common.trace.event.NamingTraceEvent;
|
||||
import com.alibaba.nacos.common.utils.ConvertUtils;
|
||||
import com.alibaba.nacos.common.utils.JacksonUtils;
|
||||
import com.alibaba.nacos.common.utils.StringUtils;
|
||||
@ -115,6 +118,9 @@ public class InstanceController {
|
||||
.setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
|
||||
|
||||
getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
|
||||
NotifyCenter.publishEvent(new NamingTraceEvent.RegisterInstanceTraceEvent(System.currentTimeMillis(), "",
|
||||
false, namespaceId, NamingUtils.getGroupName(serviceName), NamingUtils.getServiceName(serviceName),
|
||||
instance.toInetAddr()));
|
||||
return "ok";
|
||||
}
|
||||
|
||||
@ -136,6 +142,9 @@ public class InstanceController {
|
||||
NamingUtils.checkServiceNameFormat(serviceName);
|
||||
|
||||
getInstanceOperator().removeInstance(namespaceId, serviceName, instance);
|
||||
NotifyCenter.publishEvent(new NamingTraceEvent.DeregisterInstanceTraceEvent(System.currentTimeMillis(), "",
|
||||
false, DeregisterInstanceReason.REQUEST, namespaceId, NamingUtils.getGroupName(serviceName), NamingUtils.getServiceName(serviceName),
|
||||
instance.toInetAddr()));
|
||||
return "ok";
|
||||
}
|
||||
|
||||
|
@ -23,6 +23,9 @@ import com.alibaba.nacos.api.naming.pojo.Instance;
|
||||
import com.alibaba.nacos.api.naming.pojo.builder.InstanceBuilder;
|
||||
import com.alibaba.nacos.api.naming.utils.NamingUtils;
|
||||
import com.alibaba.nacos.auth.annotation.Secured;
|
||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import com.alibaba.nacos.common.trace.DeregisterInstanceReason;
|
||||
import com.alibaba.nacos.common.trace.event.NamingTraceEvent;
|
||||
import com.alibaba.nacos.common.utils.JacksonUtils;
|
||||
import com.alibaba.nacos.common.utils.StringUtils;
|
||||
import com.alibaba.nacos.core.utils.WebUtils;
|
||||
@ -110,6 +113,9 @@ public class InstanceControllerV2 {
|
||||
instance.setEphemeral((switchDomain.isDefaultInstanceEphemeral()));
|
||||
}
|
||||
instanceServiceV2.registerInstance(namespaceId, serviceName, instance);
|
||||
NotifyCenter.publishEvent(new NamingTraceEvent.RegisterInstanceTraceEvent(System.currentTimeMillis(), "",
|
||||
false, namespaceId, NamingUtils.getGroupName(serviceName), NamingUtils.getServiceName(serviceName),
|
||||
instance.toInetAddr()));
|
||||
return "ok";
|
||||
}
|
||||
|
||||
@ -148,6 +154,9 @@ public class InstanceControllerV2 {
|
||||
}
|
||||
|
||||
instanceServiceV2.removeInstance(namespaceId, serviceName, instance);
|
||||
NotifyCenter.publishEvent(new NamingTraceEvent.DeregisterInstanceTraceEvent(System.currentTimeMillis(), "",
|
||||
false, DeregisterInstanceReason.REQUEST, namespaceId, NamingUtils.getGroupName(serviceName), NamingUtils.getServiceName(serviceName),
|
||||
instance.toInetAddr()));
|
||||
return "ok";
|
||||
}
|
||||
|
||||
|
@ -24,6 +24,8 @@ import com.alibaba.nacos.api.selector.Selector;
|
||||
import com.alibaba.nacos.auth.annotation.Secured;
|
||||
import com.alibaba.nacos.common.model.RestResult;
|
||||
import com.alibaba.nacos.common.model.RestResultUtils;
|
||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import com.alibaba.nacos.common.trace.event.NamingTraceEvent;
|
||||
import com.alibaba.nacos.common.utils.IoUtils;
|
||||
import com.alibaba.nacos.common.utils.JacksonUtils;
|
||||
import com.alibaba.nacos.common.utils.NumberUtils;
|
||||
@ -119,6 +121,8 @@ public class ServiceController {
|
||||
serviceMetadata.setExtendData(UtilsAndCommons.parseMetadata(metadata));
|
||||
serviceMetadata.setEphemeral(false);
|
||||
getServiceOperator().create(namespaceId, serviceName, serviceMetadata);
|
||||
NotifyCenter.publishEvent(new NamingTraceEvent.RegisterServiceTraceEvent(System.currentTimeMillis(),
|
||||
namespaceId, NamingUtils.getGroupName(serviceName), NamingUtils.getServiceName(serviceName)));
|
||||
return "ok";
|
||||
}
|
||||
|
||||
@ -136,6 +140,8 @@ public class ServiceController {
|
||||
@RequestParam String serviceName) throws Exception {
|
||||
|
||||
getServiceOperator().delete(namespaceId, serviceName);
|
||||
NotifyCenter.publishEvent(new NamingTraceEvent.DeregisterServiceTraceEvent(System.currentTimeMillis(),
|
||||
namespaceId, "", serviceName));
|
||||
return "ok";
|
||||
}
|
||||
|
||||
|
@ -24,6 +24,8 @@ import com.alibaba.nacos.auth.annotation.Secured;
|
||||
import com.alibaba.nacos.common.Beta;
|
||||
import com.alibaba.nacos.common.model.RestResult;
|
||||
import com.alibaba.nacos.common.model.RestResultUtils;
|
||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import com.alibaba.nacos.common.trace.event.NamingTraceEvent;
|
||||
import com.alibaba.nacos.common.utils.JacksonUtils;
|
||||
import com.alibaba.nacos.common.utils.NumberUtils;
|
||||
import com.alibaba.nacos.common.utils.StringUtils;
|
||||
@ -98,6 +100,8 @@ public class ServiceControllerV2 {
|
||||
serviceMetadata.setExtendData(UtilsAndCommons.parseMetadata(metadata));
|
||||
serviceMetadata.setEphemeral(ephemeral);
|
||||
serviceOperatorV2.create(Service.newService(namespaceId, groupName, serviceName, ephemeral), serviceMetadata);
|
||||
NotifyCenter.publishEvent(new NamingTraceEvent.RegisterServiceTraceEvent(System.currentTimeMillis(),
|
||||
namespaceId, groupName, serviceName));
|
||||
return RestResultUtils.success("ok");
|
||||
}
|
||||
|
||||
@ -115,6 +119,8 @@ public class ServiceControllerV2 {
|
||||
@PathVariable String serviceName, @RequestParam(defaultValue = Constants.DEFAULT_GROUP) String groupName)
|
||||
throws Exception {
|
||||
serviceOperatorV2.delete(Service.newService(namespaceId, groupName, serviceName));
|
||||
NotifyCenter.publishEvent(new NamingTraceEvent.DeregisterServiceTraceEvent(System.currentTimeMillis(),
|
||||
namespaceId, groupName, serviceName));
|
||||
return RestResultUtils.success("ok");
|
||||
}
|
||||
|
||||
|
@ -101,7 +101,7 @@ public class ConnectionBasedClientManager extends ClientConnectionEventListener
|
||||
return true;
|
||||
}
|
||||
client.release();
|
||||
NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
|
||||
NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client, isResponsibleClient(client)));
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -89,7 +89,7 @@ public class EphemeralIpPortClientManager implements ClientManager {
|
||||
if (null == client) {
|
||||
return true;
|
||||
}
|
||||
NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
|
||||
NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client, isResponsibleClient(client)));
|
||||
client.release();
|
||||
return true;
|
||||
}
|
||||
|
@ -80,7 +80,7 @@ public class PersistentIpPortClientManager implements ClientManager {
|
||||
if (null == client) {
|
||||
return true;
|
||||
}
|
||||
NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
|
||||
NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client, isResponsibleClient(client)));
|
||||
client.release();
|
||||
return true;
|
||||
}
|
||||
|
@ -58,8 +58,15 @@ public class ClientEvent extends Event {
|
||||
|
||||
private static final long serialVersionUID = 370348024867174101L;
|
||||
|
||||
public ClientDisconnectEvent(Client client) {
|
||||
private boolean isNative;
|
||||
|
||||
public boolean isNative() {
|
||||
return isNative;
|
||||
}
|
||||
|
||||
public ClientDisconnectEvent(Client client, boolean isNative) {
|
||||
super(client);
|
||||
this.isNative = isNative;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -19,6 +19,8 @@ package com.alibaba.nacos.naming.core.v2.index;
|
||||
import com.alibaba.nacos.common.notify.Event;
|
||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import com.alibaba.nacos.common.notify.listener.SmartSubscriber;
|
||||
import com.alibaba.nacos.common.trace.DeregisterInstanceReason;
|
||||
import com.alibaba.nacos.common.trace.event.NamingTraceEvent;
|
||||
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
|
||||
import com.alibaba.nacos.naming.core.v2.client.Client;
|
||||
import com.alibaba.nacos.naming.core.v2.event.client.ClientEvent;
|
||||
@ -99,8 +101,14 @@ public class ClientServiceIndexesManager extends SmartSubscriber {
|
||||
for (Service each : client.getAllSubscribeService()) {
|
||||
removeSubscriberIndexes(each, client.getClientId());
|
||||
}
|
||||
DeregisterInstanceReason reason = event.isNative()
|
||||
? DeregisterInstanceReason.NATIVE_DISCONNECTED : DeregisterInstanceReason.SYNCED_DISCONNECTED;
|
||||
long currentTimeMillis = System.currentTimeMillis();
|
||||
for (Service each : client.getAllPublishedService()) {
|
||||
removePublisherIndexes(each, client.getClientId());
|
||||
NotifyCenter.publishEvent(new NamingTraceEvent.DeregisterInstanceTraceEvent(currentTimeMillis,
|
||||
"", false, reason, each.getNamespace(), each.getGroup(),
|
||||
each.getName(), client.getInstancePublishInfo(each).getIp()));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -16,6 +16,9 @@
|
||||
|
||||
package com.alibaba.nacos.naming.healthcheck;
|
||||
|
||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import com.alibaba.nacos.common.trace.HealthStateChangeReason;
|
||||
import com.alibaba.nacos.common.trace.event.NamingTraceEvent;
|
||||
import com.alibaba.nacos.naming.healthcheck.heartbeat.BeatProcessor;
|
||||
import com.alibaba.nacos.sys.utils.ApplicationUtils;
|
||||
import com.alibaba.nacos.naming.core.Cluster;
|
||||
@ -86,6 +89,9 @@ public class ClientBeatProcessor implements BeatProcessor {
|
||||
cluster.getService().getName(), ip, port, cluster.getName(),
|
||||
UtilsAndCommons.LOCALHOST_SITE);
|
||||
getPushService().serviceChanged(service);
|
||||
NotifyCenter.publishEvent(new NamingTraceEvent.HealthStateChangeTraceEvent(System.currentTimeMillis(),
|
||||
service.getNamespaceId(), service.getGroupName(), service.getName(), instance.getIp(),
|
||||
true, HealthStateChangeReason.HEARTBEAT_REFRESH));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,9 @@
|
||||
|
||||
package com.alibaba.nacos.naming.healthcheck;
|
||||
|
||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import com.alibaba.nacos.common.trace.HealthStateChangeReason;
|
||||
import com.alibaba.nacos.common.trace.event.NamingTraceEvent;
|
||||
import com.alibaba.nacos.naming.core.Cluster;
|
||||
import com.alibaba.nacos.naming.core.DistroMapper;
|
||||
import com.alibaba.nacos.naming.core.Instance;
|
||||
@ -101,6 +104,9 @@ public class HealthCheckCommon {
|
||||
Loggers.EVT_LOG.info("serviceName: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: {}",
|
||||
cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(),
|
||||
UtilsAndCommons.LOCALHOST_SITE, msg);
|
||||
NotifyCenter.publishEvent(new NamingTraceEvent.HealthStateChangeTraceEvent(System.currentTimeMillis(),
|
||||
service.getNamespaceId(), service.getGroupName(), service.getName(), ip.getIp(),
|
||||
true, HealthStateChangeReason.HEALTH_CHECK_SUCCESS));
|
||||
} else {
|
||||
if (!ip.isMockValid()) {
|
||||
ip.setMockValid(true);
|
||||
@ -150,6 +156,9 @@ public class HealthCheckCommon {
|
||||
.info("serviceName: {} {POS} {IP-DISABLED} invalid: {}:{}@{}, region: {}, msg: {}",
|
||||
cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(),
|
||||
UtilsAndCommons.LOCALHOST_SITE, msg);
|
||||
NotifyCenter.publishEvent(new NamingTraceEvent.HealthStateChangeTraceEvent(System.currentTimeMillis(),
|
||||
service.getNamespaceId(), service.getGroupName(), service.getName(), ip.getIp(),
|
||||
false, HealthStateChangeReason.HEALTH_CHECK_FAIL));
|
||||
} else {
|
||||
Loggers.EVT_LOG
|
||||
.info("serviceName: {} {PROBE} {IP-DISABLED} invalid: {}:{}@{}, region: {}, msg: {}",
|
||||
@ -196,6 +205,10 @@ public class HealthCheckCommon {
|
||||
.info("serviceName: {} {POS} {IP-DISABLED} invalid-now: {}:{}@{}, region: {}, msg: {}",
|
||||
cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(),
|
||||
UtilsAndCommons.LOCALHOST_SITE, msg);
|
||||
NotifyCenter.publishEvent(new NamingTraceEvent.HealthStateChangeTraceEvent(System.currentTimeMillis(),
|
||||
service.getNamespaceId(), service.getGroupName(), service.getName(), ip.getIp(),
|
||||
false, HealthStateChangeReason.HEALTH_CHECK_FAIL));
|
||||
|
||||
} else {
|
||||
if (ip.isMockValid()) {
|
||||
ip.setMockValid(false);
|
||||
@ -203,6 +216,7 @@ public class HealthCheckCommon {
|
||||
.info("serviceName: {} {PROBE} {IP-DISABLED} invalid-now: {}:{}@{}, region: {}, msg: {}",
|
||||
cluster.getService().getName(), ip.getIp(), ip.getPort(), cluster.getName(),
|
||||
UtilsAndCommons.LOCALHOST_SITE, msg);
|
||||
Service service = cluster.getService();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -18,6 +18,8 @@ package com.alibaba.nacos.naming.healthcheck.heartbeat;
|
||||
|
||||
import com.alibaba.nacos.api.naming.utils.NamingUtils;
|
||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import com.alibaba.nacos.common.trace.HealthStateChangeReason;
|
||||
import com.alibaba.nacos.common.trace.event.NamingTraceEvent;
|
||||
import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;
|
||||
import com.alibaba.nacos.naming.core.v2.event.client.ClientEvent;
|
||||
import com.alibaba.nacos.naming.core.v2.event.service.ServiceEvent;
|
||||
@ -68,6 +70,9 @@ public class ClientBeatProcessorV2 implements BeatProcessor {
|
||||
rsInfo.getServiceName(), ip, port, rsInfo.getCluster(), UtilsAndCommons.LOCALHOST_SITE);
|
||||
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service));
|
||||
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(client));
|
||||
NotifyCenter.publishEvent(new NamingTraceEvent.HealthStateChangeTraceEvent(System.currentTimeMillis(),
|
||||
service.getNamespace(), service.getGroup(), service.getName(), instance.getIp(),
|
||||
true, HealthStateChangeReason.HEARTBEAT_REFRESH));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,8 @@ package com.alibaba.nacos.naming.healthcheck.heartbeat;
|
||||
import com.alibaba.nacos.api.common.Constants;
|
||||
import com.alibaba.nacos.api.naming.PreservedMetadataKeys;
|
||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import com.alibaba.nacos.common.trace.DeregisterInstanceReason;
|
||||
import com.alibaba.nacos.common.trace.event.NamingTraceEvent;
|
||||
import com.alibaba.nacos.common.utils.ConvertUtils;
|
||||
import com.alibaba.nacos.common.utils.JacksonUtils;
|
||||
import com.alibaba.nacos.naming.core.v2.client.Client;
|
||||
@ -76,5 +78,7 @@ public class ExpiredInstanceChecker implements InstanceBeatChecker {
|
||||
client.removeServiceInstance(service);
|
||||
NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(service, client.getClientId()));
|
||||
NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(service, instance.getMetadataId(), true));
|
||||
NotifyCenter.publishEvent(new NamingTraceEvent.DeregisterInstanceTraceEvent(System.currentTimeMillis(), "",
|
||||
false, DeregisterInstanceReason.HEARTBEAT_EXPIRE, service.getNamespace(), service.getGroup(), service.getName(), instance.getIp()));
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,8 @@ package com.alibaba.nacos.naming.healthcheck.heartbeat;
|
||||
import com.alibaba.nacos.api.common.Constants;
|
||||
import com.alibaba.nacos.api.naming.PreservedMetadataKeys;
|
||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import com.alibaba.nacos.common.trace.HealthStateChangeReason;
|
||||
import com.alibaba.nacos.common.trace.event.NamingTraceEvent;
|
||||
import com.alibaba.nacos.common.utils.ConvertUtils;
|
||||
import com.alibaba.nacos.naming.core.v2.client.Client;
|
||||
import com.alibaba.nacos.naming.core.v2.event.client.ClientEvent;
|
||||
@ -77,5 +79,8 @@ public class UnhealthyInstanceChecker implements InstanceBeatChecker {
|
||||
instance.getLastHeartBeatTime());
|
||||
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service));
|
||||
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(client));
|
||||
NotifyCenter.publishEvent(new NamingTraceEvent.HealthStateChangeTraceEvent(System.currentTimeMillis(),
|
||||
service.getNamespace(), service.getGroup(), service.getName(), instance.getIp(),
|
||||
false, HealthStateChangeReason.HEARTBEAT_TIMEOUT));
|
||||
}
|
||||
}
|
||||
|
@ -16,6 +16,9 @@
|
||||
|
||||
package com.alibaba.nacos.naming.healthcheck.v2.processor;
|
||||
|
||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import com.alibaba.nacos.common.trace.HealthStateChangeReason;
|
||||
import com.alibaba.nacos.common.trace.event.NamingTraceEvent;
|
||||
import com.alibaba.nacos.naming.core.DistroMapper;
|
||||
import com.alibaba.nacos.naming.core.v2.pojo.HealthCheckInstancePublishInfo;
|
||||
import com.alibaba.nacos.naming.core.v2.pojo.Service;
|
||||
@ -104,6 +107,9 @@ public class HealthCheckCommonV2 {
|
||||
Loggers.EVT_LOG.info("serviceName: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: {}",
|
||||
serviceName, instance.getIp(), instance.getPort(), clusterName,
|
||||
UtilsAndCommons.LOCALHOST_SITE, msg);
|
||||
NotifyCenter.publishEvent(new NamingTraceEvent.HealthStateChangeTraceEvent(System.currentTimeMillis(),
|
||||
service.getNamespace(), service.getGroup(), service.getName(), instance.getIp(),
|
||||
true, HealthStateChangeReason.HEALTH_CHECK_SUCCESS));
|
||||
}
|
||||
} else {
|
||||
Loggers.EVT_LOG.info("serviceName: {} {OTHER} {IP-ENABLED} pre-valid: {}:{}@{} in {}, msg: {}",
|
||||
@ -145,6 +151,9 @@ public class HealthCheckCommonV2 {
|
||||
.info("serviceName: {} {POS} {IP-DISABLED} invalid: {}:{}@{}, region: {}, msg: {}",
|
||||
serviceName, instance.getIp(), instance.getPort(), clusterName,
|
||||
UtilsAndCommons.LOCALHOST_SITE, msg);
|
||||
NotifyCenter.publishEvent(new NamingTraceEvent.HealthStateChangeTraceEvent(System.currentTimeMillis(),
|
||||
service.getNamespace(), service.getGroup(), service.getName(), instance.getIp(),
|
||||
false, HealthStateChangeReason.HEALTH_CHECK_FAIL));
|
||||
}
|
||||
} else {
|
||||
Loggers.EVT_LOG.info("serviceName: {} {OTHER} {IP-DISABLED} pre-invalid: {}:{}@{} in {}, msg: {}",
|
||||
@ -185,6 +194,9 @@ public class HealthCheckCommonV2 {
|
||||
Loggers.EVT_LOG.info("serviceName: {} {POS} {IP-DISABLED} invalid: {}:{}@{}, region: {}, msg: {}",
|
||||
serviceName, instance.getIp(), instance.getPort(), clusterName,
|
||||
UtilsAndCommons.LOCALHOST_SITE, msg);
|
||||
NotifyCenter.publishEvent(new NamingTraceEvent.HealthStateChangeTraceEvent(System.currentTimeMillis(),
|
||||
service.getNamespace(), service.getGroup(), service.getName(), instance.getIp(),
|
||||
false, HealthStateChangeReason.HEALTH_CHECK_FAIL));
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
@ -18,7 +18,9 @@ package com.alibaba.nacos.naming.push.v2.task;
|
||||
|
||||
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
|
||||
import com.alibaba.nacos.api.remote.PushCallBack;
|
||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import com.alibaba.nacos.common.task.AbstractExecuteTask;
|
||||
import com.alibaba.nacos.common.trace.event.NamingTraceEvent;
|
||||
import com.alibaba.nacos.naming.core.v2.client.Client;
|
||||
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManager;
|
||||
import com.alibaba.nacos.naming.core.v2.metadata.ServiceMetadata;
|
||||
@ -132,6 +134,7 @@ public class PushExecuteTask extends AbstractExecuteTask {
|
||||
}
|
||||
PushResult result = PushResult.pushSuccess(service, clientId, serviceInfo, subscriber,
|
||||
pushCostTimeForNetWork, pushCostTimeForAll, serviceLevelAgreementTime, isPushToAll);
|
||||
NotifyCenter.publishEvent(getPushServiceTraceEvent(pushFinishTime, result));
|
||||
PushResultHookHolder.getInstance().pushSuccess(result);
|
||||
}
|
||||
|
||||
@ -157,5 +160,12 @@ public class PushExecuteTask extends AbstractExecuteTask {
|
||||
return ServiceUtil.selectInstancesWithHealthyProtection(serviceInfo, serviceMetadata, false, true,
|
||||
subscriber);
|
||||
}
|
||||
|
||||
private NamingTraceEvent.PushServiceTraceEvent getPushServiceTraceEvent(long eventTime, PushResult result) {
|
||||
return new NamingTraceEvent.PushServiceTraceEvent(eventTime, result.getNetworkCost(), result.getAllCost(),
|
||||
result.getSla(), result.getSubscriber().getIp(), result.getService().getNamespace(),
|
||||
result.getService().getGroup(), result.getService().getName(), result.getData().getHosts().size());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -22,6 +22,9 @@ import com.alibaba.nacos.api.naming.remote.request.InstanceRequest;
|
||||
import com.alibaba.nacos.api.naming.remote.response.InstanceResponse;
|
||||
import com.alibaba.nacos.api.remote.request.RequestMeta;
|
||||
import com.alibaba.nacos.auth.annotation.Secured;
|
||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import com.alibaba.nacos.common.trace.DeregisterInstanceReason;
|
||||
import com.alibaba.nacos.common.trace.event.NamingTraceEvent;
|
||||
import com.alibaba.nacos.core.remote.RequestHandler;
|
||||
import com.alibaba.nacos.naming.core.v2.pojo.Service;
|
||||
import com.alibaba.nacos.naming.core.v2.service.impl.EphemeralClientOperationServiceImpl;
|
||||
@ -60,11 +63,16 @@ public class InstanceRequestHandler extends RequestHandler<InstanceRequest, Inst
|
||||
|
||||
private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {
|
||||
clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
|
||||
NotifyCenter.publishEvent(new NamingTraceEvent.RegisterInstanceTraceEvent(System.currentTimeMillis(), meta.getClientIp(),
|
||||
true, service.getNamespace(), service.getGroup(), service.getName(), request.getInstance().toInetAddr()));
|
||||
return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
|
||||
}
|
||||
|
||||
private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {
|
||||
clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
|
||||
NotifyCenter.publishEvent(new NamingTraceEvent.DeregisterInstanceTraceEvent(System.currentTimeMillis(),
|
||||
meta.getClientIp(), true, DeregisterInstanceReason.REQUEST, service.getNamespace(),
|
||||
service.getGroup(), service.getName(), request.getInstance().toInetAddr()));
|
||||
return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);
|
||||
}
|
||||
|
||||
|
@ -24,6 +24,8 @@ import com.alibaba.nacos.api.naming.utils.NamingUtils;
|
||||
import com.alibaba.nacos.api.remote.request.RequestMeta;
|
||||
import com.alibaba.nacos.api.remote.response.ResponseCode;
|
||||
import com.alibaba.nacos.auth.annotation.Secured;
|
||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import com.alibaba.nacos.common.trace.event.NamingTraceEvent;
|
||||
import com.alibaba.nacos.core.remote.RequestHandler;
|
||||
import com.alibaba.nacos.naming.core.v2.index.ServiceStorage;
|
||||
import com.alibaba.nacos.naming.core.v2.metadata.NamingMetadataManager;
|
||||
@ -71,8 +73,12 @@ public class SubscribeServiceRequestHandler extends RequestHandler<SubscribeServ
|
||||
metadataManager.getServiceMetadata(service).orElse(null), subscriber);
|
||||
if (request.isSubscribe()) {
|
||||
clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
|
||||
NotifyCenter.publishEvent(new NamingTraceEvent.SubscribeServiceTraceEvent(System.currentTimeMillis(),
|
||||
meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
|
||||
} else {
|
||||
clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
|
||||
NotifyCenter.publishEvent(new NamingTraceEvent.UnsubscribeServiceTraceEvent(System.currentTimeMillis(),
|
||||
meta.getClientIp(), service.getNamespace(), service.getGroup(), service.getName()));
|
||||
}
|
||||
return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
|
||||
}
|
||||
|
@ -56,9 +56,9 @@ public class HealthCheckTaskV2Test {
|
||||
@Before
|
||||
public void setUp() {
|
||||
ApplicationUtils.injectContext(context);
|
||||
when(ApplicationUtils.getBean(SwitchDomain.class)).thenReturn(switchDomain);
|
||||
when(context.getBean(SwitchDomain.class)).thenReturn(switchDomain);
|
||||
when(switchDomain.getTcpHealthParams()).thenReturn(new SwitchDomain.TcpHealthParams());
|
||||
when(ApplicationUtils.getBean(NamingMetadataManager.class)).thenReturn(new NamingMetadataManager());
|
||||
when(context.getBean(NamingMetadataManager.class)).thenReturn(new NamingMetadataManager());
|
||||
healthCheckTaskV2 = new HealthCheckTaskV2(ipPortBasedClient);
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user