/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.transport;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionManager;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.NoSuchRemoteClusterException;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.RemoteConnectionInfo;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

final class RemoteClusterConnection
implements TransportConnectionListener,
Closeable {
    private static final Logger logger = LogManager.getLogger(RemoteClusterConnection.class);
    private final TransportService transportService;
    private final ConnectionManager connectionManager;
    private final String clusterAlias;
    private final int maxNumRemoteConnections;
    private final Predicate<DiscoveryNode> nodePredicate;
    private final ThreadPool threadPool;
    private volatile String proxyAddress;
    private volatile List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes;
    private volatile boolean skipUnavailable;
    private final ConnectHandler connectHandler;
    private final TimeValue initialConnectionTimeout;
    private final SetOnce<ClusterName> remoteClusterName = new SetOnce();
    private final AtomicLong nextNodeId = new AtomicLong();

    RemoteClusterConnection(Settings settings, String clusterAlias, List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes, TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate, String proxyAddress, ConnectionProfile connectionProfile) {
        this(settings, clusterAlias, seedNodes, transportService, maxNumRemoteConnections, nodePredicate, proxyAddress, RemoteClusterConnection.createConnectionManager(connectionProfile, transportService));
    }

    RemoteClusterConnection(Settings settings, String clusterAlias, List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes, TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate, String proxyAddress, ConnectionManager connectionManager) {
        this.transportService = transportService;
        this.maxNumRemoteConnections = maxNumRemoteConnections;
        this.nodePredicate = nodePredicate;
        this.clusterAlias = clusterAlias;
        this.connectionManager = connectionManager;
        this.seedNodes = Collections.unmodifiableList(seedNodes);
        this.skipUnavailable = RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace(clusterAlias).get(settings);
        this.connectHandler = new ConnectHandler();
        this.threadPool = transportService.threadPool;
        connectionManager.addListener(this);
        connectionManager.addListener(transportService);
        this.proxyAddress = proxyAddress;
        this.initialConnectionTimeout = RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
    }

    private static DiscoveryNode maybeAddProxyAddress(String proxyAddress, DiscoveryNode node) {
        if (proxyAddress == null || proxyAddress.isEmpty()) {
            return node;
        }
        InetSocketAddress proxyInetAddress = RemoteClusterAware.parseSeedAddress(proxyAddress);
        return new DiscoveryNode(node.getName(), node.getId(), node.getEphemeralId(), node.getHostName(), node.getHostAddress(), new TransportAddress(proxyInetAddress), node.getAttributes(), node.getRoles(), node.getVersion());
    }

    synchronized void updateSeedNodes(String proxyAddress, List<Tuple<String, Supplier<DiscoveryNode>>> seedNodes, ActionListener<Void> connectListener) {
        this.seedNodes = Collections.unmodifiableList(new ArrayList<Tuple<String, Supplier<DiscoveryNode>>>(seedNodes));
        this.proxyAddress = proxyAddress;
        this.connectHandler.connect(connectListener);
    }

    void updateSkipUnavailable(boolean skipUnavailable) {
        this.skipUnavailable = skipUnavailable;
    }

    boolean isSkipUnavailable() {
        return this.skipUnavailable;
    }

    @Override
    public void onNodeDisconnected(DiscoveryNode node) {
        if (this.connectionManager.size() < this.maxNumRemoteConnections) {
            this.connectHandler.connect(ActionListener.wrap(ignore -> logger.trace("successfully connected after disconnect of {}", (Object)node), e -> logger.trace(() -> new ParameterizedMessage("failed to connect after disconnect of {}", (Object)node), (Throwable)e)));
        }
    }

    void ensureConnected(ActionListener<Void> voidActionListener) {
        if (this.connectionManager.size() == 0) {
            this.connectHandler.connect(voidActionListener);
        } else {
            voidActionListener.onResponse(null);
        }
    }

    void collectNodes(ActionListener<Function<String, DiscoveryNode>> listener) {
        Runnable runnable = () -> {
            ThreadContext threadContext = this.threadPool.getThreadContext();
            final ContextPreservingActionListener contextPreservingActionListener = new ContextPreservingActionListener(threadContext.newRestorableContext(false), listener);
            try (ThreadContext.StoredContext ignore = threadContext.stashContext();){
                threadContext.markAsSystemContext();
                ClusterStateRequest request = new ClusterStateRequest();
                request.clear();
                request.nodes(true);
                request.local(true);
                DiscoveryNode node = this.getAnyConnectedNode();
                Transport.Connection connection = this.connectionManager.getConnection(node);
                this.transportService.sendRequest(connection, "cluster:monitor/state", (TransportRequest)request, TransportRequestOptions.EMPTY, new TransportResponseHandler<ClusterStateResponse>(){

                    @Override
                    public ClusterStateResponse read(StreamInput in) throws IOException {
                        return new ClusterStateResponse(in);
                    }

                    @Override
                    public void handleResponse(ClusterStateResponse response) {
                        DiscoveryNodes nodes = response.getState().nodes();
                        contextPreservingActionListener.onResponse(nodes::get);
                    }

                    @Override
                    public void handleException(TransportException exp) {
                        contextPreservingActionListener.onFailure(exp);
                    }

                    @Override
                    public String executor() {
                        return "same";
                    }
                });
            }
        };
        try {
            this.ensureConnected(ActionListener.wrap(x -> runnable.run(), listener::onFailure));
        }
        catch (Exception ex) {
            listener.onFailure(ex);
        }
    }

    Transport.Connection getConnection(DiscoveryNode remoteClusterNode) {
        if (this.connectionManager.nodeConnected(remoteClusterNode)) {
            return this.connectionManager.getConnection(remoteClusterNode);
        }
        DiscoveryNode discoveryNode = this.getAnyConnectedNode();
        Transport.Connection connection = this.connectionManager.getConnection(discoveryNode);
        return new ProxyConnection(connection, remoteClusterNode);
    }

    private Predicate<ClusterName> getRemoteClusterNamePredicate() {
        return new Predicate<ClusterName>(){

            @Override
            public boolean test(ClusterName c) {
                return RemoteClusterConnection.this.remoteClusterName.get() == null || c.equals(RemoteClusterConnection.this.remoteClusterName.get());
            }

            public String toString() {
                return RemoteClusterConnection.this.remoteClusterName.get() == null ? "any cluster name" : "expected remote cluster name [" + ((ClusterName)RemoteClusterConnection.this.remoteClusterName.get()).value() + "]";
            }
        };
    }

    Transport.Connection getConnection() {
        return this.connectionManager.getConnection(this.getAnyConnectedNode());
    }

    @Override
    public void close() throws IOException {
        IOUtils.close((Closeable[])new Closeable[]{this.connectHandler});
        this.connectionManager.closeNoBlock();
    }

    public boolean isClosed() {
        return this.connectHandler.isClosed();
    }

    public String getProxyAddress() {
        return this.proxyAddress;
    }

    public List<Tuple<String, Supplier<DiscoveryNode>>> getSeedNodes() {
        return this.seedNodes;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean assertNoRunningConnections() {
        Object object = this.connectHandler.mutex;
        synchronized (object) {
            assert (this.connectHandler.listeners.isEmpty());
        }
        return true;
    }

    boolean isNodeConnected(DiscoveryNode node) {
        return this.connectionManager.nodeConnected(node);
    }

    DiscoveryNode getAnyConnectedNode() {
        long curr;
        ArrayList<DiscoveryNode> nodes = new ArrayList<DiscoveryNode>(this.connectionManager.connectedNodes());
        if (nodes.isEmpty()) {
            throw new NoSuchRemoteClusterException(this.clusterAlias);
        }
        while ((curr = this.nextNodeId.incrementAndGet()) == Long.MIN_VALUE) {
        }
        return (DiscoveryNode)nodes.get(Math.toIntExact(Math.floorMod(curr, (long)nodes.size())));
    }

    public RemoteConnectionInfo getConnectionInfo() {
        return new RemoteConnectionInfo(this.clusterAlias, this.seedNodes.stream().map(Tuple::v1).collect(Collectors.toList()), this.maxNumRemoteConnections, this.getNumNodesConnected(), this.initialConnectionTimeout, this.skipUnavailable);
    }

    int getNumNodesConnected() {
        return this.connectionManager.size();
    }

    private static ConnectionManager createConnectionManager(ConnectionProfile connectionProfile, TransportService transportService) {
        return new ConnectionManager(connectionProfile, transportService.transport);
    }

    ConnectionManager getConnectionManager() {
        return this.connectionManager;
    }

    private class ConnectHandler
    implements Closeable {
        private static final int MAX_LISTENERS = 100;
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final Object mutex = new Object();
        private List<ActionListener<Void>> listeners = new ArrayList<ActionListener<Void>>();

        private ConnectHandler() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void connect(ActionListener<Void> connectListener) {
            boolean closed;
            boolean runConnect = false;
            ContextPreservingActionListener<Void> listener = ContextPreservingActionListener.wrapPreservingContext(connectListener, RemoteClusterConnection.this.threadPool.getThreadContext());
            Object object = this.mutex;
            synchronized (object) {
                closed = this.closed.get();
                if (closed) {
                    assert (this.listeners.isEmpty());
                } else {
                    if (this.listeners.size() >= 100) {
                        assert (this.listeners.size() == 100);
                        listener.onFailure(new RejectedExecutionException("connect queue is full"));
                        return;
                    }
                    this.listeners.add(listener);
                    runConnect = this.listeners.size() == 1;
                }
            }
            if (closed) {
                connectListener.onFailure((Exception)((Object)new AlreadyClosedException("connect handler is already closed")));
                return;
            }
            if (runConnect) {
                ExecutorService executor = RemoteClusterConnection.this.threadPool.executor("management");
                executor.submit(new AbstractRunnable(){

                    @Override
                    public void onFailure(Exception e) {
                        ActionListener.onFailure(ConnectHandler.this.getAndClearListeners(), e);
                    }

                    @Override
                    protected void doRun() {
                        ConnectHandler.this.collectRemoteNodes(RemoteClusterConnection.this.seedNodes.stream().map(Tuple::v2).iterator(), new ActionListener<Void>(){

                            @Override
                            public void onResponse(Void aVoid) {
                                ActionListener.onResponse(ConnectHandler.this.getAndClearListeners(), aVoid);
                            }

                            @Override
                            public void onFailure(Exception e) {
                                ActionListener.onFailure(ConnectHandler.this.getAndClearListeners(), e);
                            }
                        });
                    }
                });
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private List<ActionListener<Void>> getAndClearListeners() {
            List<ActionListener<Void>> result;
            Object object = this.mutex;
            synchronized (object) {
                if (this.listeners.isEmpty()) {
                    result = Collections.emptyList();
                } else {
                    result = this.listeners;
                    this.listeners = new ArrayList<ActionListener<Void>>();
                }
            }
            return result;
        }

        private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes, ActionListener<Void> listener) {
            if (Thread.currentThread().isInterrupted()) {
                listener.onFailure(new InterruptedException("remote connect thread got interrupted"));
            }
            if (seedNodes.hasNext()) {
                Consumer<Exception> onFailure = e -> {
                    if ((e instanceof ConnectTransportException || e instanceof IOException || e instanceof IllegalStateException) && seedNodes.hasNext()) {
                        logger.debug(() -> new ParameterizedMessage("fetching nodes from external cluster [{}] failed moving to next node", (Object)RemoteClusterConnection.this.clusterAlias), (Throwable)e);
                        this.collectRemoteNodes(seedNodes, listener);
                        return;
                    }
                    logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster [{}] failed", (Object)RemoteClusterConnection.this.clusterAlias), (Throwable)e);
                    listener.onFailure((Exception)e);
                };
                DiscoveryNode seedNode = RemoteClusterConnection.maybeAddProxyAddress(RemoteClusterConnection.this.proxyAddress, seedNodes.next().get());
                logger.debug("[{}] opening connection to seed node: [{}] proxy address: [{}]", (Object)RemoteClusterConnection.this.clusterAlias, (Object)seedNode, (Object)RemoteClusterConnection.this.proxyAddress);
                ConnectionProfile profile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG);
                StepListener<Transport.Connection> openConnectionStep = new StepListener<Transport.Connection>();
                try {
                    RemoteClusterConnection.this.connectionManager.openConnection(seedNode, profile, openConnectionStep);
                }
                catch (Exception e2) {
                    onFailure.accept(e2);
                }
                StepListener<TransportService.HandshakeResponse> handShakeStep = new StepListener<TransportService.HandshakeResponse>();
                openConnectionStep.whenComplete(connection -> {
                    ConnectionProfile connectionProfile = RemoteClusterConnection.this.connectionManager.getConnectionProfile();
                    RemoteClusterConnection.this.transportService.handshake((Transport.Connection)connection, connectionProfile.getHandshakeTimeout().millis(), RemoteClusterConnection.this.getRemoteClusterNamePredicate(), (ActionListener<TransportService.HandshakeResponse>)handShakeStep);
                }, onFailure);
                StepListener<Void> fullConnectionStep = new StepListener<Void>();
                handShakeStep.whenComplete(handshakeResponse -> {
                    DiscoveryNode handshakeNode = RemoteClusterConnection.maybeAddProxyAddress(RemoteClusterConnection.this.proxyAddress, handshakeResponse.getDiscoveryNode());
                    if (RemoteClusterConnection.this.nodePredicate.test(handshakeNode) && RemoteClusterConnection.this.connectionManager.size() < RemoteClusterConnection.this.maxNumRemoteConnections) {
                        RemoteClusterConnection.this.connectionManager.connectToNode(handshakeNode, null, RemoteClusterConnection.this.transportService.connectionValidator(handshakeNode), fullConnectionStep);
                    } else {
                        fullConnectionStep.onResponse(null);
                    }
                }, e -> {
                    Transport.Connection connection = (Transport.Connection)openConnectionStep.result();
                    logger.warn((Message)new ParameterizedMessage("failed to connect to seed node [{}]", (Object)connection.getNode()), (Throwable)e);
                    IOUtils.closeWhileHandlingException((Closeable[])new Closeable[]{connection});
                    onFailure.accept((Exception)e);
                });
                fullConnectionStep.whenComplete(aVoid -> {
                    if (RemoteClusterConnection.this.remoteClusterName.get() == null) {
                        TransportService.HandshakeResponse handshakeResponse = (TransportService.HandshakeResponse)handShakeStep.result();
                        assert (handshakeResponse.getClusterName().value() != null);
                        RemoteClusterConnection.this.remoteClusterName.set((Object)handshakeResponse.getClusterName());
                    }
                    Transport.Connection connection = (Transport.Connection)openConnectionStep.result();
                    ClusterStateRequest request = new ClusterStateRequest();
                    request.clear();
                    request.nodes(true);
                    ThreadPool threadPool = RemoteClusterConnection.this.transportService.getThreadPool();
                    ThreadContext threadContext = threadPool.getThreadContext();
                    TransportService.ContextRestoreResponseHandler<ClusterStateResponse> responseHandler = new TransportService.ContextRestoreResponseHandler<ClusterStateResponse>(threadContext.newRestorableContext(false), new SniffClusterStateResponseHandler(connection, listener, seedNodes));
                    try (ThreadContext.StoredContext ignore = threadContext.stashContext();){
                        threadContext.markAsSystemContext();
                        RemoteClusterConnection.this.transportService.sendRequest(connection, "cluster:monitor/state", (TransportRequest)request, TransportRequestOptions.EMPTY, responseHandler);
                    }
                }, e -> {
                    IOUtils.closeWhileHandlingException((Closeable[])new Closeable[]{(Closeable)openConnectionStep.result()});
                    onFailure.accept((Exception)e);
                });
            } else {
                listener.onFailure(new IllegalStateException("no seed node left"));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void close() throws IOException {
            List toNotify;
            Object object = this.mutex;
            synchronized (object) {
                if (this.closed.compareAndSet(false, true)) {
                    toNotify = this.listeners;
                    this.listeners = Collections.emptyList();
                } else {
                    toNotify = Collections.emptyList();
                }
            }
            ActionListener.onFailure(toNotify, (Exception)((Object)new AlreadyClosedException("connect handler is already closed")));
        }

        final boolean isClosed() {
            return this.closed.get();
        }

        private class SniffClusterStateResponseHandler
        implements TransportResponseHandler<ClusterStateResponse> {
            private final Transport.Connection connection;
            private final ActionListener<Void> listener;
            private final Iterator<Supplier<DiscoveryNode>> seedNodes;

            SniffClusterStateResponseHandler(Transport.Connection connection, ActionListener<Void> listener, Iterator<Supplier<DiscoveryNode>> seedNodes) {
                this.connection = connection;
                this.listener = listener;
                this.seedNodes = seedNodes;
            }

            @Override
            public ClusterStateResponse read(StreamInput in) throws IOException {
                return new ClusterStateResponse(in);
            }

            @Override
            public void handleResponse(ClusterStateResponse response) {
                this.handleNodes(response.getState().nodes().getNodes().valuesIt());
            }

            private void handleNodes(final Iterator<DiscoveryNode> nodesIter) {
                while (nodesIter.hasNext()) {
                    final DiscoveryNode node = RemoteClusterConnection.maybeAddProxyAddress(RemoteClusterConnection.this.proxyAddress, nodesIter.next());
                    if (!RemoteClusterConnection.this.nodePredicate.test(node) || RemoteClusterConnection.this.connectionManager.size() >= RemoteClusterConnection.this.maxNumRemoteConnections) continue;
                    RemoteClusterConnection.this.connectionManager.connectToNode(node, null, RemoteClusterConnection.this.transportService.connectionValidator(node), new ActionListener<Void>(){

                        @Override
                        public void onResponse(Void aVoid) {
                            SniffClusterStateResponseHandler.this.handleNodes(nodesIter);
                        }

                        @Override
                        public void onFailure(Exception e) {
                            if (e instanceof ConnectTransportException || e instanceof IllegalStateException) {
                                logger.debug(() -> new ParameterizedMessage("failed to connect to node {}", (Object)node), (Throwable)e);
                                SniffClusterStateResponseHandler.this.handleNodes(nodesIter);
                            } else {
                                logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", (Object)RemoteClusterConnection.this.clusterAlias), (Throwable)e);
                                IOUtils.closeWhileHandlingException((Closeable[])new Closeable[]{SniffClusterStateResponseHandler.this.connection});
                                ConnectHandler.this.collectRemoteNodes(SniffClusterStateResponseHandler.this.seedNodes, SniffClusterStateResponseHandler.this.listener);
                            }
                        }
                    });
                    return;
                }
                IOUtils.closeWhileHandlingException((Closeable[])new Closeable[]{this.connection});
                this.listener.onResponse(null);
            }

            @Override
            public void handleException(TransportException exp) {
                logger.warn(() -> new ParameterizedMessage("fetching nodes from external cluster {} failed", (Object)RemoteClusterConnection.this.clusterAlias), (Throwable)exp);
                try {
                    IOUtils.closeWhileHandlingException((Closeable[])new Closeable[]{this.connection});
                }
                finally {
                    ConnectHandler.this.collectRemoteNodes(this.seedNodes, this.listener);
                }
            }

            @Override
            public String executor() {
                return "management";
            }
        }
    }

    static final class ProxyConnection
    implements Transport.Connection {
        private final Transport.Connection proxyConnection;
        private final DiscoveryNode targetNode;

        private ProxyConnection(Transport.Connection proxyConnection, DiscoveryNode targetNode) {
            this.proxyConnection = proxyConnection;
            this.targetNode = targetNode;
        }

        @Override
        public DiscoveryNode getNode() {
            return this.targetNode;
        }

        @Override
        public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
            this.proxyConnection.sendRequest(requestId, TransportActionProxy.getProxyAction(action), TransportActionProxy.wrapRequest(this.targetNode, request), options);
        }

        @Override
        public void close() {
            assert (false) : "proxy connections must not be closed";
        }

        @Override
        public void addCloseListener(ActionListener<Void> listener) {
            this.proxyConnection.addCloseListener(listener);
        }

        @Override
        public boolean isClosed() {
            return this.proxyConnection.isClosed();
        }

        @Override
        public Version getVersion() {
            return this.proxyConnection.getVersion();
        }
    }
}

