/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.job.process.autodetect;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.action.GetFiltersAction;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.action.TransportOpenJobAction;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.NativeStorageProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectCommunicator;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessFactory;
import org.elasticsearch.xpack.ml.job.process.autodetect.ProcessContext;
import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateProcessMessage;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.ml.job.process.normalizer.ScoresUpdater;
import org.elasticsearch.xpack.ml.job.process.normalizer.ShortCircuitingRenormalizer;
import org.elasticsearch.xpack.ml.notifications.Auditor;

public class AutodetectProcessManager
extends AbstractComponent {
    @Deprecated
    public static final Setting<Integer> MAX_RUNNING_JOBS_PER_NODE = Setting.intSetting((String)"max_running_jobs", (int)20, (int)1, (int)512, (Setting.Property[])new Setting.Property[]{Setting.Property.NodeScope, Setting.Property.Deprecated});
    public static final Setting<Integer> MAX_OPEN_JOBS_PER_NODE = Setting.intSetting((String)"xpack.ml.max_open_jobs", MAX_RUNNING_JOBS_PER_NODE, (int)1, (Setting.Property[])new Setting.Property[]{Setting.Property.NodeScope});
    public static final Setting<ByteSizeValue> MIN_DISK_SPACE_OFF_HEAP = Setting.byteSizeSetting((String)"xpack.ml.min_disk_space_off_heap", (ByteSizeValue)new ByteSizeValue(5L, ByteSizeUnit.GB), (Setting.Property[])new Setting.Property[]{Setting.Property.NodeScope});
    private final Client client;
    private final Environment environment;
    private final ThreadPool threadPool;
    private final JobManager jobManager;
    private final JobResultsProvider jobResultsProvider;
    private final AutodetectProcessFactory autodetectProcessFactory;
    private final NormalizerFactory normalizerFactory;
    private final JobResultsPersister jobResultsPersister;
    private final JobDataCountsPersister jobDataCountsPersister;
    private NativeStorageProvider nativeStorageProvider;
    private final ConcurrentMap<Long, ProcessContext> processByAllocation = new ConcurrentHashMap<Long, ProcessContext>();
    private final ConcurrentMap<String, Path> nativeTmpStorage = new ConcurrentHashMap<String, Path>();
    private final int maxAllowedRunningJobs;
    private final NamedXContentRegistry xContentRegistry;
    private final Auditor auditor;

    public AutodetectProcessManager(Environment environment, Settings settings, Client client, ThreadPool threadPool, JobManager jobManager, JobResultsProvider jobResultsProvider, JobResultsPersister jobResultsPersister, JobDataCountsPersister jobDataCountsPersister, AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory, NamedXContentRegistry xContentRegistry, Auditor auditor) {
        super(settings);
        this.environment = environment;
        this.client = client;
        this.threadPool = threadPool;
        this.xContentRegistry = xContentRegistry;
        this.maxAllowedRunningJobs = (Integer)MAX_OPEN_JOBS_PER_NODE.get(settings);
        this.autodetectProcessFactory = autodetectProcessFactory;
        this.normalizerFactory = normalizerFactory;
        this.jobManager = jobManager;
        this.jobResultsProvider = jobResultsProvider;
        this.jobResultsPersister = jobResultsPersister;
        this.jobDataCountsPersister = jobDataCountsPersister;
        this.auditor = auditor;
        this.nativeStorageProvider = new NativeStorageProvider(environment, (ByteSizeValue)MIN_DISK_SPACE_OFF_HEAP.get(settings));
    }

    public void onNodeStartup() {
        try {
            this.nativeStorageProvider.cleanupLocalTmpStorageInCaseOfUncleanShutdown();
        }
        catch (Exception e) {
            this.logger.warn("Failed to cleanup native storage from previous invocation", (Throwable)e);
        }
    }

    public synchronized void closeAllJobsOnThisNode(String reason) {
        int numJobs = this.processByAllocation.size();
        if (numJobs != 0) {
            this.logger.info("Closing [{}] jobs, because [{}]", (Object)numJobs, (Object)reason);
            for (ProcessContext process : this.processByAllocation.values()) {
                this.closeJob(process.getJobTask(), false, reason);
            }
        }
    }

    public void killProcess(TransportOpenJobAction.JobTask jobTask, boolean awaitCompletion, String reason) {
        this.logger.trace("[{}] Killing process: awaitCompletion = [{}]; reason = [{}]", (Object)jobTask.getJobId(), (Object)awaitCompletion, (Object)reason);
        ProcessContext processContext = (ProcessContext)this.processByAllocation.remove(jobTask.getAllocationId());
        if (processContext != null) {
            processContext.newKillBuilder().setAwaitCompletion(awaitCompletion).setFinish(true).setReason(reason).kill();
        } else {
            this.logger.trace("[{}] Marking job task as completed", (Object)jobTask.getJobId());
            jobTask.markAsCompleted();
        }
    }

    public void killAllProcessesOnThisNode() {
        Iterator iterator = this.processByAllocation.values().iterator();
        while (iterator.hasNext()) {
            ProcessContext processContext = (ProcessContext)iterator.next();
            processContext.newKillBuilder().setAwaitCompletion(false).setFinish(false).setSilent(true).kill();
            iterator.remove();
        }
    }

    public void persistJob(TransportOpenJobAction.JobTask jobTask, Consumer<Exception> handler) {
        AutodetectCommunicator communicator = this.getOpenAutodetectCommunicator(jobTask);
        communicator.persistJob((aVoid, e) -> handler.accept((Exception)e));
    }

    public void processData(TransportOpenJobAction.JobTask jobTask, AnalysisRegistry analysisRegistry, InputStream input, XContentType xContentType, DataLoadParams params, BiConsumer<DataCounts, Exception> handler) {
        AutodetectCommunicator communicator = this.getOpenAutodetectCommunicator(jobTask);
        if (communicator == null) {
            throw ExceptionsHelper.conflictStatusException((String)("Cannot process data because job [" + jobTask.getJobId() + "] is not open"), (Object[])new Object[0]);
        }
        communicator.writeToJob(input, analysisRegistry, xContentType, params, handler);
    }

    public void flushJob(TransportOpenJobAction.JobTask jobTask, FlushJobParams params, ActionListener<FlushAcknowledgement> handler) {
        this.logger.debug("Flushing job {}", (Object)jobTask.getJobId());
        AutodetectCommunicator communicator = this.getOpenAutodetectCommunicator(jobTask);
        if (communicator == null) {
            String message = String.format(Locale.ROOT, "Cannot flush because job [%s] is not open", jobTask.getJobId());
            this.logger.debug(message);
            handler.onFailure((Exception)ExceptionsHelper.conflictStatusException((String)message, (Object[])new Object[0]));
            return;
        }
        communicator.flushJob(params, (flushAcknowledgement, e) -> {
            if (e != null) {
                String msg = String.format(Locale.ROOT, "[%s] exception while flushing job", jobTask.getJobId());
                this.logger.error(msg);
                handler.onFailure((Exception)ExceptionsHelper.serverError((String)msg, (Throwable)e));
            } else {
                handler.onResponse(flushAcknowledgement);
            }
        });
    }

    public Path tryGetTmpStorage(TransportOpenJobAction.JobTask jobTask, ByteSizeValue requestedSize) {
        String jobId = jobTask.getJobId();
        Path path = (Path)this.nativeTmpStorage.get(jobId);
        if (path == null) {
            path = this.nativeStorageProvider.tryGetLocalTmpStorage(jobId, requestedSize);
            if (path != null) {
                this.nativeTmpStorage.put(jobId, path);
            }
        } else if (!this.nativeStorageProvider.localTmpStorageHasEnoughSpace(path, requestedSize)) {
            return null;
        }
        return path;
    }

    public void forecastJob(TransportOpenJobAction.JobTask jobTask, ForecastParams params, Consumer<Exception> handler) {
        String jobId = jobTask.getJobId();
        this.logger.debug("Forecasting job {}", (Object)jobId);
        AutodetectCommunicator communicator = this.getOpenAutodetectCommunicator(jobTask);
        if (communicator == null) {
            String message = String.format(Locale.ROOT, "Cannot forecast because job [%s] is not open", jobId);
            this.logger.debug(message);
            handler.accept((Exception)ExceptionsHelper.conflictStatusException((String)message, (Object[])new Object[0]));
            return;
        }
        communicator.forecastJob(params, (aVoid, e) -> {
            if (e == null) {
                handler.accept(null);
            } else {
                String msg = String.format(Locale.ROOT, "[%s] exception while forecasting job", jobId);
                this.logger.error(msg, (Throwable)e);
                handler.accept((Exception)ExceptionsHelper.serverError((String)msg, (Throwable)e));
            }
        });
    }

    public void writeUpdateProcessMessage(TransportOpenJobAction.JobTask jobTask, UpdateParams updateParams, final Consumer<Exception> handler) {
        AutodetectCommunicator communicator = this.getOpenAutodetectCommunicator(jobTask);
        if (communicator == null) {
            String message = "Cannot process update model debug config because job [" + jobTask.getJobId() + "] is not open";
            this.logger.debug(message);
            handler.accept((Exception)ExceptionsHelper.conflictStatusException((String)message, (Object[])new Object[0]));
            return;
        }
        UpdateProcessMessage.Builder updateProcessMessage = new UpdateProcessMessage.Builder();
        updateProcessMessage.setModelPlotConfig(updateParams.getModelPlotConfig());
        updateProcessMessage.setDetectorUpdates(updateParams.getDetectorUpdates());
        ActionListener eventsListener = ActionListener.wrap(events -> {
            updateProcessMessage.setScheduledEvents(events == null ? null : events.results());
            communicator.writeUpdateProcessMessage(updateProcessMessage.build(), (aVoid, e) -> {
                if (e == null) {
                    handler.accept(null);
                } else {
                    handler.accept((Exception)e);
                }
            });
        }, handler);
        final ActionListener filterListener = ActionListener.wrap(filter -> {
            updateProcessMessage.setFilter((MlFilter)filter);
            if (updateParams.isUpdateScheduledEvents()) {
                Job job = this.jobManager.getJobOrThrowIfUnknown(jobTask.getJobId());
                DataCounts dataCounts = (DataCounts)this.getStatistics(jobTask).get().v1();
                ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(job.earliestValidTimestamp(dataCounts));
                this.jobResultsProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, (ActionListener<QueryPage<ScheduledEvent>>)eventsListener);
            } else {
                eventsListener.onResponse(null);
            }
        }, handler);
        if (updateParams.getFilter() == null) {
            filterListener.onResponse(null);
        } else {
            GetFiltersAction.Request getFilterRequest = new GetFiltersAction.Request();
            getFilterRequest.setFilterId(updateParams.getFilter().getId());
            ClientHelper.executeAsyncWithOrigin((Client)this.client, (String)"ml", (Action)GetFiltersAction.INSTANCE, (ActionRequest)getFilterRequest, (ActionListener)new ActionListener<GetFiltersAction.Response>(){

                public void onResponse(GetFiltersAction.Response response) {
                    filterListener.onResponse((Object)((MlFilter)response.getFilters().results().get(0)));
                }

                public void onFailure(Exception e) {
                    handler.accept(e);
                }
            });
        }
    }

    public void openJob(final TransportOpenJobAction.JobTask jobTask, final Consumer<Exception> handler) {
        final String jobId = jobTask.getJobId();
        Job job = this.jobManager.getJobOrThrowIfUnknown(jobId);
        if (job.getJobVersion() == null) {
            handler.accept((Exception)ExceptionsHelper.badRequestException((String)("Cannot open job [" + jobId + "] because jobs created prior to version 5.5 are not supported"), (Object[])new Object[0]));
            return;
        }
        this.logger.info("Opening job [{}]", (Object)jobId);
        this.processByAllocation.putIfAbsent(jobTask.getAllocationId(), new ProcessContext(jobTask));
        this.jobResultsProvider.getAutodetectParams(job, params -> this.threadPool.executor("ml_utility").execute((Runnable)new AbstractRunnable((AutodetectParams)params){
            final /* synthetic */ AutodetectParams val$params;
            {
                this.val$params = autodetectParams;
            }

            public void onFailure(Exception e) {
                handler.accept(e);
            }

            protected void doRun() throws Exception {
                ProcessContext processContext = (ProcessContext)AutodetectProcessManager.this.processByAllocation.get(jobTask.getAllocationId());
                if (processContext == null) {
                    AutodetectProcessManager.this.logger.debug("Aborted opening job [{}] as it has been closed", (Object)jobId);
                    return;
                }
                if (processContext.getState() != ProcessContext.ProcessStateName.NOT_RUNNING) {
                    AutodetectProcessManager.this.logger.debug("Cannot open job [{}] when its state is [{}]", (Object)jobId, (Object)((Object)((Object)processContext.getState())).getClass().getName());
                    return;
                }
                try {
                    AutodetectProcessManager.this.createProcessAndSetRunning(processContext, this.val$params, handler);
                    processContext.getAutodetectCommunicator().init(this.val$params.modelSnapshot());
                    AutodetectProcessManager.this.setJobState(jobTask, JobState.OPENED);
                }
                catch (Exception e1) {
                    try {
                        processContext.newKillBuilder().setAwaitCompletion(false).setFinish(false).kill();
                        AutodetectProcessManager.this.processByAllocation.remove(jobTask.getAllocationId());
                    }
                    finally {
                        AutodetectProcessManager.this.setJobState(jobTask, JobState.FAILED, (CheckedConsumer<Exception, IOException>)((CheckedConsumer)e2 -> handler.accept(e1)));
                    }
                }
            }
        }), e1 -> {
            this.logger.warn("Failed to gather information required to open job [" + jobId + "]", (Throwable)e1);
            this.setJobState(jobTask, JobState.FAILED, (CheckedConsumer<Exception, IOException>)((CheckedConsumer)e2 -> handler.accept((Exception)e1)));
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void createProcessAndSetRunning(ProcessContext processContext, AutodetectParams params, Consumer<Exception> handler) {
        processContext.tryLock();
        try {
            AutodetectCommunicator communicator = this.create(processContext.getJobTask(), params, handler);
            processContext.setRunning(communicator);
        }
        finally {
            processContext.unlock();
        }
    }

    AutodetectCommunicator create(TransportOpenJobAction.JobTask jobTask, AutodetectParams autodetectParams, Consumer<Exception> handler) {
        ExecutorService autodetectWorkerExecutor;
        int currentRunningJobs = this.processByAllocation.size();
        if (currentRunningJobs > this.maxAllowedRunningJobs) {
            throw new ElasticsearchStatusException("max running job capacity [" + this.maxAllowedRunningJobs + "] reached", RestStatus.TOO_MANY_REQUESTS, new Object[0]);
        }
        String jobId = jobTask.getJobId();
        this.notifyLoadingSnapshot(jobId, autodetectParams);
        if (autodetectParams.dataCounts().getProcessedRecordCount() > 0L) {
            String msg;
            if (autodetectParams.modelSnapshot() == null) {
                msg = "No model snapshot could be found for a job with processed records";
                this.logger.warn("[{}] {}", (Object)jobId, (Object)msg);
                this.auditor.warning(jobId, "No model snapshot could be found for a job with processed records");
            }
            if (autodetectParams.quantiles() == null) {
                msg = "No quantiles could be found for a job with processed records";
                this.logger.warn("[{}] {}", (Object)jobId, (Object)msg);
                this.auditor.warning(jobId, msg);
            }
        }
        Job job = this.jobManager.getJobOrThrowIfUnknown(jobId);
        ExecutorService autoDetectExecutorService = this.threadPool.executor("ml_autodetect");
        DataCountsReporter dataCountsReporter = new DataCountsReporter(this.settings, job, autodetectParams.dataCounts(), this.jobDataCountsPersister);
        ScoresUpdater scoresUpdater = new ScoresUpdater(job, this.jobResultsProvider, new JobRenormalizedResultsPersister(job.getId(), this.settings, this.client), this.normalizerFactory);
        ExecutorService renormalizerExecutorService = this.threadPool.executor("ml_utility");
        ShortCircuitingRenormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater, renormalizerExecutorService);
        AutodetectProcess process = this.autodetectProcessFactory.createAutodetectProcess(job, autodetectParams, autoDetectExecutorService, this.onProcessCrash(jobTask));
        AutoDetectResultProcessor processor = new AutoDetectResultProcessor(this.client, this.auditor, jobId, renormalizer, this.jobResultsPersister, this.jobResultsProvider, autodetectParams.modelSizeStats(), autodetectParams.modelSnapshot() != null);
        try (ThreadContext.StoredContext ignore = this.threadPool.getThreadContext().stashContext();){
            autodetectWorkerExecutor = this.createAutodetectExecutorService(autoDetectExecutorService);
            autoDetectExecutorService.submit(() -> processor.process(process));
        }
        catch (EsRejectedExecutionException e) {
            try {
                IOUtils.close((Closeable[])new Closeable[]{process});
            }
            catch (IOException ioe) {
                this.logger.error("Can't close autodetect", (Throwable)ioe);
            }
            throw e;
        }
        return new AutodetectCommunicator(job, this.environment, process, new StateStreamer(this.client), dataCountsReporter, processor, handler, this.xContentRegistry, autodetectWorkerExecutor);
    }

    private void notifyLoadingSnapshot(String jobId, AutodetectParams autodetectParams) {
        ModelSnapshot modelSnapshot = autodetectParams.modelSnapshot();
        StringBuilder msgBuilder = new StringBuilder("Loading model snapshot [");
        if (modelSnapshot == null) {
            msgBuilder.append("N/A");
        } else {
            msgBuilder.append(modelSnapshot.getSnapshotId());
            msgBuilder.append("] with latest_record_timestamp [");
            Date snapshotLatestRecordTimestamp = modelSnapshot.getLatestRecordTimeStamp();
            msgBuilder.append(snapshotLatestRecordTimestamp == null ? "N/A" : XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(snapshotLatestRecordTimestamp.getTime()));
        }
        msgBuilder.append("], job latest_record_timestamp [");
        Date jobLatestRecordTimestamp = autodetectParams.dataCounts().getLatestRecordTimeStamp();
        msgBuilder.append(jobLatestRecordTimestamp == null ? "N/A" : XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(jobLatestRecordTimestamp.getTime()));
        msgBuilder.append("]");
        String msg = msgBuilder.toString();
        this.logger.info("[{}] {}", (Object)jobId, (Object)msg);
        this.auditor.info(jobId, msg);
    }

    private Runnable onProcessCrash(TransportOpenJobAction.JobTask jobTask) {
        return () -> {
            AutodetectCommunicator communicator;
            ProcessContext processContext = (ProcessContext)this.processByAllocation.remove(jobTask.getAllocationId());
            if (processContext != null && (communicator = processContext.getAutodetectCommunicator()) != null) {
                communicator.destroyCategorizationAnalyzer();
            }
            this.setJobState(jobTask, JobState.FAILED);
            try {
                this.removeTmpStorage(jobTask.getJobId());
            }
            catch (IOException e) {
                this.logger.error((Message)new ParameterizedMessage("[{}] Failed to delete temporary files", (Object)jobTask.getJobId()), (Throwable)e);
            }
        };
    }

    public void closeJob(TransportOpenJobAction.JobTask jobTask, boolean restart, String reason) {
        String jobId = jobTask.getJobId();
        long allocationId = jobTask.getAllocationId();
        this.logger.debug("Attempting to close job [{}], because [{}]", (Object)jobId, (Object)reason);
        ProcessContext processContext = (ProcessContext)this.processByAllocation.get(allocationId);
        if (processContext == null) {
            this.logger.debug("Cannot close job [{}] as it has already been closed", (Object)jobId);
            return;
        }
        processContext.tryLock();
        try {
            if (!processContext.setDying()) {
                this.logger.debug("Cannot close job [{}] as it has already been closed", (Object)jobId);
                return;
            }
            if (reason == null) {
                this.logger.info("Closing job [{}]", (Object)jobId);
            } else {
                this.logger.info("Closing job [{}], because [{}]", (Object)jobId, (Object)reason);
            }
            AutodetectCommunicator communicator = processContext.getAutodetectCommunicator();
            if (communicator == null) {
                this.logger.debug("Job [{}] is being closed before its process is started", (Object)jobId);
                jobTask.markAsCompleted();
                return;
            }
            communicator.close(restart, reason);
            this.processByAllocation.remove(allocationId);
        }
        catch (Exception e) {
            if (e instanceof ElasticsearchStatusException && ((ElasticsearchStatusException)e).status() == RestStatus.CONFLICT) {
                throw e;
            }
            this.logger.warn("[" + jobId + "] Exception closing autodetect process", (Throwable)e);
            this.setJobState(jobTask, JobState.FAILED);
            throw ExceptionsHelper.serverError((String)"Exception closing autodetect process", (Throwable)e);
        }
        finally {
            processContext.unlock();
        }
        try {
            this.removeTmpStorage(jobId);
        }
        catch (IOException e) {
            this.logger.error((Message)new ParameterizedMessage("[{}]Failed to delete temporary files", (Object)jobId), (Throwable)e);
        }
    }

    int numberOfOpenJobs() {
        return (int)this.processByAllocation.values().stream().filter(p -> p.getState() != ProcessContext.ProcessStateName.DYING).count();
    }

    boolean jobHasActiveAutodetectProcess(TransportOpenJobAction.JobTask jobTask) {
        return this.getAutodetectCommunicator(jobTask) != null;
    }

    private AutodetectCommunicator getAutodetectCommunicator(TransportOpenJobAction.JobTask jobTask) {
        return this.processByAllocation.getOrDefault(jobTask.getAllocationId(), new ProcessContext(jobTask)).getAutodetectCommunicator();
    }

    private AutodetectCommunicator getOpenAutodetectCommunicator(TransportOpenJobAction.JobTask jobTask) {
        ProcessContext processContext = (ProcessContext)this.processByAllocation.get(jobTask.getAllocationId());
        if (processContext != null && processContext.getState() == ProcessContext.ProcessStateName.RUNNING) {
            return processContext.getAutodetectCommunicator();
        }
        return null;
    }

    public Optional<Duration> jobOpenTime(TransportOpenJobAction.JobTask jobTask) {
        AutodetectCommunicator communicator = this.getAutodetectCommunicator(jobTask);
        if (communicator == null) {
            return Optional.empty();
        }
        return Optional.of(Duration.between(communicator.getProcessStartTime(), ZonedDateTime.now()));
    }

    void setJobState(final TransportOpenJobAction.JobTask jobTask, final JobState state) {
        JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId());
        jobTask.updatePersistentTaskState((PersistentTaskState)jobTaskState, new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>(){

            public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
                AutodetectProcessManager.this.logger.info("Successfully set job state to [{}] for job [{}]", (Object)state, (Object)jobTask.getJobId());
            }

            public void onFailure(Exception e) {
                AutodetectProcessManager.this.logger.error("Could not set job state to [" + state + "] for job [" + jobTask.getJobId() + "]", (Throwable)e);
            }
        });
    }

    void setJobState(TransportOpenJobAction.JobTask jobTask, JobState state, final CheckedConsumer<Exception, IOException> handler) {
        JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId());
        jobTask.updatePersistentTaskState((PersistentTaskState)jobTaskState, new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>(){

            public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
                try {
                    handler.accept(null);
                }
                catch (IOException e1) {
                    AutodetectProcessManager.this.logger.warn("Error while delegating response", (Throwable)e1);
                }
            }

            public void onFailure(Exception e) {
                try {
                    handler.accept((Object)e);
                }
                catch (IOException e1) {
                    AutodetectProcessManager.this.logger.warn("Error while delegating exception [" + e.getMessage() + "]", (Throwable)e1);
                }
            }
        });
    }

    public Optional<Tuple<DataCounts, ModelSizeStats>> getStatistics(TransportOpenJobAction.JobTask jobTask) {
        AutodetectCommunicator communicator = this.getAutodetectCommunicator(jobTask);
        if (communicator == null) {
            return Optional.empty();
        }
        return Optional.of(new Tuple((Object)communicator.getDataCounts(), (Object)communicator.getModelSizeStats()));
    }

    private void removeTmpStorage(String jobId) throws IOException {
        Path path = (Path)this.nativeTmpStorage.get(jobId);
        if (path != null) {
            this.nativeStorageProvider.cleanupLocalTmpStorage(path);
        }
    }

    ExecutorService createAutodetectExecutorService(ExecutorService executorService) {
        AutodetectWorkerExecutorService autoDetectWorkerExecutor = new AutodetectWorkerExecutorService(this.threadPool.getThreadContext());
        executorService.submit(autoDetectWorkerExecutor::start);
        return autoDetectWorkerExecutor;
    }

    class AutodetectWorkerExecutorService
    extends AbstractExecutorService {
        private final ThreadContext contextHolder;
        private final CountDownLatch awaitTermination = new CountDownLatch(1);
        private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(100);
        private volatile boolean running = true;

        AutodetectWorkerExecutorService(ThreadContext contextHolder) {
            this.contextHolder = contextHolder;
        }

        @Override
        public void shutdown() {
            this.running = false;
        }

        @Override
        public List<Runnable> shutdownNow() {
            throw new UnsupportedOperationException("not supported");
        }

        @Override
        public boolean isShutdown() {
            return !this.running;
        }

        @Override
        public boolean isTerminated() {
            return this.awaitTermination.getCount() == 0L;
        }

        @Override
        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
            return this.awaitTermination.await(timeout, unit);
        }

        @Override
        public void execute(Runnable command) {
            boolean added = this.queue.offer(this.contextHolder.preserveContext(command));
            if (!added) {
                throw new ElasticsearchStatusException("Unable to submit operation", RestStatus.TOO_MANY_REQUESTS, new Object[0]);
            }
        }

        void start() {
            block9: {
                block7: while (true) {
                    try {
                        while (this.running) {
                            Runnable runnable = this.queue.poll(500L, TimeUnit.MILLISECONDS);
                            if (runnable == null) continue;
                            try {
                                runnable.run();
                                continue block7;
                            }
                            catch (Exception e) {
                                AutodetectProcessManager.this.logger.error("error handling job operation", (Throwable)e);
                            }
                        }
                        break block9;
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break block9;
                    }
                }
                finally {
                    this.awaitTermination.countDown();
                }
            }
        }
    }
}

