/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.sql.spark.dispatcher;

import com.amazonaws.services.emrserverless.model.GetJobRunResult;
import java.util.Map;
import lombok.Generated;
import org.json.JSONObject;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.StartJobRequest;
import org.opensearch.sql.spark.dispatcher.AsyncQueryHandler;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.metrics.EmrMetrics;
import org.opensearch.sql.spark.metrics.MetricsService;
import org.opensearch.sql.spark.parameter.SparkSubmitParametersBuilderProvider;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

public class BatchQueryHandler
extends AsyncQueryHandler {
    protected final EMRServerlessClient emrServerlessClient;
    protected final JobExecutionResponseReader jobExecutionResponseReader;
    protected final LeaseManager leaseManager;
    protected final MetricsService metricsService;
    protected final SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider;

    @Override
    protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata, AsyncQueryRequestContext asyncQueryRequestContext) {
        return this.jobExecutionResponseReader.getResultWithJobId(asyncQueryJobMetadata.getJobId(), asyncQueryJobMetadata.getResultIndex());
    }

    @Override
    protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJobMetadata, AsyncQueryRequestContext asyncQueryRequestContext) {
        JSONObject result = new JSONObject();
        GetJobRunResult getJobRunResult = this.emrServerlessClient.getJobRunResult(asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId());
        String jobState = getJobRunResult.getJobRun().getState();
        result.put("status", (Object)jobState);
        result.put("error", (Object)"");
        return result;
    }

    @Override
    public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata, AsyncQueryRequestContext asyncQueryRequestContext) {
        this.emrServerlessClient.cancelJobRun(asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId(), false);
        return asyncQueryJobMetadata.getQueryId();
    }

    @Override
    public DispatchQueryResponse submit(DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
        String clusterName = dispatchQueryRequest.getClusterName();
        Map<String, String> tags = context.getTags();
        DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
        tags.put("type", JobType.BATCH.getText());
        StartJobRequest startJobRequest = new StartJobRequest(clusterName + ":" + JobType.BATCH.getText(), dispatchQueryRequest.getAccountId(), dispatchQueryRequest.getApplicationId(), dispatchQueryRequest.getExecutionRoleARN(), this.sparkSubmitParametersBuilderProvider.getSparkSubmitParametersBuilder().clusterName(clusterName).query(dispatchQueryRequest.getQuery()).dataSource(context.getDataSourceMetadata(), dispatchQueryRequest, context.getAsyncQueryRequestContext()).acceptModifier(dispatchQueryRequest.getSparkSubmitParameterModifier()).acceptComposers(dispatchQueryRequest, context.getAsyncQueryRequestContext()).toString(), tags, false, dataSourceMetadata.getResultIndex());
        String jobId = this.emrServerlessClient.startJobRun(startJobRequest);
        this.metricsService.incrementNumericalMetric(EmrMetrics.EMR_BATCH_QUERY_JOBS_CREATION_COUNT);
        return DispatchQueryResponse.builder().queryId(context.getQueryId()).jobId(jobId).resultIndex(dataSourceMetadata.getResultIndex()).datasourceName(dataSourceMetadata.getName()).jobType(JobType.BATCH).build();
    }

    @Generated
    public BatchQueryHandler(EMRServerlessClient emrServerlessClient, JobExecutionResponseReader jobExecutionResponseReader, LeaseManager leaseManager, MetricsService metricsService, SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider) {
        this.emrServerlessClient = emrServerlessClient;
        this.jobExecutionResponseReader = jobExecutionResponseReader;
        this.leaseManager = leaseManager;
        this.metricsService = metricsService;
        this.sparkSubmitParametersBuilderProvider = sparkSubmitParametersBuilderProvider;
    }
}

