/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.ad.cluster;

import java.io.IOException;
import java.time.Instant;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.ad.common.exception.ResourceNotFoundException;
import org.opensearch.ad.indices.AnomalyDetectionIndices;
import org.opensearch.ad.model.ADTask;
import org.opensearch.ad.model.ADTaskState;
import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.DetectorInternalState;
import org.opensearch.ad.rest.handler.AnomalyDetectorFunction;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.ad.util.RestHandlerUtils;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentParserUtils;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.index.query.TermsQueryBuilder;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;

public class ADDataMigrator {
    private final Logger logger = LogManager.getLogger(this.getClass());
    private final Client client;
    private final ClusterService clusterService;
    private final NamedXContentRegistry xContentRegistry;
    private final AnomalyDetectionIndices detectionIndices;
    private final AtomicBoolean dataMigrated;

    public ADDataMigrator(Client client, ClusterService clusterService, NamedXContentRegistry xContentRegistry, AnomalyDetectionIndices detectionIndices) {
        this.client = client;
        this.clusterService = clusterService;
        this.xContentRegistry = xContentRegistry;
        this.detectionIndices = detectionIndices;
        this.dataMigrated = new AtomicBoolean(false);
    }

    public void migrateData() {
        if (!this.dataMigrated.getAndSet(true)) {
            this.logger.info("Start migrating AD data");
            if (!this.detectionIndices.doesAnomalyDetectorJobIndexExist()) {
                this.logger.info("AD job index doesn't exist, no need to migrate");
                return;
            }
            if (this.detectionIndices.doesDetectorStateIndexExist()) {
                this.migrateDetectorInternalStateToRealtimeTask();
            } else {
                this.detectionIndices.initDetectionStateIndex((ActionListener<CreateIndexResponse>)ActionListener.wrap(r -> {
                    if (r.isAcknowledged()) {
                        this.logger.info("Created {} with mappings.", (Object)".opendistro-anomaly-detection-state");
                        this.migrateDetectorInternalStateToRealtimeTask();
                    } else {
                        String error = "Create index .opendistro-anomaly-detection-state with mappings not acknowledged";
                        this.logger.warn(error);
                    }
                }, e -> {
                    if (ExceptionsHelper.unwrapCause((Throwable)e) instanceof ResourceAlreadyExistsException) {
                        this.migrateDetectorInternalStateToRealtimeTask();
                    } else {
                        this.logger.error("Failed to init anomaly detection state index", (Throwable)e);
                    }
                }));
            }
        }
    }

    public void migrateDetectorInternalStateToRealtimeTask() {
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query((QueryBuilder)new MatchAllQueryBuilder()).size(10000);
        SearchRequest searchRequest = new SearchRequest(new String[]{".opendistro-anomaly-detector-jobs"}).source(searchSourceBuilder);
        this.client.search(searchRequest, ActionListener.wrap(r -> {
            if (r == null || r.getHits().getTotalHits() == null || r.getHits().getTotalHits().value == 0L) {
                this.logger.info("No anomaly detector job found, no need to migrate");
                return;
            }
            ConcurrentLinkedQueue<AnomalyDetectorJob> detectorJobs = new ConcurrentLinkedQueue<AnomalyDetectorJob>();
            for (SearchHit searchHit : r.getHits()) {
                try {
                    XContentParser parser = RestHandlerUtils.createXContentParserFromRegistry(this.xContentRegistry, searchHit.getSourceRef());
                    try {
                        XContentParserUtils.ensureExpectedToken((XContentParser.Token)XContentParser.Token.START_OBJECT, (XContentParser.Token)parser.nextToken(), (XContentParser)parser);
                        AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
                        detectorJobs.add(job);
                    }
                    finally {
                        if (parser == null) continue;
                        parser.close();
                    }
                }
                catch (IOException e) {
                    this.logger.error("Fail to parse AD job " + searchHit.getId(), (Throwable)e);
                }
            }
            this.logger.info("Total AD jobs to backfill realtime task: {}", (Object)detectorJobs.size());
            this.backfillRealtimeTask(detectorJobs, true);
        }, e -> {
            if (ExceptionUtil.getErrorMessage(e).contains("all shards failed")) {
                this.logger.warn("No available shards of AD job index, reset dataMigrated as false");
                this.dataMigrated.set(false);
            } else if (!(e instanceof IndexNotFoundException)) {
                this.logger.error("Failed to migrate AD data", (Throwable)e);
            }
        }));
    }

