/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.job.process.autodetect;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ControlMsgToProcessWriter;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.LengthEncodedWriter;
import org.elasticsearch.xpack.ml.job.process.logging.CppLogMessageHandler;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;

class NativeAutodetectProcess
implements AutodetectProcess {
    private static final Logger LOGGER = Loggers.getLogger(NativeAutodetectProcess.class);
    private static final Duration WAIT_FOR_KILL_TIMEOUT = Duration.ofMillis(1000L);
    private final String jobId;
    private final CppLogMessageHandler cppLogHandler;
    private final OutputStream processInStream;
    private final InputStream processOutStream;
    private final OutputStream processRestoreStream;
    private final LengthEncodedWriter recordWriter;
    private final ZonedDateTime startTime;
    private final int numberOfFields;
    private final List<Path> filesToDelete;
    private final Runnable onProcessCrash;
    private volatile Future<?> logTailFuture;
    private volatile Future<?> stateProcessorFuture;
    private volatile boolean processCloseInitiated;
    private volatile boolean processKilled;
    private volatile boolean isReady;
    private final AutodetectResultsParser resultsParser;

    NativeAutodetectProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream, OutputStream processRestoreStream, int numberOfFields, List<Path> filesToDelete, AutodetectResultsParser resultsParser, Runnable onProcessCrash) {
        this.jobId = jobId;
        this.cppLogHandler = new CppLogMessageHandler(jobId, logStream);
        this.processInStream = new BufferedOutputStream(processInStream);
        this.processOutStream = processOutStream;
        this.processRestoreStream = processRestoreStream;
        this.recordWriter = new LengthEncodedWriter(this.processInStream);
        this.startTime = ZonedDateTime.now();
        this.numberOfFields = numberOfFields;
        this.filesToDelete = filesToDelete;
        this.resultsParser = resultsParser;
        this.onProcessCrash = Objects.requireNonNull(onProcessCrash);
    }

    public void start(ExecutorService executorService, StateProcessor stateProcessor, InputStream persistStream) {
        this.logTailFuture = executorService.submit(() -> {
            try (CppLogMessageHandler h = this.cppLogHandler;){
                h.tailStream();
            }
            catch (IOException e) {
                if (!this.processKilled) {
                    LOGGER.error((Message)new ParameterizedMessage("[{}] Error tailing autodetect process logs", (Object)this.jobId), (Throwable)e);
                }
            }
            finally {
                if (!this.processCloseInitiated && !this.processKilled) {
                    String errors = this.cppLogHandler.getErrors();
                    LOGGER.error("[{}] autodetect process stopped unexpectedly: {}", (Object)this.jobId, (Object)errors);
                    this.onProcessCrash.run();
                }
            }
        });
        this.stateProcessorFuture = executorService.submit(() -> {
            block9: {
                try (InputStream in = persistStream;){
                    stateProcessor.process(this.jobId, in);
                    if (!this.processKilled) {
                        LOGGER.info("[{}] State output finished", (Object)this.jobId);
                    }
                }
                catch (IOException e) {
                    if (this.processKilled) break block9;
                    LOGGER.error((Message)new ParameterizedMessage("[{}] Error reading autodetect state output", (Object)this.jobId), (Throwable)e);
                }
            }
        });
    }

    @Override
    public void restoreState(StateStreamer stateStreamer, ModelSnapshot modelSnapshot) {
        block9: {
            if (modelSnapshot != null) {
                try (OutputStream r = this.processRestoreStream;){
                    stateStreamer.restoreStateToStream(this.jobId, modelSnapshot, r);
                }
                catch (Exception e) {
                    if (this.processKilled) break block9;
                    LOGGER.error("Error restoring model state for job " + this.jobId, (Throwable)e);
                }
            }
        }
        this.isReady = true;
    }

    @Override
    public boolean isReady() {
        return this.isReady;
    }

    @Override
    public void writeRecord(String[] record) throws IOException {
        this.recordWriter.writeRecord(record);
    }

    @Override
    public void writeResetBucketsControlMessage(DataLoadParams params) throws IOException {
        ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(this.recordWriter, this.numberOfFields);
        writer.writeResetBucketsMessage(params);
    }

    @Override
    public void writeUpdateModelPlotMessage(ModelPlotConfig modelPlotConfig) throws IOException {
        ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(this.recordWriter, this.numberOfFields);
        writer.writeUpdateModelPlotMessage(modelPlotConfig);
    }

    @Override
    public void writeUpdateDetectorRulesMessage(int detectorIndex, List<DetectionRule> rules) throws IOException {
        ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(this.recordWriter, this.numberOfFields);
        writer.writeUpdateDetectorRulesMessage(detectorIndex, rules);
    }

    @Override
    public void writeUpdateFiltersMessage(List<MlFilter> filters) throws IOException {
        ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(this.recordWriter, this.numberOfFields);
        writer.writeUpdateFiltersMessage(filters);
    }

    @Override
    public void writeUpdateScheduledEventsMessage(List<ScheduledEvent> events, TimeValue bucketSpan) throws IOException {
        ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(this.recordWriter, this.numberOfFields);
        writer.writeUpdateScheduledEventsMessage(events, bucketSpan);
    }

    @Override
    public String flushJob(FlushJobParams params) throws IOException {
        ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(this.recordWriter, this.numberOfFields);
        writer.writeFlushControlMessage(params);
        return writer.writeFlushMessage();
    }

    @Override
    public void forecastJob(ForecastParams params) throws IOException {
        ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(this.recordWriter, this.numberOfFields);
        writer.writeForecastMessage(params);
    }

    @Override
    public void persistJob() throws IOException {
        ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(this.recordWriter, this.numberOfFields);
        writer.writeStartBackgroundPersistMessage();
    }

    @Override
    public void flushStream() throws IOException {
        this.recordWriter.flush();
    }

    @Override
    public void close() throws IOException {
        try {
            this.processCloseInitiated = true;
            this.processInStream.close();
            if (this.stateProcessorFuture != null) {
                this.stateProcessorFuture.get(MachineLearningField.STATE_PERSIST_RESTORE_TIMEOUT.getMinutes(), TimeUnit.MINUTES);
            }
            if (this.logTailFuture != null) {
                this.logTailFuture.get(5L, TimeUnit.SECONDS);
            }
            if (this.cppLogHandler.seenFatalError()) {
                throw ExceptionsHelper.serverError((String)this.cppLogHandler.getErrors());
            }
            LOGGER.debug("[{}] Autodetect process exited", (Object)this.jobId);
        }
        catch (ExecutionException | TimeoutException e) {
            LOGGER.warn((Message)new ParameterizedMessage("[{}] Exception closing the running autodetect process", (Object)this.jobId), (Throwable)e);
        }
        catch (InterruptedException e) {
            LOGGER.warn((Message)new ParameterizedMessage("[{}] Exception closing the running autodetect process", (Object)this.jobId), (Throwable)e);
            Thread.currentThread().interrupt();
        }
        finally {
            this.deleteAssociatedFiles();
        }
    }

    @Override
    public void kill() throws IOException {
        this.processKilled = true;
        try {
            NativeControllerHolder.getNativeController().killProcess(this.cppLogHandler.getPid(Duration.ZERO));
            this.cppLogHandler.waitForLogStreamClose(WAIT_FOR_KILL_TIMEOUT);
        }
        catch (TimeoutException e) {
            LOGGER.warn("[{}] Failed to get PID of autodetect process to kill", (Object)this.jobId);
        }
        finally {
            try {
                this.processInStream.close();
            }
            catch (IOException iOException) {}
            try {
                this.deleteAssociatedFiles();
            }
            catch (IOException iOException) {}
        }
    }

    private synchronized void deleteAssociatedFiles() throws IOException {
        if (this.filesToDelete == null) {
            return;
        }
        for (Path fileToDelete : this.filesToDelete) {
            if (Files.deleteIfExists(fileToDelete)) {
                LOGGER.debug("[{}] Deleted file {}", (Object)this.jobId, (Object)fileToDelete.toString());
                continue;
            }
            LOGGER.warn("[{}] Failed to delete file {}", (Object)this.jobId, (Object)fileToDelete.toString());
        }
        this.filesToDelete.clear();
    }

    @Override
    public Iterator<AutodetectResult> readAutodetectResults() {
        return this.resultsParser.parseResults(this.processOutStream);
    }

    @Override
    public ZonedDateTime getProcessStartTime() {
        return this.startTime;
    }

    @Override
    public boolean isProcessAlive() {
        return !this.cppLogHandler.hasLogStreamEnded();
    }

    @Override
    public boolean isProcessAliveAfterWaiting() {
        this.cppLogHandler.waitForLogStreamClose(Duration.ofMillis(45L));
        return this.isProcessAlive();
    }

    @Override
    public String readError() {
        return this.cppLogHandler.getErrors();
    }
}

