/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.job.process.logging;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Deque;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.ml.job.process.logging.CppLogMessage;

public class CppLogMessageHandler
implements Closeable {
    private static final Logger LOGGER = Loggers.getLogger(CppLogMessageHandler.class);
    private static final int DEFAULT_READBUF_SIZE = 1024;
    private static final int DEFAULT_ERROR_STORE_SIZE = 5;
    private static final long MAX_MESSAGE_INTERVAL_SECONDS = 10L;
    private final String jobId;
    private final InputStream inputStream;
    private final int readBufSize;
    private final int errorStoreSize;
    private final Deque<String> errorStore;
    private final CountDownLatch pidLatch;
    private final CountDownLatch cppCopyrightLatch;
    private final CountDownLatch logStreamClosedLatch;
    private MessageSummary lastMessageSummary = new MessageSummary();
    private volatile boolean seenFatalError;
    private volatile long pid;
    private volatile String cppCopyright;

    public CppLogMessageHandler(String jobId, InputStream inputStream) {
        this(inputStream, jobId, 1024, 5);
    }

    CppLogMessageHandler(InputStream inputStream, String jobId, int readBufSize, int errorStoreSize) {
        this.jobId = jobId;
        this.inputStream = Objects.requireNonNull(inputStream);
        this.readBufSize = readBufSize;
        this.errorStoreSize = errorStoreSize;
        this.errorStore = ConcurrentCollections.newDeque();
        this.pidLatch = new CountDownLatch(1);
        this.cppCopyrightLatch = new CountDownLatch(1);
        this.logStreamClosedLatch = new CountDownLatch(1);
    }

    @Override
    public void close() throws IOException {
        this.inputStream.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void tailStream() throws IOException {
        XContent xContent = XContentFactory.xContent((XContentType)XContentType.JSON);
        Object bytesRef = null;
        try {
            byte[] readBuf = new byte[this.readBufSize];
            int bytesRead = this.inputStream.read(readBuf);
            while (bytesRead != -1) {
                bytesRef = bytesRef == null ? new BytesArray(readBuf, 0, bytesRead) : new CompositeBytesReference(new BytesReference[]{bytesRef, new BytesArray(readBuf, 0, bytesRead)});
                bytesRef = this.parseMessages(xContent, (BytesReference)bytesRef);
                readBuf = new byte[this.readBufSize];
                bytesRead = this.inputStream.read(readBuf);
            }
            this.logStreamClosedLatch.countDown();
            if (this.lastMessageSummary.count > 0) {
                this.logSummarizedMessage();
            }
            if (bytesRef != null) {
                this.parseMessage(xContent, (BytesReference)bytesRef);
            }
        }
        catch (Throwable throwable) {
            this.logStreamClosedLatch.countDown();
            if (this.lastMessageSummary.count > 0) {
                this.logSummarizedMessage();
            }
            if (bytesRef != null) {
                this.parseMessage(xContent, (BytesReference)bytesRef);
            }
            throw throwable;
        }
    }

    public boolean hasLogStreamEnded() {
        return this.logStreamClosedLatch.getCount() == 0L;
    }

    public boolean seenFatalError() {
        return this.seenFatalError;
    }

    public boolean waitForLogStreamClose(Duration timeout) {
        try {
            return this.logStreamClosedLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    public long getPid(Duration timeout) throws TimeoutException {
        if (this.pid == 0L) {
            try {
                this.pidLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            if (this.pid == 0L) {
                throw new TimeoutException("Timed out waiting for C++ process PID");
            }
        }
        return this.pid;
    }

    public String getCppCopyright(Duration timeout) throws TimeoutException {
        if (this.cppCopyright == null) {
            try {
                this.cppCopyrightLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            if (this.cppCopyright == null) {
                throw new TimeoutException("Timed out waiting for C++ process copyright");
            }
        }
        return this.cppCopyright;
    }

    public String getErrors() {
        String[] errorSnapshot = this.errorStore.toArray(new String[0]);
        StringBuilder errors = new StringBuilder();
        for (String error : errorSnapshot) {
            errors.append(error).append('\n');
        }
        return errors.toString();
    }

    private BytesReference parseMessages(XContent xContent, BytesReference bytesRef) {
        int nextMarker;
        byte marker = xContent.streamSeparator();
        int from = 0;
        while ((nextMarker = CppLogMessageHandler.findNextMarker(marker, bytesRef, from)) != -1) {
            if (nextMarker > from) {
                this.parseMessage(xContent, bytesRef.slice(from, nextMarker - from));
            }
            if ((from = nextMarker + 1) >= bytesRef.length() || bytesRef.get(from) != 0) continue;
            ++from;
        }
        if (from >= bytesRef.length()) {
            return null;
        }
        return bytesRef.slice(from, bytesRef.length() - from);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void parseMessage(XContent xContent, BytesReference bytesRef) {
        try (StreamInput stream = bytesRef.streamInput();
             XContentParser parser = xContent.createParser(NamedXContentRegistry.EMPTY, (DeprecationHandler)LoggingDeprecationHandler.INSTANCE, (InputStream)stream);){
            CppLogMessage msg = (CppLogMessage)CppLogMessage.PARSER.apply(parser, null);
            Level level = Level.getLevel((String)msg.getLevel());
            if (level == null) {
                level = Level.WARN;
            } else if (level.isMoreSpecificThan(Level.ERROR)) {
                this.storeError(msg.getMessage());
                if (level.isMoreSpecificThan(Level.FATAL)) {
                    this.seenFatalError = true;
                }
            }
            long latestPid = msg.getPid();
            if (this.pid != latestPid) {
                this.pid = latestPid;
                this.pidLatch.countDown();
            }
            String latestMessage = msg.getMessage();
            if (this.cppCopyright == null && latestMessage.contains("Copyright")) {
                this.cppCopyright = latestMessage;
                this.cppCopyrightLatch.countDown();
            }
            if (!LOGGER.isEnabled(level)) {
                return;
            }
            if (!LOGGER.isDebugEnabled()) {
                if (msg.isSimilarTo(this.lastMessageSummary.message) && this.lastMessageSummary.timestamp.until(msg.getTimestamp(), ChronoUnit.SECONDS) < 10L) {
                    ++this.lastMessageSummary.count;
                    this.lastMessageSummary.message = msg;
                    return;
                }
                if (this.lastMessageSummary.count > 0) {
                    this.logSummarizedMessage();
                }
                this.lastMessageSummary.reset(msg.getTimestamp(), msg, level);
            }
            if (this.jobId != null) {
                LOGGER.log(level, "[{}] [{}/{}] [{}@{}] {}", (Object)this.jobId, (Object)msg.getLogger(), (Object)latestPid, (Object)msg.getFile(), (Object)msg.getLine(), (Object)latestMessage);
                return;
            }
            LOGGER.log(level, "[{}/{}] [{}@{}] {}", (Object)msg.getLogger(), (Object)latestPid, (Object)msg.getFile(), (Object)msg.getLine(), (Object)latestMessage);
            return;
        }
        catch (XContentParseException e) {
            String upstreamMessage = "Fatal error: '" + bytesRef.utf8ToString() + "'";
            if (upstreamMessage.contains("bad_alloc")) {
                upstreamMessage = upstreamMessage + ", process ran out of memory.";
            }
            this.storeError(upstreamMessage);
            this.seenFatalError = true;
            return;
        }
        catch (IOException e) {
            if (this.jobId != null) {
                LOGGER.warn((Message)new ParameterizedMessage("[{}] IO failure receiving C++ log message: {}", new Object[]{this.jobId, bytesRef.utf8ToString()}), (Throwable)e);
                return;
            }
            LOGGER.warn((Message)new ParameterizedMessage("IO failure receiving C++ log message: {}", new Object[]{bytesRef.utf8ToString()}), (Throwable)e);
        }
    }

    private void logSummarizedMessage() {
        if (this.lastMessageSummary.count > 1) {
            if (this.jobId != null) {
                LOGGER.log(this.lastMessageSummary.level, "[{}] [{}/{}] [{}@{}] {} | repeated [{}]", (Object)this.jobId, (Object)this.lastMessageSummary.message.getLogger(), (Object)this.lastMessageSummary.message.getPid(), (Object)this.lastMessageSummary.message.getFile(), (Object)this.lastMessageSummary.message.getLine(), (Object)this.lastMessageSummary.message.getMessage(), (Object)this.lastMessageSummary.count);
            } else {
                LOGGER.log(this.lastMessageSummary.level, "[{}/{}] [{}@{}] {} | repeated [{}]", (Object)this.lastMessageSummary.message.getLogger(), (Object)this.lastMessageSummary.message.getPid(), (Object)this.lastMessageSummary.message.getFile(), (Object)this.lastMessageSummary.message.getLine(), (Object)this.lastMessageSummary.message.getMessage(), (Object)this.lastMessageSummary.count);
            }
        } else if (this.jobId != null) {
            LOGGER.log(this.lastMessageSummary.level, "[{}] [{}/{}] [{}@{}] {}", (Object)this.jobId, (Object)this.lastMessageSummary.message.getLogger(), (Object)this.lastMessageSummary.message.getPid(), (Object)this.lastMessageSummary.message.getFile(), (Object)this.lastMessageSummary.message.getLine(), (Object)this.lastMessageSummary.message.getMessage());
        } else {
            LOGGER.log(this.lastMessageSummary.level, "[{}/{}] [{}@{}] {}", (Object)this.lastMessageSummary.message.getLogger(), (Object)this.lastMessageSummary.message.getPid(), (Object)this.lastMessageSummary.message.getFile(), (Object)this.lastMessageSummary.message.getLine(), (Object)this.lastMessageSummary.message.getMessage());
        }
    }

    private void storeError(String error) {
        if (Strings.isNullOrEmpty((String)error) || this.errorStoreSize <= 0) {
            return;
        }
        if (this.errorStore.size() >= this.errorStoreSize) {
            this.errorStore.removeFirst();
        }
        this.errorStore.offerLast(error);
    }

    private static int findNextMarker(byte marker, BytesReference bytesRef, int from) {
        for (int i = from; i < bytesRef.length(); ++i) {
            if (bytesRef.get(i) != marker) continue;
            return i;
        }
        return -1;
    }

    private static class MessageSummary {
        Instant timestamp = Instant.EPOCH;
        int count = 0;
        CppLogMessage message = null;
        Level level = Level.OFF;

        MessageSummary() {
        }

        void reset(Instant timestamp, CppLogMessage message, Level level) {
            this.timestamp = timestamp;
            this.message = message;
            this.count = 0;
            this.level = level;
        }
    }
}