    public void backfillRealtimeTask(ConcurrentLinkedQueue<AnomalyDetectorJob> detectorJobs, boolean backfillAllJob) {
        AnomalyDetectorJob job = detectorJobs.poll();
        if (job == null) {
            this.logger.info("AD data migration done.");
            if (backfillAllJob) {
                this.dataMigrated.set(true);
            }
            return;
        }
        String jobId = job.getName();
        AnomalyDetectorFunction createRealtimeTaskFunction = () -> {
            GetRequest getRequest = new GetRequest(".opendistro-anomaly-detection-state", jobId);
            this.client.get(getRequest, ActionListener.wrap(r -> {
                if (r != null && r.isExists()) {
                    try (XContentParser parser = RestHandlerUtils.createXContentParserFromRegistry(this.xContentRegistry, r.getSourceAsBytesRef());){
                        XContentParserUtils.ensureExpectedToken((XContentParser.Token)XContentParser.Token.START_OBJECT, (XContentParser.Token)parser.nextToken(), (XContentParser)parser);
                        DetectorInternalState detectorState = DetectorInternalState.parse(parser);
                        this.createRealtimeADTask(job, detectorState.getError(), detectorJobs, backfillAllJob);
                    }
                    catch (IOException e) {
                        this.logger.error("Failed to parse detector internal state " + jobId, (Throwable)e);
                        this.createRealtimeADTask(job, null, detectorJobs, backfillAllJob);
                    }
                } else {
                    this.createRealtimeADTask(job, null, detectorJobs, backfillAllJob);
                }
            }, e -> {
                this.logger.error("Failed to query detector internal state " + jobId, (Throwable)e);
                this.createRealtimeADTask(job, null, detectorJobs, backfillAllJob);
            }));
        };
        this.checkIfRealtimeTaskExistsAndBackfill(job, createRealtimeTaskFunction, detectorJobs, backfillAllJob);
    }

    private void checkIfRealtimeTaskExistsAndBackfill(AnomalyDetectorJob job, AnomalyDetectorFunction createRealtimeTaskFunction, ConcurrentLinkedQueue<AnomalyDetectorJob> detectorJobs, boolean migrateAll) {
        String jobId = job.getName();
        BoolQueryBuilder query = new BoolQueryBuilder();
        query.filter((QueryBuilder)new TermQueryBuilder("detector_id", jobId));
        if (job.isEnabled()) {
            query.filter((QueryBuilder)new TermQueryBuilder("is_latest", true));
        }
        query.filter((QueryBuilder)new TermsQueryBuilder("task_type", ADTaskType.taskTypeToString(ADTaskType.REALTIME_TASK_TYPES)));
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query((QueryBuilder)query).size(1);
        SearchRequest searchRequest = new SearchRequest(new String[]{".opendistro-anomaly-detection-state"}).source(searchSourceBuilder);
        this.client.search(searchRequest, ActionListener.wrap(r -> {
            if (r != null && r.getHits().getTotalHits().value > 0L) {
                this.backfillRealtimeTask(detectorJobs, migrateAll);
                return;
            }
            createRealtimeTaskFunction.execute();
        }, e -> {
            if (e instanceof ResourceNotFoundException) {
                createRealtimeTaskFunction.execute();
            }
            this.logger.error("Failed to search tasks of detector " + jobId);
        }));
    }

    private void createRealtimeADTask(AnomalyDetectorJob job, String error, ConcurrentLinkedQueue<AnomalyDetectorJob> detectorJobs, boolean migrateAll) {
        this.client.get(new GetRequest(".opendistro-anomaly-detectors", job.getName()), ActionListener.wrap(r -> {
            if (r != null && r.isExists()) {
                try (XContentParser parser = RestHandlerUtils.createXContentParserFromRegistry(this.xContentRegistry, r.getSourceAsBytesRef());){
                    XContentParserUtils.ensureExpectedToken((XContentParser.Token)XContentParser.Token.START_OBJECT, (XContentParser.Token)parser.nextToken(), (XContentParser)parser);
                    AnomalyDetector detector = AnomalyDetector.parse(parser, r.getId());
                    ADTaskType taskType = detector.isMultientityDetector() ? ADTaskType.REALTIME_HC_DETECTOR : ADTaskType.REALTIME_SINGLE_ENTITY;
                    Instant now = Instant.now();
                    String userName = job.getUser() != null ? job.getUser().getName() : null;
                    ADTask adTask = new ADTask.Builder().detectorId(detector.getDetectorId()).detector(detector).error(error).isLatest(true).taskType(taskType.name()).executionStartTime(now).taskProgress(Float.valueOf(0.0f)).initProgress(Float.valueOf(0.0f)).state(ADTaskState.CREATED.name()).lastUpdateTime(now).startedBy(userName).coordinatingNode(null).detectionDateRange(null).user(job.getUser()).build();
                    IndexRequest indexRequest = ((IndexRequest)new IndexRequest(".opendistro-anomaly-detection-state").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).source(adTask.toXContent(XContentFactory.jsonBuilder(), (ToXContent.Params)RestHandlerUtils.XCONTENT_WITH_TYPE));
                    this.client.index(indexRequest, ActionListener.wrap(indexResponse -> {
                        this.logger.info("Backfill realtime task successfully for detector {}", (Object)job.getName());
                        this.backfillRealtimeTask(detectorJobs, migrateAll);
                    }, ex -> {
                        this.logger.error("Failed to backfill realtime task for detector " + job.getName(), (Throwable)ex);
                        this.backfillRealtimeTask(detectorJobs, migrateAll);
                    }));
                }
                catch (IOException e) {
                    this.logger.error("Fail to parse detector " + job.getName(), (Throwable)e);
                    this.backfillRealtimeTask(detectorJobs, migrateAll);
                }
            } else {
                this.logger.error("Detector doesn't exist " + job.getName());
                this.backfillRealtimeTask(detectorJobs, migrateAll);
            }
        }, e -> {
            this.logger.error("Fail to get detector " + job.getName(), (Throwable)e);
            this.backfillRealtimeTask(detectorJobs, migrateAll);
        }));
    }

    public void skipMigration() {
        this.dataMigrated.set(true);
    }

    public boolean isMigrated() {
        return this.dataMigrated.get();
    }
}

