/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.transport;

import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.cursors.IntCursor;
import java.io.Closeable;
import java.io.IOException;
import java.io.StreamCorruptedException;
import java.io.UncheckedIOException;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.CancelledKeyException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NotifyOnceListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.compress.NotCompressedException;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.transport.PortsRange;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.node.Node;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.transport.BindTransportException;
import org.elasticsearch.transport.BytesTransportRequest;
import org.elasticsearch.transport.CloseableConnection;
import org.elasticsearch.transport.CompressibleBytesOutputStream;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.NodeNotConnectedException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.ResponseHandlerFailureTransportException;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TcpHeader;
import org.elasticsearch.transport.TcpServerChannel;
import org.elasticsearch.transport.TcpTransportChannel;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportHandshaker;
import org.elasticsearch.transport.TransportKeepAlive;
import org.elasticsearch.transport.TransportLogger;
import org.elasticsearch.transport.TransportMessage;
import org.elasticsearch.transport.TransportMessageListener;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportSerializationException;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.transport.TransportStats;
import org.elasticsearch.transport.TransportStatus;
import org.elasticsearch.transport.Transports;

public abstract class TcpTransport
extends AbstractLifecycleComponent
implements Transport {
    private static final Logger logger = LogManager.getLogger(TcpTransport.class);
    public static final String TRANSPORT_WORKER_THREAD_NAME_PREFIX = "transport_worker";
    private static final int BYTES_NEEDED_FOR_MESSAGE_SIZE = 6;
    private static final long NINETY_PER_HEAP_SIZE = (long)((double)JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9);
    private static final BytesReference EMPTY_BYTES_REFERENCE = new BytesArray(new byte[0]);
    private final String[] features;
    protected final Settings settings;
    private final CircuitBreakerService circuitBreakerService;
    private final Version version;
    protected final ThreadPool threadPool;
    protected final BigArrays bigArrays;
    protected final PageCacheRecycler pageCacheRecycler;
    protected final NetworkService networkService;
    protected final Set<ProfileSettings> profileSettings;
    private final DelegatingTransportMessageListener messageListener = new DelegatingTransportMessageListener();
    private final ConcurrentMap<String, BoundTransportAddress> profileBoundAddresses = ConcurrentCollections.newConcurrentMap();
    private final Map<String, List<TcpServerChannel>> serverChannels = ConcurrentCollections.newConcurrentMap();
    private final Set<TcpChannel> acceptedChannels = ConcurrentCollections.newConcurrentSet();
    private final NamedWriteableRegistry namedWriteableRegistry;
    private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
    private final boolean compressAllResponses;
    private volatile BoundTransportAddress boundAddress;
    private final String transportName;
    private final MeanMetric readBytesMetric = new MeanMetric();
    private final MeanMetric transmittedBytesMetric = new MeanMetric();
    private volatile Map<String, RequestHandlerRegistry<? extends TransportRequest>> requestHandlers = Collections.emptyMap();
    private final Transport.ResponseHandlers responseHandlers = new Transport.ResponseHandlers();
    private final TransportLogger transportLogger;
    private final TransportHandshaker handshaker;
    private final TransportKeepAlive keepAlive;
    private final String nodeName;
    private static final Pattern BRACKET_PATTERN = Pattern.compile("^\\[(.*:.*)\\](?::([\\d\\-]*))?$");

    public TcpTransport(String transportName, Settings settings, Version version, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
        super(settings);
        this.settings = settings;
        this.profileSettings = TcpTransport.getProfileSettings(settings);
        this.version = version;
        this.threadPool = threadPool;
        this.bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, "in_flight_requests");
        this.pageCacheRecycler = pageCacheRecycler;
        this.circuitBreakerService = circuitBreakerService;
        this.namedWriteableRegistry = namedWriteableRegistry;
        this.compressAllResponses = TransportSettings.TRANSPORT_COMPRESS.get(settings);
        this.networkService = networkService;
        this.transportName = transportName;
        this.transportLogger = new TransportLogger();
        this.handshaker = new TransportHandshaker(version, threadPool, (node, channel, requestId, v) -> this.sendRequestToChannel(node, channel, requestId, "internal:tcp/handshake", new TransportHandshaker.HandshakeRequest(version), TransportRequestOptions.EMPTY, v, false, TransportStatus.setHandshake((byte)0)), (v, features, channel, response, requestId) -> this.sendResponse(v, features, channel, response, requestId, "internal:tcp/handshake", false, TransportStatus.setHandshake((byte)0)));
        this.keepAlive = new TransportKeepAlive(threadPool, this::internalSendMessage);
        this.nodeName = Node.NODE_NAME_SETTING.get(settings);
        Settings defaultFeatures = TransportSettings.DEFAULT_FEATURES_SETTING.get(settings);
        if (defaultFeatures == null) {
            this.features = new String[0];
        } else {
            defaultFeatures.names().forEach(key -> {
                if (!Booleans.parseBoolean((String)defaultFeatures.get((String)key))) {
                    throw new IllegalArgumentException("feature settings must have default [true] value");
                }
            });
            this.features = new TreeSet<String>(defaultFeatures.names()).toArray(new String[defaultFeatures.names().size()]);
        }
    }

    @Override
    protected void doStart() {
    }

    @Override
    public void addMessageListener(TransportMessageListener listener) {
        this.messageListener.listeners.add(listener);
    }

    @Override
    public boolean removeMessageListener(TransportMessageListener listener) {
        return this.messageListener.listeners.remove(listener);
    }

    @Override
    public CircuitBreaker getInFlightRequestBreaker() {
        return this.circuitBreakerService.getBreaker("in_flight_requests");
    }

    @Override
    public synchronized <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
        if (this.requestHandlers.containsKey(reg.getAction())) {
            throw new IllegalArgumentException("transport handlers for action " + reg.getAction() + " is already registered");
        }
        this.requestHandlers = MapBuilder.newMapBuilder(this.requestHandlers).put(reg.getAction(), reg).immutableMap();
    }

    protected ConnectionProfile maybeOverrideConnectionProfile(ConnectionProfile connectionProfile) {
        return connectionProfile;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener<Transport.Connection> listener) {
        Objects.requireNonNull(profile, "connection profile cannot be null");
        if (node == null) {
            throw new ConnectTransportException(null, "can't open connection to a null node");
        }
        ConnectionProfile finalProfile = this.maybeOverrideConnectionProfile(profile);
        this.closeLock.readLock().lock();
        try {
            this.ensureOpen();
            List<TcpChannel> pendingChannels = this.initiateConnection(node, finalProfile, listener);
            Releasable releasable = () -> CloseableChannel.closeChannels(pendingChannels, false);
            return releasable;
        }
        finally {
            this.closeLock.readLock().unlock();
        }
    }

    private List<TcpChannel> initiateConnection(DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener<Transport.Connection> listener) {
        int numConnections = connectionProfile.getNumConnections();
        assert (numConnections > 0) : "A connection profile must be configured with at least one connection";
        ArrayList<TcpChannel> channels = new ArrayList<TcpChannel>(numConnections);
        for (int i = 0; i < numConnections; ++i) {
            try {
                TcpChannel channel = this.initiateChannel(node);
                logger.trace(() -> new ParameterizedMessage("Tcp transport client channel opened: {}", (Object)channel));
                channels.add(channel);
                continue;
            }
            catch (ConnectTransportException e) {
                CloseableChannel.closeChannels(channels, false);
                listener.onFailure(e);
                return channels;
            }
            catch (Exception e) {
                CloseableChannel.closeChannels(channels, false);
                listener.onFailure(new ConnectTransportException(node, "general node connection failure", e));
                return channels;
            }
        }
        ChannelsConnectedListener channelsConnectedListener = new ChannelsConnectedListener(node, connectionProfile, channels, listener);
        for (TcpChannel channel : channels) {
            channel.addConnectListener(channelsConnectedListener);
        }
        TimeValue connectTimeout = connectionProfile.getConnectTimeout();
        this.threadPool.schedule(connectTimeout, "generic", channelsConnectedListener::onTimeout);
        return channels;
    }

    @Override
    public BoundTransportAddress boundAddress() {
        return this.boundAddress;
    }

    @Override
    public Map<String, BoundTransportAddress> profileBoundAddresses() {
        return Collections.unmodifiableMap(new HashMap<String, BoundTransportAddress>(this.profileBoundAddresses));
    }

    @Override
    public List<String> getLocalAddresses() {
        ArrayList<String> local = new ArrayList<String>();
        local.add("127.0.0.1");
        if (NetworkUtils.SUPPORTS_V6) {
            local.add("[::1]");
        }
        return local;
    }

    protected void bindServer(ProfileSettings profileSettings) {
        InetAddress[] hostAddresses;
        List<String> profileBindHosts = profileSettings.bindHosts;
        try {
            hostAddresses = this.networkService.resolveBindHostAddresses(profileBindHosts.toArray(Strings.EMPTY_ARRAY));
        }
        catch (IOException e) {
            throw new BindTransportException("Failed to resolve host " + profileBindHosts, e);
        }
        if (logger.isDebugEnabled()) {
            String[] addresses = new String[hostAddresses.length];
            for (int i = 0; i < hostAddresses.length; ++i) {
                addresses[i] = NetworkAddress.format(hostAddresses[i]);
            }
            logger.debug("binding server bootstrap to: {}", (Object)addresses);
        }
        assert (hostAddresses.length > 0);
        ArrayList<InetSocketAddress> boundAddresses = new ArrayList<InetSocketAddress>();
        for (InetAddress hostAddress : hostAddresses) {
            boundAddresses.add(this.bindToPort(profileSettings.profileName, hostAddress, profileSettings.portOrRange));
        }
        BoundTransportAddress boundTransportAddress = this.createBoundTransportAddress(profileSettings, boundAddresses);
        if (profileSettings.isDefaultProfile) {
            this.boundAddress = boundTransportAddress;
        } else {
            this.profileBoundAddresses.put(profileSettings.profileName, boundTransportAddress);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private InetSocketAddress bindToPort(String name, InetAddress hostAddress, String port) {
        PortsRange portsRange = new PortsRange(port);
        AtomicReference lastException = new AtomicReference();
        AtomicReference boundSocket = new AtomicReference();
        this.closeLock.writeLock().lock();
        try {
            if (!this.lifecycle.initialized() && !this.lifecycle.started()) {
                throw new IllegalStateException("transport has been stopped");
            }
            boolean success = portsRange.iterate(portNumber -> {
                try {
                    TcpServerChannel channel = this.bind(name, new InetSocketAddress(hostAddress, portNumber));
                    this.serverChannels.computeIfAbsent(name, k -> new ArrayList()).add(channel);
                    boundSocket.set(channel.getLocalAddress());
                }
                catch (Exception e) {
                    lastException.set(e);
                    return false;
                }
                return true;
            });
            if (!success) {
                throw new BindTransportException("Failed to bind to [" + port + "]", (Throwable)lastException.get());
            }
        }
        finally {
            this.closeLock.writeLock().unlock();
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Bound profile [{}] to address {{}}", (Object)name, (Object)NetworkAddress.format((InetSocketAddress)boundSocket.get()));
        }
        return (InetSocketAddress)boundSocket.get();
    }

    private BoundTransportAddress createBoundTransportAddress(ProfileSettings profileSettings, List<InetSocketAddress> boundAddresses) {
        InetAddress publishInetAddress;
        String[] boundAddressesHostStrings = new String[boundAddresses.size()];
        TransportAddress[] transportBoundAddresses = new TransportAddress[boundAddresses.size()];
        for (int i = 0; i < boundAddresses.size(); ++i) {
            InetSocketAddress boundAddress = boundAddresses.get(i);
            boundAddressesHostStrings[i] = boundAddress.getHostString();
            transportBoundAddresses[i] = new TransportAddress(boundAddress);
        }
        List<String> publishHosts = profileSettings.publishHosts;
        if (!profileSettings.isDefaultProfile && publishHosts.isEmpty()) {
            publishHosts = Arrays.asList(boundAddressesHostStrings);
        }
        if (publishHosts.isEmpty()) {
            publishHosts = NetworkService.GLOBAL_NETWORK_PUBLISH_HOST_SETTING.get(this.settings);
        }
        try {
            publishInetAddress = this.networkService.resolvePublishHostAddresses(publishHosts.toArray(Strings.EMPTY_ARRAY));
        }
        catch (Exception e) {
            throw new BindTransportException("Failed to resolve publish address", e);
        }
        int publishPort = TcpTransport.resolvePublishPort(profileSettings, boundAddresses, publishInetAddress);
        TransportAddress publishAddress = new TransportAddress(new InetSocketAddress(publishInetAddress, publishPort));
        return new BoundTransportAddress(transportBoundAddresses, publishAddress);
    }

    static int resolvePublishPort(ProfileSettings profileSettings, List<InetSocketAddress> boundAddresses, InetAddress publishInetAddress) {
        int publishPort = profileSettings.publishPort;
        if (publishPort < 0) {
            for (InetSocketAddress boundAddress : boundAddresses) {
                InetAddress boundInetAddress = boundAddress.getAddress();
                if (!boundInetAddress.isAnyLocalAddress() && !boundInetAddress.equals(publishInetAddress)) continue;
                publishPort = boundAddress.getPort();
                break;
            }
        }
        if (publishPort < 0) {
            IntHashSet ports = new IntHashSet();
            for (InetSocketAddress boundAddress : boundAddresses) {
                ports.add(boundAddress.getPort());
            }
            if (ports.size() == 1) {
                publishPort = ((IntCursor)ports.iterator().next()).value;
            }
        }
        if (publishPort < 0) {
            String profileExplanation = profileSettings.isDefaultProfile ? "" : " for profile " + profileSettings.profileName;
            throw new BindTransportException("Failed to auto-resolve publish port" + profileExplanation + ", multiple bound addresses " + boundAddresses + " with distinct ports and none of them matched the publish address (" + publishInetAddress + "). Please specify a unique port by setting " + TransportSettings.PORT.getKey() + " or " + TransportSettings.PUBLISH_PORT.getKey());
        }
        return publishPort;
    }

    @Override
    public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
        return TcpTransport.parse(address, this.settings.get("transport.profiles.default.port", TransportSettings.PORT.get(this.settings)), perAddressLimit);
    }

    static TransportAddress[] parse(String hostPortString, String defaultPortRange, int perAddressLimit) throws UnknownHostException {
        String host;
        Objects.requireNonNull(hostPortString);
        String portString = null;
        if (hostPortString.startsWith("[")) {
            Matcher matcher = BRACKET_PATTERN.matcher(hostPortString);
            if (!matcher.matches()) {
                throw new IllegalArgumentException("Invalid bracketed host/port range: " + hostPortString);
            }
            host = matcher.group(1);
            portString = matcher.group(2);
        } else {
            int colonPos = hostPortString.indexOf(58);
            if (colonPos >= 0 && hostPortString.indexOf(58, colonPos + 1) == -1) {
                host = hostPortString.substring(0, colonPos);
                portString = hostPortString.substring(colonPos + 1);
            } else {
                host = hostPortString;
                if (colonPos >= 0) {
                    throw new IllegalArgumentException("IPv6 addresses must be bracketed: " + hostPortString);
                }
            }
        }
        if (portString == null || portString.isEmpty()) {
            portString = defaultPortRange;
        }
        HashSet<InetAddress> addresses = new HashSet<InetAddress>(Arrays.asList(InetAddress.getAllByName(host)));
        ArrayList<TransportAddress> transportAddresses = new ArrayList<TransportAddress>();
        int[] ports = new PortsRange(portString).ports();
        int limit = Math.min(ports.length, perAddressLimit);
        for (int i = 0; i < limit; ++i) {
            for (InetAddress address : addresses) {
                transportAddresses.add(new TransportAddress(address, ports[i]));
            }
        }
        return transportAddresses.toArray(new TransportAddress[transportAddresses.size()]);
    }

    @Override
    protected final void doClose() {
    }

    @Override
    protected final void doStop() {
        CountDownLatch latch = new CountDownLatch(1);
        assert (!this.threadPool.generic().isShutdown()) : "Must stop transport before terminating underlying threadpool";
        this.threadPool.generic().execute(() -> {
            this.closeLock.writeLock().lock();
            try {
                this.keepAlive.close();
                for (Map.Entry<String, List<TcpServerChannel>> entry : this.serverChannels.entrySet()) {
                    String profile = entry.getKey();
                    List<TcpServerChannel> channels = entry.getValue();
                    ActionListener<Void> closeFailLogger = ActionListener.wrap(c -> {}, e -> logger.warn(() -> new ParameterizedMessage("Error closing serverChannel for profile [{}]", (Object)profile), (Throwable)e));
                    channels.forEach(c -> c.addCloseListener(closeFailLogger));
                    CloseableChannel.closeChannels(channels, true);
                }
                this.serverChannels.clear();
                CloseableChannel.closeChannels(new ArrayList<TcpChannel>(this.acceptedChannels), true);
                this.acceptedChannels.clear();
                this.stopInternal();
            }
            finally {
                this.closeLock.writeLock().unlock();
                latch.countDown();
            }
        });
        try {
            latch.await(30L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void onException(final TcpChannel channel, Exception e) {
        if (!this.lifecycle.started()) {
            CloseableChannel.closeChannel(channel);
            return;
        }
        if (NetworkExceptionHelper.isCloseConnectionException(e)) {
            logger.trace(() -> new ParameterizedMessage("close connection exception caught on transport layer [{}], disconnecting from relevant node", (Object)channel), (Throwable)e);
            CloseableChannel.closeChannel(channel);
        } else if (NetworkExceptionHelper.isConnectException(e)) {
            logger.trace(() -> new ParameterizedMessage("connect exception caught on transport layer [{}]", (Object)channel), (Throwable)e);
            CloseableChannel.closeChannel(channel);
        } else if (e instanceof BindException) {
            logger.trace(() -> new ParameterizedMessage("bind exception caught on transport layer [{}]", (Object)channel), (Throwable)e);
            CloseableChannel.closeChannel(channel);
        } else if (e instanceof CancelledKeyException) {
            logger.trace(() -> new ParameterizedMessage("cancelled key exception caught on transport layer [{}], disconnecting from relevant node", (Object)channel), (Throwable)e);
            CloseableChannel.closeChannel(channel);
        } else if (e instanceof HttpOnTransportException) {
            if (channel.isOpen()) {
                BytesArray message = new BytesArray(e.getMessage().getBytes(StandardCharsets.UTF_8));
                ActionListener<Void> listener = new ActionListener<Void>(){

                    @Override
                    public void onResponse(Void aVoid) {
                        CloseableChannel.closeChannel(channel);
                    }

                    @Override
                    public void onFailure(Exception e) {
                        logger.debug("failed to send message to httpOnTransport channel", (Throwable)e);
                        CloseableChannel.closeChannel(channel);
                    }
                };
                try {
                    channel.sendMessage(message, new SendListener(channel, message.length(), listener));
                }
                catch (Exception ex) {
                    listener.onFailure(ex);
                }
            }
        } else {
            logger.warn(() -> new ParameterizedMessage("exception caught on transport layer [{}], closing connection", (Object)channel), (Throwable)e);
            CloseableChannel.closeChannel(channel);
        }
    }

    protected void onServerException(TcpServerChannel channel, Exception e) {
        logger.error((Message)new ParameterizedMessage("exception from server channel caught on transport layer [channel={}]", (Object)channel), (Throwable)e);
    }

    protected void onNonChannelException(Exception exception) {
        logger.warn((Message)new ParameterizedMessage("exception caught on transport layer [thread={}]", (Object)Thread.currentThread().getName()), (Throwable)exception);
    }

    protected void serverAcceptedChannel(TcpChannel channel) {
        boolean addedOnThisCall = this.acceptedChannels.add(channel);
        assert (addedOnThisCall) : "Channel should only be added to accepted channel set once";
        channel.getChannelStats().markAccessed(this.threadPool.relativeTimeInMillis());
        channel.addCloseListener(ActionListener.wrap(() -> this.acceptedChannels.remove(channel)));
        logger.trace(() -> new ParameterizedMessage("Tcp transport channel accepted: {}", (Object)channel));
    }

    protected abstract TcpServerChannel bind(String var1, InetSocketAddress var2) throws IOException;

    protected abstract TcpChannel initiateChannel(DiscoveryNode var1) throws IOException;

    protected abstract void stopInternal();

    private boolean canCompress(TransportRequest request) {
        return !(request instanceof BytesTransportRequest);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void sendRequestToChannel(DiscoveryNode node, TcpChannel channel, long requestId, String action, TransportRequest request, TransportRequestOptions options, Version channelVersion, boolean compressRequest, byte status) throws IOException, TransportException {
        boolean compressMessage = compressRequest && this.canCompress(request);
        status = TransportStatus.setRequest(status);
        ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(this.bigArrays);
        CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, compressMessage);
        boolean addedReleaseListener = false;
        try {
            if (compressMessage) {
                status = TransportStatus.setCompress(status);
            }
            Version version = Version.min(this.version, channelVersion);
            stream.setVersion(version);
            this.threadPool.getThreadContext().writeTo(stream);
            if (version.onOrAfter(Version.V_6_3_0)) {
                stream.writeStringArray(this.features);
            }
            stream.writeString(action);
            BytesReference message = this.buildMessage(requestId, status, node.getVersion(), request, stream);
            TransportRequestOptions finalOptions = options;
            ReleaseListener releaseListener = new ReleaseListener(stream, () -> this.messageListener.onRequestSent(node, requestId, action, request, finalOptions));
            this.internalSendMessage(channel, message, releaseListener);
            return;
        }
        catch (Throwable throwable) {
            if (addedReleaseListener) throw throwable;
            IOUtils.close((Closeable[])new Closeable[]{stream});
            throw throwable;
        }
    }

    private void internalSendMessage(TcpChannel channel, BytesReference message, ActionListener<Void> listener) {
        channel.getChannelStats().markAccessed(this.threadPool.relativeTimeInMillis());
        this.transportLogger.logOutboundMessage(channel, message);
        try {
            channel.sendMessage(message, new SendListener(channel, message.length(), listener));
        }
        catch (Exception ex) {
            listener.onFailure(ex);
            this.onException(channel, ex);
        }
    }

    public void sendErrorResponse(Version nodeVersion, Set<String> features, TcpChannel channel, Exception error, long requestId, String action) throws IOException {
        try (BytesStreamOutput stream = new BytesStreamOutput();){
            stream.setVersion(nodeVersion);
            stream.setFeatures(features);
            RemoteTransportException tx = new RemoteTransportException(this.nodeName, new TransportAddress(channel.getLocalAddress()), action, error);
            this.threadPool.getThreadContext().writeTo(stream);
            stream.writeException(tx);
            byte status = 0;
            status = TransportStatus.setResponse(status);
            status = TransportStatus.setError(status);
            BytesReference bytes = stream.bytes();
            BytesReference header = this.buildHeader(requestId, status, nodeVersion, bytes.length());
            CompositeBytesReference message = new CompositeBytesReference(header, bytes);
            ReleaseListener releaseListener = new ReleaseListener(null, () -> this.messageListener.onResponseSent(requestId, action, error));
            this.internalSendMessage(channel, message, releaseListener);
        }
    }

    public void sendResponse(Version nodeVersion, Set<String> features, TcpChannel channel, TransportResponse response, long requestId, String action, boolean compress) throws IOException {
        this.sendResponse(nodeVersion, features, channel, response, requestId, action, compress, (byte)0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void sendResponse(Version nodeVersion, Set<String> features, TcpChannel channel, TransportResponse response, long requestId, String action, boolean compress, byte status) throws IOException {
        boolean compressMessage = compress || this.compressAllResponses;
        status = TransportStatus.setResponse(status);
        ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(this.bigArrays);
        CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, compressMessage);
        boolean addedReleaseListener = false;
        try {
            if (compressMessage) {
                status = TransportStatus.setCompress(status);
            }
            this.threadPool.getThreadContext().writeTo(stream);
            stream.setVersion(nodeVersion);
            stream.setFeatures(features);
            BytesReference message = this.buildMessage(requestId, status, nodeVersion, response, stream);
            ReleaseListener releaseListener = new ReleaseListener(stream, () -> this.messageListener.onResponseSent(requestId, action, response));
            this.internalSendMessage(channel, message, releaseListener);
            return;
        }
        catch (Throwable throwable) {
            if (addedReleaseListener) throw throwable;
            IOUtils.close((Closeable[])new Closeable[]{stream});
            throw throwable;
        }
    }

    private BytesReference buildHeader(long requestId, byte status, Version protocolVersion, int length) throws IOException {
        try (BytesStreamOutput headerOutput = new BytesStreamOutput(19);){
            headerOutput.setVersion(protocolVersion);
            TcpHeader.writeHeader(headerOutput, requestId, status, protocolVersion, length);
            BytesReference bytes = headerOutput.bytes();
            assert (bytes.length() == 19) : "header size mismatch expected: 19 but was: " + bytes.length();
            BytesReference bytesReference = bytes;
            return bytesReference;
        }
    }

    private BytesReference buildMessage(long requestId, byte status, Version nodeVersion, TransportMessage message, CompressibleBytesOutputStream stream) throws IOException {
        BytesReference zeroCopyBuffer;
        if (message instanceof BytesTransportRequest) {
            BytesTransportRequest bRequest = (BytesTransportRequest)message;
            assert (nodeVersion.equals(bRequest.version()));
            bRequest.writeThin(stream);
            zeroCopyBuffer = bRequest.bytes;
        } else {
            message.writeTo(stream);
            zeroCopyBuffer = BytesArray.EMPTY;
        }
        BytesReference messageBody = stream.materializeBytes();
        BytesReference header = this.buildHeader(requestId, status, stream.getVersion(), messageBody.length() + zeroCopyBuffer.length());
        return new CompositeBytesReference(header, messageBody, zeroCopyBuffer);
    }

    public void inboundMessage(TcpChannel channel, BytesReference message) {
        try {
            channel.getChannelStats().markAccessed(this.threadPool.relativeTimeInMillis());
            this.transportLogger.logInboundMessage(channel, message);
            if (message.length() != 0) {
                this.messageReceived(message, channel);
            } else {
                this.keepAlive.receiveKeepAlive(channel);
            }
        }
        catch (Exception e) {
            this.onException(channel, e);
        }
    }

    public int consumeNetworkReads(TcpChannel channel, BytesReference bytesReference) throws IOException {
        BytesReference message = TcpTransport.decodeFrame(bytesReference);
        if (message == null) {
            return 0;
        }
        this.inboundMessage(channel, message);
        return message.length() + 6;
    }

    static BytesReference decodeFrame(BytesReference networkBytes) throws IOException {
        int messageLength = TcpTransport.readMessageLength(networkBytes);
        if (messageLength == -1) {
            return null;
        }
        int totalLength = messageLength + 6;
        if (totalLength > networkBytes.length()) {
            return null;
        }
        if (totalLength == 6) {
            return EMPTY_BYTES_REFERENCE;
        }
        return networkBytes.slice(6, messageLength);
    }

    public static int readMessageLength(BytesReference networkBytes) throws IOException {
        if (networkBytes.length() < 6) {
            return -1;
        }
        return TcpTransport.readHeaderBuffer(networkBytes);
    }

    private static int readHeaderBuffer(BytesReference headerBuffer) throws IOException {
        int messageLength;
        if (headerBuffer.get(0) != 69 || headerBuffer.get(1) != 83) {
            if (TcpTransport.appearsToBeHTTP(headerBuffer)) {
                throw new HttpOnTransportException("This is not an HTTP port");
            }
            throw new StreamCorruptedException("invalid internal transport message format, got (" + Integer.toHexString(headerBuffer.get(0) & 0xFF) + "," + Integer.toHexString(headerBuffer.get(1) & 0xFF) + "," + Integer.toHexString(headerBuffer.get(2) & 0xFF) + "," + Integer.toHexString(headerBuffer.get(3) & 0xFF) + ")");
        }
        try (StreamInput input = headerBuffer.streamInput();){
            input.skip(2L);
            messageLength = input.readInt();
        }
        if (messageLength == -1) {
            return 0;
        }
        if (messageLength <= 0) {
            throw new StreamCorruptedException("invalid data length: " + messageLength);
        }
        if ((long)messageLength > NINETY_PER_HEAP_SIZE) {
            throw new IllegalArgumentException("transport content length received [" + new ByteSizeValue(messageLength) + "] exceeded [" + new ByteSizeValue(NINETY_PER_HEAP_SIZE) + "]");
        }
        return messageLength;
    }

    private static boolean appearsToBeHTTP(BytesReference headerBuffer) {
        return TcpTransport.bufferStartsWith(headerBuffer, "GET") || TcpTransport.bufferStartsWith(headerBuffer, "POST") || TcpTransport.bufferStartsWith(headerBuffer, "PUT") || TcpTransport.bufferStartsWith(headerBuffer, "HEAD") || TcpTransport.bufferStartsWith(headerBuffer, "DELETE") || TcpTransport.bufferStartsWith(headerBuffer, "OPTION") || TcpTransport.bufferStartsWith(headerBuffer, "PATCH") || TcpTransport.bufferStartsWith(headerBuffer, "TRACE");
    }

    private static boolean bufferStartsWith(BytesReference buffer, String method) {
        char[] chars = method.toCharArray();
        for (int i = 0; i < chars.length; ++i) {
            if (buffer.get(i) == chars[i]) continue;
            return false;
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void messageReceived(BytesReference reference, TcpChannel channel) throws IOException {
        block22: {
            StreamInput streamIn;
            block21: {
                String profileName = channel.getProfile();
                InetSocketAddress remoteAddress = channel.getRemoteAddress();
                int messageLengthBytes = reference.length();
                int totalMessageSize = messageLengthBytes + 2 + 4;
                this.readBytesMetric.inc(totalMessageSize);
                boolean hasMessageBytesToRead = totalMessageSize - 19 > 0;
                streamIn = reference.streamInput();
                boolean success = false;
                try {
                    try (ThreadContext.StoredContext tCtx = this.threadPool.getThreadContext().stashContext();){
                        long requestId = streamIn.readLong();
                        byte status = streamIn.readByte();
                        Version version = Version.fromId(streamIn.readInt());
                        if (TransportStatus.isCompress(status) && hasMessageBytesToRead && streamIn.available() > 0) {
                            Compressor compressor;
                            try {
                                int bytesConsumed = 13;
                                compressor = CompressorFactory.compressor(reference.slice(13, reference.length() - 13));
                            }
                            catch (NotCompressedException ex) {
                                int maxToRead = Math.min(reference.length(), 10);
                                StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [").append(maxToRead).append("] content bytes out of [").append(reference.length()).append("] readable bytes with message size [").append(messageLengthBytes).append("] ").append("] are [");
                                for (int i = 0; i < maxToRead; ++i) {
                                    sb.append(reference.get(i)).append(",");
                                }
                                sb.append("]");
                                throw new IllegalStateException(sb.toString());
                            }
                            streamIn = compressor.streamInput(streamIn);
                        }
                        boolean isHandshake = TransportStatus.isHandshake(status);
                        TcpTransport.ensureVersionCompatibility(version, this.version, isHandshake);
                        streamIn = new NamedWriteableAwareStreamInput(streamIn, this.namedWriteableRegistry);
                        streamIn.setVersion(version);
                        this.threadPool.getThreadContext().readHeaders(streamIn);
                        this.threadPool.getThreadContext().putTransient("_remote_address", remoteAddress);
                        if (TransportStatus.isRequest(status)) {
                            this.handleRequest(channel, profileName, streamIn, requestId, messageLengthBytes, version, remoteAddress, status);
                        } else {
                            TransportResponseHandler<? extends TransportResponse> theHandler;
                            TransportResponseHandler<TransportResponse> handler = isHandshake ? this.handshaker.removeHandlerForHandshake(requestId) : ((theHandler = this.responseHandlers.onResponseReceived(requestId, this.messageListener)) == null && TransportStatus.isError(status) ? this.handshaker.removeHandlerForHandshake(requestId) : theHandler);
                            if (handler != null) {
                                if (TransportStatus.isError(status)) {
                                    this.handlerResponseError(streamIn, handler);
                                } else {
                                    this.handleResponse(remoteAddress, streamIn, handler);
                                }
                                int nextByte = streamIn.read();
                                if (nextByte != -1) {
                                    throw new IllegalStateException("Message not fully read (response) for requestId [" + requestId + "], handler [" + handler + "], error [" + TransportStatus.isError(status) + "]; resetting");
                                }
                            }
                        }
                        success = true;
                    }
                    if (!success) break block21;
                }
                catch (Throwable throwable) {
                    if (success) {
                        IOUtils.close((Closeable[])new Closeable[]{streamIn});
                    } else {
                        IOUtils.closeWhileHandlingException((Closeable[])new Closeable[]{streamIn});
                    }
                    throw throwable;
                }
                IOUtils.close((Closeable[])new Closeable[]{streamIn});
                break block22;
            }
            IOUtils.closeWhileHandlingException((Closeable[])new Closeable[]{streamIn});
        }
    }

    static void ensureVersionCompatibility(Version version, Version currentVersion, boolean isHandshake) {
        Version compatibilityVersion;
        Version version2 = compatibilityVersion = isHandshake ? currentVersion.minimumCompatibilityVersion() : currentVersion;
        if (!version.isCompatible(compatibilityVersion)) {
            Version minCompatibilityVersion = isHandshake ? compatibilityVersion : compatibilityVersion.minimumCompatibilityVersion();
            String msg = "Received " + (isHandshake ? "handshake " : "") + "message from unsupported version: [";
            throw new IllegalStateException(msg + version + "] minimal compatible version is: [" + minCompatibilityVersion + "]");
        }
    }

    private <T extends TransportResponse> void handleResponse(InetSocketAddress remoteAddress, StreamInput stream, final TransportResponseHandler<T> handler) {
        TransportResponse response;
        try {
            response = (TransportResponse)handler.read(stream);
            response.remoteAddress(new TransportAddress(remoteAddress));
        }
        catch (Exception e) {
            this.handleException(handler, new TransportSerializationException("Failed to deserialize response from handler [" + handler.getClass().getName() + "]", e));
            return;
        }
        this.threadPool.executor(handler.executor()).execute(new AbstractRunnable(){

            @Override
            public void onFailure(Exception e) {
                TcpTransport.this.handleException(handler, new ResponseHandlerFailureTransportException(e));
            }

            @Override
            protected void doRun() throws Exception {
                handler.handleResponse(response);
            }
        });
    }

    private void handlerResponseError(StreamInput stream, TransportResponseHandler handler) {
        Object error;
        try {
            error = stream.readException();
        }
        catch (Exception e) {
            error = new TransportSerializationException("Failed to deserialize exception response from stream", e);
        }
        this.handleException(handler, (Throwable)error);
    }

    private void handleException(TransportResponseHandler handler, Throwable error) {
        if (!(error instanceof RemoteTransportException)) {
            error = new RemoteTransportException(error.getMessage(), error);
        }
        RemoteTransportException rtx = (RemoteTransportException)error;
        this.threadPool.executor(handler.executor()).execute(() -> {
            try {
                handler.handleException(rtx);
            }
            catch (Exception e) {
                logger.error(() -> new ParameterizedMessage("failed to handle exception response [{}]", (Object)handler), (Throwable)e);
            }
        });
    }

    protected String handleRequest(TcpChannel channel, String profileName, StreamInput stream, long requestId, int messageLengthBytes, Version version, InetSocketAddress remoteAddress, byte status) throws IOException {
        Set<String> features = version.onOrAfter(Version.V_6_3_0) ? Collections.unmodifiableSet(new TreeSet<String>(Arrays.asList(stream.readStringArray()))) : Collections.emptySet();
        String action = stream.readString();
        this.messageListener.onRequestReceived(requestId, action);
        TransportChannel transportChannel = null;
        try {
            if (TransportStatus.isHandshake(status)) {
                this.handshaker.handleHandshake(version, features, channel, requestId, stream);
            } else {
                RequestHandlerRegistry<? extends TransportRequest> reg = this.getRequestHandler(action);
                if (reg == null) {
                    throw new ActionNotFoundTransportException(action);
                }
                if (reg.canTripCircuitBreaker()) {
                    this.getInFlightRequestBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, "<transport_request>");
                } else {
                    this.getInFlightRequestBreaker().addWithoutBreaking(messageLengthBytes);
                }
                transportChannel = new TcpTransportChannel(this, channel, this.transportName, action, requestId, version, features, profileName, messageLengthBytes, TransportStatus.isCompress(status));
                TransportRequest request = reg.newRequest(stream);
                request.remoteAddress(new TransportAddress(remoteAddress));
                this.validateRequest(stream, requestId, action);
                this.threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel));
            }
        }
        catch (Exception e) {
            if (transportChannel == null) {
                transportChannel = new TcpTransportChannel(this, channel, this.transportName, action, requestId, version, features, profileName, 0L, TransportStatus.isCompress(status));
            }
            try {
                transportChannel.sendResponse(e);
            }
            catch (IOException inner) {
                inner.addSuppressed(e);
                logger.warn(() -> new ParameterizedMessage("Failed to send error message back to client for action [{}]", (Object)action), (Throwable)inner);
            }
        }
        return action;
    }

    protected void validateRequest(StreamInput stream, long requestId, String action) throws IOException {
        int nextByte = stream.read();
        if (nextByte != -1) {
            throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action [" + action + "], available [" + stream.available() + "]; resetting");
        }
    }

    public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile, ActionListener<Version> listener) {
        this.handshaker.sendHandshake(this.responseHandlers.newRequestId(), node, channel, profile.getHandshakeTimeout(), listener);
    }

    final TransportKeepAlive getKeepAlive() {
        return this.keepAlive;
    }

    final int getNumPendingHandshakes() {
        return this.handshaker.getNumPendingHandshakes();
    }

    final long getNumHandshakes() {
        return this.handshaker.getNumHandshakes();
    }

    protected final void ensureOpen() {
        if (!this.lifecycle.started()) {
            throw new IllegalStateException("transport has been stopped");
        }
    }

    @Override
    public final TransportStats getStats() {
        return new TransportStats(this.acceptedChannels.size(), this.readBytesMetric.count(), this.readBytesMetric.sum(), this.transmittedBytesMetric.count(), this.transmittedBytesMetric.sum());
    }

    public static Set<ProfileSettings> getProfileSettings(Settings settings) {
        HashSet<ProfileSettings> profiles = new HashSet<ProfileSettings>();
        boolean isDefaultSet = false;
        for (String profile : settings.getGroups("transport.profiles.", true).keySet()) {
            profiles.add(new ProfileSettings(settings, profile));
            if (!"default".equals(profile)) continue;
            isDefaultSet = true;
        }
        if (!isDefaultSet) {
            profiles.add(new ProfileSettings(settings, "default"));
        }
        return Collections.unmodifiableSet(profiles);
    }

    @Override
    public final Transport.ResponseHandlers getResponseHandlers() {
        return this.responseHandlers;
    }

    @Override
    public final RequestHandlerRegistry<? extends TransportRequest> getRequestHandler(String action) {
        return this.requestHandlers.get(action);
    }

    private final class ChannelsConnectedListener
    implements ActionListener<Void> {
        private final DiscoveryNode node;
        private final ConnectionProfile connectionProfile;
        private final List<TcpChannel> channels;
        private final ActionListener<Transport.Connection> listener;
        private final CountDown countDown;

        private ChannelsConnectedListener(DiscoveryNode node, ConnectionProfile connectionProfile, List<TcpChannel> channels, ActionListener<Transport.Connection> listener) {
            this.node = node;
            this.connectionProfile = connectionProfile;
            this.channels = channels;
            this.listener = listener;
            this.countDown = new CountDown(channels.size());
        }

        @Override
        public void onResponse(Void v) {
            if (this.countDown.countDown()) {
                TcpChannel handshakeChannel = this.channels.get(0);
                try {
                    TcpTransport.this.executeHandshake(this.node, handshakeChannel, this.connectionProfile, new ActionListener<Version>(){

                        @Override
                        public void onResponse(Version version) {
                            NodeChannels nodeChannels = new NodeChannels(ChannelsConnectedListener.this.node, ChannelsConnectedListener.this.channels, ChannelsConnectedListener.this.connectionProfile, version);
                            long relativeMillisTime = TcpTransport.this.threadPool.relativeTimeInMillis();
                            nodeChannels.channels.forEach(ch -> {
                                ch.getChannelStats().markAccessed(relativeMillisTime);
                                ch.addCloseListener(ActionListener.wrap(nodeChannels::close));
                            });
                            TcpTransport.this.keepAlive.registerNodeConnection(nodeChannels.channels, ChannelsConnectedListener.this.connectionProfile);
                            ChannelsConnectedListener.this.listener.onResponse(nodeChannels);
                        }

                        @Override
                        public void onFailure(Exception e) {
                            CloseableChannel.closeChannels(ChannelsConnectedListener.this.channels, false);
                            if (e instanceof ConnectTransportException) {
                                ChannelsConnectedListener.this.listener.onFailure(e);
                            } else {
                                ChannelsConnectedListener.this.listener.onFailure(new ConnectTransportException(ChannelsConnectedListener.this.node, "general node connection failure", e));
                            }
                        }
                    });
                }
                catch (Exception ex) {
                    CloseableChannel.closeChannels(this.channels, false);
                    this.listener.onFailure(ex);
                }
            }
        }

        @Override
        public void onFailure(Exception ex) {
            if (this.countDown.fastForward()) {
                CloseableChannel.closeChannels(this.channels, false);
                this.listener.onFailure(new ConnectTransportException(this.node, "connect_exception", ex));
            }
        }

        public void onTimeout() {
            if (this.countDown.fastForward()) {
                CloseableChannel.closeChannels(this.channels, false);
                this.listener.onFailure(new ConnectTransportException(this.node, "connect_timeout[" + this.connectionProfile.getConnectTimeout() + "]"));
            }
        }
    }

    private static final class DelegatingTransportMessageListener
    implements TransportMessageListener {
        private final List<TransportMessageListener> listeners = new CopyOnWriteArrayList<TransportMessageListener>();

        private DelegatingTransportMessageListener() {
        }

        @Override
        public void onRequestReceived(long requestId, String action) {
            for (TransportMessageListener listener : this.listeners) {
                listener.onRequestReceived(requestId, action);
            }
        }

        @Override
        public void onResponseSent(long requestId, String action, TransportResponse response) {
            for (TransportMessageListener listener : this.listeners) {
                listener.onResponseSent(requestId, action, response);
            }
        }

        @Override
        public void onResponseSent(long requestId, String action, Exception error) {
            for (TransportMessageListener listener : this.listeners) {
                listener.onResponseSent(requestId, action, error);
            }
        }

        @Override
        public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions finalOptions) {
            for (TransportMessageListener listener : this.listeners) {
                listener.onRequestSent(node, requestId, action, request, finalOptions);
            }
        }

        @Override
        public void onResponseReceived(long requestId, Transport.ResponseContext holder) {
            for (TransportMessageListener listener : this.listeners) {
                listener.onResponseReceived(requestId, holder);
            }
        }
    }

    public static final class ProfileSettings {
        public final String profileName;
        public final boolean tcpNoDelay;
        public final boolean tcpKeepAlive;
        public final boolean reuseAddress;
        public final ByteSizeValue sendBufferSize;
        public final ByteSizeValue receiveBufferSize;
        public final List<String> bindHosts;
        public final List<String> publishHosts;
        public final String portOrRange;
        public final int publishPort;
        public final boolean isDefaultProfile;

        public ProfileSettings(Settings settings, String profileName) {
            this.profileName = profileName;
            this.isDefaultProfile = "default".equals(profileName);
            this.tcpKeepAlive = TransportSettings.TCP_KEEP_ALIVE_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
            this.tcpNoDelay = TransportSettings.TCP_NO_DELAY_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
            this.reuseAddress = TransportSettings.TCP_REUSE_ADDRESS_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
            this.sendBufferSize = TransportSettings.TCP_SEND_BUFFER_SIZE_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
            this.receiveBufferSize = TransportSettings.TCP_RECEIVE_BUFFER_SIZE_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
            List<String> profileBindHosts = TransportSettings.BIND_HOST_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
            this.bindHosts = profileBindHosts.isEmpty() ? NetworkService.GLOBAL_NETWORK_BIND_HOST_SETTING.get(settings) : profileBindHosts;
            this.publishHosts = TransportSettings.PUBLISH_HOST_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
            Setting<String> concretePort = TransportSettings.PORT_PROFILE.getConcreteSettingForNamespace(profileName);
            if (!concretePort.exists(settings) && !this.isDefaultProfile) {
                throw new IllegalStateException("profile [" + profileName + "] has no port configured");
            }
            this.portOrRange = TransportSettings.PORT_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
            this.publishPort = this.isDefaultProfile ? TransportSettings.PUBLISH_PORT.get(settings) : TransportSettings.PUBLISH_PORT_PROFILE.getConcreteSettingForNamespace(profileName).get(settings);
        }
    }

    private class ReleaseListener
    implements ActionListener<Void> {
        private final Closeable optionalCloseable;
        private final Runnable transportAdaptorCallback;

        private ReleaseListener(Closeable optionalCloseable, Runnable transportAdaptorCallback) {
            this.optionalCloseable = optionalCloseable;
            this.transportAdaptorCallback = transportAdaptorCallback;
        }

        @Override
        public void onResponse(Void aVoid) {
            this.closeAndCallback(null);
        }

        @Override
        public void onFailure(Exception e) {
            this.closeAndCallback(e);
        }

        private void closeAndCallback(Exception e) {
            try {
                Closeable[] closeableArray = new Closeable[2];
                closeableArray[0] = this.optionalCloseable;
                closeableArray[1] = this.transportAdaptorCallback::run;
                IOUtils.close((Closeable[])closeableArray);
            }
            catch (IOException inner) {
                if (e != null) {
                    inner.addSuppressed(e);
                }
                throw new UncheckedIOException(inner);
            }
        }
    }

    private class SendListener
    extends NotifyOnceListener<Void> {
        private final TcpChannel channel;
        private final long messageSize;
        private final ActionListener<Void> delegateListener;

        private SendListener(TcpChannel channel, long messageSize, ActionListener<Void> delegateListener) {
            this.channel = channel;
            this.messageSize = messageSize;
            this.delegateListener = delegateListener;
        }

        @Override
        protected void innerOnResponse(Void v) {
            TcpTransport.this.transmittedBytesMetric.inc(this.messageSize);
            this.delegateListener.onResponse(v);
        }

        @Override
        protected void innerOnFailure(Exception e) {
            logger.warn(() -> new ParameterizedMessage("send message failed [channel: {}]", (Object)this.channel), (Throwable)e);
            this.delegateListener.onFailure(e);
        }
    }

    class RequestHandler
    extends AbstractRunnable {
        private final RequestHandlerRegistry reg;
        private final TransportRequest request;
        private final TransportChannel transportChannel;

        RequestHandler(RequestHandlerRegistry reg, TransportRequest request, TransportChannel transportChannel) {
            this.reg = reg;
            this.request = request;
            this.transportChannel = transportChannel;
        }

        @Override
        protected void doRun() throws Exception {
            this.reg.processMessageReceived(this.request, this.transportChannel);
        }

        @Override
        public boolean isForceExecution() {
            return this.reg.isForceExecution();
        }

        @Override
        public void onFailure(Exception e) {
            if (TcpTransport.this.lifecycleState() == Lifecycle.State.STARTED) {
                try {
                    this.transportChannel.sendResponse(e);
                }
                catch (Exception inner) {
                    inner.addSuppressed(e);
                    logger.warn(() -> new ParameterizedMessage("Failed to send error message back to client for action [{}]", (Object)this.reg.getAction()), (Throwable)inner);
                }
            }
        }
    }

    public static class HttpOnTransportException
    extends ElasticsearchException {
        private HttpOnTransportException(String msg) {
            super(msg, new Object[0]);
        }

        @Override
        public RestStatus status() {
            return RestStatus.BAD_REQUEST;
        }

        public HttpOnTransportException(StreamInput in) throws IOException {
            super(in);
        }
    }

    public final class NodeChannels
    extends CloseableConnection {
        private final Map<TransportRequestOptions.Type, ConnectionProfile.ConnectionTypeHandle> typeMapping;
        private final List<TcpChannel> channels;
        private final DiscoveryNode node;
        private final Version version;
        private final boolean compress;
        private final AtomicBoolean isClosing = new AtomicBoolean(false);

        NodeChannels(DiscoveryNode node, List<TcpChannel> channels, ConnectionProfile connectionProfile, Version handshakeVersion) {
            this.node = node;
            this.channels = Collections.unmodifiableList(channels);
            assert (channels.size() == connectionProfile.getNumConnections()) : "expected channels size to be == " + connectionProfile.getNumConnections() + " but was: [" + channels.size() + "]";
            this.typeMapping = new EnumMap<TransportRequestOptions.Type, ConnectionProfile.ConnectionTypeHandle>(TransportRequestOptions.Type.class);
            for (ConnectionProfile.ConnectionTypeHandle handle : connectionProfile.getHandles()) {
                for (TransportRequestOptions.Type type : handle.getTypes()) {
                    this.typeMapping.put(type, handle);
                }
            }
            this.version = handshakeVersion;
            this.compress = connectionProfile.getCompressionEnabled();
        }

        @Override
        public Version getVersion() {
            return this.version;
        }

        public List<TcpChannel> getChannels() {
            return this.channels;
        }

        public TcpChannel channel(TransportRequestOptions.Type type) {
            ConnectionProfile.ConnectionTypeHandle connectionTypeHandle = this.typeMapping.get((Object)type);
            if (connectionTypeHandle == null) {
                throw new IllegalArgumentException("no type channel for [" + (Object)((Object)type) + "]");
            }
            return connectionTypeHandle.getChannel(this.channels);
        }

        @Override
        public void close() {
            if (this.isClosing.compareAndSet(false, true)) {
                try {
                    boolean block = TcpTransport.this.lifecycle.stopped() && !Transports.isTransportThread(Thread.currentThread());
                    CloseableChannel.closeChannels(this.channels, block);
                }
                finally {
                    super.close();
                }
            }
        }

        @Override
        public DiscoveryNode getNode() {
            return this.node;
        }

        @Override
        public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
            if (this.isClosing.get()) {
                throw new NodeNotConnectedException(this.node, "connection already closed");
            }
            TcpChannel channel = this.channel(options.type());
            TcpTransport.this.sendRequestToChannel(this.node, channel, requestId, action, request, options, this.getVersion(), this.compress, (byte)0);
        }
    }
}

