/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.ad.task;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.search.join.ScoreMode;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionType;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.bulk.BulkAction;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.ad.cluster.HashRing;
import org.opensearch.ad.common.exception.ADTaskCancelledException;
import org.opensearch.ad.common.exception.AnomalyDetectionException;
import org.opensearch.ad.common.exception.DuplicateTaskException;
import org.opensearch.ad.common.exception.EndRunException;
import org.opensearch.ad.common.exception.LimitExceededException;
import org.opensearch.ad.common.exception.ResourceNotFoundException;
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.indices.AnomalyDetectionIndices;
import org.opensearch.ad.model.ADEntityTaskProfile;
import org.opensearch.ad.model.ADTask;
import org.opensearch.ad.model.ADTaskAction;
import org.opensearch.ad.model.ADTaskProfile;
import org.opensearch.ad.model.ADTaskState;
import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.DetectionDateRange;
import org.opensearch.ad.model.DetectorProfile;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.rest.handler.AnomalyDetectorFunction;
import org.opensearch.ad.rest.handler.IndexAnomalyDetectorJobActionHandler;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.stats.InternalStatNames;
import org.opensearch.ad.task.ADRealtimeTaskCache;
import org.opensearch.ad.task.ADTaskCacheManager;
import org.opensearch.ad.task.ADTaskCancellationState;
import org.opensearch.ad.transport.ADBatchAnomalyResultAction;
import org.opensearch.ad.transport.ADBatchAnomalyResultRequest;
import org.opensearch.ad.transport.ADCancelTaskAction;
import org.opensearch.ad.transport.ADCancelTaskRequest;
import org.opensearch.ad.transport.ADStatsNodeResponse;
import org.opensearch.ad.transport.ADStatsNodesAction;
import org.opensearch.ad.transport.ADStatsRequest;
import org.opensearch.ad.transport.ADTaskProfileAction;
import org.opensearch.ad.transport.ADTaskProfileNodeResponse;
import org.opensearch.ad.transport.ADTaskProfileRequest;
import org.opensearch.ad.transport.AnomalyDetectorJobResponse;
import org.opensearch.ad.transport.ForwardADTaskAction;
import org.opensearch.ad.transport.ForwardADTaskRequest;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.ad.util.ParseUtils;
import org.opensearch.ad.util.RestHandlerUtils;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.DeprecationHandler;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.XContent;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentParserUtils;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.commons.authuser.User;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.NestedQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.index.query.TermsQueryBuilder;
import org.opensearch.index.reindex.DeleteByQueryAction;
import org.opensearch.index.reindex.DeleteByQueryRequest;
import org.opensearch.index.reindex.UpdateByQueryAction;
import org.opensearch.index.reindex.UpdateByQueryRequest;
import org.opensearch.rest.RestStatus;
import org.opensearch.script.Script;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestOptions;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

public class ADTaskManager {
    public static final String AD_TASK_LEAD_NODE_MODEL_ID = "ad_task_lead_node_model_id";
    public static final String AD_TASK_MAINTAINENCE_NODE_MODEL_ID = "ad_task_maintainence_node_model_id";
    public static final int HC_BATCH_TASK_CACHE_TIMEOUT_IN_MILLIS = 600000;
    private final Logger logger = LogManager.getLogger(this.getClass());
    static final String STATE_INDEX_NOT_EXIST_MSG = "State index does not exist.";
    private final Set<String> retryableErrors = ImmutableSet.of((Object)CommonErrorMessages.EXCEED_HISTORICAL_ANALYSIS_LIMIT, (Object)CommonErrorMessages.NO_ELIGIBLE_NODE_TO_RUN_DETECTOR);
    private final Client client;
    private final ClusterService clusterService;
    private final NamedXContentRegistry xContentRegistry;
    private final AnomalyDetectionIndices detectionIndices;
    private final DiscoveryNodeFilterer nodeFilter;
    private final ADTaskCacheManager adTaskCacheManager;
    private final HashRing hashRing;
    private volatile Integer maxOldAdTaskDocsPerDetector;
    private volatile Integer pieceIntervalSeconds;
    private volatile boolean deleteADResultWhenDeleteDetector;
    private volatile TransportRequestOptions transportRequestOptions;
    private final ThreadPool threadPool;
    private static int DEFAULT_MAINTAIN_INTERVAL_IN_SECONDS = 5;
    private final Semaphore checkingTaskSlot;
    private volatile Integer maxAdBatchTaskPerNode;
    private volatile Integer maxRunningEntitiesPerDetector;
    private final Semaphore scaleEntityTaskLane;
    private static final int SCALE_ENTITY_TASK_LANE_INTERVAL_IN_MILLIS = 10000;

