/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.sql.spark.scheduler;

import com.google.common.annotations.VisibleForTesting;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import lombok.Generated;
import org.apache.commons.io.IOUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;
import org.opensearch.sql.spark.scheduler.job.ScheduledAsyncQueryJobRunner;
import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest;
import org.opensearch.sql.spark.scheduler.model.ScheduledAsyncQueryJobRequest;

public class OpenSearchAsyncQueryScheduler
implements AsyncQueryScheduler {
    public static final String SCHEDULER_INDEX_NAME = ".async-query-scheduler";
    public static final String SCHEDULER_PLUGIN_JOB_TYPE = "async-query-scheduler";
    private static final String SCHEDULER_INDEX_MAPPING_FILE_NAME = "async-query-scheduler-index-mapping.yml";
    private static final String SCHEDULER_INDEX_SETTINGS_FILE_NAME = "async-query-scheduler-index-settings.yml";
    private static final Logger LOG = LogManager.getLogger();
    private final Client client;
    private final ClusterService clusterService;

    public void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {
        IndexResponse indexResponse;
        ScheduledAsyncQueryJobRequest request = ScheduledAsyncQueryJobRequest.fromAsyncQuerySchedulerRequest(asyncQuerySchedulerRequest);
        if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) {
            this.createAsyncQuerySchedulerIndex();
        }
        IndexRequest indexRequest = new IndexRequest(SCHEDULER_INDEX_NAME);
        indexRequest.id(request.getName());
        indexRequest.opType(DocWriteRequest.OpType.CREATE);
        indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        try {
            indexRequest.source(request.toXContent(JsonXContent.contentBuilder(), ToXContent.EMPTY_PARAMS));
            ActionFuture indexResponseActionFuture = this.client.index(indexRequest);
            indexResponse = (IndexResponse)indexResponseActionFuture.actionGet();
        }
        catch (VersionConflictEngineException exception) {
            throw new IllegalArgumentException("A job already exists with name: " + request.getName());
        }
        catch (Throwable e) {
            LOG.error("Failed to schedule job : {}", (Object)request.getName(), (Object)e);
            throw new RuntimeException(e);
        }
        if (!indexResponse.getResult().equals((Object)DocWriteResponse.Result.CREATED)) {
            throw new RuntimeException("Schedule job failed with result : " + indexResponse.getResult().getLowercase());
        }
        LOG.debug("Job : {}  successfully created", (Object)request.getName());
    }

    public void unscheduleJob(String jobId) {
        ScheduledAsyncQueryJobRequest request = ScheduledAsyncQueryJobRequest.builder().jobId(jobId).enabled(false).lastUpdateTime(Instant.now()).build();
        try {
            this.updateJob(request);
            LOG.info("Unscheduled job for jobId: {}", (Object)jobId);
        }
        catch (IllegalStateException | DocumentMissingException e) {
            LOG.error("Failed to unschedule job: {}", (Object)jobId, (Object)e);
        }
    }

    public void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {
        UpdateResponse updateResponse;
        ScheduledAsyncQueryJobRequest request = ScheduledAsyncQueryJobRequest.fromAsyncQuerySchedulerRequest(asyncQuerySchedulerRequest);
        this.assertIndexExists();
        UpdateRequest updateRequest = new UpdateRequest(SCHEDULER_INDEX_NAME, request.getName());
        updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        updateRequest.doc(request.toXContent(JsonXContent.contentBuilder(), ToXContent.EMPTY_PARAMS));
        try {
            ActionFuture updateResponseActionFuture = this.client.update(updateRequest);
            updateResponse = (UpdateResponse)updateResponseActionFuture.actionGet();
        }
        catch (DocumentMissingException exception) {
            throw new IllegalArgumentException("Job: " + request.getName() + " doesn't exist");
        }
        catch (Throwable e) {
            LOG.error("Failed to update job : {}", (Object)request.getName(), (Object)e);
            throw new RuntimeException(e);
        }
        if (!updateResponse.getResult().equals((Object)DocWriteResponse.Result.UPDATED) && !updateResponse.getResult().equals((Object)DocWriteResponse.Result.NOOP)) {
            throw new RuntimeException("Update job failed with result : " + updateResponse.getResult().getLowercase());
        }
        LOG.debug("Job : {} successfully updated", (Object)request.getName());
    }

    public void removeJob(String jobId) {
        this.assertIndexExists();
        DeleteRequest deleteRequest = new DeleteRequest(SCHEDULER_INDEX_NAME, jobId);
        deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        ActionFuture deleteResponseActionFuture = this.client.delete(deleteRequest);
        DeleteResponse deleteResponse = (DeleteResponse)deleteResponseActionFuture.actionGet();
        if (!deleteResponse.getResult().equals((Object)DocWriteResponse.Result.DELETED)) {
            if (deleteResponse.getResult().equals((Object)DocWriteResponse.Result.NOT_FOUND)) {
                throw new IllegalArgumentException("Job : " + jobId + " doesn't exist");
            }
            throw new RuntimeException("Remove job failed with result : " + deleteResponse.getResult().getLowercase());
        }
        LOG.debug("Job : {} successfully deleted", (Object)jobId);
    }

    @VisibleForTesting
    void createAsyncQuerySchedulerIndex() {
        try {
            InputStream mappingFileStream = OpenSearchAsyncQueryScheduler.class.getClassLoader().getResourceAsStream(SCHEDULER_INDEX_MAPPING_FILE_NAME);
            InputStream settingsFileStream = OpenSearchAsyncQueryScheduler.class.getClassLoader().getResourceAsStream(SCHEDULER_INDEX_SETTINGS_FILE_NAME);
            CreateIndexRequest createIndexRequest = new CreateIndexRequest(SCHEDULER_INDEX_NAME);
            createIndexRequest.mapping(IOUtils.toString((InputStream)mappingFileStream, (Charset)StandardCharsets.UTF_8), XContentType.YAML);
            createIndexRequest.settings(IOUtils.toString((InputStream)settingsFileStream, (Charset)StandardCharsets.UTF_8), XContentType.YAML);
            ActionFuture createIndexResponseActionFuture = this.client.admin().indices().create(createIndexRequest);
            CreateIndexResponse createIndexResponse = (CreateIndexResponse)createIndexResponseActionFuture.actionGet();
            if (!createIndexResponse.isAcknowledged()) {
                throw new RuntimeException("Index creation is not acknowledged.");
            }
            LOG.debug("Index: {} creation Acknowledged", (Object)SCHEDULER_INDEX_NAME);
        }
        catch (Throwable e) {
            LOG.error("Error creating index: {}", (Object)SCHEDULER_INDEX_NAME, (Object)e);
            throw new RuntimeException("Internal server error while creating .async-query-scheduler index: " + e.getMessage(), e);
        }
    }

    private void assertIndexExists() {
        if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) {
            throw new IllegalStateException("Job index does not exist.");
        }
    }

    public static ScheduledJobRunner getJobRunner() {
        return ScheduledAsyncQueryJobRunner.getJobRunnerInstance();
    }

    @Generated
    public OpenSearchAsyncQueryScheduler(Client client, ClusterService clusterService) {
        this.client = client;
        this.clusterService = clusterService;
    }
}

