/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.http.nio;

import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.http.HttpHandlingSettings;
import org.opensearch.http.HttpPipelinedRequest;
import org.opensearch.http.HttpPipelinedResponse;
import org.opensearch.http.HttpReadTimeoutException;
import org.opensearch.http.HttpRequest;
import org.opensearch.http.nio.HttpWriteOperation;
import org.opensearch.http.nio.NettyAdaptor;
import org.opensearch.http.nio.NioHttpChannel;
import org.opensearch.http.nio.NioHttpPipeliningHandler;
import org.opensearch.http.nio.NioHttpRequestCreator;
import org.opensearch.http.nio.NioHttpResponse;
import org.opensearch.http.nio.NioHttpResponseCreator;
import org.opensearch.http.nio.NioHttpServerTransport;
import org.opensearch.nio.FlushOperation;
import org.opensearch.nio.InboundChannelBuffer;
import org.opensearch.nio.NioChannelHandler;
import org.opensearch.nio.SocketChannelContext;
import org.opensearch.nio.TaskScheduler;
import org.opensearch.nio.WriteOperation;

public class HttpReadWriteHandler
implements NioChannelHandler {
    private final NettyAdaptor adaptor;
    private final NioHttpChannel nioHttpChannel;
    private final NioHttpServerTransport transport;
    private final TaskScheduler taskScheduler;
    private final LongSupplier nanoClock;
    private final long readTimeoutNanos;
    private boolean channelActive = false;
    private boolean requestSinceReadTimeoutTrigger = false;
    private int inFlightRequests = 0;

    public HttpReadWriteHandler(NioHttpChannel nioHttpChannel, NioHttpServerTransport transport, HttpHandlingSettings settings, TaskScheduler taskScheduler, LongSupplier nanoClock) {
        this.nioHttpChannel = nioHttpChannel;
        this.transport = transport;
        this.taskScheduler = taskScheduler;
        this.nanoClock = nanoClock;
        this.readTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(settings.getReadTimeoutMillis());
        ArrayList<Object> handlers = new ArrayList<Object>(8);
        HttpRequestDecoder decoder = new HttpRequestDecoder(settings.getMaxInitialLineLength(), settings.getMaxHeaderSize(), settings.getMaxChunkSize());
        decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
        handlers.add(decoder);
        handlers.add(new HttpContentDecompressor());
        handlers.add(new HttpResponseEncoder());
        handlers.add(new HttpObjectAggregator(settings.getMaxContentLength()));
        if (settings.isCompression()) {
            handlers.add(new HttpContentCompressor(settings.getCompressionLevel()));
        }
        handlers.add((Object)new NioHttpRequestCreator());
        handlers.add((Object)new NioHttpResponseCreator());
        handlers.add((Object)new NioHttpPipeliningHandler(transport.getLogger(), settings.getPipeliningMaxEvents()));
        this.adaptor = new NettyAdaptor(handlers.toArray(new ChannelHandler[0]));
        this.adaptor.addCloseListener((v, e) -> nioHttpChannel.close());
    }

    public void channelActive() {
        this.channelActive = true;
        if (this.readTimeoutNanos > 0L) {
            this.scheduleReadTimeout();
        }
    }

    public int consumeReads(InboundChannelBuffer channelBuffer) {
        Object message;
        assert (this.channelActive) : "channelActive should have been called";
        int bytesConsumed = this.adaptor.read(channelBuffer.sliceAndRetainPagesTo(channelBuffer.getIndex()));
        while ((message = this.adaptor.pollInboundMessage()) != null) {
            ++this.inFlightRequests;
            this.requestSinceReadTimeoutTrigger = true;
            this.handleRequest(message);
        }
        return bytesConsumed;
    }

    public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer<Void, Exception> listener) {
        assert (HttpReadWriteHandler.assertMessageTypes(message));
        return new HttpWriteOperation(context, (HttpPipelinedResponse)message, listener);
    }

    public List<FlushOperation> writeToBytes(WriteOperation writeOperation) {
        assert (HttpReadWriteHandler.assertMessageTypes(writeOperation.getObject()));
        assert (this.channelActive) : "channelActive should have been called";
        --this.inFlightRequests;
        assert (this.inFlightRequests >= 0) : "Inflight requests should never drop below zero, found: " + this.inFlightRequests;
        this.adaptor.write(writeOperation);
        return this.pollFlushOperations();
    }

    public List<FlushOperation> pollFlushOperations() {
        FlushOperation flushOperation;
        ArrayList<FlushOperation> copiedOperations = new ArrayList<FlushOperation>(this.adaptor.getOutboundCount());
        while ((flushOperation = this.adaptor.pollOutboundOperation()) != null) {
            copiedOperations.add(flushOperation);
        }
        return copiedOperations;
    }

    public boolean closeNow() {
        return false;
    }

    public void close() throws IOException {
        try {
            this.adaptor.close();
        }
        catch (Exception e) {
            throw new IOException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleRequest(Object msg) {
        HttpPipelinedRequest pipelinedRequest = (HttpPipelinedRequest)msg;
        boolean success = false;
        try {
            this.transport.incomingRequest((HttpRequest)pipelinedRequest, this.nioHttpChannel);
            success = true;
        }
        finally {
            if (!success) {
                pipelinedRequest.release();
            }
        }
    }

    private void maybeReadTimeout() {
        if (!this.requestSinceReadTimeoutTrigger && this.inFlightRequests == 0) {
            this.transport.onException(this.nioHttpChannel, (Exception)new HttpReadTimeoutException(TimeValue.nsecToMSec((long)this.readTimeoutNanos)));
        } else {
            this.requestSinceReadTimeoutTrigger = false;
            this.scheduleReadTimeout();
        }
    }

    private void scheduleReadTimeout() {
        this.taskScheduler.scheduleAtRelativeTime(this::maybeReadTimeout, this.nanoClock.getAsLong() + this.readTimeoutNanos);
    }

    private static boolean assertMessageTypes(Object message) {
        assert (message instanceof HttpPipelinedResponse) : "This channel only supports messages that are of type: " + HttpPipelinedResponse.class + ". Found type: " + message.getClass() + ".";
        assert (((HttpPipelinedResponse)message).getDelegateRequest() instanceof NioHttpResponse) : "This channel only pipelined responses with a delegate of type: " + NioHttpResponse.class + ". Found type: " + ((HttpPipelinedResponse)message).getDelegateRequest().getClass() + ".";
        return true;
    }
}

