/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.sql.spark.client;

import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.model.ActionOnFailure;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult;
import com.amazonaws.services.elasticmapreduce.model.DescribeStepRequest;
import com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig;
import com.amazonaws.services.elasticmapreduce.model.StepConfig;
import com.amazonaws.services.elasticmapreduce.model.StepStatus;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import org.opensearch.sql.spark.client.SparkClient;
import org.opensearch.sql.spark.helper.FlintHelper;
import org.opensearch.sql.spark.response.SparkResponse;

public class EmrClientImpl
implements SparkClient {
    private final AmazonElasticMapReduce emr;
    private final String emrCluster;
    private final FlintHelper flint;
    private final String sparkApplicationJar;
    private static final Logger logger = LogManager.getLogger(EmrClientImpl.class);
    private SparkResponse sparkResponse;

    public EmrClientImpl(AmazonElasticMapReduce emr, String emrCluster, FlintHelper flint, SparkResponse sparkResponse, String sparkApplicationJar) {
        this.emr = emr;
        this.emrCluster = emrCluster;
        this.flint = flint;
        this.sparkResponse = sparkResponse;
        this.sparkApplicationJar = sparkApplicationJar == null ? "s3://spark-datasource/sql-job.jar" : sparkApplicationJar;
    }

    @Override
    public JSONObject sql(String query) throws IOException {
        this.runEmrApplication(query);
        return this.sparkResponse.getResultFromOpensearchIndex();
    }

    @VisibleForTesting
    void runEmrApplication(String query) {
        HadoopJarStepConfig stepConfig = new HadoopJarStepConfig().withJar("command-runner.jar").withArgs(new String[]{"spark-submit", "--class", "org.opensearch.sql.SQLJob", "--jars", this.flint.getFlintIntegrationJar(), this.sparkApplicationJar, query, ".query_execution_result", this.flint.getFlintHost(), this.flint.getFlintPort(), this.flint.getFlintScheme(), this.flint.getFlintAuth(), this.flint.getFlintRegion()});
        StepConfig emrstep = new StepConfig().withName("Spark Application").withActionOnFailure(ActionOnFailure.CONTINUE).withHadoopJarStep(stepConfig);
        AddJobFlowStepsRequest request = new AddJobFlowStepsRequest().withJobFlowId(this.emrCluster).withSteps(new StepConfig[]{emrstep});
        AddJobFlowStepsResult result = this.emr.addJobFlowSteps(request);
        logger.info("EMR step ID: " + result.getStepIds());
        String stepId = (String)result.getStepIds().get(0);
        DescribeStepRequest stepRequest = new DescribeStepRequest().withClusterId(this.emrCluster).withStepId(stepId);
        this.waitForStepExecution(stepRequest);
        this.sparkResponse.setValue(stepId);
    }

    private void waitForStepExecution(DescribeStepRequest stepRequest) {
        boolean completed = false;
        while (!completed) {
            StepStatus statusDetail = this.emr.describeStep(stepRequest).getStep().getStatus();
            if (statusDetail.getState().equals("COMPLETED")) {
                completed = true;
                logger.info("EMR step completed successfully.");
                continue;
            }
            if (statusDetail.getState().equals("FAILED") || statusDetail.getState().equals("CANCELLED")) {
                logger.error("EMR step failed or cancelled.");
                throw new RuntimeException("Spark SQL application failed.");
            }
            Thread.sleep(2500L);
        }
    }
}

