/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.transport.netty;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.compress.NotCompressedException;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.ResponseHandlerFailureTransportException;
import org.elasticsearch.transport.TransportMessage;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportSerializationException;
import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.transport.netty.ChannelBufferStreamInputFactory;
import org.elasticsearch.transport.netty.NettyTransport;
import org.elasticsearch.transport.netty.NettyTransportChannel;
import org.elasticsearch.transport.support.TransportStatus;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.WriteCompletionEvent;

public class MessageChannelHandler
extends SimpleChannelUpstreamHandler {
    protected final ESLogger logger;
    protected final ThreadPool threadPool;
    protected final TransportServiceAdapter transportServiceAdapter;
    protected final NettyTransport transport;
    protected final String profileName;

    public MessageChannelHandler(NettyTransport transport, ESLogger logger, String profileName) {
        this.threadPool = transport.threadPool();
        this.transportServiceAdapter = transport.transportServiceAdapter();
        this.transport = transport;
        this.logger = logger;
        this.profileName = profileName;
    }

    public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
        this.transportServiceAdapter.sent(e.getWrittenAmount());
        super.writeComplete(ctx, e);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        boolean success;
        StreamInput streamIn;
        int expectedIndexReader;
        ChannelBuffer buffer;
        block24: {
            Transports.assertTransportThread();
            Object m = e.getMessage();
            if (!(m instanceof ChannelBuffer)) {
                ctx.sendUpstream((ChannelEvent)e);
                return;
            }
            buffer = (ChannelBuffer)m;
            int size = buffer.getInt(buffer.readerIndex() - 4);
            this.transportServiceAdapter.received(size + 6);
            boolean hasMessageBytesToRead = size - 13 != 0;
            int markedReaderIndex = buffer.readerIndex();
            expectedIndexReader = markedReaderIndex + size;
            streamIn = ChannelBufferStreamInputFactory.create(buffer, size);
            success = false;
            try {
                long requestId = streamIn.readLong();
                byte status = streamIn.readByte();
                Version version = Version.fromId(streamIn.readInt());
                if (TransportStatus.isCompress(status) && hasMessageBytesToRead && buffer.readable()) {
                    Compressor compressor;
                    try {
                        compressor = CompressorFactory.compressor(buffer);
                    }
                    catch (NotCompressedException ex) {
                        int maxToRead = Math.min(buffer.readableBytes(), 10);
                        int offset = buffer.readerIndex();
                        StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [").append(maxToRead).append("] content bytes out of [").append(buffer.readableBytes()).append("] readable bytes with message size [").append(size).append("] ").append("] are [");
                        for (int i = 0; i < maxToRead; ++i) {
                            sb.append(buffer.getByte(offset + i)).append(",");
                        }
                        sb.append("]");
                        throw new IllegalStateException(sb.toString());
                    }
                    streamIn = compressor.streamInput(streamIn);
                }
                streamIn.setVersion(version);
                if (TransportStatus.isRequest(status)) {
                    String action = this.handleRequest(ctx.getChannel(), streamIn, requestId, version);
                    int nextByte = streamIn.read();
                    if (nextByte != -1) {
                        throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action [" + action + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting");
                    }
                    if (buffer.readerIndex() < expectedIndexReader) {
                        throw new IllegalStateException("Message is fully read (request), yet there are " + (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting");
                    }
                    if (buffer.readerIndex() > expectedIndexReader) {
                        throw new IllegalStateException("Message read past expected size (request) for requestId [" + requestId + "], action [" + action + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting");
                    }
                    break block24;
                }
                TransportResponseHandler handler = this.transportServiceAdapter.onResponseReceived(requestId);
                if (handler == null) break block24;
                if (TransportStatus.isError(status)) {
                    this.handlerResponseError(streamIn, handler);
                } else {
                    this.handleResponse(ctx.getChannel(), streamIn, handler);
                }
                int nextByte = streamIn.read();
                if (nextByte != -1) {
                    throw new IllegalStateException("Message not fully read (response) for requestId [" + requestId + "], handler [" + handler + "], error [" + TransportStatus.isError(status) + "]; resetting");
                }
                if (buffer.readerIndex() < expectedIndexReader) {
                    throw new IllegalStateException("Message is fully read (response), yet there are " + (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting");
                }
                if (buffer.readerIndex() <= expectedIndexReader) break block24;
                throw new IllegalStateException("Message read past expected size (response) for requestId [" + requestId + "], handler [" + handler + "], error [" + TransportStatus.isError(status) + "]; resetting");
            }
            catch (Throwable throwable) {
                try {
                    if (success) {
                        IOUtils.close((Closeable[])new Closeable[]{streamIn});
                    } else {
                        IOUtils.closeWhileHandlingException((Closeable[])new Closeable[]{streamIn});
                    }
                }
                finally {
                    buffer.readerIndex(expectedIndexReader);
                }
                throw throwable;
            }
        }
        try {
            if (success) {
                IOUtils.close((Closeable[])new Closeable[]{streamIn});
            }
            IOUtils.closeWhileHandlingException((Closeable[])new Closeable[]{streamIn});
        }
        finally {
            buffer.readerIndex(expectedIndexReader);
        }
    }

    protected void handleResponse(Channel channel, StreamInput buffer, TransportResponseHandler handler) {
        buffer = new NamedWriteableAwareStreamInput(buffer, this.transport.namedWriteableRegistry);
        Object response = handler.newInstance();
        ((TransportMessage)response).remoteAddress(new InetSocketTransportAddress((InetSocketAddress)channel.getRemoteAddress()));
        ((TransportMessage)response).remoteAddress();
        try {
            ((TransportMessage)response).readFrom(buffer);
        }
        catch (Throwable e) {
            this.handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + response.getClass().getName() + "]", e));
            return;
        }
        try {
            if ("same".equals(handler.executor())) {
                handler.handleResponse(response);
            } else {
                this.threadPool.executor(handler.executor()).execute(new ResponseHandler(handler, (TransportResponse)response));
            }
        }
        catch (Throwable e) {
            this.handleException(handler, new ResponseHandlerFailureTransportException(e));
        }
    }

    private void handlerResponseError(StreamInput buffer, TransportResponseHandler handler) {
        Object error;
        try {
            error = buffer.readThrowable();
        }
        catch (Throwable e) {
            error = new TransportSerializationException("Failed to deserialize exception response from stream", e);
        }
        this.handleException(handler, (Throwable)error);
    }

    private void handleException(final TransportResponseHandler handler, Throwable error) {
        if (!(error instanceof RemoteTransportException)) {
            error = new RemoteTransportException(error.getMessage(), error);
        }
        final RemoteTransportException rtx = (RemoteTransportException)error;
        if ("same".equals(handler.executor())) {
            try {
                handler.handleException(rtx);
            }
            catch (Throwable e) {
                this.logger.error("failed to handle exception response [{}]", e, handler);
            }
        } else {
            this.threadPool.executor(handler.executor()).execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        handler.handleException(rtx);
                    }
                    catch (Throwable e) {
                        MessageChannelHandler.this.logger.error("failed to handle exception response [{}]", e, handler);
                    }
                }
            });
        }
    }

    protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
        buffer = new NamedWriteableAwareStreamInput(buffer, this.transport.namedWriteableRegistry);
        String action = buffer.readString();
        this.transportServiceAdapter.onRequestReceived(requestId, action);
        NettyTransportChannel transportChannel = new NettyTransportChannel(this.transport, this.transportServiceAdapter, action, channel, requestId, version, this.profileName);
        try {
            RequestHandlerRegistry reg = this.transportServiceAdapter.getRequestHandler(action);
            if (reg == null) {
                throw new ActionNotFoundTransportException(action);
            }
            Object request = reg.newRequest();
            ((TransportMessage)request).remoteAddress(new InetSocketTransportAddress((InetSocketAddress)channel.getRemoteAddress()));
            ((TransportMessage)request).readFrom(buffer);
            if ("same".equals(reg.getExecutor())) {
                reg.processMessageReceived(request, transportChannel);
            } else {
                this.threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, (TransportRequest)request, transportChannel));
            }
        }
        catch (Throwable e) {
            try {
                transportChannel.sendResponse(e);
            }
            catch (IOException e1) {
                this.logger.warn("Failed to send error message back to client for action [" + action + "]", e, new Object[0]);
                this.logger.warn("Actual Exception", e1, new Object[0]);
            }
        }
        return action;
    }

    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        this.transport.exceptionCaught(ctx, e);
    }

    class RequestHandler
    extends AbstractRunnable {
        private final RequestHandlerRegistry reg;
        private final TransportRequest request;
        private final NettyTransportChannel transportChannel;

        public RequestHandler(RequestHandlerRegistry reg, TransportRequest request, NettyTransportChannel transportChannel) {
            this.reg = reg;
            this.request = request;
            this.transportChannel = transportChannel;
        }

        @Override
        protected void doRun() throws Exception {
            this.reg.processMessageReceived(this.request, this.transportChannel);
        }

        @Override
        public boolean isForceExecution() {
            return this.reg.isForceExecution();
        }

        @Override
        public void onFailure(Throwable e) {
            if (MessageChannelHandler.this.transport.lifecycleState() == Lifecycle.State.STARTED) {
                try {
                    this.transportChannel.sendResponse(e);
                }
                catch (Throwable e1) {
                    MessageChannelHandler.this.logger.warn("Failed to send error message back to client for action [" + this.reg.getAction() + "]", e1, new Object[0]);
                    MessageChannelHandler.this.logger.warn("Actual Exception", e, new Object[0]);
                }
            }
        }
    }

    class ResponseHandler
    implements Runnable {
        private final TransportResponseHandler handler;
        private final TransportResponse response;

        public ResponseHandler(TransportResponseHandler handler, TransportResponse response) {
            this.handler = handler;
            this.response = response;
        }

        @Override
        public void run() {
            try {
                this.handler.handleResponse(this.response);
            }
            catch (Throwable e) {
                MessageChannelHandler.this.handleException(this.handler, new ResponseHandlerFailureTransportException(e));
            }
        }
    }
}

