/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.transport;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionManager;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.RemoteConnectionInfo;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

final class RemoteClusterConnection
extends AbstractComponent
implements TransportConnectionListener,
Closeable {
    private final TransportService transportService;
    private final ConnectionManager connectionManager;
    private final ConnectionProfile remoteProfile;
    private final ConnectedNodes connectedNodes;
    private final String clusterAlias;
    private final int maxNumRemoteConnections;
    private final Predicate<DiscoveryNode> nodePredicate;
    private final ThreadPool threadPool;
    private volatile String proxyAddress;
    private volatile List<Supplier<DiscoveryNode>> seedNodes;
    private volatile boolean skipUnavailable;
    private final ConnectHandler connectHandler;
    private SetOnce<ClusterName> remoteClusterName = new SetOnce();

    RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes, TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate) {
        this(settings, clusterAlias, seedNodes, transportService, connectionManager, maxNumRemoteConnections, nodePredicate, null);
    }

    RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes, TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate, String proxyAddress) {
        super(settings);
        this.transportService = transportService;
        this.maxNumRemoteConnections = maxNumRemoteConnections;
        this.nodePredicate = nodePredicate;
        this.clusterAlias = clusterAlias;
        ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
        builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
        builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings));
        builder.addConnections(6, TransportRequestOptions.Type.REG, TransportRequestOptions.Type.PING);
        builder.addConnections(0, TransportRequestOptions.Type.BULK, TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY);
        this.remoteProfile = builder.build();
        this.connectedNodes = new ConnectedNodes(clusterAlias);
        this.seedNodes = Collections.unmodifiableList(seedNodes);
        this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace(clusterAlias).get(settings);
        this.connectHandler = new ConnectHandler();
        this.threadPool = transportService.threadPool;
        this.connectionManager = connectionManager;
        connectionManager.addListener(this);
        connectionManager.addListener(transportService);
        this.proxyAddress = proxyAddress;
    }

    private static DiscoveryNode maybeAddProxyAddress(String proxyAddress, DiscoveryNode node) {
        if (proxyAddress == null || proxyAddress.isEmpty()) {
            return node;
        }
        InetSocketAddress proxyInetAddress = RemoteClusterAware.parseSeedAddress(proxyAddress);
        return new DiscoveryNode(node.getName(), node.getId(), node.getEphemeralId(), node.getHostName(), node.getHostAddress(), new TransportAddress(proxyInetAddress), node.getAttributes(), node.getRoles(), node.getVersion());
    }

    synchronized void updateSeedNodes(String proxyAddress, List<Supplier<DiscoveryNode>> seedNodes, ActionListener<Void> connectListener) {
        this.seedNodes = Collections.unmodifiableList(new ArrayList<Supplier<DiscoveryNode>>(seedNodes));
        this.proxyAddress = proxyAddress;
        this.connectHandler.connect(connectListener);
    }

    void updateSkipUnavailable(boolean skipUnavailable) {
        this.skipUnavailable = skipUnavailable;
    }

    @Override
    public void onNodeDisconnected(DiscoveryNode node) {
        boolean remove = this.connectedNodes.remove(node);
        if (remove && this.connectedNodes.size() < this.maxNumRemoteConnections) {
            this.connectHandler.forceConnect();
        }
    }

    public void fetchSearchShards(ClusterSearchShardsRequest searchRequest, ActionListener<ClusterSearchShardsResponse> listener) {
        ActionListener<ClusterSearchShardsResponse> searchShardsListener;
        Consumer<Exception> onConnectFailure;
        if (this.skipUnavailable) {
            onConnectFailure = exception -> listener.onResponse(ClusterSearchShardsResponse.EMPTY);
            searchShardsListener = ActionListener.wrap(listener::onResponse, e -> listener.onResponse(ClusterSearchShardsResponse.EMPTY));
        } else {
            onConnectFailure = listener::onFailure;
            searchShardsListener = listener;
        }
        this.ensureConnected(ActionListener.wrap(x -> this.fetchShardsInternal(searchRequest, searchShardsListener), onConnectFailure));
    }

    public void ensureConnected(ActionListener<Void> voidActionListener) {
        if (this.connectedNodes.size() == 0) {
            this.connectHandler.connect(voidActionListener);
        } else {
            voidActionListener.onResponse(null);
        }
    }

    private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest, final ActionListener<ClusterSearchShardsResponse> listener) {
        DiscoveryNode node = this.getAnyConnectedNode();
        Transport.Connection connection = this.connectionManager.getConnection(node);
        this.transportService.sendRequest(connection, "indices:admin/shards/search_shards", (TransportRequest)searchShardsRequest, TransportRequestOptions.EMPTY, new TransportResponseHandler<ClusterSearchShardsResponse>(){

            @Override
            public ClusterSearchShardsResponse newInstance() {
                return new ClusterSearchShardsResponse();
            }

            @Override
            public void handleResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
                listener.onResponse(clusterSearchShardsResponse);
            }

            @Override
            public void handleException(TransportException e) {
                listener.onFailure(e);
            }

            @Override
            public String executor() {
                return "search";
            }
        });
    }

    void collectNodes(final ActionListener<Function<String, DiscoveryNode>> listener) {
        Runnable runnable = () -> {
            ClusterStateRequest request = new ClusterStateRequest();
            request.clear();
            request.nodes(true);
            request.local(true);
            DiscoveryNode node = this.getAnyConnectedNode();
            Transport.Connection connection = this.connectionManager.getConnection(node);
            this.transportService.sendRequest(connection, "cluster:monitor/state", (TransportRequest)request, TransportRequestOptions.EMPTY, new TransportResponseHandler<ClusterStateResponse>(){

                @Override
                public ClusterStateResponse read(StreamInput in) throws IOException {
                    ClusterStateResponse response = new ClusterStateResponse();
                    response.readFrom(in);
                    return response;
                }

                @Override
                public void handleResponse(ClusterStateResponse response) {
                    DiscoveryNodes nodes = response.getState().nodes();
                    listener.onResponse(nodes::get);
                }

                @Override
                public void handleException(TransportException exp) {
                    listener.onFailure(exp);
                }

                @Override
                public String executor() {
                    return "same";
                }
            });
        };
        try {
            this.ensureConnected(ActionListener.wrap(x -> runnable.run(), listener::onFailure));
        }
        catch (Exception ex) {
            listener.onFailure(ex);
        }
    }

    Transport.Connection getConnection(DiscoveryNode remoteClusterNode) {
        if (this.connectionManager.nodeConnected(remoteClusterNode)) {
            return this.connectionManager.getConnection(remoteClusterNode);
        }
        DiscoveryNode discoveryNode = this.getAnyConnectedNode();
        Transport.Connection connection = this.connectionManager.getConnection(discoveryNode);
        return new ProxyConnection(connection, remoteClusterNode);
    }

    Transport.Connection getConnection() {
        return this.connectionManager.getConnection(this.getAnyConnectedNode());
    }

    @Override
    public void close() throws IOException {
        IOUtils.close(this.connectHandler, this.connectionManager);
    }

    public boolean isClosed() {
        return this.connectHandler.isClosed();
    }

    boolean assertNoRunningConnections() {
        assert (this.connectHandler.running.availablePermits() == 1);
        return true;
    }

    boolean isNodeConnected(DiscoveryNode node) {
        return this.connectedNodes.contains(node);
    }

    DiscoveryNode getAnyConnectedNode() {
        return this.connectedNodes.getAny();
    }

    void addConnectedNode(DiscoveryNode node) {
        this.connectedNodes.add(node);
    }

    public void getConnectionInfo(final ActionListener<RemoteConnectionInfo> listener) {
        Optional<DiscoveryNode> anyNode = this.connectedNodes.getAnyConnectedNode();
        if (!anyNode.isPresent()) {
            RemoteConnectionInfo remoteConnectionStats = new RemoteConnectionInfo(this.clusterAlias, Collections.emptyList(), Collections.emptyList(), this.maxNumRemoteConnections, 0, RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(this.settings), this.skipUnavailable);
            listener.onResponse(remoteConnectionStats);
        } else {
            NodesInfoRequest request = new NodesInfoRequest();
            request.clear();
            request.http(true);
            Transport.Connection connection = this.connectionManager.getConnection(anyNode.get());
            this.transportService.sendRequest(connection, "cluster:monitor/nodes/info", (TransportRequest)request, TransportRequestOptions.EMPTY, new TransportResponseHandler<NodesInfoResponse>(){

                @Override
                public NodesInfoResponse newInstance() {
                    return new NodesInfoResponse();
                }

                @Override
                public void handleResponse(NodesInfoResponse response) {
                    HashSet<TransportAddress> httpAddresses = new HashSet<TransportAddress>();
                    for (NodeInfo info : response.getNodes()) {
                        if (!RemoteClusterConnection.this.connectedNodes.contains(info.getNode()) || info.getHttp() == null) continue;
                        httpAddresses.add(info.getHttp().getAddress().publishAddress());
                    }
                    if (httpAddresses.size() < RemoteClusterConnection.this.maxNumRemoteConnections) {
                        for (NodeInfo info : response.getNodes()) {
                            if (RemoteClusterConnection.this.nodePredicate.test(info.getNode()) && info.getHttp() != null) {
                                httpAddresses.add(info.getHttp().getAddress().publishAddress());
                            }
                            if (httpAddresses.size() != RemoteClusterConnection.this.maxNumRemoteConnections) continue;
                            break;
                        }
                    }
                    RemoteConnectionInfo remoteConnectionInfo = new RemoteConnectionInfo(RemoteClusterConnection.this.clusterAlias, RemoteClusterConnection.this.seedNodes.stream().map(sup -> ((DiscoveryNode)sup.get()).getAddress()).collect(Collectors.toList()), new ArrayList<TransportAddress>(httpAddresses), RemoteClusterConnection.this.maxNumRemoteConnections, RemoteClusterConnection.this.connectedNodes.size(), RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(RemoteClusterConnection.this.settings), RemoteClusterConnection.this.skipUnavailable);
                    listener.onResponse(remoteConnectionInfo);
                }

                @Override
                public void handleException(TransportException exp) {
                    listener.onFailure(exp);
                }

                @Override
                public String executor() {
                    return "same";
                }
            });
        }
    }

    RemoteConnectionInfo getLocalConnectionInfo() {
        List<TransportAddress> seedNodeAddresses = this.seedNodes.stream().map(node -> ((DiscoveryNode)node.get()).getAddress()).collect(Collectors.toList());
        TimeValue initialConnectionTimeout = RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(this.settings);
        return new RemoteConnectionInfo(this.clusterAlias, Collections.emptyList(), seedNodeAddresses, this.maxNumRemoteConnections, this.connectedNodes.size(), initialConnectionTimeout, this.skipUnavailable);
    }

    int getNumNodesConnected() {
        return this.connectedNodes.size();
    }

    ConnectionManager getConnectionManager() {
        return this.connectionManager;
    }

    private static final class ConnectedNodes {
        private final Set<DiscoveryNode> nodeSet = new HashSet<DiscoveryNode>();
        private final String clusterAlias;
        private Iterator<DiscoveryNode> currentIterator = null;

        private ConnectedNodes(String clusterAlias) {
            this.clusterAlias = clusterAlias;
        }

        public synchronized DiscoveryNode getAny() {
            this.ensureIteratorAvailable();
            if (this.currentIterator.hasNext()) {
                return this.currentIterator.next();
            }
            throw new IllegalStateException("No node available for cluster: " + this.clusterAlias);
        }

        synchronized boolean remove(DiscoveryNode node) {
            boolean setRemoval = this.nodeSet.remove(node);
            if (setRemoval) {
                this.currentIterator = null;
            }
            return setRemoval;
        }

        synchronized boolean add(DiscoveryNode node) {
            boolean added = this.nodeSet.add(node);
            if (added) {
                this.currentIterator = null;
            }
            return added;
        }

        synchronized int size() {
            return this.nodeSet.size();
        }

        synchronized boolean contains(DiscoveryNode node) {
            return this.nodeSet.contains(node);
        }

        synchronized Optional<DiscoveryNode> getAnyConnectedNode() {
            this.ensureIteratorAvailable();
            if (this.currentIterator.hasNext()) {
                return Optional.of(this.currentIterator.next());
            }
            return Optional.empty();
        }

        private synchronized void ensureIteratorAvailable() {
            if (this.currentIterator == null) {
                this.currentIterator = this.nodeSet.iterator();
            } else if (!this.currentIterator.hasNext() && !this.nodeSet.isEmpty()) {
                this.currentIterator = this.nodeSet.iterator();
            }
        }
    }

    private class ConnectHandler
    implements Closeable {
        private final Semaphore running = new Semaphore(1);
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final BlockingQueue<ActionListener<Void>> queue = new ArrayBlockingQueue<ActionListener<Void>>(100);
        private final CancellableThreads cancellableThreads = new CancellableThreads();

        private ConnectHandler() {
        }

        void maybeConnect() {
            this.connect(null);
        }

        void connect(ActionListener<Void> connectListener) {
            this.connect(connectListener, false);
        }

        void forceConnect() {
            this.connect(null, true);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void connect(ActionListener<Void> connectListener, boolean forceRun) {
            List toNotify;
            boolean runConnect;
            ContextPreservingActionListener<Void> listener = connectListener == null ? null : ContextPreservingActionListener.wrapPreservingContext(connectListener, RemoteClusterConnection.this.threadPool.getThreadContext());
            BlockingQueue<ActionListener<Void>> blockingQueue = this.queue;
            synchronized (blockingQueue) {
                if (listener != null && !this.queue.offer(listener)) {
                    listener.onFailure(new RejectedExecutionException("connect queue is full"));
                    return;
                }
                if (!forceRun && this.queue.isEmpty()) {
                    return;
                }
                runConnect = this.running.tryAcquire();
                if (runConnect) {
                    toNotify = new ArrayList();
                    this.queue.drainTo(toNotify);
                    if (this.closed.get()) {
                        this.running.release();
                        ActionListener.onFailure(toNotify, new AlreadyClosedException("connect handler is already closed"));
                        return;
                    }
                } else {
                    toNotify = Collections.emptyList();
                }
            }
            if (runConnect) {
                this.forkConnect(toNotify);
            }
        }

        private void forkConnect(final Collection<ActionListener<Void>> toNotify) {
            ExecutorService executor = RemoteClusterConnection.this.threadPool.executor("management");
            executor.submit(new AbstractRunnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onFailure(Exception e) {
                    BlockingQueue blockingQueue = ConnectHandler.this.queue;
                    synchronized (blockingQueue) {
                        ConnectHandler.this.running.release();
                    }
                    try {
                        ActionListener.onFailure(toNotify, e);
                    }
                    finally {
                        ConnectHandler.this.maybeConnect();
                    }
                }

                @Override
                protected void doRun() {
                    ActionListener<Void> listener = ActionListener.wrap(x -> {
                        BlockingQueue blockingQueue = ConnectHandler.this.queue;
                        synchronized (blockingQueue) {
                            ConnectHandler.this.running.release();
                        }
                        try {
                            ActionListener.onResponse(toNotify, x);
                        }
                        finally {
                            ConnectHandler.this.maybeConnect();
                        }
                    }, e -> {
                        BlockingQueue blockingQueue = ConnectHandler.this.queue;
                        synchronized (blockingQueue) {
                            ConnectHandler.this.running.release();
                        }
                        try {
                            ActionListener.onFailure(toNotify, e);
                        }
                        finally {
                            ConnectHandler.this.maybeConnect();
                        }
                    });
                    ConnectHandler.this.collectRemoteNodes(RemoteClusterConnection.this.seedNodes.iterator(), RemoteClusterConnection.this.transportService, RemoteClusterConnection.this.connectionManager, listener);
                }
            });
        }

        private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes, TransportService transportService, ConnectionManager manager, ActionListener<Void> listener) {
            if (Thread.currentThread().isInterrupted()) {
                listener.onFailure(new InterruptedException("remote connect thread got interrupted"));
            }
            try {
                if (seedNodes.hasNext()) {
                    this.cancellableThreads.executeIO(() -> {
                        DiscoveryNode seedNode = RemoteClusterConnection.maybeAddProxyAddress(RemoteClusterConnection.this.proxyAddress, (DiscoveryNode)((Supplier)seedNodes.next()).get());
                        RemoteClusterConnection.this.logger.debug("[{}] opening connection to seed node: [{}] proxy address: [{}]", (Object)RemoteClusterConnection.this.clusterAlias, (Object)seedNode, (Object)RemoteClusterConnection.this.proxyAddress);
                        Transport.Connection connection = manager.openConnection(seedNode, ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
                        boolean success = false;
                        try {
                            TransportService.HandshakeResponse handshakeResponse;
                            try {
                                handshakeResponse = transportService.handshake(connection, RemoteClusterConnection.this.remoteProfile.getHandshakeTimeout().millis(), c -> RemoteClusterConnection.this.remoteClusterName.get() == null ? true : c.equals(RemoteClusterConnection.this.remoteClusterName.get()));
                            }
                            catch (IllegalStateException ex) {
                                RemoteClusterConnection.this.logger.warn(() -> new ParameterizedMessage("seed node {} cluster name mismatch expected cluster name {}", (Object)connection.getNode(), RemoteClusterConnection.this.remoteClusterName.get()), (Throwable)ex);
                                throw ex;
                            }
                            DiscoveryNode handshakeNode = RemoteClusterConnection.maybeAddProxyAddress(RemoteClusterConnection.this.proxyAddress, handshakeResponse.getDiscoveryNode());
                            if (RemoteClusterConnection.this.nodePredicate.test(handshakeNode) && RemoteClusterConnection.this.connectedNodes.size() < RemoteClusterConnection.this.maxNumRemoteConnections) {
                                manager.connectToNode(handshakeNode, RemoteClusterConnection.this.remoteProfile, transportService.connectionValidator(handshakeNode));
                                if (RemoteClusterConnection.this.remoteClusterName.get() == null) {
                                    assert (handshakeResponse.getClusterName().value() != null);
                                    RemoteClusterConnection.this.remoteClusterName.set(handshakeResponse.getClusterName());
                                }
                                RemoteClusterConnection.this.connectedNodes.add(handshakeNode);
                            }
                            ClusterStateRequest request = new ClusterStateRequest();
                            request.clear();
                            request.nodes(true);
                            ThreadPool threadPool = transportService.getThreadPool();
                            ThreadContext threadContext = threadPool.getThreadContext();
                            TransportService.ContextRestoreResponseHandler<ClusterStateResponse> responseHandler = new TransportService.ContextRestoreResponseHandler<ClusterStateResponse>(threadContext.newRestorableContext(false), new SniffClusterStateResponseHandler(connection, listener, seedNodes, this.cancellableThreads));
                            try (ThreadContext.StoredContext ignore = threadContext.stashContext();){
                                threadContext.markAsSystemContext();
                                transportService.sendRequest(connection, "cluster:monitor/state", (TransportRequest)request, TransportRequestOptions.EMPTY, responseHandler);
                            }
                            success = true;
                        }
                        finally {
                            if (!success) {
                                connection.close();
                            }
                        }
                    });
                } else {
                    listener.onFailure(new IllegalStateException("no seed node left"));
                }
            }
            catch (CancellableThreads.ExecutionCancelledException ex) {
                RemoteClusterConnection.this.logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster [{}] failed", (Object)RemoteClusterConnection.this.clusterAlias), (Throwable)ex);
                listener.onFailure(ex);
            }
            catch (IOException | IllegalStateException | ConnectTransportException ex) {
                if (seedNodes.hasNext()) {
                    RemoteClusterConnection.this.logger.debug(() -> new ParameterizedMessage("fetching nodes from external cluster [{}] failed moving to next node", (Object)RemoteClusterConnection.this.clusterAlias), (Throwable)ex);
                    this.collectRemoteNodes(seedNodes, transportService, manager, listener);
                }
                RemoteClusterConnection.this.logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster [{}] failed", (Object)RemoteClusterConnection.this.clusterAlias), (Throwable)ex);
                listener.onFailure(ex);
            }
        }

        @Override
        public void close() throws IOException {
            try {
                if (this.closed.compareAndSet(false, true)) {
                    this.cancellableThreads.cancel("connect handler is closed");
                    this.running.acquire();
                    this.running.release();
                    this.maybeConnect();
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        final boolean isClosed() {
            return this.closed.get();
        }

        private class SniffClusterStateResponseHandler
        implements TransportResponseHandler<ClusterStateResponse> {
            private final Transport.Connection connection;
            private final ActionListener<Void> listener;
            private final Iterator<Supplier<DiscoveryNode>> seedNodes;
            private final CancellableThreads cancellableThreads;

            SniffClusterStateResponseHandler(Transport.Connection connection, ActionListener<Void> listener, Iterator<Supplier<DiscoveryNode>> seedNodes, CancellableThreads cancellableThreads) {
                this.connection = connection;
                this.listener = listener;
                this.seedNodes = seedNodes;
                this.cancellableThreads = cancellableThreads;
            }

            @Override
            public ClusterStateResponse newInstance() {
                return new ClusterStateResponse();
            }

            @Override
            public void handleResponse(ClusterStateResponse response) {
                try {
                    if (RemoteClusterConnection.this.remoteClusterName.get() == null) {
                        assert (response.getClusterName().value() != null);
                        RemoteClusterConnection.this.remoteClusterName.set(response.getClusterName());
                    }
                    try (Transport.Connection theConnection = this.connection;){
                        this.cancellableThreads.executeIO(() -> {
                            DiscoveryNodes nodes = response.getState().nodes();
                            Iterable nodesIter = nodes.getNodes()::valuesIt;
                            for (DiscoveryNode n : nodesIter) {
                                DiscoveryNode node = RemoteClusterConnection.maybeAddProxyAddress(RemoteClusterConnection.this.proxyAddress, n);
                                if (!RemoteClusterConnection.this.nodePredicate.test(node) || RemoteClusterConnection.this.connectedNodes.size() >= RemoteClusterConnection.this.maxNumRemoteConnections) continue;
                                try {
                                    RemoteClusterConnection.this.connectionManager.connectToNode(node, RemoteClusterConnection.this.remoteProfile, RemoteClusterConnection.this.transportService.connectionValidator(node));
                                    RemoteClusterConnection.this.connectedNodes.add(node);
                                }
                                catch (IllegalStateException | ConnectTransportException ex) {
                                    RemoteClusterConnection.this.logger.debug(() -> new ParameterizedMessage("failed to connect to node {}", (Object)node), (Throwable)ex);
                                }
                            }
                        });
                    }
                    this.listener.onResponse(null);
                }
                catch (CancellableThreads.ExecutionCancelledException ex) {
                    this.listener.onFailure(ex);
                }
                catch (Exception ex) {
                    RemoteClusterConnection.this.logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", (Object)RemoteClusterConnection.this.clusterAlias), (Throwable)ex);
                    ConnectHandler.this.collectRemoteNodes(this.seedNodes, RemoteClusterConnection.this.transportService, RemoteClusterConnection.this.connectionManager, this.listener);
                }
            }

            @Override
            public void handleException(TransportException exp) {
                RemoteClusterConnection.this.logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", (Object)RemoteClusterConnection.this.clusterAlias), (Throwable)exp);
                try {
                    IOUtils.closeWhileHandlingException(this.connection);
                }
                finally {
                    ConnectHandler.this.collectRemoteNodes(this.seedNodes, RemoteClusterConnection.this.transportService, RemoteClusterConnection.this.connectionManager, this.listener);
                }
            }

            @Override
            public String executor() {
                return "management";
            }
        }
    }

    static final class ProxyConnection
    implements Transport.Connection {
        private final Transport.Connection proxyConnection;
        private final DiscoveryNode targetNode;

        private ProxyConnection(Transport.Connection proxyConnection, DiscoveryNode targetNode) {
            this.proxyConnection = proxyConnection;
            this.targetNode = targetNode;
        }

        @Override
        public DiscoveryNode getNode() {
            return this.targetNode;
        }

        @Override
        public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
            this.proxyConnection.sendRequest(requestId, TransportActionProxy.getProxyAction(action), TransportActionProxy.wrapRequest(this.targetNode, request), options);
        }

        @Override
        public boolean sendPing() {
            return this.proxyConnection.sendPing();
        }

        @Override
        public void close() {
            assert (false) : "proxy connections must not be closed";
        }

        @Override
        public void addCloseListener(ActionListener<Void> listener) {
            this.proxyConnection.addCloseListener(listener);
        }

        @Override
        public boolean isClosed() {
            return this.proxyConnection.isClosed();
        }

        @Override
        public Version getVersion() {
            return this.proxyConnection.getVersion();
        }
    }
}

