/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.discovery.zen.fd;

import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.discovery.zen.fd.FaultDetection;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;

public class NodesFaultDetection
extends FaultDetection {
    public static final String PING_ACTION_NAME = "internal:discovery/zen/fd/ping";
    private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList();
    private final ConcurrentMap<DiscoveryNode, NodeFD> nodesFD = ConcurrentCollections.newConcurrentMap();
    private volatile long clusterStateVersion = -1L;
    private volatile DiscoveryNode localNode;

    public NodesFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
        super(settings, threadPool, transportService, clusterName);
        this.logger.debug("[node  ] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", this.pingInterval, this.pingRetryTimeout, this.pingRetryCount);
        transportService.registerRequestHandler(PING_ACTION_NAME, PingRequest.class, "same", new PingRequestHandler());
    }

    public void setLocalNode(DiscoveryNode localNode) {
        this.localNode = localNode;
    }

    public void addListener(Listener listener) {
        this.listeners.add(listener);
    }

    public void removeListener(Listener listener) {
        this.listeners.remove(listener);
    }

    public void updateNodesAndPing(ClusterState clusterState) {
        for (DiscoveryNode monitoredNode : this.nodesFD.keySet()) {
            if (clusterState.nodes().nodeExists(monitoredNode.id())) continue;
            this.nodesFD.remove(monitoredNode);
        }
        for (DiscoveryNode node : clusterState.nodes()) {
            if (node.equals(this.localNode) || this.nodesFD.containsKey(node)) continue;
            NodeFD fd = new NodeFD(node);
            this.nodesFD.put(node, fd);
            this.threadPool.schedule(TimeValue.timeValueMillis(0L), "same", fd);
        }
    }

    public NodesFaultDetection stop() {
        this.nodesFD.clear();
        return this;
    }

    @Override
    public void close() {
        super.close();
        this.stop();
        this.transportService.removeHandler(PING_ACTION_NAME);
    }

    @Override
    protected void handleTransportDisconnect(DiscoveryNode node) {
        NodeFD nodeFD = (NodeFD)this.nodesFD.remove(node);
        if (nodeFD == null) {
            return;
        }
        if (this.connectOnNetworkDisconnect) {
            NodeFD fd = new NodeFD(node);
            try {
                this.transportService.connectToNode(node);
                this.nodesFD.put(node, fd);
                this.threadPool.schedule(TimeValue.timeValueMillis(0L), "same", fd);
            }
            catch (Exception e) {
                this.logger.trace("[node  ] [{}] transport disconnected (with verified connect)", node);
                this.nodesFD.remove(node, fd);
                this.notifyNodeFailure(node, "transport disconnected (with verified connect)");
            }
        } else {
            this.logger.trace("[node  ] [{}] transport disconnected", node);
            this.notifyNodeFailure(node, "transport disconnected");
        }
    }

    private void notifyNodeFailure(final DiscoveryNode node, final String reason) {
        try {
            this.threadPool.generic().execute(new Runnable(){

                @Override
                public void run() {
                    for (Listener listener : NodesFaultDetection.this.listeners) {
                        listener.onNodeFailure(node, reason);
                    }
                }
            });
        }
        catch (EsRejectedExecutionException ex) {
            this.logger.trace("[node  ] [{}] ignoring node failure (reason [{}]). Local node is shutting down", ex, node, reason);
        }
    }

    private void notifyPingReceived(final PingRequest pingRequest) {
        this.threadPool.generic().execute(new Runnable(){

            @Override
            public void run() {
                for (Listener listener : NodesFaultDetection.this.listeners) {
                    listener.onPingReceived(pingRequest);
                }
            }
        });
    }

    private static class PingResponse
    extends TransportResponse {
        private PingResponse() {
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
        }
    }

    public static class PingRequest
    extends TransportRequest {
        private String nodeId;
        private ClusterName clusterName;
        private DiscoveryNode masterNode;
        private long clusterStateVersion = -1L;

        PingRequest() {
        }

        PingRequest(String nodeId, ClusterName clusterName, DiscoveryNode masterNode, long clusterStateVersion) {
            this.nodeId = nodeId;
            this.clusterName = clusterName;
            this.masterNode = masterNode;
            this.clusterStateVersion = clusterStateVersion;
        }

        public String nodeId() {
            return this.nodeId;
        }

        public ClusterName clusterName() {
            return this.clusterName;
        }

        public DiscoveryNode masterNode() {
            return this.masterNode;
        }

        public long clusterStateVersion() {
            return this.clusterStateVersion;
        }

        @Override
        public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
            this.nodeId = in.readString();
            this.clusterName = ClusterName.readClusterName(in);
            this.masterNode = DiscoveryNode.readNode(in);
            this.clusterStateVersion = in.readLong();
        }

        @Override
        public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            out.writeString(this.nodeId);
            this.clusterName.writeTo(out);
            this.masterNode.writeTo(out);
            out.writeLong(this.clusterStateVersion);
        }
    }

    class PingRequestHandler
    implements TransportRequestHandler<PingRequest> {
        PingRequestHandler() {
        }

        @Override
        public void messageReceived(PingRequest request, TransportChannel channel) throws Exception {
            if (!NodesFaultDetection.this.localNode.id().equals(request.nodeId)) {
                throw new IllegalStateException("Got pinged as node [" + request.nodeId + "], but I am node [" + NodesFaultDetection.this.localNode.id() + "]");
            }
            if (request.clusterName != null && !request.clusterName.equals(NodesFaultDetection.this.clusterName)) {
                throw new IllegalStateException("Got pinged with cluster name [" + request.clusterName + "], but I'm part of cluster [" + NodesFaultDetection.this.clusterName + "]");
            }
            NodesFaultDetection.this.notifyPingReceived(request);
            channel.sendResponse(new PingResponse());
        }
    }

    private class NodeFD
    implements Runnable {
        volatile int retryCount;
        private final DiscoveryNode node;

        private NodeFD(DiscoveryNode node) {
            this.node = node;
        }

        private boolean running() {
            return this.equals(NodesFaultDetection.this.nodesFD.get(this.node));
        }

        @Override
        public void run() {
            if (!this.running()) {
                return;
            }
            final PingRequest pingRequest = new PingRequest(this.node.id(), NodesFaultDetection.this.clusterName, NodesFaultDetection.this.localNode, NodesFaultDetection.this.clusterStateVersion);
            final TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.PING).withTimeout(NodesFaultDetection.this.pingRetryTimeout);
            NodesFaultDetection.this.transportService.sendRequest(this.node, NodesFaultDetection.PING_ACTION_NAME, pingRequest, options, new BaseTransportResponseHandler<PingResponse>(){

                @Override
                public PingResponse newInstance() {
                    return new PingResponse();
                }

                @Override
                public void handleResponse(PingResponse response) {
                    if (!NodeFD.this.running()) {
                        return;
                    }
                    NodeFD.this.retryCount = 0;
                    NodesFaultDetection.this.threadPool.schedule(NodesFaultDetection.this.pingInterval, "same", NodeFD.this);
                }

                @Override
                public void handleException(TransportException exp) {
                    if (!NodeFD.this.running()) {
                        return;
                    }
                    if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
                        NodesFaultDetection.this.handleTransportDisconnect(NodeFD.this.node);
                        return;
                    }
                    ++NodeFD.this.retryCount;
                    NodesFaultDetection.this.logger.trace("[node  ] failed to ping [{}], retry [{}] out of [{}]", exp, NodeFD.this.node, NodeFD.this.retryCount, NodesFaultDetection.this.pingRetryCount);
                    if (NodeFD.this.retryCount >= NodesFaultDetection.this.pingRetryCount) {
                        NodesFaultDetection.this.logger.debug("[node  ] failed to ping [{}], tried [{}] times, each with  maximum [{}] timeout", NodeFD.this.node, NodesFaultDetection.this.pingRetryCount, NodesFaultDetection.this.pingRetryTimeout);
                        if (NodesFaultDetection.this.nodesFD.remove(NodeFD.this.node, NodeFD.this)) {
                            NodesFaultDetection.this.notifyNodeFailure(NodeFD.this.node, "failed to ping, tried [" + NodesFaultDetection.this.pingRetryCount + "] times, each with maximum [" + NodesFaultDetection.this.pingRetryTimeout + "] timeout");
                        }
                    } else {
                        NodesFaultDetection.this.transportService.sendRequest(NodeFD.this.node, NodesFaultDetection.PING_ACTION_NAME, pingRequest, options, this);
                    }
                }

                @Override
                public String executor() {
                    return "same";
                }
            });
        }
    }

    public static abstract class Listener {
        public void onNodeFailure(DiscoveryNode node, String reason) {
        }

        public void onPingReceived(PingRequest pingRequest) {
        }
    }
}

