/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.rollup.job;

import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.rollup.action.StartRollupJobAction;
import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction;
import org.elasticsearch.xpack.core.rollup.job.IndexerState;
import org.elasticsearch.xpack.core.rollup.job.RollupJob;
import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig;
import org.elasticsearch.xpack.core.rollup.job.RollupJobStats;
import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.rollup.job.CronSchedule;
import org.elasticsearch.xpack.rollup.job.RollupClientHelper;
import org.elasticsearch.xpack.rollup.job.RollupIndexer;

public class RollupJobTask
extends AllocatedPersistentTask
implements SchedulerEngine.Listener {
    private static final Logger logger = Logger.getLogger((String)RollupJobTask.class.getName());
    static final String SCHEDULE_NAME = "xpack/rollup/job/schedule";
    private final RollupJob job;
    private final SchedulerEngine schedulerEngine;
    private final ThreadPool threadPool;
    private final RollupIndexer indexer;

    RollupJobTask(long id, String type, String action, TaskId parentTask, RollupJob job, RollupJobStatus status, Client client, SchedulerEngine schedulerEngine, ThreadPool threadPool, Map<String, String> headers) {
        super(id, type, action, "rollup_" + job.getConfig().getId(), parentTask, headers);
        this.job = job;
        this.schedulerEngine = schedulerEngine;
        this.threadPool = threadPool;
        Map initialPosition = null;
        IndexerState initialState = IndexerState.STOPPED;
        if (status != null) {
            logger.debug((Object)("We have existing status, setting state to [" + status.getState() + "] and current position to [" + status.getPosition() + "] for job [" + job.getConfig().getId() + "]"));
            initialState = status.getState().equals((Object)IndexerState.INDEXING) ? IndexerState.STARTED : (status.getState().equals((Object)IndexerState.ABORTING) || status.getState().equals((Object)IndexerState.STOPPING) ? IndexerState.STOPPED : status.getState());
            initialPosition = status.getPosition();
        }
        this.indexer = new ClientRollupPageManager(job, initialState, initialPosition, (Client)new ParentTaskAssigningClient(client, new TaskId(this.getPersistentTaskId())));
    }

    public Task.Status getStatus() {
        return new RollupJobStatus(this.indexer.getState(), this.indexer.getPosition());
    }

    public RollupJobStats getStats() {
        return this.indexer.getStats();
    }

    public RollupJobConfig getConfig() {
        return this.job.getConfig();
    }

    public synchronized void start(ActionListener<StartRollupJobAction.Response> listener) {
        IndexerState prevState = this.indexer.getState();
        if (prevState != IndexerState.STOPPED) {
            listener.onFailure((Exception)new ElasticsearchException("Cannot start task for Rollup Job [" + this.job.getConfig().getId() + "] because state was [" + prevState + "]", new Object[0]));
            return;
        }
        IndexerState newState = this.indexer.start();
        if (newState != IndexerState.STARTED) {
            listener.onFailure((Exception)new ElasticsearchException("Cannot start task for Rollup Job [" + this.job.getConfig().getId() + "] because state was [" + newState + "]", new Object[0]));
            return;
        }
        RollupJobStatus status = new RollupJobStatus(IndexerState.STARTED, this.indexer.getPosition());
        logger.debug((Object)("Updating status for rollup job [" + this.job.getConfig().getId() + "] to [" + status.getState() + "][" + status.getPosition() + "]"));
        this.updatePersistentStatus((Task.Status)status, ActionListener.wrap(task -> {
            logger.debug((Object)("Succesfully updated status for rollup job [" + this.job.getConfig().getId() + "] to [" + status.getState() + "][" + status.getPosition() + "]"));
            listener.onResponse((Object)new StartRollupJobAction.Response(true));
        }, exc -> listener.onFailure((Exception)new ElasticsearchException("Error while updating status for rollup job [" + this.job.getConfig().getId() + "] to [" + status.getState() + "].", (Throwable)exc, new Object[0]))));
    }

    public synchronized void stop(ActionListener<StopRollupJobAction.Response> listener) {
        IndexerState newState = this.indexer.stop();
        switch (newState) {
            case STOPPED: {
                listener.onResponse((Object)new StopRollupJobAction.Response(true));
                break;
            }
            case STOPPING: {
                RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, this.indexer.getPosition());
                this.updatePersistentStatus((Task.Status)status, ActionListener.wrap(task -> {
                    logger.debug((Object)("Succesfully updated status for rollup job [" + this.job.getConfig().getId() + "] to [" + status.getState() + "]"));
                    listener.onResponse((Object)new StopRollupJobAction.Response(true));
                }, exc -> listener.onFailure((Exception)new ElasticsearchException("Error while updating status for rollup job [" + this.job.getConfig().getId() + "] to [" + status.getState() + "].", (Throwable)exc, new Object[0]))));
                break;
            }
            default: {
                listener.onFailure((Exception)new ElasticsearchException("Cannot stop task for Rollup Job [" + this.job.getConfig().getId() + "] because state was [" + newState + "]", new Object[0]));
            }
        }
    }

    synchronized void shutdown() {
        try {
            logger.info((Object)("Rollup indexer [" + this.job.getConfig().getId() + "] received abort request, stopping indexer."));
            this.schedulerEngine.remove("xpack/rollup/job/schedule_" + this.job.getConfig().getId());
            this.schedulerEngine.unregister((SchedulerEngine.Listener)this);
        }
        catch (Exception e) {
            this.markAsFailed(e);
            return;
        }
        this.markAsCompleted();
    }

    protected synchronized void onCancelled() {
        logger.info((Object)("Received cancellation request for Rollup job [" + this.job.getConfig().getId() + "], state: [" + this.indexer.getState() + "]"));
        if (this.indexer.abort()) {
            this.shutdown();
        }
    }

    public synchronized void triggered(SchedulerEngine.Event event) {
        if (event.getJobName().equals("xpack/rollup/job/schedule_" + this.job.getConfig().getId())) {
            logger.debug((Object)("Rollup indexer [" + event.getJobName() + "] schedule has triggered, state: [" + this.indexer.getState() + "]"));
            this.indexer.maybeTriggerAsyncJob(System.currentTimeMillis());
        }
    }

    protected class ClientRollupPageManager
    extends RollupIndexer {
        private final Client client;
        private final RollupJob job;

        ClientRollupPageManager(RollupJob job, IndexerState initialState, Map<String, Object> initialPosition, Client client) {
            super(RollupJobTask.this.threadPool.executor("generic"), job, new AtomicReference<IndexerState>(initialState), initialPosition);
            this.client = client;
            this.job = job;
        }

        @Override
        protected void doNextSearch(SearchRequest request, ActionListener<SearchResponse> nextPhase) {
            RollupClientHelper.executeAsync(this.client, this.job, SearchAction.INSTANCE, request, nextPhase);
        }

        @Override
        protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> nextPhase) {
            RollupClientHelper.executeAsync(this.client, this.job, BulkAction.INSTANCE, request, nextPhase);
        }

        @Override
        protected void doSaveState(IndexerState state, Map<String, Object> position, Runnable next) {
            if (state.equals((Object)IndexerState.ABORTING)) {
                next.run();
            } else {
                RollupJobStatus status = new RollupJobStatus(state, this.getPosition());
                logger.debug((Object)("Updating persistent status of job [" + this.job.getConfig().getId() + "] to [" + state.toString() + "]"));
                RollupJobTask.this.updatePersistentStatus((Task.Status)status, ActionListener.wrap(task -> next.run(), exc -> next.run()));
            }
        }

        @Override
        protected void onFinish() {
            logger.debug((Object)("Finished indexing for job [" + this.job.getConfig().getId() + "]"));
        }

        @Override
        protected void onFailure(Exception exc) {
            logger.warn((Object)("Rollup job [" + this.job.getConfig().getId() + "] failed with an exception: "), (Throwable)exc);
        }

        @Override
        protected void onAbort() {
            RollupJobTask.this.shutdown();
        }
    }

    public static class RollupJobPersistentTasksExecutor
    extends PersistentTasksExecutor<RollupJob> {
        private final Client client;
        private final SchedulerEngine schedulerEngine;
        private final ThreadPool threadPool;

        public RollupJobPersistentTasksExecutor(Settings settings, Client client, SchedulerEngine schedulerEngine, ThreadPool threadPool) {
            super(settings, "xpack/rollup/job", "rollup_indexing");
            this.client = client;
            this.schedulerEngine = schedulerEngine;
            this.threadPool = threadPool;
        }

        protected void nodeOperation(AllocatedPersistentTask task, @Nullable RollupJob params, Task.Status status) {
            RollupJobTask rollupJobTask = (RollupJobTask)task;
            SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job("xpack/rollup/job/schedule_" + params.getConfig().getId(), (SchedulerEngine.Schedule)new CronSchedule(params.getConfig().getCron()));
            this.schedulerEngine.register((SchedulerEngine.Listener)rollupJobTask);
            this.schedulerEngine.add(schedulerJob);
            this.logger.info("Rollup job [" + params.getConfig().getId() + "] created.");
        }

        protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId, PersistentTasksCustomMetaData.PersistentTask<RollupJob> persistentTask, Map<String, String> headers) {
            return new RollupJobTask(id, type, action, parentTaskId, (RollupJob)persistentTask.getParams(), (RollupJobStatus)persistentTask.getStatus(), this.client, this.schedulerEngine, this.threadPool, headers);
        }
    }
}

