/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.sql.spark.scheduler.job;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.jobscheduler.spi.JobExecutionContext;
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse;
import org.opensearch.sql.spark.scheduler.model.ScheduledAsyncQueryJobRequest;
import org.opensearch.threadpool.ThreadPool;

public class ScheduledAsyncQueryJobRunner
implements ScheduledJobRunner {
    private static final String ASYNC_QUERY_THREAD_POOL_NAME = "sql-worker";
    private static final Logger LOGGER = LogManager.getLogger(ScheduledAsyncQueryJobRunner.class);
    private static ScheduledAsyncQueryJobRunner INSTANCE = new ScheduledAsyncQueryJobRunner();
    private ClusterService clusterService;
    private ThreadPool threadPool;
    private Client client;
    private AsyncQueryExecutorService asyncQueryExecutorService;

    public static ScheduledAsyncQueryJobRunner getJobRunnerInstance() {
        return INSTANCE;
    }

    private ScheduledAsyncQueryJobRunner() {
    }

    public void loadJobResource(Client client, ClusterService clusterService, ThreadPool threadPool, AsyncQueryExecutorService asyncQueryExecutorService) {
        this.client = client;
        this.clusterService = clusterService;
        this.threadPool = threadPool;
        this.asyncQueryExecutorService = asyncQueryExecutorService;
    }

    public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) {
        if (!(jobParameter instanceof ScheduledAsyncQueryJobRequest)) {
            throw new IllegalStateException("Job parameter is not instance of ScheduledAsyncQueryJobRequest, type: " + jobParameter.getClass().getCanonicalName());
        }
        if (this.clusterService == null) {
            throw new IllegalStateException("ClusterService is not initialized.");
        }
        if (this.threadPool == null) {
            throw new IllegalStateException("ThreadPool is not initialized.");
        }
        if (this.client == null) {
            throw new IllegalStateException("Client is not initialized.");
        }
        if (this.asyncQueryExecutorService == null) {
            throw new IllegalStateException("AsyncQueryExecutorService is not initialized.");
        }
        Runnable runnable = () -> {
            try {
                this.doRefresh((ScheduledAsyncQueryJobRequest)jobParameter);
            }
            catch (Throwable throwable) {
                LOGGER.error((Object)throwable);
            }
        };
        this.threadPool.executor(ASYNC_QUERY_THREAD_POOL_NAME).submit(runnable);
    }

    void doRefresh(ScheduledAsyncQueryJobRequest request) {
        LOGGER.info("Scheduled refresh index job on job: " + request.getName());
        CreateAsyncQueryRequest createAsyncQueryRequest = new CreateAsyncQueryRequest(request.getScheduledQuery(), request.getDataSource(), request.getQueryLang());
        CreateAsyncQueryResponse createAsyncQueryResponse = this.asyncQueryExecutorService.createAsyncQuery(createAsyncQueryRequest, (AsyncQueryRequestContext)new NullAsyncQueryRequestContext());
        LOGGER.info("Created async query with queryId: " + createAsyncQueryResponse.getQueryId());
    }
}