    public ADTaskManager(Settings settings, ClusterService clusterService, Client client, NamedXContentRegistry xContentRegistry, AnomalyDetectionIndices detectionIndices, DiscoveryNodeFilterer nodeFilter, HashRing hashRing, ADTaskCacheManager adTaskCacheManager, ThreadPool threadPool) {
        this.client = client;
        this.xContentRegistry = xContentRegistry;
        this.detectionIndices = detectionIndices;
        this.nodeFilter = nodeFilter;
        this.clusterService = clusterService;
        this.adTaskCacheManager = adTaskCacheManager;
        this.hashRing = hashRing;
        this.maxOldAdTaskDocsPerDetector = (Integer)AnomalyDetectorSettings.MAX_OLD_AD_TASK_DOCS_PER_DETECTOR.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(AnomalyDetectorSettings.MAX_OLD_AD_TASK_DOCS_PER_DETECTOR, it -> {
            this.maxOldAdTaskDocsPerDetector = it;
        });
        this.pieceIntervalSeconds = (Integer)AnomalyDetectorSettings.BATCH_TASK_PIECE_INTERVAL_SECONDS.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(AnomalyDetectorSettings.BATCH_TASK_PIECE_INTERVAL_SECONDS, it -> {
            this.pieceIntervalSeconds = it;
        });
        this.deleteADResultWhenDeleteDetector = (Boolean)AnomalyDetectorSettings.DELETE_AD_RESULT_WHEN_DELETE_DETECTOR.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(AnomalyDetectorSettings.DELETE_AD_RESULT_WHEN_DELETE_DETECTOR, it -> {
            this.deleteADResultWhenDeleteDetector = it;
        });
        this.maxAdBatchTaskPerNode = (Integer)AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE, it -> {
            this.maxAdBatchTaskPerNode = it;
        });
        this.maxRunningEntitiesPerDetector = (Integer)AnomalyDetectorSettings.MAX_RUNNING_ENTITIES_PER_DETECTOR_FOR_HISTORICAL_ANALYSIS.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(AnomalyDetectorSettings.MAX_RUNNING_ENTITIES_PER_DETECTOR_FOR_HISTORICAL_ANALYSIS, it -> {
            this.maxRunningEntitiesPerDetector = it;
        });
        this.transportRequestOptions = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.REG).withTimeout((TimeValue)AnomalyDetectorSettings.REQUEST_TIMEOUT.get(settings)).build();
        clusterService.getClusterSettings().addSettingsUpdateConsumer(AnomalyDetectorSettings.REQUEST_TIMEOUT, it -> {
            this.transportRequestOptions = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.REG).withTimeout(it).build();
        });
        this.threadPool = threadPool;
        this.checkingTaskSlot = new Semaphore(1);
        this.scaleEntityTaskLane = new Semaphore(1);
    }

    public void startDetector(String detectorId, DetectionDateRange detectionDateRange, IndexAnomalyDetectorJobActionHandler handler, User user, TransportService transportService, ThreadContext.StoredContext context, ActionListener<AnomalyDetectorJobResponse> listener) {
        this.detectionIndices.update();
        this.getDetector(detectorId, detector -> {
            if (!detector.isPresent()) {
                listener.onFailure((Exception)new OpenSearchStatusException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, RestStatus.NOT_FOUND, new Object[0]));
                return;
            }
            String errorMessage = this.validateDetector((AnomalyDetector)detector.get());
            if (errorMessage != null) {
                listener.onFailure((Exception)new OpenSearchStatusException(errorMessage, RestStatus.BAD_REQUEST, new Object[0]));
                return;
            }
            String resultIndex = ((AnomalyDetector)detector.get()).getResultIndex();
            if (resultIndex == null) {
                this.startRealtimeOrHistoricalDetection(detectionDateRange, handler, user, transportService, listener, (Optional<AnomalyDetector>)detector);
                return;
            }
            context.restore();
            this.detectionIndices.initCustomResultIndexAndExecute(resultIndex, () -> this.startRealtimeOrHistoricalDetection(detectionDateRange, handler, user, transportService, listener, (Optional<AnomalyDetector>)detector), listener);
        }, listener);
    }

    private void startRealtimeOrHistoricalDetection(DetectionDateRange detectionDateRange, IndexAnomalyDetectorJobActionHandler handler, User user, TransportService transportService, ActionListener<AnomalyDetectorJobResponse> listener, Optional<AnomalyDetector> detector) {
        try (ThreadContext.StoredContext context = this.client.threadPool().getThreadContext().stashContext();){
            if (detectionDateRange == null) {
                handler.startAnomalyDetectorJob(detector.get(), listener);
            } else {
                this.forwardApplyForTaskSlotsRequestToLeadNode(detector.get(), detectionDateRange, user, transportService, listener);
            }
        }
        catch (Exception e) {
            this.logger.error("Failed to stash context", (Throwable)e);
            listener.onFailure(e);
        }
    }

    protected void forwardApplyForTaskSlotsRequestToLeadNode(AnomalyDetector detector, DetectionDateRange detectionDateRange, User user, TransportService transportService, ActionListener<AnomalyDetectorJobResponse> listener) {
        ForwardADTaskRequest forwardADTaskRequest = new ForwardADTaskRequest(detector, detectionDateRange, user, ADTaskAction.APPLY_FOR_TASK_SLOTS);
        this.forwardRequestToLeadNode(forwardADTaskRequest, transportService, listener);
    }

    public void forwardScaleTaskSlotRequestToLeadNode(ADTask adTask, TransportService transportService, ActionListener<AnomalyDetectorJobResponse> listener) {
        this.forwardRequestToLeadNode(new ForwardADTaskRequest(adTask, ADTaskAction.CHECK_AVAILABLE_TASK_SLOTS), transportService, listener);
    }

    public void forwardRequestToLeadNode(ForwardADTaskRequest forwardADTaskRequest, TransportService transportService, ActionListener<AnomalyDetectorJobResponse> listener) {
        this.hashRing.buildAndGetOwningNodeWithSameLocalAdVersion(AD_TASK_LEAD_NODE_MODEL_ID, node -> {
            if (!node.isPresent()) {
                listener.onFailure((Exception)new ResourceNotFoundException("Can't find AD task lead node"));
                return;
            }
            transportService.sendRequest((DiscoveryNode)node.get(), ForwardADTaskAction.NAME, (TransportRequest)forwardADTaskRequest, this.transportRequestOptions, (TransportResponseHandler)new ActionListenerResponseHandler(listener, AnomalyDetectorJobResponse::new));
        }, listener);
    }

    public void startHistoricalAnalysis(AnomalyDetector detector, DetectionDateRange detectionDateRange, User user, int availableTaskSlots, TransportService transportService, ActionListener<AnomalyDetectorJobResponse> listener) {
        String detectorId = detector.getDetectorId();
        this.hashRing.buildAndGetOwningNodeWithSameLocalAdVersion(detectorId, owningNode -> {
            if (!owningNode.isPresent()) {
                this.logger.debug("Can't find eligible node to run as AD task's coordinating node");
                listener.onFailure((Exception)new OpenSearchStatusException("No eligible node to run detector", RestStatus.INTERNAL_SERVER_ERROR, new Object[0]));
                return;
            }
            this.logger.debug("coordinating node is : {} for detector: {}", (Object)((DiscoveryNode)owningNode.get()).getId(), (Object)detectorId);
            this.forwardDetectRequestToCoordinatingNode(detector, detectionDateRange, user, availableTaskSlots, ADTaskAction.START, transportService, (DiscoveryNode)owningNode.get(), listener);
        }, listener);
    }

    protected void forwardDetectRequestToCoordinatingNode(AnomalyDetector detector, DetectionDateRange detectionDateRange, User user, Integer availableTaskSlots, ADTaskAction adTaskAction, TransportService transportService, DiscoveryNode node, ActionListener<AnomalyDetectorJobResponse> listener) {
        Version adVersion = this.hashRing.getAdVersion(node.getId());
        transportService.sendRequest(node, ForwardADTaskAction.NAME, (TransportRequest)new ForwardADTaskRequest(detector, detectionDateRange, user, adTaskAction, availableTaskSlots, adVersion), this.transportRequestOptions, (TransportResponseHandler)new ActionListenerResponseHandler(listener, AnomalyDetectorJobResponse::new));
    }

    protected void forwardADTaskToCoordinatingNode(ADTask adTask, ADTaskAction adTaskAction, TransportService transportService, ActionListener<AnomalyDetectorJobResponse> listener) {
        this.logger.debug("Forward AD task to coordinating node, task id: {}, action: {}", (Object)adTask.getTaskId(), (Object)adTaskAction.name());
        transportService.sendRequest(this.getCoordinatingNode(adTask), ForwardADTaskAction.NAME, (TransportRequest)new ForwardADTaskRequest(adTask, adTaskAction), this.transportRequestOptions, (TransportResponseHandler)new ActionListenerResponseHandler(listener, AnomalyDetectorJobResponse::new));
    }

    protected void forwardStaleRunningEntitiesToCoordinatingNode(ADTask adTask, ADTaskAction adTaskAction, TransportService transportService, List<String> staleRunningEntity, ActionListener<AnomalyDetectorJobResponse> listener) {
        transportService.sendRequest(this.getCoordinatingNode(adTask), ForwardADTaskAction.NAME, (TransportRequest)new ForwardADTaskRequest(adTask, adTaskAction, staleRunningEntity), this.transportRequestOptions, (TransportResponseHandler)new ActionListenerResponseHandler(listener, AnomalyDetectorJobResponse::new));
    }

    public void checkTaskSlots(ADTask adTask, AnomalyDetector detector, DetectionDateRange detectionDateRange, User user, ADTaskAction afterCheckAction, TransportService transportService, ActionListener<AnomalyDetectorJobResponse> listener) {
        String detectorId = detector.getDetectorId();
        this.logger.debug("Start checking task slots for detector: {}, task action: {}", (Object)detectorId, (Object)afterCheckAction);
        if (!this.checkingTaskSlot.tryAcquire()) {
            this.logger.info("Can't acquire checking task slot semaphore for detector {}", (Object)detectorId);
            listener.onFailure((Exception)new OpenSearchStatusException("Too many historical analysis requests in short time. Please retry later.", RestStatus.FORBIDDEN, new Object[0]));
            return;
        }
        ActionListener wrappedActionListener = ActionListener.runAfter(listener, () -> {
            this.checkingTaskSlot.release(1);
            this.logger.debug("Release checking task slot semaphore on lead node for detector {}", (Object)detectorId);
        });
        this.hashRing.getNodesWithSameLocalAdVersion(nodes -> {
            int maxAdTaskSlots = ((DiscoveryNode[])nodes).length * this.maxAdBatchTaskPerNode;
            ADStatsRequest adStatsRequest = new ADStatsRequest((DiscoveryNode)nodes);
            adStatsRequest.addAll((Set<String>)ImmutableSet.of((Object)InternalStatNames.AD_USED_BATCH_TASK_SLOT_COUNT.getName(), (Object)InternalStatNames.AD_DETECTOR_ASSIGNED_BATCH_TASK_SLOT_COUNT.getName()));
            this.client.execute((ActionType)ADStatsNodesAction.INSTANCE, (ActionRequest)adStatsRequest, ActionListener.wrap(adStatsResponse -> {
                int totalUsedTaskSlots = 0;
                int totalAssignedTaskSlots = 0;
                for (ADStatsNodeResponse response : adStatsResponse.getNodes()) {
                    totalUsedTaskSlots += ((Integer)response.getStatsMap().get(InternalStatNames.AD_USED_BATCH_TASK_SLOT_COUNT.getName())).intValue();
                    totalAssignedTaskSlots += ((Integer)response.getStatsMap().get(InternalStatNames.AD_DETECTOR_ASSIGNED_BATCH_TASK_SLOT_COUNT.getName())).intValue();
                }
                this.logger.info("Current total used task slots is {}, total detector assigned task slots is {} when start historical analysis for detector {}", (Object)totalUsedTaskSlots, (Object)totalAssignedTaskSlots, (Object)detectorId);
                int currentUsedTaskSlots = Math.max(totalUsedTaskSlots, totalAssignedTaskSlots);
                if (currentUsedTaskSlots >= maxAdTaskSlots) {
                    wrappedActionListener.onFailure((Exception)new OpenSearchStatusException("No available task slot", RestStatus.BAD_REQUEST, new Object[0]));
                    return;
                }
                int availableAdTaskSlots = maxAdTaskSlots - currentUsedTaskSlots;
                this.logger.info("Current available task slots is {} for historical analysis of detector {}", (Object)availableAdTaskSlots, (Object)detectorId);
                if (ADTaskAction.SCALE_ENTITY_TASK_SLOTS == afterCheckAction) {
                    this.forwardToCoordinatingNode(adTask, detector, detectionDateRange, user, afterCheckAction, transportService, (ActionListener<AnomalyDetectorJobResponse>)wrappedActionListener, availableAdTaskSlots);
                    return;
                }
                int approvedTaskSlots = detector.isMultientityDetector() ? Math.min(this.maxRunningEntitiesPerDetector, availableAdTaskSlots) : 1;
                this.forwardToCoordinatingNode(adTask, detector, detectionDateRange, user, afterCheckAction, transportService, (ActionListener<AnomalyDetectorJobResponse>)wrappedActionListener, approvedTaskSlots);
            }, exception -> {
                this.logger.error("Failed to get node's task stats for detector " + detectorId, (Throwable)exception);
                wrappedActionListener.onFailure(exception);
            }));
        }, wrappedActionListener);
    }

    private void forwardToCoordinatingNode(ADTask adTask, AnomalyDetector detector, DetectionDateRange detectionDateRange, User user, ADTaskAction targetActionOfTaskSlotChecking, TransportService transportService, ActionListener<AnomalyDetectorJobResponse> wrappedActionListener, int approvedTaskSlots) {
        switch (targetActionOfTaskSlotChecking) {
            case START: {
                this.logger.info("Will assign {} task slots to run historical analysis for detector {}", (Object)approvedTaskSlots, (Object)detector.getDetectorId());
                this.startHistoricalAnalysis(detector, detectionDateRange, user, approvedTaskSlots, transportService, wrappedActionListener);
                break;
            }
            case SCALE_ENTITY_TASK_SLOTS: {
                this.logger.info("There are {} task slots available now to scale historical analysis task lane for detector {}", (Object)approvedTaskSlots, (Object)adTask.getDetectorId());
                this.scaleTaskLaneOnCoordinatingNode(adTask, approvedTaskSlots, transportService, wrappedActionListener);
                break;
            }
            default: {
                wrappedActionListener.onFailure((Exception)new AnomalyDetectionException("Unknown task action " + targetActionOfTaskSlotChecking));
            }
        }
    }

    protected void scaleTaskLaneOnCoordinatingNode(ADTask adTask, int approvedTaskSlot, TransportService transportService, ActionListener<AnomalyDetectorJobResponse> listener) {
        DiscoveryNode coordinatingNode = this.getCoordinatingNode(adTask);
        transportService.sendRequest(coordinatingNode, ForwardADTaskAction.NAME, (TransportRequest)new ForwardADTaskRequest(adTask, approvedTaskSlot, ADTaskAction.SCALE_ENTITY_TASK_SLOTS), this.transportRequestOptions, (TransportResponseHandler)new ActionListenerResponseHandler(listener, AnomalyDetectorJobResponse::new));
    }

    private DiscoveryNode getCoordinatingNode(ADTask adTask) {
        String coordinatingNode = adTask.getCoordinatingNode();
        DiscoveryNode[] eligibleDataNodes = this.nodeFilter.getEligibleDataNodes();
        DiscoveryNode targetNode = null;
        for (DiscoveryNode node : eligibleDataNodes) {
            if (!node.getId().equals(coordinatingNode)) continue;
            targetNode = node;
            break;
        }
        if (targetNode == null) {
            throw new ResourceNotFoundException(adTask.getDetectorId(), "AD task coordinating node not found");
        }
        return targetNode;
    }

    public void startDetector(AnomalyDetector detector, DetectionDateRange detectionDateRange, User user, TransportService transportService, ActionListener<AnomalyDetectorJobResponse> listener) {
        try {
            if (this.detectionIndices.doesDetectorStateIndexExist()) {
                this.getAndExecuteOnLatestDetectorLevelTask(detector.getDetectorId(), this.getADTaskTypes(detectionDateRange), adTask -> {
                    if (!adTask.isPresent() || ((ADTask)adTask.get()).isDone()) {
                        this.updateLatestFlagOfOldTasksAndCreateNewTask(detector, detectionDateRange, user, listener);
                    } else {
                        listener.onFailure((Exception)new OpenSearchStatusException(CommonErrorMessages.DETECTOR_IS_RUNNING, RestStatus.BAD_REQUEST, new Object[0]));
                    }
                }, transportService, true, listener);
            } else {
                this.detectionIndices.initDetectionStateIndex((ActionListener<CreateIndexResponse>)ActionListener.wrap(r -> {
                    if (r.isAcknowledged()) {
                        this.logger.info("Created {} with mappings.", (Object)".opendistro-anomaly-detection-state");
                        this.updateLatestFlagOfOldTasksAndCreateNewTask(detector, detectionDateRange, user, listener);
                    } else {
                        String error = String.format(Locale.ROOT, "Create index %S not acknowledged", ".opendistro-anomaly-detection-state");
                        this.logger.warn(error);
                        listener.onFailure((Exception)new OpenSearchStatusException(error, RestStatus.INTERNAL_SERVER_ERROR, new Object[0]));
                    }
                }, e -> {
                    if (ExceptionsHelper.unwrapCause((Throwable)e) instanceof ResourceAlreadyExistsException) {
                        this.updateLatestFlagOfOldTasksAndCreateNewTask(detector, detectionDateRange, user, listener);
                    } else {
                        this.logger.error("Failed to init anomaly detection state index", (Throwable)e);
                        listener.onFailure(e);
                    }
                }));
            }
        }
        catch (Exception e2) {
            this.logger.error("Failed to start detector " + detector.getDetectorId(), (Throwable)e2);
            listener.onFailure(e2);
        }
    }

    private ADTaskType getADTaskType(AnomalyDetector detector, DetectionDateRange detectionDateRange) {
        if (detectionDateRange == null) {
            return detector.isMultientityDetector() ? ADTaskType.REALTIME_HC_DETECTOR : ADTaskType.REALTIME_SINGLE_ENTITY;
        }
        return detector.isMultientityDetector() ? ADTaskType.HISTORICAL_HC_DETECTOR : ADTaskType.HISTORICAL_SINGLE_ENTITY;
    }

    private List<ADTaskType> getADTaskTypes(DetectionDateRange detectionDateRange) {
        return this.getADTaskTypes(detectionDateRange, false);
    }

    private List<ADTaskType> getADTaskTypes(DetectionDateRange detectionDateRange, boolean resetLatestTaskStateFlag) {
        if (detectionDateRange == null) {
            return ADTaskType.REALTIME_TASK_TYPES;
        }
        if (resetLatestTaskStateFlag) {
            return ADTaskType.ALL_HISTORICAL_TASK_TYPES;
        }
        return ADTaskType.HISTORICAL_DETECTOR_TASK_TYPES;
    }

    public void stopDetector(String detectorId, boolean historical, IndexAnomalyDetectorJobActionHandler handler, User user, TransportService transportService, ActionListener<AnomalyDetectorJobResponse> listener) {
        this.getDetector(detectorId, detector -> {
            if (!detector.isPresent()) {
                listener.onFailure((Exception)new OpenSearchStatusException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, RestStatus.NOT_FOUND, new Object[0]));
                return;
            }
            if (historical) {
                this.getAndExecuteOnLatestDetectorLevelTask(detectorId, ADTaskType.HISTORICAL_DETECTOR_TASK_TYPES, task -> this.stopHistoricalAnalysis(detectorId, (Optional<ADTask>)task, user, listener), transportService, false, listener);
            } else {
                handler.stopAnomalyDetectorJob(detectorId, listener);
            }
        }, listener);
    }

    public <T> void getDetector(String detectorId, Consumer<Optional<AnomalyDetector>> function, ActionListener<T> listener) {
        GetRequest getRequest = new GetRequest(".opendistro-anomaly-detectors", detectorId);
        this.client.get(getRequest, ActionListener.wrap(response -> {
            if (!response.isExists()) {
                function.accept(Optional.empty());
                return;
            }
            try (XContentParser parser = RestHandlerUtils.createXContentParserFromRegistry(this.xContentRegistry, response.getSourceAsBytesRef());){
                XContentParserUtils.ensureExpectedToken((XContentParser.Token)XContentParser.Token.START_OBJECT, (XContentParser.Token)parser.nextToken(), (XContentParser)parser);
                AnomalyDetector detector = AnomalyDetector.parse(parser, response.getId(), response.getVersion());
                function.accept(Optional.of(detector));
            }
            catch (Exception e) {
                String message = "Failed to parse anomaly detector " + detectorId;
                this.logger.error(message, (Throwable)e);
                listener.onFailure((Exception)new OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR, new Object[0]));
            }
        }, exception -> {
            this.logger.error("Failed to get detector " + detectorId, (Throwable)exception);
            listener.onFailure(exception);
        }));
    }

    public <T> void getAndExecuteOnLatestDetectorLevelTask(String detectorId, List<ADTaskType> adTaskTypes, Consumer<Optional<ADTask>> function, TransportService transportService, boolean resetTaskState, ActionListener<T> listener) {
        this.getAndExecuteOnLatestADTask(detectorId, null, null, adTaskTypes, function, transportService, resetTaskState, listener);
    }

    public <T> void getAndExecuteOnLatestADTask(String detectorId, String parentTaskId, Entity entity, List<ADTaskType> adTaskTypes, Consumer<Optional<ADTask>> function, TransportService transportService, boolean resetTaskState, ActionListener<T> listener) {
        this.getAndExecuteOnLatestADTasks(detectorId, parentTaskId, entity, adTaskTypes, taskList -> {
            if (taskList != null && taskList.size() > 0) {
                function.accept(Optional.ofNullable((ADTask)taskList.get(0)));
            } else {
                function.accept(Optional.empty());
            }
        }, transportService, resetTaskState, 1, listener);
    }

    public <T> void getAndExecuteOnLatestADTasks(String detectorId, String parentTaskId, Entity entity, List<ADTaskType> adTaskTypes, Consumer<List<ADTask>> function, TransportService transportService, boolean resetTaskState, int size, ActionListener<T> listener) {
        BoolQueryBuilder query = new BoolQueryBuilder();
        query.filter((QueryBuilder)new TermQueryBuilder("detector_id", detectorId));
        query.filter((QueryBuilder)new TermQueryBuilder("is_latest", true));
        if (parentTaskId != null) {
            query.filter((QueryBuilder)new TermQueryBuilder("parent_task_id", parentTaskId));
        }
        if (adTaskTypes != null && adTaskTypes.size() > 0) {
            query.filter((QueryBuilder)new TermsQueryBuilder("task_type", ADTaskType.taskTypeToString(adTaskTypes)));
        }
        if (entity != null && !ParseUtils.isNullOrEmpty(entity.getAttributes())) {
            String path = "entity";
            String entityKeyFieldName = path + ".name";
            String entityValueFieldName = path + ".value";
            for (Map.Entry<String, String> attribute : entity.getAttributes().entrySet()) {
                BoolQueryBuilder entityBoolQuery = new BoolQueryBuilder();
                TermQueryBuilder entityKeyFilterQuery = QueryBuilders.termQuery((String)entityKeyFieldName, (String)attribute.getKey());
                TermQueryBuilder entityValueFilterQuery = QueryBuilders.termQuery((String)entityValueFieldName, (String)attribute.getValue());
                entityBoolQuery.filter((QueryBuilder)entityKeyFilterQuery).filter((QueryBuilder)entityValueFilterQuery);
                NestedQueryBuilder nestedQueryBuilder = new NestedQueryBuilder(path, (QueryBuilder)entityBoolQuery, ScoreMode.None);
                query.filter((QueryBuilder)nestedQueryBuilder);
            }
        }
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query((QueryBuilder)query).sort("execution_start_time", SortOrder.DESC).size(size);
        SearchRequest searchRequest = new SearchRequest();
        searchRequest.source(sourceBuilder);
        searchRequest.indices(new String[]{".opendistro-anomaly-detection-state"});
        this.client.search(searchRequest, ActionListener.wrap(r -> {
            ArrayList<ADTask> adTasks = new ArrayList<ADTask>();
            if (r == null || r.getHits().getTotalHits() == null || r.getHits().getTotalHits().value == 0L) {
                function.accept(adTasks);
                return;
            }
            for (SearchHit searchHit : r.getHits()) {
                try {
                    XContentParser parser = RestHandlerUtils.createXContentParserFromRegistry(this.xContentRegistry, searchHit.getSourceRef());
                    try {
                        XContentParserUtils.ensureExpectedToken((XContentParser.Token)XContentParser.Token.START_OBJECT, (XContentParser.Token)parser.nextToken(), (XContentParser)parser);
                        ADTask adTask = ADTask.parse(parser, searchHit.getId());
                        adTasks.add(adTask);
                    }
                    finally {
                        if (parser == null) continue;
                        parser.close();
                    }
                }
                catch (Exception e) {
                    String message = "Failed to parse AD task for detector " + detectorId + ", task id " + searchHit.getId();
                    this.logger.error(message, (Throwable)e);
                    listener.onFailure((Exception)new OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR, new Object[0]));
                }
            }
            if (resetTaskState) {
                this.resetLatestDetectorTaskState(adTasks, function, transportService, listener);
            } else {
                function.accept(adTasks);
            }
        }, e -> {
            if (e instanceof IndexNotFoundException) {
                function.accept(new ArrayList());
            } else {
                this.logger.error("Failed to search AD task for detector " + detectorId, (Throwable)e);
                listener.onFailure(e);
            }
        }));
    }

    private <T> void resetLatestDetectorTaskState(List<ADTask> adTasks, Consumer<List<ADTask>> function, TransportService transportService, ActionListener<T> listener) {
        ArrayList<ADTask> runningHistoricalTasks = new ArrayList<ADTask>();
        ArrayList<ADTask> runningRealtimeTasks = new ArrayList<ADTask>();
        for (ADTask adTask : adTasks) {
            if (adTask.isEntityTask() || adTask.isDone()) continue;
            if (!adTask.isHistoricalTask()) {
                runningRealtimeTasks.add(adTask);
                continue;
            }
            runningHistoricalTasks.add(adTask);
        }
        this.resetHistoricalDetectorTaskState(runningHistoricalTasks, () -> this.resetRealtimeDetectorTaskState(runningRealtimeTasks, () -> function.accept(adTasks), transportService, listener), transportService, listener);
    }

    private <T> void resetRealtimeDetectorTaskState(List<ADTask> runningRealtimeTasks, AnomalyDetectorFunction function, TransportService transportService, ActionListener<T> listener) {
        if (ParseUtils.isNullOrEmpty(runningRealtimeTasks)) {
            function.execute();
            return;
        }
        ADTask adTask = runningRealtimeTasks.get(0);
        String detectorId = adTask.getDetectorId();
        GetRequest getJobRequest = new GetRequest(".opendistro-anomaly-detector-jobs").id(detectorId);
        this.client.get(getJobRequest, ActionListener.wrap(r -> {
            if (r.isExists()) {
                try (XContentParser parser = RestHandlerUtils.createXContentParserFromRegistry(this.xContentRegistry, r.getSourceAsBytesRef());){
                    XContentParserUtils.ensureExpectedToken((XContentParser.Token)XContentParser.Token.START_OBJECT, (XContentParser.Token)parser.nextToken(), (XContentParser)parser);
                    AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
                    if (!job.isEnabled()) {
                        this.logger.debug("AD job is disabled, reset realtime task as stopped for detector {}", (Object)detectorId);
                        this.resetTaskStateAsStopped(adTask, function, transportService, listener);
                    } else {
                        function.execute();
                    }
                }
                catch (IOException e) {
                    this.logger.error(" Failed to parse AD job " + detectorId, (Throwable)e);
                    listener.onFailure((Exception)e);
                }
            } else {
                this.logger.debug("AD job is not found, reset realtime task as stopped for detector {}", (Object)detectorId);
                this.resetTaskStateAsStopped(adTask, function, transportService, listener);
            }
        }, e -> {
            this.logger.error("Fail to get AD realtime job for detector " + detectorId, (Throwable)e);
            listener.onFailure(e);
        }));
    }

    private <T> void resetHistoricalDetectorTaskState(List<ADTask> runningHistoricalTasks, AnomalyDetectorFunction function, TransportService transportService, ActionListener<T> listener) {
        if (ParseUtils.isNullOrEmpty(runningHistoricalTasks)) {
            function.execute();
            return;
        }
        ADTask adTask = runningHistoricalTasks.get(0);
        if (!this.lastUpdateTimeOfHistoricalTaskExpired(adTask)) {
            function.execute();
            return;
        }
        String taskId = adTask.getTaskId();
        AnomalyDetector detector = adTask.getDetector();
        this.getADTaskProfile(adTask, (ActionListener<ADTaskProfile>)ActionListener.wrap(taskProfile -> {
            boolean taskStopped = this.isTaskStopped(taskId, detector, (ADTaskProfile)taskProfile);
            if (taskStopped) {
                this.logger.debug("Reset task state as stopped, task id: {}", (Object)adTask.getTaskId());
                if (taskProfile.getTaskId() == null && detector.isMultientityDetector() && !ParseUtils.isNullOrEmpty(taskProfile.getEntityTaskProfiles())) {
                    this.stopHistoricalAnalysis(adTask.getDetectorId(), Optional.of(adTask), null, (ActionListener<AnomalyDetectorJobResponse>)ActionListener.wrap(r -> {
                        this.logger.debug("Restop detector successfully");
                        this.resetTaskStateAsStopped(adTask, function, transportService, listener);
                    }, e -> {
                        this.logger.error("Failed to restop detector ", (Throwable)e);
                        listener.onFailure(e);
                    }));
                } else {
                    this.resetTaskStateAsStopped(adTask, function, transportService, listener);
                }
            } else {
                function.execute();
                if (ADTaskType.HISTORICAL_HC_DETECTOR.name().equals(adTask.getTaskType()) && !ParseUtils.isNullOrEmpty(taskProfile.getRunningEntities()) && this.hcBatchTaskExpired(taskProfile.getLatestHCTaskRunTime())) {
                    ArrayList<String> runningTasksInCoordinatingNodeCache = new ArrayList<String>(taskProfile.getRunningEntities());
                    ArrayList runningTasksOnWorkerNode = new ArrayList();
                    if (taskProfile.getEntityTaskProfiles() != null && taskProfile.getEntityTaskProfiles().size() > 0) {
                        taskProfile.getEntityTaskProfiles().forEach(entryTask -> runningTasksOnWorkerNode.add(this.convertEntityToString(entryTask.getEntity(), detector)));
                    }
                    if (runningTasksInCoordinatingNodeCache.size() > runningTasksOnWorkerNode.size()) {
                        runningTasksInCoordinatingNodeCache.removeAll(runningTasksOnWorkerNode);
                        this.forwardStaleRunningEntitiesToCoordinatingNode(adTask, ADTaskAction.CLEAN_STALE_RUNNING_ENTITIES, transportService, runningTasksInCoordinatingNodeCache, (ActionListener<AnomalyDetectorJobResponse>)ActionListener.wrap(res -> this.logger.debug("Forwarded task to clean stale running entity, task id {}", (Object)taskId), ex -> this.logger.error("Failed to forward clean stale running entity for task " + taskId, (Throwable)ex)));
                    }
                }
            }
        }, e -> {
            this.logger.error("Failed to get AD task profile for task " + adTask.getTaskId(), (Throwable)e);
            function.execute();
        }));
    }

    private boolean isTaskStopped(String taskId, AnomalyDetector detector, ADTaskProfile taskProfile) {
        String detectorId = detector.getDetectorId();
        if (taskProfile == null || !Objects.equals(taskId, taskProfile.getTaskId())) {
            this.logger.debug("AD task not found for task {} detector {}", (Object)taskId, (Object)detectorId);
            return true;
        }
        if (!detector.isMultientityDetector() && taskProfile.getNodeId() == null) {
            this.logger.debug("AD task not running for single entity detector {}, task {}", (Object)detectorId, (Object)taskId);
            return true;
        }
        if (detector.isMultientityDetector() && taskProfile.getTotalEntitiesInited() && ParseUtils.isNullOrEmpty(taskProfile.getRunningEntities()) && ParseUtils.isNullOrEmpty(taskProfile.getEntityTaskProfiles()) && this.hcBatchTaskExpired(taskProfile.getLatestHCTaskRunTime())) {
            this.logger.debug("AD task not running for HC detector {}, task {}", (Object)detectorId, (Object)taskId);
            return true;
        }
        return false;
    }

    public boolean hcBatchTaskExpired(Long latestHCTaskRunTime) {
        if (latestHCTaskRunTime == null) {
            return true;
        }
        return latestHCTaskRunTime + 600000L < Instant.now().toEpochMilli();
    }

    private void stopHistoricalAnalysis(String detectorId, Optional<ADTask> adTask, User user, ActionListener<AnomalyDetectorJobResponse> listener) {
        if (!adTask.isPresent()) {
            listener.onFailure((Exception)new ResourceNotFoundException(detectorId, "Detector not started"));
            return;
        }
        if (adTask.get().isDone()) {
            listener.onFailure((Exception)new ResourceNotFoundException(detectorId, "No running task found"));
            return;
        }
        String taskId = adTask.get().getTaskId();
        DiscoveryNode[] dataNodes = this.hashRing.getNodesWithSameLocalAdVersion();
        String userName = user == null ? null : user.getName();
        ADCancelTaskRequest cancelTaskRequest = new ADCancelTaskRequest(detectorId, taskId, userName, dataNodes);
        this.client.execute((ActionType)ADCancelTaskAction.INSTANCE, (ActionRequest)cancelTaskRequest, ActionListener.wrap(response -> listener.onResponse((Object)new AnomalyDetectorJobResponse(taskId, 0L, 0L, 0L, RestStatus.OK)), e -> {
            this.logger.error("Failed to cancel AD task " + taskId + ", detector id: " + detectorId, (Throwable)e);
            listener.onFailure(e);
        }));
    }

    private boolean lastUpdateTimeOfHistoricalTaskExpired(ADTask adTask) {
        int waitingTime = Math.max(2 * this.pieceIntervalSeconds, 10);
        return adTask.getLastUpdateTime().plus((long)waitingTime, ChronoUnit.SECONDS).isBefore(Instant.now());
    }

    private <T> void resetTaskStateAsStopped(ADTask adTask, AnomalyDetectorFunction function, TransportService transportService, ActionListener<T> listener) {
        this.cleanDetectorCache(adTask, transportService, () -> {
            String taskId = adTask.getTaskId();
            ImmutableMap updatedFields = ImmutableMap.of((Object)"state", (Object)ADTaskState.STOPPED.name());
            this.updateADTask(taskId, (Map<String, Object>)updatedFields, (ActionListener<UpdateResponse>)ActionListener.wrap(r -> {
                adTask.setState(ADTaskState.STOPPED.name());
                if (function != null) {
                    function.execute();
                }
                if (ADTaskType.HISTORICAL_HC_DETECTOR.name().equals(adTask.getTaskType())) {
                    this.resetEntityTasksAsStopped(taskId);
                }
            }, e -> {
                this.logger.error("Failed to update task state as STOPPED for task " + taskId, (Throwable)e);
                listener.onFailure(e);
            }));
        }, listener);
    }

    private void resetEntityTasksAsStopped(String detectorTaskId) {
        UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
        updateByQueryRequest.indices(new String[]{".opendistro-anomaly-detection-state"});
        BoolQueryBuilder query = new BoolQueryBuilder();
        query.filter((QueryBuilder)new TermQueryBuilder("parent_task_id", detectorTaskId));
        query.filter((QueryBuilder)new TermQueryBuilder("task_type", ADTaskType.HISTORICAL_HC_ENTITY.name()));
        query.filter((QueryBuilder)new TermsQueryBuilder("state", ADTaskState.NOT_ENDED_STATES));
        updateByQueryRequest.setQuery((QueryBuilder)query);
        updateByQueryRequest.setRefresh(true);
        String script = String.format(Locale.ROOT, "ctx._source.%s='%s';", "state", ADTaskState.STOPPED.name());
        updateByQueryRequest.setScript(new Script(script));
        this.client.execute((ActionType)UpdateByQueryAction.INSTANCE, (ActionRequest)updateByQueryRequest, ActionListener.wrap(r -> {
            List bulkFailures = r.getBulkFailures();
            if (ParseUtils.isNullOrEmpty(bulkFailures)) {
                this.logger.debug("Updated {} child entity tasks state for detector task {}", (Object)r.getUpdated(), (Object)detectorTaskId);
            } else {
                this.logger.error("Failed to update child entity task's state for detector task {} ", (Object)detectorTaskId);
            }
        }, e -> this.logger.error("Exception happened when update child entity task's state for detector task " + detectorTaskId, (Throwable)e)));
    }

    public <T> void cleanDetectorCache(ADTask adTask, TransportService transportService, AnomalyDetectorFunction function, ActionListener<T> listener) {
        String coordinatingNode = adTask.getCoordinatingNode();
        String detectorId = adTask.getDetectorId();
        String taskId = adTask.getTaskId();
        try {
            this.forwardADTaskToCoordinatingNode(adTask, ADTaskAction.CLEAN_CACHE, transportService, (ActionListener<AnomalyDetectorJobResponse>)ActionListener.wrap(r -> function.execute(), e -> {
                this.logger.error("Failed to clear detector cache on coordinating node " + coordinatingNode, (Throwable)e);
                listener.onFailure(e);
            }));
        }
        catch (ResourceNotFoundException e2) {
            this.logger.warn("Task coordinating node left cluster, taskId: {}, detectorId: {}, coordinatingNode: {}", (Object)taskId, (Object)detectorId, (Object)coordinatingNode);
            function.execute();
        }
        catch (Exception e3) {
            this.logger.error("Failed to forward clean cache event for detector " + detectorId + ", task " + taskId, (Throwable)e3);
            listener.onFailure(e3);
        }
    }

    protected void cleanDetectorCache(ADTask adTask, TransportService transportService, AnomalyDetectorFunction function) {
        String detectorId = adTask.getDetectorId();
        String taskId = adTask.getTaskId();
        this.cleanDetectorCache(adTask, transportService, function, ActionListener.wrap(r -> this.logger.debug("Successfully cleaned cache for detector {}, task {}", (Object)detectorId, (Object)taskId), e -> this.logger.error("Failed to clean cache for detector " + detectorId + ", task " + taskId, (Throwable)e)));
    }

    public void getLatestHistoricalTaskProfile(String detectorId, TransportService transportService, DetectorProfile profile, ActionListener<DetectorProfile> listener) {
        this.getAndExecuteOnLatestADTask(detectorId, null, null, ADTaskType.HISTORICAL_DETECTOR_TASK_TYPES, adTask -> {
            if (adTask.isPresent()) {
                this.getADTaskProfile((ADTask)adTask.get(), (ActionListener<ADTaskProfile>)ActionListener.wrap(adTaskProfile -> {
                    DetectorProfile.Builder profileBuilder = new DetectorProfile.Builder();
                    profileBuilder.adTaskProfile((ADTaskProfile)adTaskProfile);
                    DetectorProfile detectorProfile = profileBuilder.build();
                    detectorProfile.merge(profile);
                    listener.onResponse((Object)detectorProfile);
                }, e -> {
                    this.logger.error("Failed to get AD task profile for task " + ((ADTask)adTask.get()).getTaskId(), (Throwable)e);
                    listener.onFailure(e);
                }));
            } else {
                DetectorProfile.Builder profileBuilder = new DetectorProfile.Builder();
                listener.onResponse((Object)profileBuilder.build());
            }
        }, transportService, false, listener);
    }

    private void getADTaskProfile(ADTask adDetectorLevelTask, ActionListener<ADTaskProfile> listener) {
        String detectorId = adDetectorLevelTask.getDetectorId();
        this.hashRing.getAllEligibleDataNodesWithKnownAdVersion(dataNodes -> {
            ADTaskProfileRequest adTaskProfileRequest = new ADTaskProfileRequest(detectorId, (DiscoveryNode)dataNodes);
            this.client.execute((ActionType)ADTaskProfileAction.INSTANCE, (ActionRequest)adTaskProfileRequest, ActionListener.wrap(response -> {
                if (response.hasFailures()) {
                    listener.onFailure((Exception)response.failures().get(0));
                    return;
                }
                ArrayList<ADEntityTaskProfile> adEntityTaskProfiles = new ArrayList<ADEntityTaskProfile>();
                ADTaskProfile detectorTaskProfile = new ADTaskProfile(adDetectorLevelTask);
                for (ADTaskProfileNodeResponse node : response.getNodes()) {
                    ADTaskProfile taskProfile = node.getAdTaskProfile();
                    if (taskProfile == null) continue;
                    if (taskProfile.getNodeId() != null) {
                        detectorTaskProfile.setTaskId(taskProfile.getTaskId());
                        detectorTaskProfile.setShingleSize(taskProfile.getShingleSize());
                        detectorTaskProfile.setRcfTotalUpdates(taskProfile.getRcfTotalUpdates());
                        detectorTaskProfile.setThresholdModelTrained(taskProfile.getThresholdModelTrained());
                        detectorTaskProfile.setThresholdModelTrainingDataSize(taskProfile.getThresholdModelTrainingDataSize());
                        detectorTaskProfile.setModelSizeInBytes(taskProfile.getModelSizeInBytes());
                        detectorTaskProfile.setNodeId(taskProfile.getNodeId());
                        detectorTaskProfile.setTotalEntitiesCount(taskProfile.getTotalEntitiesCount());
                        detectorTaskProfile.setDetectorTaskSlots(taskProfile.getDetectorTaskSlots());
                        detectorTaskProfile.setPendingEntitiesCount(taskProfile.getPendingEntitiesCount());
                        detectorTaskProfile.setRunningEntitiesCount(taskProfile.getRunningEntitiesCount());
                        detectorTaskProfile.setRunningEntities(taskProfile.getRunningEntities());
                        detectorTaskProfile.setAdTaskType(taskProfile.getAdTaskType());
                    }
                    if (taskProfile.getEntityTaskProfiles() == null) continue;
                    adEntityTaskProfiles.addAll(taskProfile.getEntityTaskProfiles());
                }
                if (adEntityTaskProfiles != null && adEntityTaskProfiles.size() > 0) {
                    detectorTaskProfile.setEntityTaskProfiles(adEntityTaskProfiles);
                }
                listener.onResponse((Object)detectorTaskProfile);
            }, e -> {
                this.logger.error("Failed to get task profile for task " + adDetectorLevelTask.getTaskId(), (Throwable)e);
                listener.onFailure(e);
            }));
        }, listener);
    }

    private String validateDetector(AnomalyDetector detector) {
        String error = null;
        if (detector.getFeatureAttributes().size() == 0) {
            error = "Can't start detector job as no features configured";
        } else if (detector.getEnabledFeatureIds().size() == 0) {
            error = "Can't start detector job as no enabled features configured";
        }
        return error;
    }

    private void updateLatestFlagOfOldTasksAndCreateNewTask(AnomalyDetector detector, DetectionDateRange detectionDateRange, User user, ActionListener<AnomalyDetectorJobResponse> listener) {
        UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
        updateByQueryRequest.indices(new String[]{".opendistro-anomaly-detection-state"});
        BoolQueryBuilder query = new BoolQueryBuilder();
        query.filter((QueryBuilder)new TermQueryBuilder("detector_id", detector.getDetectorId()));
        query.filter((QueryBuilder)new TermQueryBuilder("is_latest", true));
        query.filter((QueryBuilder)new TermsQueryBuilder("task_type", ADTaskType.taskTypeToString(this.getADTaskTypes(detectionDateRange, true))));
        updateByQueryRequest.setQuery((QueryBuilder)query);
        updateByQueryRequest.setRefresh(true);
        String script = String.format(Locale.ROOT, "ctx._source.%s=%s;", "is_latest", false);
        updateByQueryRequest.setScript(new Script(script));
        this.client.execute((ActionType)UpdateByQueryAction.INSTANCE, (ActionRequest)updateByQueryRequest, ActionListener.wrap(r -> {
            List bulkFailures = r.getBulkFailures();
            if (bulkFailures.isEmpty()) {
                String coordinatingNode = detectionDateRange == null ? null : this.clusterService.localNode().getId();
                this.createNewADTask(detector, detectionDateRange, user, coordinatingNode, listener);
            } else {
                this.logger.error("Failed to update old task's state for detector: {}, response: {} ", (Object)detector.getDetectorId(), (Object)r.toString());
                listener.onFailure(((BulkItemResponse.Failure)bulkFailures.get(0)).getCause());
            }
        }, e -> {
            this.logger.error("Failed to reset old tasks as not latest for detector " + detector.getDetectorId(), (Throwable)e);
            listener.onFailure(e);
        }));
    }

    private void createNewADTask(AnomalyDetector detector, DetectionDateRange detectionDateRange, User user, String coordinatingNode, ActionListener<AnomalyDetectorJobResponse> listener) {
        String userName = user == null ? null : user.getName();
        Instant now = Instant.now();
        String taskType = this.getADTaskType(detector, detectionDateRange).name();
        ADTask adTask = new ADTask.Builder().detectorId(detector.getDetectorId()).detector(detector).isLatest(true).taskType(taskType).executionStartTime(now).taskProgress(Float.valueOf(0.0f)).initProgress(Float.valueOf(0.0f)).state(ADTaskState.CREATED.name()).lastUpdateTime(now).startedBy(userName).coordinatingNode(coordinatingNode).detectionDateRange(detectionDateRange).user(user).build();
        this.createADTaskDirectly(adTask, r -> this.onIndexADTaskResponse((IndexResponse)r, adTask, (response, delegatedListener) -> this.cleanOldAdTaskDocs((IndexResponse)response, adTask, (ActionListener<AnomalyDetectorJobResponse>)delegatedListener), listener), listener);
    }

    public <T> void createADTaskDirectly(ADTask adTask, Consumer<IndexResponse> function, ActionListener<T> listener) {
        IndexRequest request = new IndexRequest(".opendistro-anomaly-detection-state");
        try (XContentBuilder builder = XContentFactory.jsonBuilder();){
            request.source(adTask.toXContent(builder, (ToXContent.Params)RestHandlerUtils.XCONTENT_WITH_TYPE)).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
            this.client.index(request, ActionListener.wrap(r -> function.accept((IndexResponse)r), e -> {
                this.logger.error("Failed to create AD task for detector " + adTask.getDetectorId(), (Throwable)e);
                listener.onFailure(e);
            }));
        }
        catch (Exception e2) {
            this.logger.error("Failed to create AD task for detector " + adTask.getDetectorId(), (Throwable)e2);
            listener.onFailure(e2);
        }
    }

    private void onIndexADTaskResponse(IndexResponse response, ADTask adTask, BiConsumer<IndexResponse, ActionListener<AnomalyDetectorJobResponse>> function, ActionListener<AnomalyDetectorJobResponse> listener) {
        if (response == null || response.getResult() != DocWriteResponse.Result.CREATED) {
            String errorMsg = ExceptionUtil.getShardsFailure(response);
            listener.onFailure((Exception)new OpenSearchStatusException(errorMsg, response.status(), new Object[0]));
            return;
        }
        adTask.setTaskId(response.getId());
        ActionListener delegatedListener = ActionListener.wrap(r -> listener.onResponse((Object)r), e -> {
            this.handleADTaskException(adTask, (Exception)e);
            if (e instanceof DuplicateTaskException) {
                listener.onFailure((Exception)new OpenSearchStatusException(CommonErrorMessages.DETECTOR_IS_RUNNING, RestStatus.BAD_REQUEST, new Object[0]));
            } else {
                if (adTask.isHistoricalTask()) {
                    this.adTaskCacheManager.removeHistoricalTaskCache(adTask.getDetectorId());
                }
                listener.onFailure(e);
            }
        });
        try {
            if (adTask.isHistoricalTask()) {
                this.adTaskCacheManager.add(adTask.getDetectorId(), adTask);
            }
        }
        catch (Exception e2) {
            delegatedListener.onFailure(e2);
            return;
        }
        if (function != null) {
            function.accept(response, (ActionListener<AnomalyDetectorJobResponse>)delegatedListener);
        }
    }

    private void cleanOldAdTaskDocs(IndexResponse response, ADTask adTask, ActionListener<AnomalyDetectorJobResponse> delegatedListener) {
        BoolQueryBuilder query = new BoolQueryBuilder();
        query.filter((QueryBuilder)new TermQueryBuilder("detector_id", adTask.getDetectorId()));
        query.filter((QueryBuilder)new TermQueryBuilder("is_latest", false));
        if (adTask.isHistoricalTask()) {
            query.filter((QueryBuilder)new TermsQueryBuilder("task_type", ADTaskType.taskTypeToString(ADTaskType.HISTORICAL_DETECTOR_TASK_TYPES)));
        } else {
            query.filter((QueryBuilder)new TermsQueryBuilder("task_type", ADTaskType.taskTypeToString(ADTaskType.REALTIME_TASK_TYPES)));
        }
        SearchRequest searchRequest = new SearchRequest();
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query((QueryBuilder)query).sort("execution_start_time", SortOrder.DESC).from(this.maxOldAdTaskDocsPerDetector.intValue()).size(AnomalyDetectorSettings.MAX_OLD_AD_TASK_DOCS);
        searchRequest.source(sourceBuilder).indices(new String[]{".opendistro-anomaly-detection-state"});
        String detectorId = adTask.getDetectorId();
        this.deleteTaskDocs(detectorId, searchRequest, () -> {
            if (adTask.isHistoricalTask()) {
                this.runBatchResultAction(response, adTask, delegatedListener);
            } else {
                AnomalyDetectorJobResponse anomalyDetectorJobResponse = new AnomalyDetectorJobResponse(response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), RestStatus.OK);
                delegatedListener.onResponse((Object)anomalyDetectorJobResponse);
            }
        }, delegatedListener);
    }

    protected <T> void deleteTaskDocs(String detectorId, SearchRequest searchRequest, AnomalyDetectorFunction function, ActionListener<T> listener) {
        ActionListener searchListener = ActionListener.wrap(r -> {
            Iterator iterator = r.getHits().iterator();
            if (iterator.hasNext()) {
                BulkRequest bulkRequest = new BulkRequest();
                while (iterator.hasNext()) {
                    SearchHit searchHit = (SearchHit)iterator.next();
                    try {
                        XContentParser parser = RestHandlerUtils.createXContentParserFromRegistry(this.xContentRegistry, searchHit.getSourceRef());
                        try {
                            XContentParserUtils.ensureExpectedToken((XContentParser.Token)XContentParser.Token.START_OBJECT, (XContentParser.Token)parser.nextToken(), (XContentParser)parser);
                            ADTask adTask = ADTask.parse(parser, searchHit.getId());
                            this.logger.debug("Delete old task: {} of detector: {}", (Object)adTask.getTaskId(), (Object)adTask.getDetectorId());
                            bulkRequest.add(new DeleteRequest(".opendistro-anomaly-detection-state").id(adTask.getTaskId()));
                        }
                        finally {
                            if (parser == null) continue;
                            parser.close();
                        }
                    }
                    catch (Exception e2) {
                        listener.onFailure(e2);
                    }
                }
                this.client.execute((ActionType)BulkAction.INSTANCE, (ActionRequest)bulkRequest, ActionListener.wrap(res -> {
                    this.logger.info("Old AD tasks deleted for detector {}", (Object)detectorId);
                    BulkItemResponse[] bulkItemResponses = res.getItems();
                    if (bulkItemResponses != null && bulkItemResponses.length > 0) {
                        for (BulkItemResponse bulkItemResponse : bulkItemResponses) {
                            if (bulkItemResponse.isFailed()) continue;
                            this.logger.debug("Add detector task into cache. Task id: {}", (Object)bulkItemResponse.getId());
                            this.adTaskCacheManager.addDeletedDetectorTask(bulkItemResponse.getId());
                        }
                    }
                    this.cleanChildTasksAndADResultsOfDeletedTask();
                    function.execute();
                }, e -> {
                    this.logger.warn("Failed to clean AD tasks for detector " + detectorId, (Throwable)e);
                    listener.onFailure(e);
                }));
            } else {
                function.execute();
            }
        }, e -> {
            if (e instanceof IndexNotFoundException) {
                function.execute();
            } else {
                listener.onFailure(e);
            }
        });
        this.client.search(searchRequest, searchListener);
    }

    public void cleanChildTasksAndADResultsOfDeletedTask() {
        if (!this.adTaskCacheManager.hasDeletedDetectorTask()) {
            return;
        }
        this.threadPool.schedule(() -> {
            String taskId = this.adTaskCacheManager.pollDeletedDetectorTask();
            if (taskId == null) {
                return;
            }
            DeleteByQueryRequest deleteADResultsRequest = new DeleteByQueryRequest(new String[]{".opendistro-anomaly-results*"});
            deleteADResultsRequest.setQuery((QueryBuilder)new TermsQueryBuilder("task_id", new String[]{taskId}));
            this.client.execute((ActionType)DeleteByQueryAction.INSTANCE, (ActionRequest)deleteADResultsRequest, ActionListener.wrap(res -> {
                this.logger.debug("Successfully deleted AD results of task " + taskId);
                DeleteByQueryRequest deleteChildTasksRequest = new DeleteByQueryRequest(new String[]{".opendistro-anomaly-detection-state"});
                deleteChildTasksRequest.setQuery((QueryBuilder)new TermsQueryBuilder("parent_task_id", new String[]{taskId}));
                this.client.execute((ActionType)DeleteByQueryAction.INSTANCE, (ActionRequest)deleteChildTasksRequest, ActionListener.wrap(r -> {
                    this.logger.debug("Successfully deleted child tasks of task " + taskId);
                    this.cleanChildTasksAndADResultsOfDeletedTask();
                }, e -> this.logger.error("Failed to delete child tasks of task " + taskId, (Throwable)e)));
            }, ex -> this.logger.error("Failed to delete AD results for task " + taskId, (Throwable)ex)));
        }, TimeValue.timeValueSeconds((long)DEFAULT_MAINTAIN_INTERVAL_IN_SECONDS), "ad-batch-task-threadpool");
    }

    private void runBatchResultAction(IndexResponse response, ADTask adTask, ActionListener<AnomalyDetectorJobResponse> listener) {
        this.client.execute((ActionType)ADBatchAnomalyResultAction.INSTANCE, (ActionRequest)new ADBatchAnomalyResultRequest(adTask), ActionListener.wrap(r -> {
            String remoteOrLocal = r.isRunTaskRemotely() ? "remote" : "local";
            this.logger.info("AD task {} of detector {} dispatched to {} node {}", (Object)adTask.getTaskId(), (Object)adTask.getDetectorId(), (Object)remoteOrLocal, (Object)r.getNodeId());
            AnomalyDetectorJobResponse anomalyDetectorJobResponse = new AnomalyDetectorJobResponse(response.getId(), response.getVersion(), response.getSeqNo(), response.getPrimaryTerm(), RestStatus.OK);
            listener.onResponse((Object)anomalyDetectorJobResponse);
        }, e -> listener.onFailure(e)));
    }

    public void handleADTaskException(ADTask adTask, Exception e) {
        String state = ADTaskState.FAILED.name();
        HashMap<String, Object> updatedFields = new HashMap<String, Object>();
        if (e instanceof DuplicateTaskException) {
            this.logger.warn("There is already one running task for detector, detectorId:" + adTask.getDetectorId() + ". Will delete task " + adTask.getTaskId());
            this.deleteADTask(adTask.getTaskId());
            return;
        }
        if (e instanceof ADTaskCancelledException) {
            this.logger.info("AD task cancelled, taskId: {}, detectorId: {}", (Object)adTask.getTaskId(), (Object)adTask.getDetectorId());
            state = ADTaskState.STOPPED.name();
            String stoppedBy = ((ADTaskCancelledException)e).getCancelledBy();
            if (stoppedBy != null) {
                updatedFields.put("stopped_by", stoppedBy);
            }
        } else {
            this.logger.error("Failed to execute AD batch task, task id: " + adTask.getTaskId() + ", detector id: " + adTask.getDetectorId(), (Throwable)e);
        }
        updatedFields.put("error", ExceptionUtil.getErrorMessage(e));
        updatedFields.put("state", state);
        updatedFields.put("execution_end_time", Instant.now().toEpochMilli());
        this.updateADTask(adTask.getTaskId(), updatedFields);
    }

    public void updateADTask(String taskId, Map<String, Object> updatedFields) {
        this.updateADTask(taskId, updatedFields, (ActionListener<UpdateResponse>)ActionListener.wrap(response -> {
            if (response.status() == RestStatus.OK) {
                this.logger.debug("Updated AD task successfully: {}, task id: {}", (Object)response.status(), (Object)taskId);
            } else {
                this.logger.error("Failed to update AD task {}, status: {}", (Object)taskId, (Object)response.status());
            }
        }, e -> this.logger.error("Failed to update task: " + taskId, (Throwable)e)));
    }

    public void updateADTask(String taskId, Map<String, Object> updatedFields, ActionListener<UpdateResponse> listener) {
        UpdateRequest updateRequest = new UpdateRequest(".opendistro-anomaly-detection-state", taskId);
        HashMap<String, Object> updatedContent = new HashMap<String, Object>();
        updatedContent.putAll(updatedFields);
        updatedContent.put("last_update_time", Instant.now().toEpochMilli());
        updateRequest.doc(updatedContent);
        updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        this.client.update(updateRequest, listener);
    }

    public void deleteADTask(String taskId) {
        this.deleteADTask(taskId, (ActionListener<DeleteResponse>)ActionListener.wrap(r -> this.logger.info("Deleted AD task {} with status: {}", (Object)taskId, (Object)r.status()), e -> this.logger.error("Failed to delete AD task " + taskId, (Throwable)e)));
    }

    public void deleteADTask(String taskId, ActionListener<DeleteResponse> listener) {
        DeleteRequest deleteRequest = new DeleteRequest(".opendistro-anomaly-detection-state", taskId);
        this.client.delete(deleteRequest, listener);
    }

    public ADTaskCancellationState cancelLocalTaskByDetectorId(String detectorId, String detectorTaskId, String reason, String userName) {
        ADTaskCancellationState cancellationState = this.adTaskCacheManager.cancelByDetectorId(detectorId, detectorTaskId, reason, userName);
        this.logger.debug("Cancelled AD task for detector: {}, state: {}, cancelled by: {}, reason: {}", (Object)detectorId, (Object)cancellationState, (Object)userName, (Object)reason);
        return cancellationState;
    }

    public void deleteADTasks(String detectorId, AnomalyDetectorFunction function, ActionListener<DeleteResponse> listener) {
        DeleteByQueryRequest request = new DeleteByQueryRequest(new String[]{".opendistro-anomaly-detection-state"});
        BoolQueryBuilder query = new BoolQueryBuilder();
        query.filter((QueryBuilder)new TermQueryBuilder("detector_id", detectorId));
        request.setQuery((QueryBuilder)query);
        this.client.execute((ActionType)DeleteByQueryAction.INSTANCE, (ActionRequest)request, ActionListener.wrap(r -> {
            if (r.getBulkFailures() == null || r.getBulkFailures().size() == 0) {
                this.logger.info("AD tasks deleted for detector {}", (Object)detectorId);
                this.deleteADResultOfDetector(detectorId);
                function.execute();
            } else {
                listener.onFailure((Exception)new OpenSearchStatusException("Failed to delete all AD tasks", RestStatus.INTERNAL_SERVER_ERROR, new Object[0]));
            }
        }, e -> {
            this.logger.info("Failed to delete AD tasks for " + detectorId, (Throwable)e);
            if (e instanceof IndexNotFoundException) {
                this.deleteADResultOfDetector(detectorId);
                function.execute();
            } else {
                listener.onFailure(e);
            }
        }));
    }

    private void deleteADResultOfDetector(String detectorId) {
        if (!this.deleteADResultWhenDeleteDetector) {
            this.logger.info("Won't delete ad result for {} as delete AD result setting is disabled", (Object)detectorId);
            return;
        }
        this.logger.info("Start to delete AD results of detector {}", (Object)detectorId);
        DeleteByQueryRequest deleteADResultsRequest = new DeleteByQueryRequest(new String[]{".opendistro-anomaly-results*"});
        deleteADResultsRequest.setQuery((QueryBuilder)new TermQueryBuilder("detector_id", detectorId));
        this.client.execute((ActionType)DeleteByQueryAction.INSTANCE, (ActionRequest)deleteADResultsRequest, ActionListener.wrap(response -> this.logger.debug("Successfully deleted AD results of detector " + detectorId), exception -> {
            this.logger.error("Failed to delete AD results of detector " + detectorId, (Throwable)exception);
            this.adTaskCacheManager.addDeletedDetector(detectorId);
        }));
    }

    public void cleanADResultOfDeletedDetector() {
        String detectorId = this.adTaskCacheManager.pollDeletedDetector();
        if (detectorId != null) {
            this.deleteADResultOfDetector(detectorId);
        }
    }

    public void updateLatestADTask(String detectorId, List<ADTaskType> taskTypes, Map<String, Object> updatedFields, ActionListener<UpdateResponse> listener) {
        this.getAndExecuteOnLatestDetectorLevelTask(detectorId, taskTypes, adTask -> {
            if (adTask.isPresent()) {
                this.updateADTask(((ADTask)adTask.get()).getTaskId(), updatedFields, listener);
            } else {
                listener.onFailure((Exception)new ResourceNotFoundException(detectorId, CommonErrorMessages.CAN_NOT_FIND_LATEST_TASK));
            }
        }, null, false, listener);
    }

    public void stopLatestRealtimeTask(String detectorId, ADTaskState state, Exception error, TransportService transportService, ActionListener<AnomalyDetectorJobResponse> listener) {
        this.getAndExecuteOnLatestDetectorLevelTask(detectorId, ADTaskType.REALTIME_TASK_TYPES, adTask -> {
            if (adTask.isPresent() && !((ADTask)adTask.get()).isDone()) {
                HashMap<String, String> updatedFields = new HashMap<String, String>();
                updatedFields.put("state", state.name());
                if (error != null) {
                    updatedFields.put("error", error.getMessage());
                }
                AnomalyDetectorFunction function = () -> this.updateADTask(((ADTask)adTask.get()).getTaskId(), updatedFields, (ActionListener<UpdateResponse>)ActionListener.wrap(r -> {
                    if (error == null) {
                        listener.onResponse((Object)new AnomalyDetectorJobResponse(detectorId, 0L, 0L, 0L, RestStatus.OK));
                    } else {
                        listener.onFailure(error);
                    }
                }, e -> listener.onFailure(e)));
                String coordinatingNode = ((ADTask)adTask.get()).getCoordinatingNode();
                if (coordinatingNode != null && transportService != null) {
                    this.cleanDetectorCache((ADTask)adTask.get(), transportService, function, listener);
                } else {
                    function.execute();
                }
            } else {
                listener.onFailure((Exception)new OpenSearchStatusException("Anomaly detector job is already stopped: " + detectorId, RestStatus.OK, new Object[0]));
            }
        }, null, false, listener);
    }

    public void updateLatestRealtimeTaskOnCoordinatingNode(String detectorId, String state, Long rcfTotalUpdates, Long detectorIntervalInMinutes, String error, ActionListener<UpdateResponse> listener) {
        Float initProgress = null;
        String newState = null;
        if (detectorIntervalInMinutes != null && rcfTotalUpdates != null) {
            newState = ADTaskState.INIT.name();
            if (rcfTotalUpdates < 32L) {
                initProgress = Float.valueOf((float)rcfTotalUpdates.longValue() / 32.0f);
            } else {
                newState = ADTaskState.RUNNING.name();
                initProgress = Float.valueOf(1.0f);
            }
        }
        if (state != null) {
            newState = state;
        }
        if (!this.adTaskCacheManager.isRealtimeTaskChangeNeeded(detectorId, newState, initProgress, error = Optional.ofNullable(error).orElse(""))) {
            listener.onResponse(null);
            return;
        }
        HashMap<String, Object> updatedFields = new HashMap<String, Object>();
        updatedFields.put("coordinating_node", this.clusterService.localNode().getId());
        if (initProgress != null) {
            updatedFields.put("init_progress", initProgress);
            updatedFields.put("estimated_minutes_left", Math.max(0L, 32L - rcfTotalUpdates) * detectorIntervalInMinutes);
        }
        if (newState != null) {
            updatedFields.put("state", newState);
        }
        if (error != null) {
            updatedFields.put("error", error);
        }
        Float finalInitProgress = initProgress;
        String finalError = error;
        String finalNewState = newState;
        this.updateLatestADTask(detectorId, ADTaskType.REALTIME_TASK_TYPES, updatedFields, (ActionListener<UpdateResponse>)ActionListener.wrap(r -> {
            this.logger.debug("Updated latest realtime AD task successfully for detector {}", (Object)detectorId);
            this.adTaskCacheManager.updateRealtimeTaskCache(detectorId, finalNewState, finalInitProgress, finalError);
            listener.onResponse(r);
        }, e -> {
            this.logger.error("Failed to update realtime task for detector " + detectorId, (Throwable)e);
            listener.onFailure(e);
        }));
    }

    public void initRealtimeTaskCacheAndCleanupStaleCache(String detectorId, AnomalyDetector detector, TransportService transportService, ActionListener<Boolean> listener) {
        try {
            if (this.adTaskCacheManager.getRealtimeTaskCache(detectorId) != null) {
                listener.onResponse((Object)false);
                return;
            }
            this.getAndExecuteOnLatestDetectorLevelTask(detectorId, ADTaskType.REALTIME_TASK_TYPES, adTaskOptional -> {
                if (!adTaskOptional.isPresent()) {
                    this.logger.debug("Can't find realtime task for detector {}, init realtime task cache directly", (Object)detectorId);
                    AnomalyDetectorFunction function = () -> this.createNewADTask(detector, null, detector.getUser(), this.clusterService.localNode().getId(), (ActionListener<AnomalyDetectorJobResponse>)ActionListener.wrap(r -> {
                        this.logger.info("Recreate realtime task successfully for detector {}", (Object)detectorId);
                        this.adTaskCacheManager.initRealtimeTaskCache(detectorId, detector.getDetectorIntervalInMilliseconds());
                        listener.onResponse((Object)true);
                    }, e -> {
                        this.logger.error("Failed to recreate realtime task for detector " + detectorId, (Throwable)e);
                        listener.onFailure(e);
                    }));
                    this.recreateRealtimeTask(function, listener);
                    return;
                }
                ADTask adTask = (ADTask)adTaskOptional.get();
                String localNodeId = this.clusterService.localNode().getId();
                String oldCoordinatingNode = adTask.getCoordinatingNode();
                if (oldCoordinatingNode != null && !localNodeId.equals(oldCoordinatingNode)) {
                    this.logger.warn("AD realtime job coordinating node changed from {} to this node {} for detector {}", (Object)oldCoordinatingNode, (Object)localNodeId, (Object)detectorId);
                    this.cleanDetectorCache(adTask, transportService, () -> {
                        this.logger.info("Realtime task cache cleaned on old coordinating node {} for detector {}", (Object)oldCoordinatingNode, (Object)detectorId);
                        this.adTaskCacheManager.initRealtimeTaskCache(detectorId, detector.getDetectorIntervalInMilliseconds());
                        listener.onResponse((Object)true);
                    }, listener);
                } else {
                    this.logger.info("Init realtime task cache for detector {}", (Object)detectorId);
                    this.adTaskCacheManager.initRealtimeTaskCache(detectorId, detector.getDetectorIntervalInMilliseconds());
                    listener.onResponse((Object)true);
                }
            }, transportService, false, listener);
        }
        catch (Exception e) {
            this.logger.error("Failed to init realtime task cache for " + detectorId, (Throwable)e);
            listener.onFailure(e);
        }
    }

    private void recreateRealtimeTask(AnomalyDetectorFunction function, ActionListener<Boolean> listener) {
        if (this.detectionIndices.doesDetectorStateIndexExist()) {
            function.execute();
        } else {
            this.detectionIndices.initDetectionStateIndex((ActionListener<CreateIndexResponse>)ActionListener.wrap(r -> {
                if (r.isAcknowledged()) {
                    this.logger.info("Created {} with mappings.", (Object)".opendistro-anomaly-detection-state");
                    function.execute();
                } else {
                    String error = String.format(Locale.ROOT, "Create index %S not acknowledged", ".opendistro-anomaly-detection-state");
                    this.logger.warn(error);
                    listener.onFailure((Exception)new OpenSearchStatusException(error, RestStatus.INTERNAL_SERVER_ERROR, new Object[0]));
                }
            }, e -> {
                if (ExceptionsHelper.unwrapCause((Throwable)e) instanceof ResourceAlreadyExistsException) {
                    function.execute();
                } else {
                    this.logger.error("Failed to init anomaly detection state index", (Throwable)e);
                    listener.onFailure(e);
                }
            }));
        }
    }

    public void refreshRealtimeJobRunTime(String detectorId) {
        this.adTaskCacheManager.refreshRealtimeJobRunTime(detectorId);
    }

    public void removeRealtimeTaskCache(String detectorId) {
        this.adTaskCacheManager.removeRealtimeTaskCache(detectorId);
    }

    protected void entityTaskDone(ADTask adTask, Exception exception, TransportService transportService) {
        this.entityTaskDone(adTask, exception, transportService, (ActionListener<AnomalyDetectorJobResponse>)ActionListener.wrap(r -> this.logger.debug("AD task forwarded to coordinating node, task id {}", (Object)adTask.getTaskId()), e -> this.logger.error("AD task failed to forward to coordinating node " + adTask.getCoordinatingNode() + " for task " + adTask.getTaskId(), (Throwable)e)));
    }

    private void entityTaskDone(ADTask adTask, Exception exception, TransportService transportService, ActionListener<AnomalyDetectorJobResponse> listener) {
        try {
            ADTaskAction action = this.getAdEntityTaskAction(adTask, exception);
            this.forwardADTaskToCoordinatingNode(adTask, action, transportService, listener);
        }
        catch (Exception e) {
            listener.onFailure(e);
        }
    }

    private ADTaskAction getAdEntityTaskAction(ADTask adTask, Exception exception) {
        ADTaskAction action = ADTaskAction.NEXT_ENTITY;
        if (exception != null) {
            adTask.setError(ExceptionUtil.getErrorMessage(exception));
            if (exception instanceof LimitExceededException && this.isRetryableError(exception.getMessage())) {
                action = ADTaskAction.PUSH_BACK_ENTITY;
            } else if (exception instanceof ADTaskCancelledException || exception instanceof EndRunException) {
                action = ADTaskAction.CANCEL;
            }
        }
        return action;
    }

    public boolean isRetryableError(String error) {
        if (error == null) {
            return false;
        }
        return this.retryableErrors.stream().filter(e -> error.contains((CharSequence)e)).findFirst().isPresent();
    }

    public void setHCDetectorTaskDone(ADTask adTask, ADTaskState state, ActionListener<AnomalyDetectorJobResponse> listener) {
        String detectorId = adTask.getDetectorId();
        String taskId = adTask.isEntityTask() ? adTask.getParentTaskId() : adTask.getTaskId();
        String detectorTaskId = adTask.getDetectorLevelTaskId();
        ActionListener wrappedListener = ActionListener.wrap(response -> {
            this.logger.info("Historical HC detector done with state: {}. Remove from cache, detector id:{}", (Object)state.name(), (Object)detectorId);
            this.adTaskCacheManager.removeHistoricalTaskCache(detectorId);
        }, e -> {
            if (e instanceof LimitExceededException && e.getMessage().contains(CommonErrorMessages.HC_DETECTOR_TASK_IS_UPDATING)) {
                this.logger.warn("HC task is updating, skip this update for task: " + taskId);
            } else {
                this.logger.error("Failed to update task: " + taskId, (Throwable)e);
            }
            this.adTaskCacheManager.removeHistoricalTaskCache(detectorId);
        });
        long timeoutInMillis = 2000L;
        if (state == ADTaskState.FINISHED) {
            this.countEntityTasksByState(detectorTaskId, (List<ADTaskState>)ImmutableList.of((Object)((Object)ADTaskState.FINISHED)), (ActionListener<Long>)ActionListener.wrap(r -> {
                this.logger.info("number of finished entity tasks: {}, for detector {}", r, (Object)adTask.getDetectorId());
                ADTaskState hcDetectorTaskState = r == 0L ? ADTaskState.FAILED : ADTaskState.FINISHED;
                this.threadPool.executor("ad-batch-task-threadpool").execute(() -> this.updateADHCDetectorTask(detectorId, taskId, (Map<String, Object>)ImmutableMap.of((Object)"state", (Object)hcDetectorTaskState.name(), (Object)"task_progress", (Object)1.0, (Object)"execution_end_time", (Object)Instant.now().toEpochMilli()), timeoutInMillis, (ActionListener<UpdateResponse>)wrappedListener));
            }, e -> {
                this.logger.error("Failed to get finished entity tasks", (Throwable)e);
                String errorMessage = ExceptionUtil.getErrorMessage(e);
                this.threadPool.executor("ad-batch-task-threadpool").execute(() -> this.updateADHCDetectorTask(detectorId, taskId, (Map<String, Object>)ImmutableMap.of((Object)"state", (Object)ADTaskState.FAILED.name(), (Object)"task_progress", (Object)1.0, (Object)"error", (Object)errorMessage, (Object)"execution_end_time", (Object)Instant.now().toEpochMilli()), timeoutInMillis, (ActionListener<UpdateResponse>)wrappedListener));
            }));
        } else {
            this.threadPool.executor("ad-batch-task-threadpool").execute(() -> this.updateADHCDetectorTask(detectorId, taskId, (Map<String, Object>)ImmutableMap.of((Object)"state", (Object)state.name(), (Object)"error", (Object)adTask.getError(), (Object)"execution_end_time", (Object)Instant.now().toEpochMilli()), timeoutInMillis, (ActionListener<UpdateResponse>)wrappedListener));
        }
        listener.onResponse((Object)new AnomalyDetectorJobResponse(taskId, 0L, 0L, 0L, RestStatus.OK));
    }

    public void countEntityTasksByState(String detectorTaskId, List<ADTaskState> taskStates, ActionListener<Long> listener) {
        BoolQueryBuilder queryBuilder = new BoolQueryBuilder();
        queryBuilder.filter((QueryBuilder)new TermQueryBuilder("parent_task_id", detectorTaskId));
        if (taskStates != null && taskStates.size() > 0) {
            queryBuilder.filter((QueryBuilder)new TermsQueryBuilder("state", (Iterable)taskStates.stream().map(s -> s.name()).collect(Collectors.toList())));
        }
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query((QueryBuilder)queryBuilder);
        sourceBuilder.size(0);
        sourceBuilder.trackTotalHits(true);
        SearchRequest request = new SearchRequest();
        request.source(sourceBuilder);
        request.indices(new String[]{".opendistro-anomaly-detection-state"});
        this.client.search(request, ActionListener.wrap(r -> {
            TotalHits totalHits = r.getHits().getTotalHits();
            listener.onResponse((Object)totalHits.value);
        }, e -> listener.onFailure(e)));
    }

    public void updateADHCDetectorTask(String detectorId, String taskId, Map<String, Object> updatedFields) {
        this.updateADHCDetectorTask(detectorId, taskId, updatedFields, 0L, (ActionListener<UpdateResponse>)ActionListener.wrap(response -> {
            if (response == null) {
                this.logger.debug("Skip updating AD task: {}", (Object)taskId);
            } else if (response.status() == RestStatus.OK) {
                this.logger.debug("Updated AD task successfully: {}, taskId: {}", (Object)response.status(), (Object)taskId);
            } else {
                this.logger.error("Failed to update AD task {}, status: {}", (Object)taskId, (Object)response.status());
            }
        }, e -> {
            if (e instanceof LimitExceededException && e.getMessage().contains(CommonErrorMessages.HC_DETECTOR_TASK_IS_UPDATING)) {
                this.logger.warn("AD HC detector task is updating, skip this update for task: " + taskId);
            } else {
                this.logger.error("Failed to update AD HC detector task: " + taskId, (Throwable)e);
            }
        }));
    }

    private void updateADHCDetectorTask(String detectorId, String taskId, Map<String, Object> updatedFields, long timeoutInMillis, ActionListener<UpdateResponse> listener) {
        try {
            if (this.adTaskCacheManager.tryAcquireTaskUpdatingSemaphore(detectorId, timeoutInMillis)) {
                try {
                    this.updateADTask(taskId, updatedFields, (ActionListener<UpdateResponse>)ActionListener.runAfter(listener, () -> this.adTaskCacheManager.releaseTaskUpdatingSemaphore(detectorId)));
                }
                catch (Exception e) {
                    this.logger.error("Failed to update detector task " + taskId, (Throwable)e);
                    this.adTaskCacheManager.releaseTaskUpdatingSemaphore(detectorId);
                    listener.onFailure(e);
                }
            } else if (!this.adTaskCacheManager.isHCTaskCoordinatingNode(detectorId)) {
                this.logger.info("HC detector task cache does not exist, detectorId:{}, taskId:{}", (Object)detectorId, (Object)taskId);
                listener.onResponse(null);
            } else {
                this.logger.info("HC detector task is updating, detectorId:{}, taskId:{}", (Object)detectorId, (Object)taskId);
                listener.onFailure((Exception)new LimitExceededException(CommonErrorMessages.HC_DETECTOR_TASK_IS_UPDATING));
            }
        }
        catch (Exception e) {
            this.logger.error("Failed to get AD HC detector task updating semaphore " + taskId, (Throwable)e);
            listener.onFailure(e);
        }
    }

    public void runNextEntityForHCADHistorical(ADTask adTask, TransportService transportService, ActionListener<AnomalyDetectorJobResponse> listener) {
        String detectorId = adTask.getDetectorId();
        int scaleDelta = this.scaleTaskSlots(adTask, transportService, (ActionListener<AnomalyDetectorJobResponse>)ActionListener.wrap(r -> this.logger.debug("Scale up task slots done for detector {}, task {}", (Object)detectorId, (Object)adTask.getTaskId()), e -> this.logger.error("Failed to scale up task slots for task " + adTask.getTaskId(), (Throwable)e)));
        if (scaleDelta < 0) {
            this.logger.warn("Have scaled down task slots. Will not poll next entity for detector {}, task {}, task slots: {}", (Object)detectorId, (Object)adTask.getTaskId(), (Object)this.adTaskCacheManager.getDetectorTaskSlots(detectorId));
            listener.onResponse((Object)new AnomalyDetectorJobResponse(detectorId, 0L, 0L, 0L, RestStatus.ACCEPTED));
            return;
        }
        this.client.execute((ActionType)ADBatchAnomalyResultAction.INSTANCE, (ActionRequest)new ADBatchAnomalyResultRequest(adTask), ActionListener.wrap(r -> {
            String remoteOrLocal = r.isRunTaskRemotely() ? "remote" : "local";
            this.logger.info("AD entity task {} of detector {} dispatched to {} node {}", (Object)adTask.getTaskId(), (Object)detectorId, (Object)remoteOrLocal, (Object)r.getNodeId());
            AnomalyDetectorJobResponse anomalyDetectorJobResponse = new AnomalyDetectorJobResponse(detectorId, 0L, 0L, 0L, RestStatus.OK);
            listener.onResponse((Object)anomalyDetectorJobResponse);
        }, e -> listener.onFailure(e)));
    }

    protected int scaleTaskSlots(ADTask adTask, TransportService transportService, ActionListener<AnomalyDetectorJobResponse> scaleUpListener) {
        String detectorId = adTask.getDetectorId();
        if (!this.scaleEntityTaskLane.tryAcquire()) {
            this.logger.debug("Can't get scaleEntityTaskLane semaphore");
            return 0;
        }
        try {
            int scaleDelta = this.detectorTaskSlotScaleDelta(detectorId);
            this.logger.debug("start to scale task slots for detector {} with delta {}", (Object)detectorId, (Object)scaleDelta);
            if (this.adTaskCacheManager.getAvailableNewEntityTaskLanes(detectorId) <= 0 && scaleDelta > 0) {
                Instant lastScaleEntityTaskLaneTime = this.adTaskCacheManager.getLastScaleEntityTaskLaneTime(detectorId);
                if (lastScaleEntityTaskLaneTime == null) {
                    this.logger.debug("lastScaleEntityTaskLaneTime is null for detector {}", (Object)detectorId);
                    this.scaleEntityTaskLane.release();
                    return 0;
                }
                boolean lastScaleTimeExpired = lastScaleEntityTaskLaneTime.plusMillis(10000L).isBefore(Instant.now());
                if (lastScaleTimeExpired) {
                    this.adTaskCacheManager.refreshLastScaleEntityTaskLaneTime(detectorId);
                    this.logger.debug("Forward scale entity task lane request to lead node for detector {}", (Object)detectorId);
                    this.forwardScaleTaskSlotRequestToLeadNode(adTask, transportService, (ActionListener<AnomalyDetectorJobResponse>)ActionListener.runAfter(scaleUpListener, () -> this.scaleEntityTaskLane.release()));
                } else {
                    this.logger.debug("lastScaleEntityTaskLaneTime is not expired yet: {} for detector {}", (Object)lastScaleEntityTaskLaneTime, (Object)detectorId);
                    this.scaleEntityTaskLane.release();
                }
            } else {
                if (scaleDelta < 0) {
                    int runningEntityCount = this.adTaskCacheManager.getRunningEntityCount(detectorId) + this.adTaskCacheManager.getTempEntityCount(detectorId);
                    int assignedTaskSlots = this.adTaskCacheManager.getDetectorTaskSlots(detectorId);
                    int scaleDownDelta = Math.min(assignedTaskSlots - runningEntityCount, 0 - scaleDelta);
                    this.logger.debug("Scale down task slots, scaleDelta: {}, assignedTaskSlots: {}, runningEntityCount: {}, scaleDownDelta: {}", (Object)scaleDelta, (Object)assignedTaskSlots, (Object)runningEntityCount, (Object)scaleDownDelta);
                    this.adTaskCacheManager.scaleDownHCDetectorTaskSlots(detectorId, scaleDownDelta);
                }
                this.scaleEntityTaskLane.release();
            }
            return scaleDelta;
        }
        catch (Exception e) {
            this.logger.error("Failed to forward scale entity task lane request to lead node for detector " + detectorId, (Throwable)e);
            this.scaleEntityTaskLane.release();
            return 0;
        }
    }

    public int detectorTaskSlotScaleDelta(String detectorId) {
        DiscoveryNode[] eligibleDataNodes = this.hashRing.getNodesWithSameLocalAdVersion();
        int unfinishedEntities = this.adTaskCacheManager.getUnfinishedEntityCount(detectorId);
        int totalTaskSlots = eligibleDataNodes.length * this.maxAdBatchTaskPerNode;
        int taskLaneLimit = Math.min(unfinishedEntities, Math.min(totalTaskSlots, this.maxRunningEntitiesPerDetector));
        this.adTaskCacheManager.setDetectorTaskLaneLimit(detectorId, taskLaneLimit);
        int assignedTaskSlots = this.adTaskCacheManager.getDetectorTaskSlots(detectorId);
        int scaleDelta = taskLaneLimit - assignedTaskSlots;
        this.logger.debug("Calculate task slot scale delta for detector {}, totalTaskSlots: {}, maxRunningEntitiesPerDetector: {}, unfinishedEntities: {}, taskLaneLimit: {}, assignedTaskSlots: {}, scaleDelta: {}", (Object)detectorId, (Object)totalTaskSlots, (Object)this.maxRunningEntitiesPerDetector, (Object)unfinishedEntities, (Object)taskLaneLimit, (Object)assignedTaskSlots, (Object)scaleDelta);
        return scaleDelta;
    }

    public float hcDetectorProgress(String detectorId) {
        int entityCount = this.adTaskCacheManager.getTopEntityCount(detectorId);
        int leftEntities = this.adTaskCacheManager.getPendingEntityCount(detectorId) + this.adTaskCacheManager.getRunningEntityCount(detectorId);
        return 1.0f - (float)leftEntities / (float)entityCount;
    }

    public ADTaskProfile getLocalADTaskProfilesByDetectorId(String detectorId) {
        List<String> tasksOfDetector = this.adTaskCacheManager.getTasksOfDetector(detectorId);
        ADTaskProfile detectorTaskProfile = null;
        String localNodeId = this.clusterService.localNode().getId();
        if (this.adTaskCacheManager.isHCTaskRunning(detectorId)) {
            detectorTaskProfile = new ADTaskProfile();
            if (this.adTaskCacheManager.isHCTaskCoordinatingNode(detectorId)) {
                detectorTaskProfile.setNodeId(localNodeId);
                detectorTaskProfile.setTaskId(this.adTaskCacheManager.getDetectorTaskId(detectorId));
                detectorTaskProfile.setDetectorTaskSlots(this.adTaskCacheManager.getDetectorTaskSlots(detectorId));
                detectorTaskProfile.setTotalEntitiesInited(this.adTaskCacheManager.topEntityInited(detectorId));
                detectorTaskProfile.setTotalEntitiesCount(this.adTaskCacheManager.getTopEntityCount(detectorId));
                detectorTaskProfile.setPendingEntitiesCount(this.adTaskCacheManager.getPendingEntityCount(detectorId));
                detectorTaskProfile.setRunningEntitiesCount(this.adTaskCacheManager.getRunningEntityCount(detectorId));
                detectorTaskProfile.setRunningEntities(this.adTaskCacheManager.getRunningEntities(detectorId));
                detectorTaskProfile.setAdTaskType(ADTaskType.HISTORICAL_HC_DETECTOR.name());
                Instant latestHCTaskRunTime = this.adTaskCacheManager.getLatestHCTaskRunTime(detectorId);
                if (latestHCTaskRunTime != null) {
                    detectorTaskProfile.setLatestHCTaskRunTime(latestHCTaskRunTime.toEpochMilli());
                }
            }
            if (tasksOfDetector.size() > 0) {
                ArrayList<ADEntityTaskProfile> entityTaskProfiles = new ArrayList<ADEntityTaskProfile>();
                tasksOfDetector.forEach(taskId -> {
                    ADEntityTaskProfile entityTaskProfile = new ADEntityTaskProfile(this.adTaskCacheManager.getShingle((String)taskId).size(), this.adTaskCacheManager.getTRcfModel((String)taskId).getForest().getTotalUpdates(), this.adTaskCacheManager.isThresholdModelTrained((String)taskId), this.adTaskCacheManager.getThresholdModelTrainingDataSize((String)taskId), this.adTaskCacheManager.getModelSize((String)taskId), localNodeId, this.adTaskCacheManager.getEntity((String)taskId), (String)taskId, ADTaskType.HISTORICAL_HC_ENTITY.name());
                    entityTaskProfiles.add(entityTaskProfile);
                });
                detectorTaskProfile.setEntityTaskProfiles(entityTaskProfiles);
            }
        } else {
            if (tasksOfDetector.size() > 1) {
                String error = "Multiple tasks are running for detector: " + detectorId + ". You can stop detector to kill all running tasks.";
                this.logger.error(error);
                throw new LimitExceededException(error);
            }
            if (tasksOfDetector.size() == 1) {
                String taskId2 = tasksOfDetector.get(0);
                detectorTaskProfile = new ADTaskProfile(this.adTaskCacheManager.getDetectorTaskId(detectorId), this.adTaskCacheManager.getShingle(taskId2).size(), this.adTaskCacheManager.getTRcfModel(taskId2).getForest().getTotalUpdates(), this.adTaskCacheManager.isThresholdModelTrained(taskId2), this.adTaskCacheManager.getThresholdModelTrainingDataSize(taskId2), this.adTaskCacheManager.getModelSize(taskId2), localNodeId);
                detectorTaskProfile.setDetectorTaskSlots(1);
            }
        }
        this.threadPool.executor("ad-batch-task-threadpool").execute(() -> this.adTaskCacheManager.cleanExpiredHCBatchTaskRunStates());
        this.logger.debug("Local AD task profile of detector {}: {}", (Object)detectorId, (Object)detectorTaskProfile);
        return detectorTaskProfile;
    }

    public synchronized void removeStaleRunningEntity(ADTask adTask, String entity, TransportService transportService, ActionListener<AnomalyDetectorJobResponse> listener) {
        String detectorId = adTask.getDetectorId();
        boolean removed = this.adTaskCacheManager.removeRunningEntity(detectorId, entity);
        if (removed && this.adTaskCacheManager.getPendingEntityCount(detectorId) > 0) {
            this.logger.debug("kick off next pending entities");
            this.runNextEntityForHCADHistorical(adTask, transportService, listener);
        } else if (!this.adTaskCacheManager.hasEntity(detectorId)) {
            this.setHCDetectorTaskDone(adTask, ADTaskState.STOPPED, listener);
        }
    }

    public boolean skipUpdateHCRealtimeTask(String detectorId, String error) {
        ADRealtimeTaskCache realtimeTaskCache = this.adTaskCacheManager.getRealtimeTaskCache(detectorId);
        return realtimeTaskCache != null && realtimeTaskCache.getInitProgress() != null && (double)realtimeTaskCache.getInitProgress().floatValue() == 1.0 && Objects.equals(error, realtimeTaskCache.getError());
    }

    public boolean isHCRealtimeTaskStartInitializing(String detectorId) {
        ADRealtimeTaskCache realtimeTaskCache = this.adTaskCacheManager.getRealtimeTaskCache(detectorId);
        return realtimeTaskCache != null && realtimeTaskCache.getInitProgress() != null && realtimeTaskCache.getInitProgress().floatValue() > 0.0f;
    }

    public String convertEntityToString(ADTask adTask) {
        if (adTask == null || !adTask.isEntityTask()) {
            return null;
        }
        AnomalyDetector detector = adTask.getDetector();
        return this.convertEntityToString(adTask.getEntity(), detector);
    }

    public String convertEntityToString(Entity entity, AnomalyDetector detector) {
        if (detector.isMultiCategoryDetector()) {
            try {
                XContentBuilder builder = entity.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS);
                return BytesReference.bytes((XContentBuilder)builder).utf8ToString();
            }
            catch (IOException e) {
                String error = "Failed to parse entity into string";
                this.logger.debug(error, (Throwable)e);
                throw new AnomalyDetectionException(error);
            }
        }
        if (detector.isMultientityDetector()) {
            String categoryField = detector.getCategoryField().get(0);
            return entity.getAttributes().get(categoryField);
        }
        return null;
    }

    public Entity parseEntityFromString(String entityValue, ADTask adTask) {
        AnomalyDetector detector = adTask.getDetector();
        if (detector.isMultiCategoryDetector()) {
            try {
                XContentParser parser = XContentType.JSON.xContent().createParser(this.xContentRegistry, (DeprecationHandler)LoggingDeprecationHandler.INSTANCE, entityValue);
                XContentParserUtils.ensureExpectedToken((XContentParser.Token)XContentParser.Token.START_ARRAY, (XContentParser.Token)parser.nextToken(), (XContentParser)parser);
                return Entity.parse(parser);
            }
            catch (IOException e) {
                String error = "Failed to parse string into entity";
                this.logger.debug(error, (Throwable)e);
                throw new AnomalyDetectionException(error);
            }
        }
        if (detector.isMultientityDetector()) {
            return Entity.createSingleAttributeEntity(detector.getCategoryField().get(0), entityValue);
        }
        throw new IllegalArgumentException("Fail to parse to Entity for single flow detector");
    }

    public void getADTask(String taskId, ActionListener<Optional<ADTask>> listener) {
        GetRequest request = new GetRequest(".opendistro-anomaly-detection-state", taskId);
        this.client.get(request, ActionListener.wrap(r -> {
            if (r != null && r.isExists()) {
                try (XContentParser parser = RestHandlerUtils.createXContentParserFromRegistry(this.xContentRegistry, r.getSourceAsBytesRef());){
                    XContentParserUtils.ensureExpectedToken((XContentParser.Token)XContentParser.Token.START_OBJECT, (XContentParser.Token)parser.nextToken(), (XContentParser)parser);
                    ADTask adTask = ADTask.parse(parser, r.getId());
                    listener.onResponse(Optional.ofNullable(adTask));
                }
                catch (Exception e) {
                    this.logger.error("Failed to parse AD task " + r.getId(), (Throwable)e);
                    listener.onFailure(e);
                }
            } else {
                listener.onResponse(Optional.empty());
            }
        }, e -> {
            if (e instanceof IndexNotFoundException) {
                listener.onResponse(Optional.empty());
            } else {
                this.logger.error("Failed to get AD task " + taskId, (Throwable)e);
                listener.onFailure(e);
            }
        }));
    }

    public void resetLatestFlagAsFalse(List<ADTask> adTasks) {
        if (adTasks == null || adTasks.size() == 0) {
            return;
        }
        BulkRequest bulkRequest = new BulkRequest();
        adTasks.forEach(task -> {
            try {
                task.setLatest(false);
                task.setLastUpdateTime(Instant.now());
                IndexRequest indexRequest = new IndexRequest(".opendistro-anomaly-detection-state").id(task.getTaskId()).source(task.toXContent(XContentBuilder.builder((XContent)XContentType.JSON.xContent()), (ToXContent.Params)RestHandlerUtils.XCONTENT_WITH_TYPE));
                bulkRequest.add(indexRequest);
            }
            catch (Exception e) {
                this.logger.error("Fail to parse task AD task to XContent, task id " + task.getTaskId(), (Throwable)e);
            }
        });
        bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        this.client.execute((ActionType)BulkAction.INSTANCE, (ActionRequest)bulkRequest, ActionListener.wrap(res -> {
            BulkItemResponse[] bulkItemResponses = res.getItems();
            if (bulkItemResponses != null && bulkItemResponses.length > 0) {
                for (BulkItemResponse bulkItemResponse : bulkItemResponses) {
                    if (!bulkItemResponse.isFailed()) {
                        this.logger.warn("Reset AD tasks latest flag as false Successfully. Task id: {}", (Object)bulkItemResponse.getId());
                        continue;
                    }
                    this.logger.warn("Failed to reset AD tasks latest flag as false. Task id: " + bulkItemResponse.getId());
                }
            }
        }, e -> this.logger.warn("Failed to reset AD tasks latest flag as false", (Throwable)e)));
    }

    public int getLocalAdUsedBatchTaskSlot() {
        return this.adTaskCacheManager.getTotalBatchTaskCount();
    }

    public int getLocalAdAssignedBatchTaskSlot() {
        return this.adTaskCacheManager.getTotalDetectorTaskSlots();
    }

    public void maintainRunningHistoricalTasks(TransportService transportService, int size) {
        this.adTaskCacheManager.cleanExpiredHCBatchTaskRunStates();
        Optional<DiscoveryNode> owningNode = this.hashRing.getOwningNodeWithHighestAdVersion(AD_TASK_MAINTAINENCE_NODE_MODEL_ID);
        if (!owningNode.isPresent() || !this.clusterService.localNode().getId().equals(owningNode.get().getId())) {
            return;
        }
        this.logger.info("Start to maintain running historical tasks");
        BoolQueryBuilder query = new BoolQueryBuilder();
        query.filter((QueryBuilder)new TermQueryBuilder("is_latest", true));
        query.filter((QueryBuilder)new TermsQueryBuilder("task_type", ADTaskType.taskTypeToString(ADTaskType.HISTORICAL_DETECTOR_TASK_TYPES)));
        query.filter((QueryBuilder)new TermsQueryBuilder("state", ADTaskState.NOT_ENDED_STATES));
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query((QueryBuilder)query).sort("last_update_time", SortOrder.DESC).size(size);
        SearchRequest searchRequest = new SearchRequest();
        searchRequest.source(sourceBuilder);
        searchRequest.indices(new String[]{".opendistro-anomaly-detection-state"});
        this.client.search(searchRequest, ActionListener.wrap(r -> {
            if (r == null || r.getHits().getTotalHits() == null || r.getHits().getTotalHits().value == 0L) {
                return;
            }
            ConcurrentLinkedQueue<ADTask> taskQueue = new ConcurrentLinkedQueue<ADTask>();
            for (SearchHit searchHit : r.getHits()) {
                try {
                    XContentParser parser = RestHandlerUtils.createXContentParserFromRegistry(this.xContentRegistry, searchHit.getSourceRef());
                    try {
                        XContentParserUtils.ensureExpectedToken((XContentParser.Token)XContentParser.Token.START_OBJECT, (XContentParser.Token)parser.nextToken(), (XContentParser)parser);
                        taskQueue.add(ADTask.parse(parser, searchHit.getId()));
                    }
                    finally {
                        if (parser == null) continue;
                        parser.close();
                    }
                }
                catch (Exception e) {
                    this.logger.error("Maintaining running historical task: failed to parse AD task " + searchHit.getId(), (Throwable)e);
                }
            }
            this.maintainRunningHistoricalTask(taskQueue, transportService);
        }, e -> {
            if (e instanceof IndexNotFoundException) {
                this.logger.debug(STATE_INDEX_NOT_EXIST_MSG);
            } else {
                this.logger.error("Failed to search historical tasks in maintaining job", (Throwable)e);
            }
        }));
    }

    private void maintainRunningHistoricalTask(ConcurrentLinkedQueue<ADTask> taskQueue, TransportService transportService) {
        ADTask adTask = taskQueue.poll();
        if (adTask == null) {
            return;
        }
        this.threadPool.schedule(() -> this.resetHistoricalDetectorTaskState((List<ADTask>)ImmutableList.of((Object)adTask), () -> {
            this.logger.debug("Finished maintaining running historical task {}", (Object)adTask.getTaskId());
            this.maintainRunningHistoricalTask(taskQueue, transportService);
        }, transportService, ActionListener.wrap(r -> this.logger.debug("Reset historical task state done for task {}, detector {}", (Object)adTask.getTaskId(), (Object)adTask.getDetectorId()), e -> this.logger.error("Failed to reset historical task state for task " + adTask.getTaskId(), (Throwable)e))), TimeValue.timeValueSeconds((long)DEFAULT_MAINTAIN_INTERVAL_IN_SECONDS), "ad-batch-task-threadpool");
    }

    public void maintainRunningRealtimeTasks() {
        String[] detectorIds = this.adTaskCacheManager.getDetectorIdsInRealtimeTaskCache();
        if (detectorIds == null || detectorIds.length == 0) {
            return;
        }
        for (int i = 0; i < detectorIds.length; ++i) {
            String detectorId = detectorIds[i];
            ADRealtimeTaskCache taskCache = this.adTaskCacheManager.getRealtimeTaskCache(detectorId);
            if (taskCache == null || !taskCache.expired()) continue;
            this.adTaskCacheManager.removeRealtimeTaskCache(detectorId);
        }
    }
}

