/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.sql.spark.dispatcher;

import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import java.util.Map;
import lombok.Generated;
import org.json.JSONObject;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.utils.MetricUtils;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.StartJobRequest;
import org.opensearch.sql.spark.dispatcher.AsyncQueryHandler;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.leasemanager.model.LeaseRequest;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

public class BatchQueryHandler
extends AsyncQueryHandler {
    private final EMRServerlessClient emrServerlessClient;
    private final JobExecutionResponseReader jobExecutionResponseReader;
    protected final LeaseManager leaseManager;

    @Override
    protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
        return this.jobExecutionResponseReader.getResultFromOpensearchIndex(asyncQueryJobMetadata.getJobId(), asyncQueryJobMetadata.getResultIndex());
    }

    @Override
    protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJobMetadata) {
        JSONObject result = new JSONObject();
        GetJobRunResult getJobRunResult = this.emrServerlessClient.getJobRunResult(asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId());
        String jobState = getJobRunResult.getJobRun().getState();
        result.put("status", (Object)jobState);
        result.put("error", (Object)"");
        return result;
    }

    @Override
    public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
        this.emrServerlessClient.cancelJobRun(asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId(), false);
        return asyncQueryJobMetadata.getQueryId().getId();
    }

    @Override
    public DispatchQueryResponse submit(DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
        this.leaseManager.borrow(new LeaseRequest(JobType.BATCH, dispatchQueryRequest.getDatasource()));
        String clusterName = dispatchQueryRequest.getClusterName();
        Map<String, String> tags = context.getTags();
        DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
        tags.put("type", JobType.BATCH.getText());
        StartJobRequest startJobRequest = new StartJobRequest(clusterName + ":" + JobType.BATCH.getText(), dispatchQueryRequest.getApplicationId(), dispatchQueryRequest.getExecutionRoleARN(), SparkSubmitParameters.Builder.builder().clusterName(clusterName).dataSource(context.getDataSourceMetadata()).query(dispatchQueryRequest.getQuery()).extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams()).build().toString(), tags, false, dataSourceMetadata.getResultIndex());
        String jobId = this.emrServerlessClient.startJobRun(startJobRequest);
        MetricUtils.incrementNumericalMetric((MetricName)MetricName.EMR_BATCH_QUERY_JOBS_CREATION_COUNT);
        return new DispatchQueryResponse(context.getQueryId(), jobId, dataSourceMetadata.getResultIndex(), null);
    }

    @Generated
    public BatchQueryHandler(EMRServerlessClient emrServerlessClient, JobExecutionResponseReader jobExecutionResponseReader, LeaseManager leaseManager) {
        this.emrServerlessClient = emrServerlessClient;
        this.jobExecutionResponseReader = jobExecutionResponseReader;
        this.leaseManager = leaseManager;
    }
}

